# mrjob: Reunion de datasets

Una tarea que es de gran interés es la reunión o join de datasets. Esto es, agrupar dos datasets a través de un atributo común. En este ejercicio, partiremos de dos datasets, que contienen información sobre países y clientes. 

In [1]:
! mkdir -p mrjob/join

In [2]:
import os
os.chdir("/media/notebooks/mrjob/join")

FileNotFoundError: [Errno 2] No such file or directory: '/media/notebooks/mrjob/join'

In [3]:
! pwd

/media/notebooks/Untitled Folder


## Los ficheros countries.csv y clients.csv deben estar descargados en la carpeta /media/notebooks/mrjob/join

In [5]:
cat countries.csv

Name,Code
Afghanistan,AF
Åland Islands,AX
Albania,AL
Algeria,DZ
American Samoa,AS
Andorra,AD
Angola,AO
Anguilla,AI
Antarctica,AQ
Antigua and Barbuda,AG
Argentina,AR
Armenia,AM
Aruba,AW
Australia,AU
Austria,AT
Azerbaijan,AZ
Bahamas,BS
Bahrain,BH
Bangladesh,BD
Barbados,BB
Belarus,BY
Belgium,BE
Belize,BZ
Benin,BJ
Bermuda,BM
Bhutan,BT
"Bolivia, Plurinational State of",BO
"Bonaire, Sint Eustatius and Saba",BQ
Bosnia and Herzegovina,BA
Botswana,BW
Bouvet Island,BV
Brazil,BR
British Indian Ocean Territory,IO
Brunei Darussalam,BN
Bulgaria,BG
Burkina Faso,BF
Burundi,BI
Cambodia,KH
Cameroon,CM
Canada,CA
Cape Verde,CV
Cayman Islands,KY
Central African Republic,CF
Chad,TD
Chile,CL
China,CN
Christmas Island,CX
Cocos (Keeling) Islands,CC
Colombia,CO
Comoros,KM
Congo,CG
"Congo, the Democratic Republic of the",CD
Cook Islands,CK
Costa Rica,CR
Côte d'Ivoire,CI
Croatia,HR
Cuba,CU
Curaçao,C

In [6]:
cat clients.csv

Bertram Pearcy  ,bueno,SO
Steven Ulman  ,regular,ZA
Enid Follansbee  ,malo,GS
Candie Jacko  ,malo,SS
Alana Zufelt  ,regular,ES
Craig Pinkett  ,malo,LK
Carson Levey  ,bueno,GU
Reanna Calabrese  ,regular,GT
Elliott Kosak  ,malo,GG
Yuette Steinman  ,bueno,GN
Grisel Wines  ,regular,GW
Kathryne Dieguez  ,regular,AE
Donna Raabe  ,malo,GB
Norine Mundt  ,bueno,US
Brittaney Amaro  ,bueno,ES
Penni Husted  ,bueno,ES
Delmer Semon  ,malo,IT
Lennie Dunkerson  ,bueno,CA
Mayra Bobb  ,regular,IT
Altagracia Merced  ,regular,CA
Verda Belgrave  ,malo,GB
Jonnie Urban  ,malo,US
Chung Frankum  ,malo,ES
Vincenzo Samples  ,regular,TT
Dominick Barkan  ,bueno,GU
Carisa Ellingwood  ,bueno,TR
Garret Wess  ,regular,TM
Zoraida Muise  ,bueno,GU
Samantha Cusson  ,bueno,PT
Jenine Greenburg  ,regular,PR
Geri Paddock  ,bueno,QA
Antonia Klosterman  ,regular,RE
Moriah Galey  ,malo,RO
Nyla Eckard  ,malo,GB
Arlean Harries  ,malo,US
Kenyatta Lippold  ,malo,ES
Samuel Knipe  ,malo,MV
Jamison

In [7]:
%%writefile mrjob-ejercicio.py
#!/usr/bin/env python3
import sys, os, re
from mrjob.job import MRJob

class MRJoin(MRJob):

  # Realiza la ordenacion secundaria
  SORT_VALUES = True

  def mapper(self, _, line):
    splits = line.rstrip("\n").split(",")

    if len(splits) == 2: # datos de paises
      symbol = 'A' # ordenamos los paises antes que los datos de personas
      country2digit = splits[1]
      yield country2digit, [symbol, splits]
    else: #  datos de personas
      symbol = 'B'
      country2digit = splits[2]
      yield country2digit, [symbol, splits]

  def reducer(self, key, values):
    countries = [] # paises primero ya que llevan la clave 'A'
    for value in values:
      if value[0] == 'A':
        countries.append(value)
      if value[0] == 'B':
        for country in countries:
          yield key, country[1:] + value[1:]

if __name__ == '__main__':
    MRJoin.run()

Writing mrjob-ejercicio.py


Primero ejecutamos el códigoen local y luego en Hadoop

In [26]:
! python3 mrjob-ejercicio.py countries.csv clients.csv > ouputlocal

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mrjob-ejercicio.root.20221103.192411.245825
Running step 1 of 1...
job output is in /tmp/mrjob-ejercicio.root.20221103.192411.245825/output
Streaming final output from /tmp/mrjob-ejercicio.root.20221103.192411.245825/output...
Removing temp directory /tmp/mrjob-ejercicio.root.20221103.192411.245825...


In [27]:
! ls

tail: cannot open 'outputlocal' for reading: No such file or directory


In [29]:
! hdfs dfs -mkdir /tmp/mrjoin
! hdfs dfs -put countries.csv  /tmp/mrjoin
! hdfs dfs -put clients.csv  /tmp/mrjoin

mkdir: `/tmp/mrjoin': File exists


In [30]:
! hdfs dfs -ls  /tmp/mrjoin

Found 2 items
-rw-r--r--   3 root supergroup       1289 2022-11-03 20:27 /tmp/mrjoin/clients.csv
-rw-r--r--   3 root supergroup       4120 2022-11-03 20:27 /tmp/mrjoin/countries.csv


In [31]:
! hdfs dfs -rm /tmp/carpeta/mrjob-join-output/*
! hdfs dfs -rmdir /tmp/carpeta/mrjob-join-output

rm: `/tmp/carpeta/mrjob-join-output/*': No such file or directory
rmdir: `/tmp/carpeta/mrjob-join-output': No such file or directory


In [33]:
! python3 mrjob-ejercicio.py hdfs:///tmp/mrjoin/* -r hadoop --python-bin /opt/anaconda/bin/python3.7 \
--output-dir hdfs:///tmp/carpeta/mrjob-join-output

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /app/hadoop-3.3.1/bin...
Found hadoop binary: /app/hadoop-3.3.1/bin/hadoop
Using Hadoop version 3.3.1
Looking for Hadoop streaming jar in /app/hadoop-3.3.1...
Found Hadoop streaming jar: /app/hadoop-3.3.1/share/hadoop/tools/lib/hadoop-streaming-3.3.1.jar
Creating temp directory /tmp/mrjob-ejercicio.root.20221103.192948.857903
uploading working dir files to hdfs:///user/root/tmp/mrjob/mrjob-ejercicio.root.20221103.192948.857903/files/wd...
Copying other local files to hdfs:///user/root/tmp/mrjob/mrjob-ejercicio.root.20221103.192948.857903/files/
Running step 1 of 1...
  packageJobJar: [/tmp/hadoop-unjar5576048610416006465/] [] /tmp/streamjob8911985583310908246.jar tmpDir=null
  Connecting to ResourceManager at yarnmaster/172.18.0.6:8032
  Connecting to ResourceManager at yarnmaster/172.18.0.6:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.sta

  Task Id : attempt_1667501234871_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
	at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:326)
	at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:539)
	at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
	at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
	at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:466)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)

   m

In [14]:
! hdfs dfs -tail /tmp/carpeta/mrjob-join-output/part-00000

tail: `/tmp/carpeta/mrjob-join-output/part-00000': No such file or directory
