![RDD key pair](media/04.rdd_key_pair.png)

---
# 03 - RDDs con pares clave/valor (aka Pair RDDs)

-   Tipos de datos muy usados en Big Data (MapReduce)
-   Spark dispone de operaciones especiales para su manejo

In [1]:
!pip install pyspark

[33mYou are using pip version 9.0.1, however version 18.0 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [5]:
# Create apache spark context
from pyspark import SparkContext
sc = SparkContext(master="local", appName="Mi app")

In [3]:
# Stop apache spark context
sc.stop()

## Creación de *Pair RDDs*
Los RDDs clave/valor pueden crearse a partir de una lista de tuplas, a partir de otro RDD o mediante un zip de dos RDDs.
-   A partir de una lista de tuplas
-   A partir de otro RDD

In [6]:
prdd = sc.parallelize([('a',2), ('b',5), ('a',3)])
print(prdd.collect())

prdd = sc.parallelize(zip(['a', 'b', 'c'], range(3)))
print(prdd.collect())

[('a', 2), ('b', 5), ('a', 3)]
[('a', 0), ('b', 1), ('c', 2)]


In [179]:
# Ejemplo usando un fichero
# Para cada línea ontenemos una tupla, siendo el primer elemento
# la primera palabra de la línes, y el segundo la línea completa
linesrdd = sc.textFile("data/quijote.txt")
prdd = linesrdd.map(lambda x: (x.split(" ")[0], x))
print('Par (1ª palabra, línea): {0}\n'.format(prdd.takeSample(False, 1)))

Par (1ª palabra, línea): [('arenga.', 'arenga. Al cabo de lo cual, dijo:')]



In [181]:
nrdd = sc.parallelize(range(2,5))
prdd = nrdd.keyBy(lambda x: x*x)

print(prdd.collect())

[(4, 2), (9, 3), (16, 4)]


In [182]:
# zipWithIndex(): Zipea el RDD con los índices de sus elementos.
rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'], 3)
prdd = rdd.zipWithIndex()
print(rdd.glom().collect())

print(prdd.collect())

# Este método dispara un Spark job cuando el RDD tiene más de una partición.

[['a', 'b'], ['c', 'd'], ['e', 'f', 'g', 'h']]
[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4), ('f', 5), ('g', 6), ('h', 7)]


In [183]:
# zipWithUniqueId(): Zipea el RDD con identificadores únicos (long) para cada elemento.
# Los elementos en la partición k-ésima obtienen los ids k, n+k, 2*n+k,... siendo n = nº de particiones
# No dispara un trabajo Spark
rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'], 3)
print("Particionado del RDD: {0}".format(rdd.glom().collect()))
prdd = rdd.zipWithUniqueId()

print(prdd.collect())

Particionado del RDD: [['a', 'b'], ['c', 'd'], ['e', 'f', 'g', 'h']]
[('a', 0), ('b', 3), ('c', 1), ('d', 4), ('e', 2), ('f', 5), ('g', 8), ('h', 11)]


* Mediante un zip de dos RDDs
    * Los RDDs deben tener el mismo número de particiones y el mismo número de elementos en cada partición

In [186]:
rdd1 = sc.parallelize(range(0, 5), 2)
rdd2 = sc.parallelize(range(1000, 1005), 2)
prdd = rdd1.zip(rdd2)

print(prdd.collect())

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]


## Transformaciones sobre un único RDD clave/valor

Sobre un único RDD clave/valor podemos efectuar transformaciones de agregación a nivel de clave y transformaciones que afectan a las claves o a los valores

### Transformaciones de agregación

* `reduceByKey(func)`/`foldByKey(func)`
    -   Devuelven un RDD, agrupando los valores asociados a la misma clave mediante `func`
    -   Similares a `reduce` y `fold` sobre RDDs simples

In [188]:
from operator import add
prdd   = sc.parallelize([('a', 2), ('b', 5), ('a', 8), ('b', 6), ('b', 2)]).cache()
redrdd = prdd.reduceByKey(add)

print(redrdd.collect())

[('a', 10), ('b', 13)]


-   `groupByKey()` agrupa valores asociados a misma clave
    - Operación muy costosa en comunicaciones
    - Mejor usar operaciones de reducción

In [190]:
grouprdd = prdd.groupByKey()

print(grouprdd.collect())
print

lista = [(k, list(v)) for k, v in grouprdd.collect()]
print(lista)

[('a', <pyspark.resultiterable.ResultIterable object at 0x7f07c711f860>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f07c711fe80>)]
[('a', [2, 8]), ('b', [5, 6, 2])]


- `combineByKey(createCombiner(func1), mergeValue(func2), mergeCombiners(func3))`
    - Método general para agregación por clave, similar a `aggregate`
    - Especifica tres funciones:

     1.  `createCombiner` al recorrer los elementos de cada partición, si nos encontramos una clave nueva se crea un acumulador y se inicializa con `func1`

     2.  `mergeValue` mezcla los valores de cada clave en cada partición usando `func2`

     3.  `mergeCombiners` mezcla los resultados de las diferentes particiones mediante `func3`

- Los valores del RDD de salida pueden tener un tipo diferente al de los valores del RDD de entrada.

In [192]:
# Para cada clave, obten una tupla que tenga la suma y el número de valores
sumCount = prdd.combineByKey(
                            (lambda x: (x, 1)),
                            (lambda x, y: (x[0]+y, x[1]+1)),
                            (lambda x, y: (x[0]+y[0], x[1]+y[1])))

print(sumCount.collect())

# Con el RDD anterior, obtenemos la media de los valores
m = sumCount.mapValues(lambda v: float(v[0])/v[1])
print(m.collect())

[('a', (10, 2)), ('b', (13, 3))]
[('a', 5.0), ('b', 4.333333333333333)]


### Transformaciones sobre claves o valores
-   `keys()` devuelve un RDD con las claves
-   `values()` devuelve un RDD con los valores
-   `sortByKey()` devuelve un RDD clave/valor con las claves ordenadas

In [206]:
prdd   = sc.parallelize([('a', 2), ('b', 5), ('a', 8), ('b', 6), ('b', 2)]).cache()
print("RDD completo: {0}".format(prdd.collect()))
print("RDD con las claves: {0}".format(prdd.keys().collect()))
print("RDD con los valores: {0}".format(prdd.values().collect()))
print("RDD con las claves ordenadas: {0}".format(prdd.sortByKey().collect()))

RDD completo: [('a', 2), ('b', 5), ('a', 8), ('b', 6), ('b', 2)]
RDD con las claves: ['a', 'b', 'a', 'b', 'b']
RDD con los valores: [2, 5, 8, 6, 2]
RDD con las claves ordenadas: [('a', 2), ('a', 8), ('b', 5), ('b', 6), ('b', 2)]


-   `mapValues(func)` devuelve un RDD aplicando una función sobre los valores
-   `flatMapValues(func)` devuelve un RDD aplicando una función sobre los valores y “aplanando” la salida

In [208]:
mapv = prdd.mapValues(lambda x: (x, 10*x))
print(mapv.collect())

fmapv = prdd.flatMapValues(lambda x: (x, 10*x))
print(fmapv.collect())

[('a', (2, 20)), ('b', (5, 50)), ('a', (8, 80)), ('b', (6, 60)), ('b', (2, 20))]
[('a', 2), ('a', 20), ('b', 5), ('b', 50), ('a', 8), ('a', 80), ('b', 6), ('b', 60), ('b', 2), ('b', 20)]


### Transformaciones sobre dos RDDs clave/valor
Combinan dos RDDs de tipo clave/valor para obtener un tercer RDD.

`join`/`leftOuterJoin`/`rightOuterJoin`/`fullOuterJoin` realizan inner/outer/full joins entre los dos RDDs

In [214]:
rdd1 = sc.parallelize([("a", 2), ("b", 5), ("a", 8)]).cache()
rdd2 = sc.parallelize([("c", 7), ("a", 1)]).cache()

rdd3 = rdd1.join(rdd2)
print("Join: ", rdd3.collect())

rdd3 = rdd1.leftOuterJoin(rdd2)
print("LeftOuterJoin: ", rdd3.collect())

rdd3 = rdd1.rightOuterJoin(rdd2)
print("RightOuterJoin: ", rdd3.collect())

rdd3 = rdd1.fullOuterJoin(rdd2)
print("FullOuterJoin: ", rdd3.collect())

Join:  [('a', (2, 1)), ('a', (8, 1))]
LeftOuterJoin:  [('b', (5, None)), ('a', (2, 1)), ('a', (8, 1))]
RightOuterJoin:  [('c', (None, 7)), ('a', (2, 1)), ('a', (8, 1))]
FullOuterJoin:  [('c', (None, 7)), ('b', (5, None)), ('a', (2, 1)), ('a', (8, 1))]


-   `subtractByKey` elimina elementos con una clave presente en otro RDD

In [220]:
rdd3 = rdd1.subtractByKey(rdd2)

print("rd1: ", rdd1.collect())
print("rd2: ", rdd2.collect())
print("subtractByKey: ", rdd3.collect())

rd1:  [('a', 2), ('b', 5), ('a', 8)]
rd2:  [('c', 7), ('a', 1)]
subtractByKey:  [('b', 5)]


-   `cogroup` agrupa los datos que comparten la misma clave en ambos RDDs

In [223]:
rdd3 = rdd1.cogroup(rdd2)

print(rdd3.collect())

map = rdd3.mapValues(lambda v: [list(l) for l in v]).collectAsMap()
print("rd1: ", rdd1.collect())
print("rd2: ", rdd2.collect())
print(map)

[('c', (<pyspark.resultiterable.ResultIterable object at 0x7f07c71feda0>, <pyspark.resultiterable.ResultIterable object at 0x7f07c71fecf8>)), ('b', (<pyspark.resultiterable.ResultIterable object at 0x7f07c71feba8>, <pyspark.resultiterable.ResultIterable object at 0x7f07c71fea90>)), ('a', (<pyspark.resultiterable.ResultIterable object at 0x7f07c71fe7b8>, <pyspark.resultiterable.ResultIterable object at 0x7f07c731db00>))]
rd1:  [('a', 2), ('b', 5), ('a', 8)]
rd2:  [('c', 7), ('a', 1)]
{'b': [[5], []], 'a': [[2, 8], [1]], 'c': [[], [7]]}


---
## Acciones sobre RDDs clave/valor
Sobre los RDDs clave/valor podemos aplicar las acciones para RDDs simples y algunas adicionales.

-   `collectAsMap()` obtiene el RDD en forma de mapa

In [7]:
prdd = sc.parallelize([("a", 7), ("b", 5), ("a", 8)]).cache()

rddMap = prdd.collectAsMap()

print(rddMap)

{'b': 5, 'a': 8}


-   `countByKey()` devuelve un mapa indicando el número de ocurrencias de cada clave

In [8]:
countMap = prdd.countByKey()

print(countMap)

defaultdict(<class 'int'>, {'b': 1, 'a': 2})


In [None]:
- lookup(key) devuelve una lista con los valores asociados con una clave