In [0]:
# Iniciamos sesion en SPARK
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## *Transformaciones Wide*. 
Las siguientes transformaciones, además de trabajar con RDD de pares, mezclan los datos de las particiones mediante el shuffle de los elementos.
Usamos el siguiente fichero: pdi_sales.csv  

Haremos la carga tal y como se comentó en el notebook de Transformaciones Narrow

In [0]:
rdd = spark.read.options(delimiter=";", header=True).csv("/FileStore/tables/pdi_sales.csv") # Cargamos el fichero manteniendo los encabezados.
rdd.show()

+---------+---------+---------------+-----+-------+-------+
|ProductID|     Date|            Zip|Units|Revenue|Country|
+---------+---------+---------------+-----+-------+-------+
|      725|1/15/1999|41540          |    1|  115.5|Germany|
|      787| 6/6/2002|41540          |    1|  314.9|Germany|
|      788| 6/6/2002|41540          |    1|  314.9|Germany|
|      940|1/15/1999|22587          |    1|  687.7|Germany|
|      396|1/15/1999|22587          |    1|  857.1|Germany|
|      734|4/10/2003|22587          |    1|  330.7|Germany|
|      769|2/15/1999|22587          |    1|  257.2|Germany|
|      499|1/15/1999|12555          |    1|  846.3|Germany|
|     2254|1/15/1999|40217          |    1|   57.7|Germany|
|       31|5/31/2002|40217          |    1|  761.2|Germany|
|      475|2/15/1999|13583          |    1|  970.2|Germany|
|      510|1/15/1999|22337          |    1|  837.1|Germany|
|      499| 6/5/2002|22337          |    1|    883|Germany|
|      289|2/15/1999|13587          |   

### *ReduceByKey*
Mediante la transformación *reduceByKey* los datos se calculan utilizando una función de reducción a partir de la clave combinando en la misma máquina las parejas con la misma clave antes de que los datos se barajen.

Vamos a comenzar con un ejemplo simple donde se va a contar las veces que aparece cada palabra en el fichero:

In [0]:
rdd = sc.textFile("/FileStore/tables/pdi_sales.csv")
parPais1 = rdd.map(lambda x: (x.split(";")[-1].strip(), 1))

# Quitamos el encabezado
header = parPais1.first()
parPais1SinHeader = parPais1.filter(lambda linea: linea != header)

# Aplicamos la reducción por clave (reduceByKey())
paisesTotal = parPais1SinHeader.reduceByKey(lambda a,b: a+b)

# Mostramos por pantalla
paisesTotal.collect()

Out[8]: [('Mexico', 217007),
 ('France', 314750),
 ('Germany', 234072),
 ('Canada', 75318)]

#### *Funciones Lambda en Reduce*
Al aplicar una transformación de tipo reduce, la función lambda recibirá dos parámetros, siendo el primero el valor acumulado y el segundo el valor del elemento a operar

Veamos otro ejemplo, en este caso vamos a calcular el total de unidades vendidas por país, de manera que vamos a coger el nombre del país (Country) y las unidades (Units) vendidas:

In [0]:
rdd = sc.textFile("/FileStore/tables/pdi_sales.csv")

# Recogemos el país y las unidades de las ventas
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))

# Le quitamos el encabezado
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)

# Pasamos las unidades a un número entero
paisesUnidadesInt = paisesUnidadesSinHeader.map(lambda x: (x[0], int(x[1])))

# Reducimos por el país y sumamos las unidades
paisesTotalUnidades = paisesUnidadesInt.reduceByKey(lambda a,b: a+b)
paisesTotalUnidades.collect()

Out[9]: [('Mexico', 223463),
 ('France', 327730),
 ('Germany', 244265),
 ('Canada', 77609)]

### *EJERCICIO*


Dada la siguiente lista de compra:
```python
lista = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
```
Calcular:

1. El total que se ha gastado por cada producto
2. Cuánto es lo máximo que se ha pagado por cada producto

### *GroupByKey*
Permite agrupar los datos a partir de una clave, repartiendo los resultados (shuffle) entre todos los nodos:

In [0]:
rdd = sc.textFile("/FileStore/tables/pdi_sales.csv")

# Creamos un RDD de pares con el nombre del país como clave, y una lista con los valores
ventas = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")))

# Quitamos el primer elemento que es el encabezado del CSV
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)

# Agrupamos las ventas por nombre del país
paisesAgrupados = ventas.groupByKey()
paisesAgrupados.collect()

Out[11]: [('Country', <pyspark.resultiterable.ResultIterable at 0x7f7886664b50>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x7f7887c504f0>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x7f7891010a90>),
 ('Germany', <pyspark.resultiterable.ResultIterable at 0x7f78914328e0>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x7f7890e59b50>)]

Se obtiene para cada país, un iterable con todos sus datos, tal y como se visualiza en el resultado anterior.
Para transformar los iterables en una lista ejecutamos el siguiente código:

In [0]:
paisesAgrupadosLista = paisesAgrupados.map(lambda x: (x[0], list(x[1])))
paisesAgrupadosLista.collect()

Out[12]: [('Country', [['ProductID', 'Date', 'Zip', 'Units', 'Revenue', 'Country']]),
 ('Mexico',
  [['2235', '1/15/1999', '8650           ', '1', '65.6', 'Mexico '],
   ['837', '2/15/1999', '8650           ', '1', '840', 'Mexico '],
   ['491', '2/15/1999', '8650           ', '1', '815.3', 'Mexico '],
   ['426', '5/31/2002', '8650           ', '1', '843.1', 'Mexico '],
   ['400', '6/6/2002', '8650           ', '1', '734.7', 'Mexico '],
   ['1131', '2/15/1999', '8650           ', '1', '419.9', 'Mexico '],
   ['2277', '3/15/1999', '8650           ', '1', '251.9', 'Mexico '],
   ['491', '3/15/1999', '8650           ', '1', '815.3', 'Mexico '],
   ['467', '6/8/2002', '8650           ', '1', '900.6', 'Mexico '],
   ['565', '4/12/2003', '2000           ', '1', '890.7', 'Mexico '],
   ['604', '4/12/2003', '2000           ', '1', '494.8', 'Mexico '],
   ['940', '3/15/1999', '2000           ', '1', '687.7', 'Mexico '],
   ['2277', '1/15/1999', '8640           ', '1', '251.9', 'Mexico '],
   ['7

### *EJERCICIO*

Ahora tenemos las cuentas de las compras de 3 días:

- día 1: pan 3€, agua 2€, azúcar 1€, leche 2€, pan 4€
- día 2: pan 1€, cereales 3€, agua 0.5€, leche 2€, filetes 5€
- día 3: filetes 2€, cereales 1€

Dada la siguiente lista de compra:
```python
dia1 = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',4)]
dia2 = [('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
dia3 = [('filetes',2), ('cereales',1)]
```


1. ¿Cómo obtenemos lo que hemos gastado en cada producto?
2. ¿Y el gasto medio que hemos realizado en cada uno de ellos?

**NOTA**: Si el tipo de operación a realizar es posible mediante una operación de *reduce*, su rendimiento será una solución más eficiente!!!

## *SortByKey*.  

*sortByKey* permite ordenar los datos a partir de una clave (Ojo!!! Ordena por CLAVE). Los pares de la misma máquina se ordenan primero por la misma clave, y luego los datos de las diferentes particiones se barajan.

Para ello crearemos una tupla, siendo el primer elemento un valor numérico por el cual ordenar, y el segundo el dato asociado.

Vamos a partir del ejemplo anterior para ordenar los paises por la cantidad de ventas:

In [0]:
rdd = sc.textFile("/FileStore/tables/pdi_sales.csv")
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
paisesTotalUnidades = paisesUnidadesSinHeader.reduceByKey(lambda a,b: int(a)+int(b))

# Le damos la vuelta a la lista
unidadesPaises = paisesTotalUnidades.map(lambda x: (x[1],x[0]))
unidadesPaises.collect()

Out[13]: [(223463, 'Mexico'),
 (327730, 'France'),
 (244265, 'Germany'),
 (77609, 'Canada')]

A continuación, procedemos a la ordenación usando *sortByKey*

In [0]:
unidadesPaisesOrdenadas = unidadesPaises.sortByKey()
unidadesPaisesOrdenadas.collect()

Out[14]: [(77609, 'Canada'),
 (223463, 'Mexico'),
 (244265, 'Germany'),
 (327730, 'France')]

Si quisiéramos obtener los datos en orden descendente, le pasamos False a la transformación:

In [0]:
unidadesPaisesOrdenadasDesc = unidadesPaises.sortByKey(False)
unidadesPaisesOrdenadasDesc.collect()

Out[16]: [(327730, 'France'),
 (244265, 'Germany'),
 (223463, 'Mexico'),
 (77609, 'Canada')]

Otro ejemplo más "compacto"

In [0]:
prdd   = sc.parallelize([('a', 2), ('b', 5), ('a', 8), ('b', 6), ('b', 2)]).cache()
print("RDD completo: {0}".format(prdd.collect()))
print("RDD con las claves: {0}".format(prdd.keys().collect()))
print("RDD con los valores: {0}".format(prdd.values().collect()))
print("RDD con las claves ordenadas: {0}".format(prdd.sortByKey().collect()))
print("RDD con las claves ordenadas de forma descendente: {0}".format(prdd.sortByKey(False).collect()))

RDD completo: [('a', 2), ('b', 5), ('a', 8), ('b', 6), ('b', 2)]
RDD con las claves: ['a', 'b', 'a', 'b', 'b']
RDD con los valores: [2, 5, 8, 6, 2]
RDD con las claves ordenadas: [('a', 2), ('a', 8), ('b', 5), ('b', 6), ('b', 2)]
RDD con las claves ordenadas de forma descendente: [('b', 5), ('b', 6), ('b', 2), ('a', 2), ('a', 8)]


#### Nota sobre JOIN
Aunque los RDD permitan realizar operaciones tipo *join*, realmente este tipo de operaciones se realizan mediante DataFrames, por lo que se verá en otro Notebook.