# Ejemplos de Transformaciones y Acciones usando PySpark

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/oramosul/abd-files/blob/main/spark/1-spark-rdd/2-Transformaciones-acciones.ipynb)

Si se utiliza un entorno en el que no está instalado por defecto Spark, es neceario instalarlo (usando pip) y luego crear un contexto de Spark. En este caso, el contexto de Spark se crea con un solo hilo en local, y el nombre de la aplicación es `ejemplo_transformaciones`

In [1]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark import SparkContext

sc = SparkContext("local[1]", "ejemplo_transformaciones")

## 1.&nbsp;Transformaciones

### 1.1. Map

Se utiliza para realizar un mapeo de un RDD a otro RDD término a término. El mapeo se especifica mediante una función, que puede ser anónima (`lambda`) o puede haber sido definida usando `def`.

In [3]:
# Creación de un RDD
rdd = sc.parallelize([1, 4, 6, 3])

# Map: se sumará 10 a cada elemento del RDD
rdd2 = rdd.map(lambda x: x+10)

# Mostrar el contenido del RDD creado usando map
rdd2.collect()

[11, 14, 16, 13]

El siguiente ejemplo crea un RDD conteniendo un conjunto de cadenas de caracteres. Luego, se utiliza un `map` para obtener la cantidad de letras que tiene cada cadena de caracteres.

In [4]:
# Creación del RDD
rdd = sc.parallelize(['este es un texto', 'este es otro texto de prueba', 'prueba 1'])

# Map: obtiene las cantidades de letras de cada cadena de caracteres (usando "len")
rdd2 = rdd.map(lambda x: len(x))

# Se muestra el resultado (utilizando la acción "collect")
rdd2.collect()

[16, 28, 8]

In [5]:
# De manera alternativa se puede realizar el map y mostrarlo en una sola instrucción
rdd.map(lambda x: len(x)).collect()

[16, 28, 8]

### 1.2. Filter (filtro)

* Ejemplo 1: Filtro que solo mantiene aquellos valores mayores o iguales a 6. En este caso se utiliza una función anónima `lambda`.

In [6]:
# Creación de un RDD
rdd = sc.parallelize([1, 4, 6, 3, 10, 15, 16])

# Filtro: solo mantiene aquellos elementos del RDD mayores o iguales a 6
rdd2 = rdd.filter(lambda x: x>=6)

# Mostrar el resultado usando la acción "collect"
rdd2.collect()

[6, 10, 15, 16]

* Ejemplo 2: Filtro que solo mantiene aquellas cadenas de caracteres que tengan menos de 20 caracteres

In [7]:
# Creación del RDD
rdd = sc.parallelize(['este es un texto', 'este es otro texto de prueba', 'prueba 1'])

# Función que retorna el valor de entrada si tiene una longitud menor a 20
def fmenor20(x):
    if (len (x) < 20):
        return x

# Filtro aplicando la función anterior
rdd.filter(fmenor20).collect()

['este es un texto', 'prueba 1']

### 1.3. Map y FlatMap

In [11]:
# Creación de un RDD
rdd = sc.parallelize([1, 4, 6, 3])

* Ejemplo 1: Aplicación de un `map` o `flatMap` para multiplicar a cada elemento por 10 creando como salida una lista formada por el valor original y el valor multiplicado por 10

In [13]:
# Aplicación de un mapa
rdd_map = rdd.map(lambda x: [x, 10*x])

# La salida mantiene la estructura deseada de la lista: [x, 10x]
rdd_map.collect()

[[1, 10], [4, 40], [6, 60], [3, 30]]

In [14]:
# Aplicación de un flatmap
rdd_flatmap = rdd.flatMap(lambda x: [x, 10*x])

# La salida no mantiene la estructura de lista
rdd_flatmap.collect()

[1, 10, 4, 40, 6, 60, 3, 30]

* Ejemplo 2: separar una cadena de caracteres en palabras.

In [15]:
# Creación del rdd
rdd = sc.parallelize(["hola mundo", "hoy es jueves"])

rdd.collect()

['hola mundo', 'hoy es jueves']

In [16]:
# Aplicación de map
rdd_map = rdd.map(lambda x: x.split(" "))
rdd_map.collect()

[['hola', 'mundo'], ['hoy', 'es', 'jueves']]

In [17]:
# Aplicación de flatmap
rdd_flatmap = rdd.flatMap(lambda x: x.split(" "))
rdd_flatmap.collect()

['hola', 'mundo', 'hoy', 'es', 'jueves']

### 1.4. GroupByKey

In [24]:
rdd = sc.parallelize([('a',7), ('b',4), ('a',1), ('b',6), ('c',3)])
rdd.collect()

[('a', 7), ('b', 4), ('a', 1), ('b', 6), ('c', 3)]

In [25]:
# Aplicación de agrupamiento por clave
rdd2 = rdd.groupByKey()

rdd2.collect()

[('a', <pyspark.resultiterable.ResultIterable at 0x7e9b10eb7f40>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x7e9b10eb6ec0>),
 ('c', <pyspark.resultiterable.ResultIterable at 0x7e9b10eb49a0>)]

Como se observa, al utilizar `groupByKey`, los valores se encuentran en formato de iterable de Spark. Para poder visualizarlos se debe convertir estos valores iterables en una lista. Esta conversión se puede realizar utilizando `mapValues(list)` que mapea solo los valores a listas

In [26]:
rdd2.mapValues(list).collect()

[('a', [7, 1]), ('b', [4, 6]), ('c', [3])]

In [27]:
# El resultado es similar al caso anterior
rdd2.mapValues(lambda v: list(v)).collect()

[('a', [7, 1]), ('b', [4, 6]), ('c', [3])]

Alternativamente, se puede utilizar el método `.data` de los iterables, que permiten recuperar los datos. Con este fin, se puede utilizar nuevamente `mapValues`.

In [28]:
rdd2.mapValues(lambda v: v.data).collect()

[('a', [7, 1]), ('b', [4, 6]), ('c', [3])]

### 1.5. Cogroup

In [29]:
rdd = sc.parallelize([('a',7), ('b',4), ('a',1), ('b',6), ('c',3)])
rdd2 = sc.parallelize([('a',5), ('b',9)])

rdd3 = rdd.cogroup(rdd2)
rdd3.collect()

[('b',
  (<pyspark.resultiterable.ResultIterable at 0x7e9b29b404c0>,
   <pyspark.resultiterable.ResultIterable at 0x7e9b29b406a0>)),
 ('c',
  (<pyspark.resultiterable.ResultIterable at 0x7e9b29b41270>,
   <pyspark.resultiterable.ResultIterable at 0x7e9b29b412a0>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x7e9b29b411e0>,
   <pyspark.resultiterable.ResultIterable at 0x7e9b11b4e080>))]

En este caso, los valores de cada elemento son tuplas de dos iterables. Por tanto, se puede utilizar `mapValues` para mapear a cada valor una conversión a lista, o el uso de `.data`, que también recupera los datos del iterable.

In [30]:
rdd3.mapValues(lambda x: (x[0].data, x[1].data)).collect()

[('b', ([4, 6], [9])), ('c', ([3], [])), ('a', ([7, 1], [5]))]

### 1.6. Join

In [31]:
rdd = sc.parallelize([('a',7), ('b',4), ('a',1), ('b',6), ('c',3)])
rdd2 = sc.parallelize([('a',5), ('b',9), ('a',100)])

rdd3 = rdd.join(rdd2)
rdd3.collect()

[('b', (4, 9)),
 ('b', (6, 9)),
 ('a', (7, 5)),
 ('a', (7, 100)),
 ('a', (1, 5)),
 ('a', (1, 100))]

### 1.7. Operadores de Conjuntos

In [36]:
rdd1 = sc.parallelize([1, 3, 5, 7])
rdd2 = sc.parallelize([20, 7, 40, 60, 5])

rdd1.union(rdd2).collect()

[1, 3, 5, 7, 20, 7, 40, 60, 5]

In [37]:
rdd1.intersection(rdd2).collect()

[5, 7]

In [38]:
rdd1.subtract(rdd2).collect()

[1, 3]

In [39]:
rdd1.cartesian(rdd2).collect()

[(1, 20),
 (1, 7),
 (1, 40),
 (1, 60),
 (1, 5),
 (3, 20),
 (3, 7),
 (3, 40),
 (3, 60),
 (3, 5),
 (5, 20),
 (5, 7),
 (5, 40),
 (5, 60),
 (5, 5),
 (7, 20),
 (7, 7),
 (7, 40),
 (7, 60),
 (7, 5)]

### 1.8. ReduceByKey

Requiere que el RDD de entrada contenga tuplas, y actúa reduciendo los valores de las tuplas que tienen la misma clave.

In [40]:
# RDD de entrada
rdd = sc.parallelize([('a',7), ('b',4), ('a',1), ('a',100), ('b',6), ('c',3)])
rdd.collect()

[('a', 7), ('b', 4), ('a', 1), ('a', 100), ('b', 6), ('c', 3)]

En este ejemplo, se suma los valores que tienen las mismas tuplas (como la reducción de MapReduce)

In [41]:
rdd.reduceByKey(lambda x,y: x+y).collect()

[('a', 108), ('b', 10), ('c', 3)]

### 1.9. SortByKey

Ordena según las claves. Requiere que el RDD utilizado tenga datos en forma de tuplas.

In [42]:
rdd = sc.parallelize([('a',7), ('b',4), ('a',1), ('a',100), ('b',6), ('c',3)])
rdd.collect()

[('a', 7), ('b', 4), ('a', 1), ('a', 100), ('b', 6), ('c', 3)]

In [43]:
rdd.sortByKey().collect()

[('a', 7), ('a', 1), ('a', 100), ('b', 4), ('b', 6), ('c', 3)]

In [44]:
rdd.sortByKey(ascending=False).collect()

[('c', 3), ('b', 4), ('b', 6), ('a', 7), ('a', 1), ('a', 100)]

## 2.&nbsp;Acciones

### 2.1. Collect, take, top

In [45]:
# Creación de un RDD
lista = [40, 20, 50, 10, 70, 30]
rdd = sc.parallelize(lista)

* `collect`: recupera todo el RDD y lo convierte a lista

In [46]:
# Collect
rdd.collect()

[40, 20, 50, 10, 70, 30]

* `take(n)`: toma `n` valores del RDD

In [47]:
# Take
rdd.take(3)

[40, 20, 50]

* `top(n)`: Toma los `n` valores más altos del RDD

In [48]:
# Top: 3 valores más altos
rdd.top(3)

[70, 50, 40]

### 2.2. Reduce

Reduce los valores según lo especificado por alguna función. En el siguiente ejemplo, reduce utilizando la suma de los elementos del RDD

In [49]:
# Ejemplo usando la suma
rdd.reduce(lambda x,y: x+y)

220

### 2.3. saveAsTextFile

Almacena el RDD como un archivo de texto

In [50]:
# Creación del RDD
rdd = sc.parallelize(["Este es un archivo", "de texto de ejemplo"])

# Almacenamiento del RDD (especificando el nombre de la carpeta)
rdd.saveAsTextFile("rdd_salida")

### 2.4. Ejemplo de procesamiento usando MapReduce

In [51]:
# Creación de un RDD
rdd = sc.parallelize(['Hola este es un texto', 'este es un párrafo', 'es'])
rdd.collect()

['Hola este es un texto', 'este es un párrafo', 'es']

In [52]:
# Separación en palabras
rdd.flatMap(lambda x: x.split(" ")).collect()

['Hola', 'este', 'es', 'un', 'texto', 'este', 'es', 'un', 'párrafo', 'es']

In [53]:
# Mapeo a 1 de cada palabra
rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).collect()

[('Hola', 1),
 ('este', 1),
 ('es', 1),
 ('un', 1),
 ('texto', 1),
 ('este', 1),
 ('es', 1),
 ('un', 1),
 ('párrafo', 1),
 ('es', 1)]

In [54]:
# Reducción: suma de valores para las mismas claves
rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).collect()

[('Hola', 1), ('este', 2), ('es', 3), ('un', 2), ('texto', 1), ('párrafo', 1)]