### Universidad Nacional de Lujan - Bases de Datos Masivas (11088) - Cavasin Nicolas #143501

# TP06: Frameworks de procesamiento distribuido

## Hadoop MapReduce:
El archivo *ventas.txt* posee las ventas de una empresa con los siguientes datos: *id_vendedor*, *id_coordinador*, *cantidad_de_productos_vendidos*, *cantidad_de_dinero*.

Genere un esquema bajo el paradigma *MapReduce* para resolver las siguientes consignas:

- Produzca un mapper y un reducer para responder cuál es el bonus obtenido por cada vendedor siendo que cada vendedor obtiene el 3% del total del dinero vendido.

- Produzca un mapper y un reducer para obtener la cantidad de productos vendidos por cada vendedor, agrupado por coordinador.



In [6]:
!wget https://raw.githubusercontent.com/bdm-unlu/2020/master/TPs/TP06/data/ventas.txt

--2020-12-04 01:38:42--  https://raw.githubusercontent.com/bdm-unlu/2020/master/TPs/TP06/data/ventas.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3705191 (3.5M) [text/plain]
Saving to: ‘ventas.txt’


2020-12-04 01:38:42 (34.6 MB/s) - ‘ventas.txt’ saved [3705191/3705191]



Para emular el funcionamiento de Hadoop, se utilizarán 3 scripts:

1. ``mapper.py``: 
    - Realiza la transformación al formato deseado ``<key, value>`` quedando ``<id_vendedor, cantidad_de_dinero>``.
    - Escribe el resultado a un archivo *01-mapped_a.txt* en formato *.tsv* para que lo consuma el siguiente script.

2. ``sorter.py``: 
    - Ordena la salida del ``script_mapper`` por *id_vendedor*.
    - Escribe un nuevo archivo *02-sorted_b.txt* en formato *.tsv*.

3. ``reducer.py``:
    - Consume la salida del ``script_sorter``.
    - Acumula y calcula el 3% del total de dinero que le corresponde a cada *id_vendedor*.
    - Escribe los resultados en nuevo archivo *03-reduced_b.txt* en formato *.tsv*.

___

# Paso 1 - Mapping:

In [10]:
# Abro el archivo ventas
with open("ventas.txt") as file_hdfs:

    # Creo el archivo mapper.txt
    with open('01-mapped_a.txt', 'w') as map_file:

        # Leo cada linea del archivo
        for line in file_hdfs:

            # Se eliminan los espacios en blanco iniciales y finales
            line = line.strip()

            # Separo la linea en palabras y obtengo una lista 
            words = line.split()

            # Imprimo el id_vendedor y cantidad_de_dinero
            map_file.write(f'{words[0]}\t{words[3]}\n')

# 'with' cierra automaticamente todos los archivos

# Paso 2 - Sorting:

In [11]:
# Abro el archivo mapped
with open('01-mapped_a.txt', 'r') as map_file:

    # Creo el archivo sorted
    with open('02-sorted_a.txt', 'w') as sort_file:

        # Ordeno y escribo el nuevo archivo ordenado
        for line in sorted(map_file):
            sort_file.write(line)

# Paso 3 - Reducing:

In [20]:
# Referencias
vendedor_actual = None
dinero_actual = 0

# Abro el archivo ordenado por id_vendedor
with open('02-sorted_a.txt', 'r') as sort_file:

    # Creo el archivo reducer
    with open('03-reduced_a.txt', 'w') as red_file:

        # Por cada linea del sorter
        for line in sort_file:

            # Obtengo el <key, value> de cada linea 
            vendedor, dinero = line.split('\t')

            # Convierto dinero a float
            dinero = float(dinero)

            # Si no hubo un cambio de vendedor acumulo
            if vendedor == vendedor_actual:
                dinero_actual += dinero
            else:
                # Si hubo un cambio de vendedor imprimo y actualizo

                if vendedor:
                    # Calculo el bono
                    bono = dinero_actual * 0.03

                    # Informo por consola
                    print(vendedor_actual, '\t', bono)

                    # Escribo tambien el archivo reduced
                    red_file.write(f'{vendedor_actual}\t{bono}\n')

                    # Modifico los 'actuales'
                    vendedor_actual = vendedor
                    dinero_actual = dinero


# Agrego la ultima linea procesada
bono = (dinero_actual + dinero)* 0.03

print(vendedor_actual, '\t', bono)

with open('03-reduced_a.txt', 'a') as red_file:
    red_file.write(f'{vendedor_actual}\t{bono}\n')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
28138 	 272.3980338217683
28139 	 168.08776134256644
28142 	 92.90485393782366
28145 	 231.91605069596105
28146 	 139.74001609596442
28149 	 151.5498231010279
28150 	 285.46684262522825
28154 	 170.85310505612452
28155 	 96.45120447299027
28156 	 262.9958950242941
28157 	 178.97220940430287
28158 	 242.531807601141
28160 	 226.00773650091512
28162 	 124.97123844408048
28164 	 65.2783910954736
28166 	 291.32894141903563
28168 	 270.5256950852117
2817 	 327.38927926429966
28170 	 167.51288586571465
28171 	 426.0686442526374
28174 	 240.86794861409246
28175 	 129.26209754220315
28176 	 10.39883150532786
28177 	 209.23630866205139
28179 	 133.17210911380684
28185 	 281.90595917753217
28188 	 166.1693548190264
28189 	 204.99953107060847
28194 	 205.53978729468741
28196 	 79.34100210971252
2820 	 153.82895060341264
28202 	 90.23912894457526
28203 	 209.58049657716467
28205 	 360.9844662737962
28206 	 221.5547460786789
28208 	 2

___

A continuación se realiza el mismo proceso pero para el punto b:

1. ``mapper.py``: 
    - Realiza la transformación al formato deseado ``<key, value>`` quedando ``<[id_coordinador, id_vendedor], cantidad_de_productos_vendidos>``.
    - Escribe el resultado a un archivo *01-mapped_b.txt* en formato *.tsv* para que lo consuma el siguiente script.

2. ``sorter.py``: 
    - Ordena la salida del ``script_mapper`` por *id_coordinador* + *id_vendedor*.
    - Escribe un nuevo archivo *02-sorted_b.txt* en formato *.tsv*.

3. ``reducer.py``:
    - Consume la salida del ``script_sorter``.
    - Calcula el total de productos vendidos agrupados por coordinador y vendedor.
    - Escribe los resultados en nuevo archivo *03-reduced_b.txt* en formato *.tsv*.

In [23]:
# Abro el archivo ventas
with open("ventas.txt") as file_hdfs:

    # Creo el archivo mapper.txt
    with open('01-mapped_b.txt', 'w') as map_file:

        # Leo cada linea del archivo
        for line in file_hdfs:

            # Se eliminan los espacios en blanco iniciales y finales
            line = line.strip()

            # Separo la linea en palabras y obtengo una lista 
            words = line.split()

            # Imprimo el id_coordinador, id_vendedor y cantidad_de_productos_vendidos
            map_file.write(f'{words[1]}\t{words[0]}\t{words[2]}\n')

# 'with' cierra automaticamente todos los archivos

# Abro el archivo mapped
with open('01-mapped_b.txt', 'r') as map_file:

    # Creo el archivo sorted
    with open('02-sorted_b.txt', 'w') as sorted_file:

        # Ordeno y escribo el nuevo archivo ordenado
        for line in sorted(map_file):
            sorted_file.write(line)

# Referencias
coordinador_actual = None
vendedor_actual = None
cantidad_actual = 0

# Abro el archivo ordenado por id_vendedor
with open('02-sorted_b.txt', 'r') as sorted_file:

    # Creo el archivo reducer
    with open('03-reduced_b.txt', 'w') as red_file:

        # Por cada linea del sorted
        for line in sorted_file:

            # Obtengo el <key, value> de cada linea 
            coordinador, vendedor, cantidad = line.split('\t')

            # Convierto de string a integer
            cantidad = int(dinero)

            # Si no hubo un cambio de clave, acumulo
            if coordinador == coordinador_actual and vendedor == vendedor_actual:
                cantidad_actual += cantidad
            else:
                # Si hubo un cambio de coordinador/vendedor imprimo y actualizo

                # Informo por consola
                print(coordinador_actual, '\t', vendedor_actual, '\t', cantidad_actual)

                # Escribo tambien el archivo reduced
                red_file.write(f'{coordinador_actual}\t{vendedor_actual}\t{cantidad_actual}\n')

                # Modifico los 'actuales'
                coordinador_actual = coordinador
                vendedor_actual = vendedor
                cantidad_actual = cantidad

# Agrego la ultima linea procesada
print(coordinador_actual, '\t', vendedor_actual, '\t', cantidad_actual+cantidad)

with open('03-reduced_b.txt', 'a') as red_file:
    red_file.write(f'{coordinador_actual}\t{vendedor_actual}\t{cantidad_actual+cantidad}\n')

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
28255 	 3576 	 2164
28255 	 4667 	 4869
28255 	 4678 	 3246
28255 	 5547 	 2164
28255 	 8738 	 4328
28305 	 19320 	 5951
28305 	 21452 	 3787
28305 	 22131 	 2705
28305 	 23167 	 3787
28305 	 24340 	 4869
28305 	 24616 	 4328
28305 	 27687 	 5951
28305 	 3133 	 3246
28305 	 3246 	 2164
28305 	 9420 	 3787
28319 	 12833 	 4328
28319 	 13623 	 2705
28319 	 14160 	 3787
28319 	 24536 	 3246
28319 	 25648 	 1623
28319 	 6347 	 6492
28319 	 8917 	 3787
28319 	 8985 	 3787
28377 	 11281 	 3246
28377 	 14829 	 2164
28377 	 19695 	 4869
28377 	 21351 	 5410
28377 	 23693 	 4869
28377 	 24276 	 3246
28377 	 27155 	 5951
28377 	 5185 	 4869
28377 	 6750 	 4328
28382 	 12162 	 2705
28382 	 1474 	 2164
28382 	 18202 	 3787
28382 	 20726 	 4869
28382 	 21312 	 2164
28382 	 23155 	 3787
28382 	 25006 	 2164
28382 	 27436 	 4328
28382 	 3785 	 4328
28388 	 1043 	 2164
28388 	 10768 	 2705
28388 	 19526 	 4328
28388 	 2064 	 5951
28388 	

## Apache Spark con PySpark:
Resuelva el ejercicio anterior con PySpark.


In [3]:
# Instalo pyspark y configuro el entorno
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 73kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 35.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=695a7f97d64bc319b43c411c7f478ca0cb0419b24b2ddae56d01d51c6d325976
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [4]:
# Importo Spark
from pyspark import SparkConf, SparkContext

# Seteo el master al entorno local y defino el nombre de la app para identificarla
conf = SparkConf().setMaster("local").setAppName("Bono vendedores")

# Inicializo el Spark Context
sc = SparkContext(conf = conf)

# Verifico inicializacion
sc

In [9]:
# Leo el archivo de ventas y separo cada valor 
rdd_ventas = sc.textFile("ventas.txt").\
                map(lambda line: line.split("\t"))

# Muestro la primer linea del archivo
print(f'Primer linea del archivo: {rdd_ventas.first()}')
print(f'Ocurrencias en archivo: {rdd_ventas.count()}.\n')

# Mapeo seleccionando id_vendedor y cantidad_de_dinero (columnas 0 y 3)
# Ademas convierto dinero a float y ordeno por id_vendedor
rdd_vendedor_dinero = rdd_ventas.\
                        map(lambda values: (values[0], float(values[3]))).\
                        sortByKey()

# Muestro el primer mapeo
print(f'Primer elemento del mapper: <{rdd_vendedor_dinero.first()}>')
print(f'Ocurrencias en mapper: {rdd_vendedor_dinero.count()}.\n')

# Ahora reduzco por id_vendedor y acumulo el dinero
rdd_vendedor_dinero = rdd_vendedor_dinero.reduceByKey(lambda id, dinero: id +dinero)

# Mapeo nuevamente para poder aplicar una multiplicacion y asi
# calcular el bono de cada vendedor
rdd_vendedor_bono = rdd_vendedor_dinero.map(lambda values: (values[0], values[1]*0.03))

# Muestro primer reduccion y ocurrencias
print(f'Primer elemento del reducer: <{rdd_vendedor_bono.first()}>')
print(f'Ocurrencias en reducer: {rdd_vendedor_bono.count()}.')
print()

Primer linea del archivo: ['17493', '6012', '21', '207.509827822219']
Ocurrencias en archivo: 119582.

Primer elemento del mapper: <('100', 341.811971935356)>
Ocurrencias en mapper: 119582.

Primer elemento del reducer: <('100', 286.54551868222)>
Ocurrencias en reducer: 15522.



Por último se muestra todo el contenido del RDD. Es decir, todos los pares ``<id_vendedor, bono>``:

In [None]:
# Muestro todos los id_vendedor con su correspondiente bono
for vendedor in rdd_vendedor_bono.collect():
    print(vendedor)

A continuación se realiza el punto b:

In [46]:
# Reutilizo el rdd del punto anterior 
# Mapeo <[id_coordinador, id_vendedor], cantidad_productos_vendidos> 
# Ademas, convierto cantidades a integer y ordeno por clave
rdd_coord_ventas = rdd_ventas.map(lambda values: ((values[1], values[0]), int(values[2]))).sortByKey()

# Muestro el primer mapeo
print(f'Primer elemento del mapper: <{rdd_coord_ventas.first()}>')
print(f'Ocurrencias en mapper: {rdd_coord_ventas.count()}.')
print()

# Acumulo las cantidades por clave
rdd_coord_cantidades = rdd_coord_ventas.reduceByKey(lambda coord, cant: coord  +cant)

# Muestro la primer reduccion
print(f'Primer elemento del reducer: <{rdd_coord_cantidades.first()}>')
print(f'Ocurrencias en reducer: {rdd_coord_cantidades.count()}.')
print()

Primer elemento del mapper: <(('10008', '11947'), 50)>
Ocurrencias en reducer: 119582.

Primer elemento del reducer: <(('10008', '11947'), 292)>
Ocurrencias en reducer: 15522.



In [None]:
# Muestro todo el contenido del rdd reducido
for val in rdd_coord_cantidades.collect():
    print(val)

In [48]:
# Finalizo la aplicacion
sc.stop()