# Transformaciones

## Transformaciones Narrow

#### Map

In [0]:
# Cargamos un archivo de texto donde cada línea es un dato del RDD
rdd = sc.textFile("/FileStore/tables/empleados.txt") 
rdd.count()                 # 4 - cantidad de líneas
resultRDD = rdd.map(len)    # obtenemos la cantidad de caracteres cada línea
resultRDD.collect() 

In [0]:
# Muestra los datos de los empleados
empleados = rdd.collect()
for empleado in empleados:
    print(empleado)

#### FlatMap

In [0]:
# Se obtiene una única lista donde cada atributo es un elemento de la lista
rdd = sc.textFile("/FileStore/tables/empleados.txt") 
resultFM = rdd.flatMap(lambda x: x.split("|"))
resultFM.collect()

#### Filter

In [0]:
rdd = sc.textFile("/FileStore/tables/empleados.txt") 
hombres = rdd.filter(lambda x: x.split("|")[2].split(",")[0] == "Male")
resultFM.collect()

#### Distinct

In [0]:
rdd = sc.parallelize([1,1,2,2,3,4,5])
resultRDD = rdd.distinct()
resultRDD.collect()     

### Conjuntos

In [0]:
# Declaracion de los RDD
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])

In [0]:
# Union de conjuntos
resultRDD = rdd1.union(rdd2)
resultRDD.collect()

In [0]:
# Intersección de conjuntos
resultRDD = rdd1.intersection(rdd2)
resultRDD.collect()

In [0]:
# Resta del primer RDD los que están en un segundo
resultRDD = rdd1.subtract(rdd2)
resultRDD.collect()

## RDD de Pares

#### zip
Une dos listas del mismo tamaño

In [0]:
lista1 = ['a','b','c','e','f','g','h']
lista2 = [4, 5, 6, 7, 8, 9, 10]
rddZip = sc.parallelize(lista1).zip(sc.parallelize(lista2))
rddZip.collect()

In [0]:
rddZipSecuencia = sc.parallelize(zip(lista1,range(len(lista1)))) 
rddZipSecuencia.collect()

#### map
Asigna a cada elemento un valor o cálculo basado en él mismo

In [0]:
lista  = ['Hola', 'Adiós', 'Hasta luego']
rddMap = sc.parallelize(lista).map(lambda x: (x, len(x)))
rddMap.collect()

#### keyBy
crea las claves a partir de cada elemento

In [0]:
# creamos una clave con la primera letra
rddKeyBy = sc.parallelize(lista).keyBy(lambda x: x[0])
rddKeyBy.collect()

### Código de ejemplo

Otras transformaciones 
* **keys**: devuelve las claves
* **values**: devuelve los valores
* **mapValues**: Aplica la función sobre los valores
* **flatMapValues**: Aplica la función sobre los valores y los aplana.

In [0]:
listaTuplas = [('a',1), ('z',3), ('b',4), ('c',3), ('a',4)]
rddTuplas= sc.parallelize(listaTuplas)

claves = rddTuplas.keys()       
claves.collect()

In [0]:
valores = rddTuplas.values()
valores.collect()

In [0]:
rddMapValues = rddTuplas.mapValues(lambda x: (x,x*2))
rddMapValues.collect()

In [0]:
rddFMV = rddTuplas.flatMapValues(lambda x: (x,x*2))
rddFMV.collect()

## Transformaciones Wide

#### ReduceByKey

In [0]:
# Cálculo del número de ventas por país, a partir del fichero pdi_sales_small.csv
rdd = sc.textFile("/FileStore/tables/pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
parPais1 = rdd.map(lambda x: (x.split(";")[-1].strip(), 1))
parPais1.collect()

In [0]:
# Elimina el encabezado
header = parPais1.first()
parPais1SinHeader = parPais1.filter(lambda linea: linea != header)

# Reducción por la clave
paisesTotal = parPais1SinHeader.reduceByKey(lambda a,b: a+b)
paisesTotal.collect()

In [0]:
# Otra reducción calculando el total de unidades por país (cuarto y último campo de cada línea)
rdd = sc.textFile("/FileStore/tables/pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
# Le quitamos el encabezado
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
# Pasamos las unidades a un número entero
paisesUnidadesInt = paisesUnidadesSinHeader.map(lambda x: (x[0], int(x[1])))
# Reducimos por el país y sumamos las unidades
paisesTotalUnidades = paisesUnidadesInt.reduceByKey(lambda a,b: a+b)
paisesTotalUnidades.collect()

#### GroupByKey

In [0]:
# A partir del ejemplo anterior, obtenemos una lista para cada país
rdd = sc.textFile("/FileStore/tables/pdi_sales_small.csv")
# Creamos un RDD de pares con el nombre del país como clave, y una lista con los valores
ventas = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")))
# Quitamos el primer elemento que es el encabezado del CSV
header = ventas.first()
ventasSinHeader = ventas.filter(lambda linea: linea != header)
# Agrupamos las ventas por nombre del país
ventasAgrupados = ventasSinHeader.groupByKey()
ventasAgrupados.collect()

In [0]:
# Transforma los iterables de cada país a una lista
ventasAgrupadosLista = ventasAgrupados.map(lambda x: (x[0], list(x[1])))
ventasAgrupadosLista.collect()

**Otro ejemplo de groupByKey**

A partir de las compras de 3 días:

* día 1: pan 3€, agua 2€, azúcar 1€, leche 2€, pan 4€
* día 2: pan 1€, cereales 3€, agua 0.5€, leche 2€, filetes 5€
* día 3: filetes 2€, cereales 1€

Obtener lo gastado en cada producto y el gasto medio realizado en cada uno de ellos

In [0]:
# Generación de una lista por día
dia1 = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',4)]
dia2 = [('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
dia3 = [('filetes',2), ('cereales',1)]

In [0]:
# unión de las listas como RDD y se agrupan por productos (clave)
rdd = sc.parallelize(dia1).union(sc.parallelize(dia2)).union(sc.parallelize(dia3))
rdd1=rdd.groupByKey()

In [0]:
# Se crea una lista a partir del RDD agrupado rdd1
rdd1a = [(x,list(y)) for x,y in rdd1.collect()]
print(rdd1a)

In [0]:
# Obtención del gasto por producto
rdd1b = rdd1.map(lambda x: (x[0], list(x[1])))
rdd1b.collect()

In [0]:
# Obtención de la media de gasto por producto
rdd2 = rdd1.map(lambda x: (x[0], sum(x[1])/len(x[1])))
rdd2.collect()

#### SortByKey

In [0]:
# Utilizamos las líneas del ejemplo de ReduceByKey con paises y ventas con un archivo con más líneas
rdd = sc.textFile("/FileStore/tables/pdi_sales.csv")
# Recogemos el país y las unidades de las ventas
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
# Le quitamos el encabezado
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)

# A cada país le asignamos sus ventas totales
paisesTotalUnidades = paisesUnidadesSinHeader.reduceByKey(lambda a,b: int(a)+int(b))
paisesTotalUnidades.collect()


In [0]:
# Le damos la vuelta a la lista
unidadesPaises = paisesTotalUnidades.map(lambda x: (x[1],x[0]))
unidadesPaises.collect()

In [0]:
# Ordenación en orden ascendente (por defecto)
unidadesPaisesOrdenadas = unidadesPaises.sortByKey()
unidadesPaisesOrdenadas.collect()

In [0]:
# Ordenación en orden descendente (argumento a False)
unidadesPaisesOrdenadasDesc = unidadesPaises.sortByKey(False)
unidadesPaisesOrdenadasDesc.collect()

#### SortBy

In [0]:
#Ordenación por la cantidad vendida ascendente
paisesTotalUnidades.sortBy(lambda x: x[1]).collect()