# SPARK - RDD
RDD, o Resilient Distributed Dataset, es una abstracción fundamental de Spark que representa una colección inmutable y distribuida de objetos que pueden ser procesados en paralelo.

Características:
* Resistencia
* Inmutabilidad
* Distribución
* Paralelización
* Tolerancia a fallos

Antes de la versión 2.0, el principal interfaz para programar en Spark eran los RDD. Desde la versión 2.0, también existen los Dataset, que son RDD fuertemente tipados que además están optimizados a bajo nivel.

Sobre los RDD se pueden hacer dos tipos de operaciones:
* **Transformaciones**: operaciones que crean un nuevo RDD a partir del existente (p. ej., map, filter, flatMap, reduceByKey, groupBy).
* **Acciones**: operaciones que devuelven un valor después de ejecutar un cálculo sobre el RDD (p. ej., collect, count, reduce, take).

## CREAR SparkSession Y SparkContext

In [3]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark_rdd").getOrCreate()
#spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## CREACIÓN DE UN RDD

### CREAR UN RDD A PARTIR DE UN FICHERO DE TEXTO
Podemos crear un RDD directamente desde un archivo de texto utilizando el método textFile del SparkContext. Cada línea del archivo se convierte en un elemento del RDD.

In [4]:
rdd = sc.textFile("datos/el_quijote.txt")
rdd.collect()

['DON QUIJOTE DE LA MANCHA',
 'Miguel de Cervantes Saavedra',
 '',
 'PRIMERA PARTE',
 'CAPÍTULO 1: Que trata de la condición y ejercicio del famoso hidalgo D. Quijote de la Mancha',
 'En un lugar de la Mancha, de cuyo nombre no quiero acordarme, no ha mucho tiempo que vivía un hidalgo de los de lanza en astillero, adarga antigua, rocín flaco y galgo corredor. Una olla de algo más vaca que carnero, salpicón las más noches, duelos y quebrantos los sábados, lentejas los viernes, algún palomino de añadidura los domingos, consumían las tres partes de su hacienda. El resto della concluían sayo de velarte, calzas de velludo para las fiestas con sus pantuflos de lo mismo, los días de entre semana se honraba con su vellori de lo más fino. Tenía en su casa una ama que pasaba de los cuarenta, y una sobrina que no llegaba a los veinte, y un mozo de campo y plaza, que así ensillaba el rocín como tomaba la podadera. Frisaba la edad de nuestro hidalgo con los cincuenta años, era de 

### CREAR UN RDD A PARTIR DE UN CSV
Para crear un RDD a partir de un archivo CSV, primero debemos leer el archivo CSV como un DataFrame y luego convertirlo en un RDD. Spark proporciona funciones específicas para leer archivos CSV.

In [6]:
df = spark.read.csv("datos/pdi_sales.csv", header=True, inferSchema=True)
rdd = df.rdd
rdd.collect()

[Row(ProductID;Date;Zip;Units;Revenue;Country='725;1/15/1999;41540          ;1;115.5;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='787;6/6/2002;41540          ;1;314.9;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='788;6/6/2002;41540          ;1;314.9;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='940;1/15/1999;22587          ;1;687.7;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='396;1/15/1999;22587          ;1;857.1;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='734;4/10/2003;22587          ;1;330.7;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='769;2/15/1999;22587          ;1;257.2;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='499;1/15/1999;12555          ;1;846.3;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='2254;1/15/1999;40217          ;1;57.7;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Country='31;5/31/2002;40217          ;1;761.2;Germany'),
 Row(ProductID;Date;Zip;Units;Revenue;Count

### CREAR UN RDD A PARTIR DE UNA LISTA
Podemos crear un RDD directamente desde una lista de Python utilizando el método parallelize del SparkContext. Esto distribuirá la lista a través de los nodos del clúster.

In [None]:
lista = [1, 2, 3, 4, 5]
rdd = sc.parallelize(lista)
rdd.collect()

[1, 2, 3, 4, 5]

En algunos ejemplos se ha usado SparkSession y en otros SparkContext a la hora de crear RDD:
* SparkSession se utiliza para leer archivos CSV debido a que esta funcionalidad está integrada en la API de DataFrame, que es más adecuada para manejar datos estructurados y ofrece más características para la lectura y manipulación de estos datos. Por otro lado,
* SparkContext se utiliza para crear RDDs de formas más básicas y directas, como a partir de archivos de texto o listas, donde no se necesitan las capacidades avanzadas de la API de DataFrame.

## ACCIONES
Las acciones son operaciones en RDD que se ejecutan inmediatamente.

Mientras que las transformaciones devuelven otro RDD, las acciones devuelven estructuras de datos nativas.

Acciones frecuentes en RDD:
* collect
* count
* first
* take
* takeSample
* takeOrdered
* reduce

[Más información sobre las acciones de Spark](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)


### collect
Devuelve todos los elementos para el RDD dado.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 2])
rdd.collect()

[1, 2, 3, 4, 2]

La acción collect duvuelve todos los datos que se distribuyen entre los diferentes trabajadores del clúster. Todos los datos viajan a través de la red de los trabajadores al master o driver y éste los tiene que almacenar.
Si estamos trabajando con una gran cantidad de datos esto penalizará el rendimiento de nuestra aplicación.

### count
Devuelve el número de elementos en el RDD dado.

In [None]:
rdd.count()

                                                                                

5

### first
Devuelve el primer elemento del RDD dado.

In [None]:
rdd.first()

1

### take
Devuelve los primeros N elementos, donde N es un parámetro que le tenemos que indicar.

In [None]:
rdd.take(3)

[1, 2, 3]

### takeSample
Devuelve una muestra con el número de elementos que le indiquemos.


In [None]:
rdd.takeSample(False, 3)

[4, 5, 3]

El primer parámetro indica si se pueden repetir los elementos de la muestra y el segundo fija la cantidad de elementos a devolver

### takeOrdered
Devuelve los primero N elementos, donde N es un parámetro que le tenemos que indicar, después de ordenarlos según su orden natural o según cualquier otro orden que le indiquemos.

In [None]:
rdd.takeOrdered(3)

[1, 2, 3]

In [None]:
rdd.takeOrdered(3, lambda x: -x)

[5, 4, 3]

Hay que tener cuidado si el conjunto de datos es muy grande, porque tanto take, como takeSample y takeOrdered  llevarán todos los datos a memoria.

### reduce
Combina los elementos de un RDD (Resilient Distributed Dataset) de manera que se obtenga un único valor. Funciona aplicando una función que toma dos argumentos y devuelve un nuevo valor, de forma iterativa, a los elementos del RDD.

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])

def sumar(a, b):
    return a + b

suma_total = rdd.reduce(sumar)
suma_total

15

## TRANSFORMACIONES
En Spark, las estructuras de datos son inmutables, de manera que una vez creadas no se pueden modificar. Para poder modificar un RDD/DataFrame/Dataset, hace falta realizar una transformación.

Todas las transformaciones en Spark se evalúan de manera perezosa (**lazy evaluation**), de manera que los resultados no se computan inmediatamente, sino que se retrasa el cálculo hasta que el valor sea necesario. Para ello, se van almacenando los pasos necesarios y se ejecutan únicamente cuando una acción requiere devolver un resultado al driver. Este diseño facilita un mejor rendimiento.

Así pues, las acciones provocan la evaluación de todas las transformaciones previas que se habían evaluado de forma perezosa y estaban a la espera.

Por defecto, cada transformación se puede recalcular cada vez que se ejecute una acción. Sin embargo, podemos persistir un RDD en memoria mediante los métodos persist (o cache ), de manera que Spark mantendrá los datos para un posterior acceso más eficiente. También podemos persistir RDD en disco o replicarlo en múltiples nodos.

Existen dos tipos de transformaciones, dependiendo de las dependencias entre las particiones de datos:
* Transformaciones Narrow: consisten en dependencias estrechas en las que cada partición de entrada contribuye a una única partición de salida.
* Transformaciones Wide: consisten en dependencias anchas de manera que varias particiones de entrada contribuyen a muchas otras particiones de salida, es decir, cada partición de salida depende de diferentes particiones de entrada. Este proceso también se conoce como shuffle, ya que Spark baraja los datos entre las particiones del clúster.

![Ejemplo](Datos/transformaciones.jpg "Título de la imagen")

Transformaciones frecuentes en RDD:
* map
* filter
* flatMap
* distinct
* union
* intersection
* subtract

[Más información sobre las transformaciones de Spark](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations)



### map
Aplica la función recibida a cada elemento del RDD, de manera que se puede añadir una nueva columna, modificar una existente, etc...

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd = rdd.map(lambda x: x * 2)
rdd.collect()

[2, 4, 6, 8, 10]

In [None]:
rddEmpleados = sc.textFile("Datos/empleados.txt")
rddEmpleados.collect()

['Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer Lead',
 'Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead',
 'Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect',
 'Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead']

In [None]:
for empleado in rddEmpleados.collect():
    print(empleado)

Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer Lead
Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead
Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect
Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead


In [None]:
rdd = rddEmpleados.map(len)
rdd.collect()

[61, 52, 60, 50]

### flatMap
Es muy similar a la anterior, pero en vez de devolver una lista con un elemento por cada entrada, devuelve una única lista deshaciendo las colecciones en elementos individuales

In [None]:
rdd= rddEmpleados.flatMap(lambda x: x.split("|"))
rdd.collect()

['Michael',
 'Montreal,Toronto',
 'Male,30',
 'DB:80',
 'Product:Developer Lead',
 'Will',
 'Montreal',
 'Male,35',
 'Perl:85',
 'Product:Lead,Test:Lead',
 'Shelley',
 'New York',
 'Female,27',
 'Python:80',
 'Test:Lead,COE:Architect',
 'Lucy',
 'Vancouver',
 'Female,57',
 'Sales:89,HR:94',
 'Sales:Lead']

In [None]:
rddLista = sc.parallelize([5, 2, 3, 4, 2])
rdd = rddLista.flatMap(lambda x: range(1, x))
rdd.collect()

[1, 2, 3, 4, 1, 1, 2, 1, 2, 3, 1]

### filter
Permite filtrar los elementos que cumplen una condición.

In [None]:
rddLista = sc.parallelize([1, 2, 3, 4, 5])
rdd = rddLista.filter(lambda x: x % 2 == 0)
rdd.collect()

[2, 4]

También podemos anidar diferentes transformaciones. Para este ejemplo, vamos a crear tuplas formadas por un número y su cuadrado, y luego quitar los que no coincide el número con su cuadrado (sólo coinciden el 0 y el 1), y luego aplanarlo en una lista.

In [None]:
rdd10 = sc.parallelize(range(10 + 1))
rdd = (
    rdd10.map(lambda x: (x, x ** 2))
    .filter(lambda x: (x[0] != x[1]))
    .flatMap(lambda x: x)
)
rdd.collect()

[2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]

Veamos otro ejemplo. Retomamos los datos de los empleados y si queremos filtrar los empleados que son hombres, primero separamos por las | y nos quedamos con el tercer elemento que contiene el sexo y la edad. A continuación, separamos por la coma para quedarnos con el sexo en la posición 0 y la edad en el 1, y comparamos con el valor deseado.

In [None]:
hombres = rddEmpleados.filter(lambda x: x.split("|")[2].split(",")[0] == "Male")
hombres.collect()

['Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer Lead',
 'Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead']

### distinct
Elimina los elementos repetidos

In [None]:
rddLista = sc.parallelize([1, 1, 2, 2, 3, 4, 5])
rdd = rddLista.distinct()
rdd.collect()

[1, 2, 3, 4, 5]

### union
Une dos rdd en uno

In [None]:
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([5, 6, 7, 8])
rddUnion = rdd1.union(rdd2)
rddUnion.collect()

[1, 2, 3, 4, 5, 5, 6, 7, 8]

### intersection
Devuelve un rdd con los elementos en común de dos rdd

In [None]:
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize([3, 4, 5, 6])
rddIntersection = rdd1.intersection(rdd2)
rddIntersection.collect()

                                                                                

[3, 4]

### subtract
Devuelve los elementos de un rdd que no están en el segundo rdd

In [None]:
rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize([3, 4, 5, 6])
rddSubtract = rdd1.subtract(rdd2)
rddSubtract.collect()

                                                                                

[1, 2]

## RDD DE PARES
Una técnica muy común a la hora de trabajar con RDD es hacerlo con elementos que tienen el formato (clave, valor), pudiendo las claves y los valores ser de cualquier tipo.

### CREACIÓN DE UN RDD DE PARES

#### A PARTIR DE UNA LISTA DE TUPLAS

In [None]:
listaTuplas = [(1, "a"), (2, "b"), (3, "c"), (4, "d")]
rddTuplas = sc.parallelize(listaTuplas)

#### zip
Une dos RDDs del mismo tamaño

In [None]:
lista1 = ['a','b','c','e','f','g','h']
lista2 = [4, 5, 6, 7, 8, 9, 10]
rddZip = sc.parallelize(lista1).zip(sc.parallelize(lista2))
rddZip.collect()

[('a', 4), ('b', 5), ('c', 6), ('e', 7), ('f', 8), ('g', 9), ('h', 10)]

In [None]:
rddZipSecuencia = sc.parallelize(zip(lista1, range(len(lista1))))
rddZipSecuencia.collect()

[('a', 0), ('b', 1), ('c', 2), ('e', 3), ('f', 4), ('g', 5), ('h', 6)]

Otros métodos relacionados son zipWithIndex y zipWithUniqueId

In [None]:
lista = ['a', 'b', 'c', 'd']
rdd = sc.parallelize(lista)
rddZipWithIndex = rdd.zipWithIndex()
rddZipWithIndex.collect()


[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

In [None]:
rdd = sc.parallelize(['a', 'b', 'c', 'd'])
rddWithUniqueId = rdd.zipWithUniqueId()
rddWithUniqueId.collect()


[('a', 3), ('b', 7), ('c', 11), ('d', 15)]

#### map
Asigna a cada elemento un valor o cálculo sobre él mismo

In [None]:
lista = ["Hola", "Adiós", "Hasta luego"]
rddMap = sc.parallelize(lista).map(lambda x: (x, len(x)))
rddMap.collect()

[('Hola', 4), ('Adiós', 5), ('Hasta luego', 11)]

#### keyBy
Permite crear las claves a partir de cada elemento

In [None]:
rddKeyBy = sc.parallelize(lista).keyBy(lambda x: x[0])
rddKeyBy.collect()

[('H', 'Hola'), ('A', 'Adiós'), ('H', 'Hasta luego')]

### TRASNFORMACIONES

#### keys
Devuelve las claves

In [None]:
listaTuplas = [("a", 1), ("z", 3), ("b", 4), ("c", 3), ("a", 4)]
rddTuplas = sc.parallelize(listaTuplas)
claves = rddTuplas.keys()
claves.collect()

['a', 'z', 'b', 'c', 'a']

#### values
Devuelve los valores

In [None]:
valores = rddTuplas.values()
valores.collect()

[1, 3, 4, 3, 4]

#### mapValues
Aplica una función sobre los valores

In [None]:
rddMapValues = rddTuplas.mapValues(lambda x: (x, x*2))
rddMapValues.collect()

[('a', (1, 2)), ('z', (3, 6)), ('b', (4, 8)), ('c', (3, 6)), ('a', (4, 8))]

#### flatMapValues
Aplica la función sobre los valores y los aplana

In [None]:
rddFlatMapValues = rddTuplas.flatMapValues(lambda x: (x, x * 2))
rddFlatMapValues.collect()

[('a', 1),
 ('a', 2),
 ('z', 3),
 ('z', 6),
 ('b', 4),
 ('b', 8),
 ('c', 3),
 ('c', 6),
 ('a', 4),
 ('a', 8)]

#### reduceByKey
Esta función reduce los pares agrupando por las claves y aplicando la funcion indicada a los values.

In [1]:
rddSales = sc.textFile("Datos/pdi_sales_small.csv")
rddSales.take(3)

NameError: name 'sc' is not defined

In [None]:
parPais1 = rddSales.map(lambda x: (x.split(";")[-1].strip(), 1))
parPais1.collect()

[('Country', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1

Hemos creado un RDD de pares compuesto por el nombre del país y el número uno, para luego en la fase de reducción sumar estos valores. Pero si nos fijamos, el archivo csv contiene el encabezado con los datos, el cual debemos quitar

In [None]:
header = parPais1.first()
parPais1SinHeader = parPais1.filter(lambda linea: linea != header)
parPais1SinHeader.collect()

[('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1),
 ('Germany', 1

Y finalmente, ya podemos reducir por la clave:

In [None]:
paisesTotal = parPais1SinHeader.reduceByKey(lambda a, b: a + b)
paisesTotal.collect()

[('Mexico', 30060), ('France', 30060), ('Germany', 30059), ('Canada', 30060)]

In [None]:
pairs = [ ("a", 5), ("b", 7), ("c", 2), ("a", 3), ("b", 1), ("c", 4)]
pair_rdd = sc.parallelize(pairs)
output = pair_rdd.reduceByKey(lambda x, y : x + y)
result = output.collect()
print(*result, sep='\n')

('b', 8)
('c', 6)
('a', 8)


Veamos otro ejemplo, en este caso vamos a calcular el total de unidades vendidas por país, de manera que vamos a coger el nombre del país (Country) y las unidades (Units) vendidas:

In [None]:
paisesUnidades = rddSales.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
paisesUnidadesInt = paisesUnidadesSinHeader.map(lambda x: (x[0], float(x[1])))
paisesTotalUnidades = paisesUnidadesInt.reduceByKey(lambda a, b: a + b)
paisesTotalUnidades.collect()

[('Mexico', 10588066.300000923),
 ('France', 9453441.700000618),
 ('Germany', 31746.0),
 ('Canada', 31148.0)]

#### groupByKey
Esta función es otra función ByKey que puede operar en un par (clave, valor) RDD pero esto solo agrupará los valores basados en las claves.

In [None]:
ventas = rddSales.map(lambda x: (x.split(";")[-1].strip(), x.split(";")))
header = ventas.first()
ventasSinHeader = ventas.filter(lambda linea: linea != header)
paisesAgrupados = ventasSinHeader.groupByKey()
paisesAgrupados.collect()

[('Mexico', <pyspark.resultiterable.ResultIterable at 0x131bc6740>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x131987dc0>),
 ('Germany', <pyspark.resultiterable.ResultIterable at 0x1319872b0>),
 ('Canada', <pyspark.resultiterable.ResultIterable at 0x1319879a0>)]

Obtendremos para cada país, un iterable con todos sus datos, asi que podemos transformar los iterables a una lista:

In [None]:
paisesAgrupadosLista = paisesAgrupados.map(lambda x: (x[0], list(x[1])))
paisesAgrupadosLista.collect()

[('Mexico',
  [['2235', '1/15/1999', '86501', '65.6', 'Mexico '],
   ['837', '2/15/1999', '86501', '840', 'Mexico '],
   ['491', '2/15/1999', '86501', '815.3', 'Mexico '],
   ['426', '5/31/2002', '86501', '843.1', 'Mexico '],
   ['400', '6/6/2002', '86501', '734.7', 'Mexico '],
   ['1131', '2/15/1999', '86501', '419.9', 'Mexico '],
   ['2277', '3/15/1999', '86501', '251.9', 'Mexico '],
   ['491', '3/15/1999', '86501', '815.3', 'Mexico '],
   ['467', '6/8/2002', '86501', '900.6', 'Mexico '],
   ['565', '4/12/2003', '20001', '890.7', 'Mexico '],
   ['604', '4/12/2003', '20001', '494.8', 'Mexico '],
   ['940', '3/15/1999', '20001', '687.7', 'Mexico '],
   ['2277', '1/15/1999', '86401', '251.9', 'Mexico '],
   ['739', '1/15/1999', '20081', '170.6', 'Mexico '],
   ['2188', '1/15/1999', '20081', '233.6', 'Mexico '],
   ['1894', '1/15/1999', '20081', '682.2', 'Mexico '],
   ['757', '4/11/2003', '20081', '73.5', 'Mexico '],
   ['758', '4/11/2003', '20081', '73.5', 'Mexico '],
   ['2373', '3/15

#### sortByKey
Permite ordenar los datos a partir de una clave

In [None]:
rdd = sc.textFile("Datos/pdi_sales.csv")
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
paisesTotalUnidades = paisesUnidadesSinHeader.reduceByKey(lambda a, b: float(a) + float(b))
print(paisesTotalUnidades.collect())
unidadesPaises = paisesTotalUnidades.map(lambda x: (int(x[1]), x[0]))
unidadesPaises.collect()

                                                                                

[('Mexico', 223463.0), ('France', 327730.0), ('Germany', 244265.0), ('Canada', 77609.0)]


[(223463, 'Mexico'),
 (327730, 'France'),
 (244265, 'Germany'),
 (77609, 'Canada')]

Y a continuación los ordenamos:

In [None]:
unidadesPaisesOrdenadas = unidadesPaises.sortByKey()
unidadesPaisesOrdenadas.collect()

[(11003916, 'France'),
 (34114930, 'Canada'),
 (103645802, 'Mexico'),
 (131930510, 'Germany')]

Veamos otro ejemplo, pero en este caso la ordenación va a ser de forma descendente:

In [None]:
pairs = [("a", 5), ("d", 7), ("c", 2), ("b", 3)]
raw_rdd = sc.parallelize(pairs)
sortkey_rdd = raw_rdd.sortByKey(ascending=False)
sortkey_rdd.collect()

[('d', 7), ('c', 2), ('b', 3), ('a', 5)]

#### sortBy
Ordena los datos según la función especificada.

In [None]:
unidadesPaises.sortBy(lambda x: x[1]).collect()

[(34114930, 'Canada'),
 (11003916, 'France'),
 (131930510, 'Germany'),
 (103645802, 'Mexico')]

Veamos otro ejemplo:

In [None]:
pairs = [ ("a", 5, 10), ("d", 7, 12), ("c", 2, 11), ("b", 3, 9)]
raw_rdd = sc.parallelize(pairs)
sort_out = raw_rdd.sortBy(lambda x : x[2])
result = sort_out.collect()
print(*result, sep='\n')

('b', 3, 9)
('a', 5, 10)
('c', 2, 11)
('d', 7, 12)


## PARTICIONES
Spark organiza los datos en particiones, considerándolas divisiones lógicas de los datos entre los nodos del clúster.

Cada una de las particiones va a llevar asociada una tarea de ejecución, de manera que a más particiones, mayor paralelización del proceso.

Veamos con código como podemos trabajar con las particiones:

In [None]:
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5])
rdd.getNumPartitions()

16

In [None]:
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5], 2)
rdd.getNumPartitions()

2

In [None]:
rddE = sc.textFile("Datos/empleados.txt")
rddE.getNumPartitions()

2

In [None]:
rddE = sc.textFile("Datos/empleados.txt", 3)
rddE.getNumPartitions()

3

La mayoría de las operaciones que trabajan con los datos admiten un parámetro extra indicando la cantidad de particiones con las que queremos trabajar.

### mapPartitions
A diferencia de la transformación map que se invoca por cada elemento, mapPartitions se llama por cada partición.

La función que recibe como parámetro recogerá como entrada un iterador con los elementos de cada partición:

In [None]:
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5], 2)

In [None]:
rddParticiones = rdd.mapPartitions(lambda x: [list(x)])
rddParticiones.collect()

[[1, 1, 2, 2], [3, 3, 4, 5]]

In [None]:
resultadoRdd = rdd.mapPartitions(lambda x: [sum(x)])
resultadoRdd.collect()

[6, 15]

En el ejemplo, ha dividido los datos en dos particiones, la primera con [1, 1, 2, 2] y la otra con [3, 3, 4, 5], y de ahí el resultado de sumar sus elementos es [6, 15].

### mapPartitionsWithIndex
De forma similar al caso anterior, pero ahora recibe una función cuyos parámetros son el índice de la partición y el iterador con los datos de la misma.

In [None]:
def mpwi(indice, iterador):
    return [(indice, list(iterador))]

resultadoRdd = rdd.mapPartitionsWithIndex(mpwi)
resultadoRdd.collect()

[(0, [1, 1, 2, 2]), (1, [3, 3, 4, 5])]

### coalesce
Obtenemos un nuevo RDD con el nuevo número de particiones que queremos (más pequeño, coalesce es para reducir)

In [None]:
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5], 3)
rdd.getNumPartitions()

3

In [None]:
rdd1p = rdd.coalesce(1)
rdd1p.getNumPartitions()

1

### repartition
Obtenemos un nuevo RDD con la cantidad exacta de particiones deseadas (al reducir las particiones, repartition realiza un shuffle para redistribuir los datos, por lo tanto, si queremos reducir la cantidad de particiones, es más eficiente utilizar coalesce):

In [None]:
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5], 3)
rdd.getNumPartitions()

3

In [None]:
rdd2p = rdd.repartition(4)
rdd2p.getNumPartitions()

4