# Operaciones sobre `RDD`s

Como se mencionó en la presentación, hay dos tipos de operaciones sobre los `RDD`s: _transformaciones_ y _acciones_.

- Las _transformaciones_ construyen un `RDD` nuevo a partir del anterior.
  - Cada transformación queda guardada por =Spark= en el /lineage graph/ un *DAG*.

- Las _acciones_ calculan un resultado basado en el `RDD`.

- La diferencia es que las `RDD` son computadas en forma _lazy_, sólo son ejecutadas hasta la acción.

- Si quieres usarlo una `RDD` varias veces debes de persistirla (con `persist()`).

## Flujo típico de trabajo

1. Crear un `RDD` a partir de datos externos.
2. Transformarlo a nuevos `RDDs`.
3. Persistir algunos `RDDs` para su uso posterior.
4. Lanzar acciones.

Obtenemos el `SparkContext` para poder trabajar

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

## Transformaciones

Las principales transformaciones (o por lo menos las más usadas) se listan a continuación

  - `map`
    - Usa una función y la aplica a cada elemento del `RDD`, el resultado se guarda en un nuevo `RDD`.
  - `filter`
    - Usa una función y devuelve sólo los elementos que pasan la función (que devuelven verdadero) en el nuevo `RDD`.
  - `flatMap`
    - Como el `map` pero regresa un iterador por cada elemento
      - Por ejemplo una función que divide una cadena.
  - `distinct`
  - `sample`
  - `join`
  - `cogroup`
  - `coalesce`
  - `union`, `intersection`, `substract`, `cartesian`


**NOTA** En los ejemplos que siguen usaremos `collect()`,  `count()`, `take()`. Estas funciones **no** son _transformaciones_, sino _acciones_ que se explican más abajo.

Creamos un `RDD` a partir de enteros (justo como antes)

In [2]:
numeros = sc.parallelize(range(1000))

Usaremos una _función anónima_ para elevar los números al cuadrado

In [3]:
cuadrados = numeros.map(lambda x: x*x)

In [4]:
cuadrados.take(5)

[0, 1, 4, 9, 16]

In [5]:
muestra = cuadrados.sample(fraction=0.3, withReplacement=False)

In [6]:
muestra.count()

287

In [7]:
muestra.take(5)

[1, 9, 36, 100, 144]

In [8]:
pares = muestra.filter(lambda x: x%2 == 0)

In [9]:
pares.take(5)

[36, 100, 144, 484, 576]

In [10]:
pares.count()

149

El `DAG` está formado por `numeros -> cuadrados -> muestra -> pares` 

Estar verificando en cada paso, no es muy eficiente, de hecho, una manera de programar muy utilizada es la siguiente:

In [11]:
pares2 = numeros.map(lambda x: x*x)\
                .sample(fraction=0.3, withReplacement=False)\
                .filter(lambda x: x%2 == 0)

In [12]:
pares2.take(5)

[0, 16, 144, 256, 400]

In [13]:
pares2.count()

148

El `DAG` tiene la misma estructura (con `numeros` como raíz), pero sus nodos son anónimos.

Una transformación que causa confusión es `flatMap`, veamos un ejemplo

La función `.split()` de `python`, toma una cadena y devuelve una lista

In [14]:
"Hola a todos".split(" ")

['Hola', 'a', 'todos']

¿Qué efectos tendría en un `map`?

In [15]:
frases = sc.parallelize(["hola a todos", "taller nacional de big data", "Análisis de redes sociales"])

In [16]:
palabras = frases.map(lambda frase: frase.split(" ")).collect()
palabras

[['hola', 'a', 'todos'],
 ['taller', 'nacional', 'de', 'big', 'data'],
 ['Análisis', 'de', 'redes', 'sociales']]

In [17]:
palabras[1]

['taller', 'nacional', 'de', 'big', 'data']

Obtenemos un arreglo de arreglos y quizá esto no sea lo que necesitamos. Usando `flatMap` "aplanamos" el `RDD` resultante.

In [18]:
palabras = frases.flatMap(lambda frase: frase.split(" ")).collect()
palabras

['hola',
 'a',
 'todos',
 'taller',
 'nacional',
 'de',
 'big',
 'data',
 'Análisis',
 'de',
 'redes',
 'sociales']

## Acciones

- `first`
- `take`, `takeSample`
- `reduce`
  - Opera en dos elementos del mismo tipo del `RDD` y regresa un elemento del mismo tipo.
- `aggregate`
  - Nos permite implementar acumuladores.
- `collect`
  - Regresa el `RDD` completo.
- `count`, `countByValue`, `top`, `foreach`, `countByKey`
- `saveAsTextFile`


Es importante notar que todos estas operaciones acaban con datos en el _driver_.

In [19]:
numeros.first()

0

In [20]:
numeros.take(5)

[0, 1, 2, 3, 4]

In [21]:
numeros.takeSample(num=30, withReplacement=False)

[563,
 121,
 914,
 842,
 57,
 509,
 714,
 94,
 197,
 667,
 788,
 395,
 862,
 588,
 585,
 778,
 242,
 894,
 517,
 513,
 225,
 359,
 510,
 615,
 409,
 717,
 515,
 171,
 82,
 108]

In [22]:
suma = numeros.reduce(lambda x, y: x + y)
suma

499500

In [23]:
pares.top(10)

[996004,
 988036,
 968256,
 964324,
 960400,
 952576,
 929296,
 898704,
 894916,
 876096]

Para los ejemplos que siguen generaremos un conjunto falso de transacciones, usando las bibliotecas de `python` `random` y `uuid`

In [24]:
import random

In [25]:
random.randint(10,1000)

24

In [26]:
accion = ['RETIRO', 'COMPRA', 'CONSULTA']
random.choice(accion)

'CONSULTA'

In [27]:
import uuid

In [28]:
clientes = [str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4()), str(uuid.uuid4())]

In [29]:
clientes

['1ecf3bf6-91ce-4612-b80f-10c9770b88a2',
 '92c807e4-5dba-4cf3-b5c6-6966863966cc',
 '08ab52c3-cf99-45ab-8cc9-5f8822da20ab',
 '8da344a6-490d-46a9-8ac0-96482348df66',
 'e64e49a1-8b94-4ef6-8f81-bebe2e703995']

In [30]:
def generate_transaction():
    """
    Regresa una transacción falsa, la primera columna es el número de tarjeta ofuscado, las demás
    columnas son el comercio, la acción realizada en el comercio y el monto de la acción.
    Devuelve una cadena separada por pipes (|)
    """
    comercio = ['ARENA COLISEO', 'SUPERCITO', 'RESTAURANTE EL TRABAJO']
    accion = ['RETIRO', 'COMPRA']
    
    return "%s|%s|%s|%s" % (random.choice(clientes), random.choice(comercio), random.choice(accion), random.randint(10, 10000))

In [31]:
?generate_transaction

In [32]:
generate_transaction()

'08ab52c3-cf99-45ab-8cc9-5f8822da20ab|RESTAURANTE EL TRABAJO|COMPRA|301'

In [33]:
def generate_transactions(number=10000):
    """
    Regresa una lista de transacciones falsa.
    """
    txs = []
    for i in range(number):
        txs.append(generate_transaction())
    return txs

In [34]:
generate_transactions(number=10)

['1ecf3bf6-91ce-4612-b80f-10c9770b88a2|ARENA COLISEO|COMPRA|8521',
 '08ab52c3-cf99-45ab-8cc9-5f8822da20ab|SUPERCITO|COMPRA|5787',
 '8da344a6-490d-46a9-8ac0-96482348df66|RESTAURANTE EL TRABAJO|COMPRA|6565',
 'e64e49a1-8b94-4ef6-8f81-bebe2e703995|SUPERCITO|RETIRO|9263',
 '1ecf3bf6-91ce-4612-b80f-10c9770b88a2|RESTAURANTE EL TRABAJO|COMPRA|1823',
 '1ecf3bf6-91ce-4612-b80f-10c9770b88a2|RESTAURANTE EL TRABAJO|RETIRO|441',
 '92c807e4-5dba-4cf3-b5c6-6966863966cc|SUPERCITO|COMPRA|1735',
 'e64e49a1-8b94-4ef6-8f81-bebe2e703995|SUPERCITO|COMPRA|4752',
 '1ecf3bf6-91ce-4612-b80f-10c9770b88a2|RESTAURANTE EL TRABAJO|RETIRO|4773',
 '08ab52c3-cf99-45ab-8cc9-5f8822da20ab|SUPERCITO|COMPRA|3540']

In [35]:
txs = sc.parallelize(generate_transactions(number=10000))

In [36]:
txs.first()

'8da344a6-490d-46a9-8ac0-96482348df66|ARENA COLISEO|RETIRO|1621'

In [37]:
txs.count()

10000

Guardamos estas transacciones para usarlas posteriormente

In [None]:
! ls -lh output/raw/transacciones

In [None]:
! rm -R output/raw/transacciones

rm: remove write-protected directory ‘output/raw/transacciones’? 

In [None]:
txs.saveAsTextFile("output/raw/transacciones")

Como está distribuido, los archivos en realidad se guardan como carpeta.

In [None]:
! ls -lh output/raw/transacciones

Supongamos que queremos realizar un conteo por tarjeta, los pasos serían los siguientes:

Designamos el número de tarjeta como la **llave** (_key_)

In [None]:
kv_txs = txs.map(lambda x: x.split("|"))\
            .map(lambda x: (x[0], x[1:])) # x[0] contiene el número de tarjeta ofuscado
kv_txs.take(5)

In [None]:
kv_txs.keys().first()

In [None]:
kv_txs.values().first()

In [None]:
kv_txs.count()

In [None]:
kv_txs.countByKey()