Programa MapReduce para la unión interna de datos de dos tablas mediante una clave común. Este es de mi cosecha; habría que mejorarlo para las otras 
uniones: izquierda, derecha, completa y la antiunión.

In [1]:
%config IPCompleter.greedy=True

In [5]:
%%writefile reduceSideJoin.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,os

class reduceSideJoin(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="clientes.csv":
                #linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                #linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[6] #Esta es la otra clave común de la tabla 2. SI CAMBIAMOS LE ORIGEN DE DATOS ESTO HAY QUE VARIARLO
                yield clave,linea
    
    
    def reducer(self,key,values):
        lista=[]
        union=[]
        for valor in values:
            lista.append(valor)
        
        for elemento in range(1,len(lista)):
            union.append(lista[0]+lista[elemento])
            
        for registro in union:
            yield key,registro
      
            
if __name__ == '__main__':
    reduceSideJoin.run()

Overwriting reduceSideJoin.py


In [6]:
!python reduceSideJoin.py clientes.csv ventas.csv > resultadoVentas.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MRSANC~1\AppData\Local\Temp\reduceSideJoin.mrsanchez.20200812.195315.406881
Running step 1 of 1...
job output is in C:\Users\MRSANC~1\AppData\Local\Temp\reduceSideJoin.mrsanchez.20200812.195315.406881\output
Streaming final output from C:\Users\MRSANC~1\AppData\Local\Temp\reduceSideJoin.mrsanchez.20200812.195315.406881\output...
Removing temp directory C:\Users\MRSANC~1\AppData\Local\Temp\reduceSideJoin.mrsanchez.20200812.195315.406881...


A continuación vamos a investigar con una versión en la que no vamos a identificar el nombre del archivo que entra por Stream; vamos a ver si MRjob organiza las claves valor.
RESULTADO: efectivamente MRJob organiza de forma autónoma cada clave con su registro correspondiente independientemente de las tablas. Lo que tenemos que tener en cuenta es donde está la clave común en cada una de las tablas, y aquí sí tenemos que conocer el nombre del archivo que está entrando, por que puede darse el caso que la clave esté en distintas columnas en cada una de las tablas implicadas en la unión. Habrá ocasiones en que podamos modificar la posición de la columna en el origen de datos, pero en otras ocasiones no será posible, que sería lo más comun esto último.

In [35]:
%%writefile pruebaJoin.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class pruebaJoin(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
            yield clave,linea
            
    
    def reducer(self,key,values):
        lista=[]
        union=[]
        for valor in values:
            lista.append(valor)
        
        for elemento in range(1,len(lista)):
            union.append(lista[0]+lista[elemento])
            
        for registro in union:
            yield key,registro
      
            
if __name__ == '__main__':
    pruebaJoin.run()

Writing pruebaJoin.py


In [36]:
!python pruebaJoin.py tablaA.csv tablaB.csv > resultado_prueba

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MRSANC~1\AppData\Local\Temp\pruebaJoin.mrsanchez.20200811.104540.565129
Running step 1 of 1...
job output is in C:\Users\MRSANC~1\AppData\Local\Temp\pruebaJoin.mrsanchez.20200811.104540.565129\output
Streaming final output from C:\Users\MRSANC~1\AppData\Local\Temp\pruebaJoin.mrsanchez.20200811.104540.565129\output...
Removing temp directory C:\Users\MRSANC~1\AppData\Local\Temp\pruebaJoin.mrsanchez.20200811.104540.565129...


Uniones varias: interna, por la izquierda, por la derecha, completa y antiunión. Basado en el libro de Donald Miner y Adam Shook. Este archivo es el ejemplo de donde extraemos todos los algoritmos de unión.

In [46]:
%%writefile unionesVarias.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class unionesVarias(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
    
    def reducer_init(self):
        self.listaA=[]
        self.listaB=[]
        
    def reducer(self,key,values):
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                self.listaA.append(valor)
            else:
                self.listaB.append(valor)
        
        # Union interna ########################
        if self.listaA and self.listaB:
            for A in self.listaA:
                for B in self.listaB:
                    yield A, B
        
        # Union por la izquierda ####################
        for A in self.listaA:
            if self.listaB:
                for B in self.listaB:
                    yield A, B
            else:
                #Si la listaB está vacía
                yield A, "null" 
                
        # Union por la derecha ################
        for B in self.listaB:
            if self.listaA:
                for A in self.listaA:
                    yield A, B
            else:
                #Else, output A by itself
                yield "null",B
        
        # Union completa ######################
        # Si listaA no está vacía, commprobamos cada una de sus entradas
        if self.listaA: 
        # Por cada entrada en la listaA
            for A in self.listaA:
            #Si la listaB no está vacía, unimos A con B
                if self.listaB:
                    for B in self.listaB:
                        yield A, B
                else:
                #Si no es el caso, sacamos A con union nula
                    yield A, "null"
        else:
        #En cambio si la listaA está vacía, sacamos solo los elementos de la listaB
            for B in self.listaB:
                yield "null", B
        
        # Antiunion ############################
        #Si la listaA o la listaB están vacías
        if not self.listaA or not self.listaB:

        # Iteramos los valores vacíos de las dos listas
            for A in self.listaA:
                yield A, "null"
            for B in self.listaB:
                yield "null", B

if __name__ == '__main__':
    unionesVarias.run()

Overwriting unionesVarias.py


In [47]:
!python unionesVarias.py tablaA.csv tablaB.csv > unionesVarias

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\unionesVarias.manuel.20200811.173445.916581
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\unionesVarias.manuel.20200811.173445.916581\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\unionesVarias.manuel.20200811.173445.916581\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\unionesVarias.manuel.20200811.173445.916581...


=============================================UNIÓN INTERNA=================================

In [1]:
%%writefile unionInterna.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class unionInterna(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
        
    def reducer(self,key,values):
        listaA=[]
        listaB=[]
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                listaA.append(valor)
            else:
                listaB.append(valor)
        
        # Union interna ########################
        if listaA and listaB:
            for A in listaA:
                for B in listaB:
                    yield key,(A, B)
if __name__ == '__main__':
    unionInterna.run()

Overwriting unionInterna.py


In [2]:
!python unionInterna.py tablaA.csv tablaB.csv > unionInterna.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\unionInterna.manuel.20200813.081851.269326
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\unionInterna.manuel.20200813.081851.269326\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\unionInterna.manuel.20200813.081851.269326\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\unionInterna.manuel.20200813.081851.269326...


========================UNIÓN POR LA IZQUIERDA====================================

In [1]:
%%writefile unionIzquierda.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class unionIzquierda(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
    
   
       
    #########################################
        
    def reducer(self,key,values):
        listaA=[]
        listaB=[]
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                listaA.append(valor)
            else:
                listaB.append(valor)
        
        # Union por la izquierda ####################
        for A in listaA:
            if listaB:
                for B in listaB:
                    yield A, B
            else:
                #Si la listaB está vacía
                yield A, "null" 
                
if __name__ == '__main__':
    unionIzquierda.run()

Writing unionIzquierda.py


In [2]:
!python unionIzquierda.py tablaA.csv tablaB.csv > unionIzquierda.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\MRSANC~1\AppData\Local\Temp\unionIzquierda.mrsanchez.20200820.184128.655718
Running step 1 of 1...
job output is in C:\Users\MRSANC~1\AppData\Local\Temp\unionIzquierda.mrsanchez.20200820.184128.655718\output
Streaming final output from C:\Users\MRSANC~1\AppData\Local\Temp\unionIzquierda.mrsanchez.20200820.184128.655718\output...
Removing temp directory C:\Users\MRSANC~1\AppData\Local\Temp\unionIzquierda.mrsanchez.20200820.184128.655718...


========================UNIÓN POR LA DERECHA====================================

In [6]:
%%writefile unionDerecha.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class unionDerecha(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
    
        
    def reducer(self,key,values):
        listaA=[]
        listaB=[]
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                listaA.append(valor)
            else:
                listaB.append(valor)
        
        # Union por la derecha ####################
        for B in listaB:
            if listaA:
                for A in listaA:
                    yield A, B
            else:
                #Si la listaA está vacía
                yield "null",B
                
if __name__ == '__main__':
    unionDerecha.run()

Overwriting unionDerecha.py


In [7]:
!python unionDerecha.py tablaA.csv tablaB.csv > unionDerecha.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\unionDerecha.manuel.20200813.152455.281172
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\unionDerecha.manuel.20200813.152455.281172\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\unionDerecha.manuel.20200813.152455.281172\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\unionDerecha.manuel.20200813.152455.281172...


In [9]:
%%writefile unionCompleta.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class unionCompleta(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
    
        
    def reducer(self,key,values):
        listaA=[]
        listaB=[]
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                listaA.append(valor)
            else:
                listaB.append(valor)
        
        # Si listaA no está vacía, commprobamos cada una de sus entradas
        if listaA: 
        # Por cada entrada en la listaA
            for A in listaA:
            #Si la listaB no está vacía, unimos A con B
                if listaB:
                    for B in listaB:
                        yield A, B
                else:
                #Si no es el caso, sacamos A con union nula
                    yield A, "null"
        else:
        #En cambio si la listaA está vacía, sacamos solo los elementos de la listaB
            for B in listaB:
                yield "null", B
                
if __name__ == '__main__':
    unionCompleta.run()

Overwriting unionCompleta.py


In [10]:
!python unionCompleta.py tablaA.csv tablaB.csv > unionCompleta.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\unionCompleta.manuel.20200813.153727.272390
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\unionCompleta.manuel.20200813.153727.272390\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\unionCompleta.manuel.20200813.153727.272390\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\unionCompleta.manuel.20200813.153727.272390...


In [14]:
%%writefile antiunion.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,os

class antiunion(MRJob):
    
    #Función que limpia lo que el parámetro devuelve en forma "file://nombre_fichero" para dejarlo solo con el nombre 
    #del fichero: "nombre_fichero"
    def limpiarNombreArchivo(self,archivo):
        tamano=len(archivo)
        return archivo[7:tamano]
    def mapper_init(self):
        self.namefile=self.limpiarNombreArchivo(os.getenv('map_input_file')) #Usamos el parámetro para saber que nos llega desde streaming
        #map_input_file devuelve una cadena de caracteres correspondiente al archivo de entrada por el stream en formato:
        # file://nombre_fichero
        
    def mapper(self,_,line):
        clave=""
        linea=line.split(';')
        encontrado=re.search('[a-zA-Z]',linea[0])#Para que no tenga en cuenta las cabeceras de las tablas
        if encontrado==None:
            #Modificar el nombre del archivo que contiene la tablaA, cuando queramos usar otras tablas
            if self.namefile=="tablaA.csv":
                linea.append(self.namefile)#Añadimos al registro el nombre del archivo
                #Clave de la tablaA
                clave=linea[0] #Esta clave es la común de la tabla 1, que me permite hacer 
                                #la unión con la de la tabla 2 
                yield clave,linea
            else:
                linea.append(self.namefile) #Aquí estoy usando el nombre del archivo, pero puedo usar otro identificador
                                        #como en el libro Dessign Pattern que usa A y B. Añadimos al registro el nombre
                                        #del archivo.
                #Clave de la tablaB
                clave=linea[0] #Esta es la otra clave común de la tabla 2
                yield clave,linea
    
        
    def reducer(self,key,values):
        listaA=[]
        listaB=[]
        #Llenamos las dos listas
        for valor in values:
            if valor[len(valor)-1]=="tablaA.csv":
                listaA.append(valor)
            else:
                listaB.append(valor)
        
        # Antiunion ############################
        #Si la listaA o la listaB están vacías
        if not listaA or not listaB:

        # Iteramos los valores vacíos de las dos listas
            for A in listaA:
                yield A, "null"
            for B in listaB:
                yield "null", B
       
                
if __name__ == '__main__':
    antiunion.run()

Overwriting antiunion.py


In [12]:
!python antiunion.py tablaA.csv tablaB.csv > antiunion.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\antiunion.manuel.20200813.154009.022142
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\antiunion.manuel.20200813.154009.022142\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\antiunion.manuel.20200813.154009.022142\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\antiunion.manuel.20200813.154009.022142...


Ejemplo para comprobar que fichero entra en el map. Esto lo necesitaremos para desarrollar el patron de unión, por que dependiendo del fichero que entre, debermos de realizar distintas acciones.
Usando un parámetro configurado (map_input_file), puedo saber que archivo Hadoop Streaming está leyendo
de esta forma controlaremos lo que entra para desarrollar el patrón de unión y saber que clave/valor ha de ir a cada estructura, que luego usaremos en el reducer para hacer la unión. Mas informacón de parámetros configurados en:
http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Configured+Parameters
EJEMPLO DE USO:
https://www-it--swarm-dev.cdn.ampproject.org/v/s/www.it-swarm.dev/es/hadoop/como-obtener-el-nombre-del-archivo-de-entrada-en-el-mapeador-en-un-programa-hadoop/1042140842/amp/?usqp=mq331AQFKAGwASA%3D&amp_js_v=0.1#aoh=15964086144637&referrer=https%3A%2F%2Fwww.google.com&amp_tf=From%20%251%24s&ampshare=https%3A%2F%2Fwww.it-swarm.dev%2Fes%2Fhadoop%2Fcomo-obtener-el-nombre-del-archivo-de-entrada-en-el-mapeador-en-un-programa-hadoop%2F1042140842%2F
Este ejemplo lo voy a coger como base para desarrollar el patrón

In [21]:
%%writefile nombrefichero.py
#!/usr/bin/env python
from mrjob.job import MRJob
import re,sys,fileinput,os

class nombrefichero(MRJob):
    
    def mapper_init(self):
        self.namefile=os.getenv('map_input_file') #Usamos el parámetro para saber que nos llega desde streaming
        self.namefile2="file://ventas_ESP.csv" #Tenemos que compararlo con el otro u otros archivos que le metemos para
                                                # hacer la unión
        linea=[] #Declaramos una lista que le pasaremos al reducer con el valor.
        
    def mapper(self,_,line):
        if self.namefile2!=self.namefile:      
            linea=line.split(';')
            yield self.namefile,linea
        else:
            linea=line.split(';')
            yield "Fichero nuevo: " + self.namefile,linea
        
    """def reducer(self, key, values):
        for record in values:
            yield key,record"""

if __name__ == '__main__':
    nombrefichero.run()

Overwriting nombrefichero.py


In [22]:
!python nombrefichero.py ventas_AUS.csv ventas_ESP.csv > resultados

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\cyber\AppData\Local\Temp\nombrefichero.manuel.20200808.110407.188379
Running step 1 of 1...
job output is in C:\Users\cyber\AppData\Local\Temp\nombrefichero.manuel.20200808.110407.188379\output
Streaming final output from C:\Users\cyber\AppData\Local\Temp\nombrefichero.manuel.20200808.110407.188379\output...
Removing temp directory C:\Users\cyber\AppData\Local\Temp\nombrefichero.manuel.20200808.110407.188379...
