# Optimizando flujos de datos con PySpark



### Mónica Zamudio

## ¿Qué es Apache Spark?

> Motor de cómputo unificado + conjunto de bibliotecas para procesamiento distribuido de datos

*(términos clave: motor de cómputo, unificado, bibliotecas)*

![Una mirada rápida](images/spdg_0101.png)

## Algunos detalles importantes sobre Spark

- Basado en Hadoop MapReduce

- Implementado en Scala

- APIs para varios lenguajes: Python, R, SQL, Java

## ¿Eso qué implica?

- Paradigma funcional

- Python se habla con Java

![](images/sad.jpg)

# Abstracciones básicas

## Lógica de ejecución

- Transformations: instrucciones para mutar estructuras de datos (withColumn, groupBy, cast, filter, etc.)

- Actions: detonan la ejecución de una serie de transformaciones (collect, agg, etc.)

## RDDs: Resilient Distributed Datasets

- Colección particionada, inmutable y tolerante a fallas

- Abstracción "base", permite mucho control sobre las transformaciones que hacemos en los datos

- Puede procesar datos no estructurados y estructurados, pero no infiere el schema de nuestros datos estructurados

## Dataframes

- NO son DataFrames de Pandas/R

- Estructuras que representan tablas (datos estructurados/semiestructurados) de forma particionada

- Implementa distintas fases de optimización antes de tener el plan de ejecución definitivo

## Arquitectura de Spark

![](images/spark_architecture.jpg)

![](images/spark_architecture_2.png)

## ¿Eso qué implica?

- Overhead (latencia) para inicializar workers

- Mensajes de error: Python + Java

```
20/09/29 22:32:35 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): TaskKilled (Stage cancelled) Traceback (most recent call last): File "pyspark_job.py", line 61, in main() File "pyspark_job.py", line 58, in main process_events_data(spark, input_path) File "pyspark_job.py", line 41, in process_events_data df = spark.read.format('json').load(input_path) File "spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 166, in load File "spark_dir\spark-2.4.7_h2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in call File "spark_dir\spark-2.4.7_h2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 63, in deco File "spark_dir\spark-2.4.7_h2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o42.load. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 138345560; received: 71284276

```

![](images/sad.jpg)

- Doble serialización

![](images/avocado.jpg)

# Entonces ¿Cómo lo optimizo?

## 1. Reutilización de datos

Python es un lenguaje interpretado. No puede preveer que vamos a reutilizar datos después de ejecutar una primera serie de transformaciones.

```
rdd = sc.textFile('mis_datos.txt')
palabras = rdd.flatMap(lambda x: x.split(' '))
palabras_tuplas = palabras.map(lambda w: (w, 1))
palabras_agrupadas = wordPairs.groupByKey()
grouped.mapValues(lambda counts: sum(counts))
grouped.saveAsTextFile('cuenta_palabras')

errores = rdd.filter(lambda x: x.lower.find('ERROR') != -1).count()
```

Podemos:

- Persistir los datos en memoria si no son tan grandes (`cache`)

- Persistir los datos en memoria, disco o ambos (`persist`)

```
rdd = sc.textFile('mis_datos.txt').cache()
palabras = rdd.flatMap(lambda x: x.split(' '))
palabras_tuplas = palabras.map(lambda w: (w, 1))
palabras_agrupadas = wordPairs.groupByKey()
grouped.mapValues(lambda counts: sum(counts))
grouped.saveAsTextFile('cuenta_palabras')

errores = rdd.filter(lambda x: x.lower.find('error') != -1).count()
```

## 2. Agrupaciones en RDDs: ReduceByKey vs GroupByKey

```
rdd = sc.textFile('mis_datos.txt')
palabras = rdd.flatMap(lambda x: x.split(' '))
palabras_tuplas = palabras.map(lambda w: (w, 1))
palabras_agrupadas = wordPairs.groupByKey()
grouped.mapValues(lambda counts: sum(counts))
grouped.saveAsTextFile('cuenta_palabras')

errores = rdd.filter(lambda x: x.lower.find('error') != -1).count()
```

![GroupByKey](images/groupByKey.png)

![ReduceByKey](images/reduceByKey.png)

## 3. Utiliza DataFrames cuando sea posible, y evita las UDFs

- Los DataFrames traducen código de Python directamente en un Plan de ejecución que puede ejecutarse directamente en la JVM. ¡Nos libramos de la doble serialización! Y no transferimos datos entre Python y Java

- Los planes de ejecución pasan por una (o más) capa de optimización antes de ejecutarse

- ¡Las UDFs (User-Defined Functions) son una caja negra para la JVM! Necesitaríamos pasarle los datos al intérprete de Python para que ejecute nuestra UDF y serialice el resultado y lo regrese a la JVM. Mantente en expresiones nativas de PySpark siempre que puedas.

## 4. Optimiza el I/O de datos cuando te sea posible

### Mi sugerencia: Parquet

- Formato columnar distribuido de almacenamiento de datos

- Compresión y codificación muy eficientes

- Permite selección y particionado a partir de campos

## 5. No hagas Full Scans innecesarios. Es en serio.

Parece chiste, pero es anécdota:

```
mis_datos = spark.read.parquet('mis_datos')
mis_datos_agrupados = mis_datos.groupBy('anio', 'mes', 'id_cliente').agg(sum('ventas'))

mis_datos_cliente = mis_datos_agrupados.filter(col('id_cliente') = '001')
```

## 6. Broadcast joins

- Joins comunes entre dos dataframes con tamaños muy distintos.

- Evita el reparticionado entre nodos de datos

- Spark hace broadcast joins en la medida en que uno de los datasets es más pequeño que `spark.sql.autoBroadcastJoinThreshold` (propiedad configurable)

- Se puede "sobrepasar" esa configuración con la función `broadcast`:

```
print(df1.join(broadcast(df2),df2.id==df1._id).take(10))
```

## 7. ¡No hagas ciclos `for`!

- Al momento de hacer transformaciones en ciclos `for`, PySpark no ejecuta de forma secuencial esas transformaciones: PySpark genera una serie de planes de ejecución que compone y manda ejecutar *de forma simultánea* al momento de aplicar las acciones.

- Necesitamos darle a PySpark flujos de control que pueda traducir a `map` + `reduce`. De lo contrario:

```
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5136 tasks (1024.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
```

### Algunas ideas:

- `rdd.map()`, `spark.sql.functions.create_map()`

- Implementar esa misma lógica via joins 

- Para dos ciclos `for` anidados: implementación de *flat maps* en Python
    
    - for + extend
    
    - doble comprensión de listas
    
    - map + reduce (Python)
    
    - map + sum

## 8. Prueba distintas configuraciones de número de particiones

> Idealmente, queremos que las particiones correspondan (más o menos) con el tamaño y número de nodos ejecutores. Queremos que la carga de procesamiento esté balanceada entre los nodos ejecutores.

- Particiones demasiado grandes no cabrán en la memoria de un nodo ejecutor

- Muy pocas particiones podrían hacer que los nodos ejecutores pasen mucho tiempo sin actividad

- Muchas particiones inducen mucho *overhead* de orquestación

- Queremos mínimo 2x el número de núcleos en el clúster

- Revisa el tiempo que las tareas toman en ejecutarse. Si cada tarea individual toma muy poco, podrías estar perdiendo mucho tiempo en *scheduling* que podría optimizarse agrandando las particiones.

![](images/monitor_executors.png)

# Shameless Self-promotion

## ¡Estamos contratando!

![](images/opi.png)
![](images/navidopi.jpg)

## Referencias

- Spark: The Definitive Guide - Bill Chambers, Matei Zaharia. (2018) O'Reilly

- Improving PySpark Performance: Spark performance beyond the JVM - Holden Karau https://www.youtube.com/watch?v=jGhju2bw3RQ&list=PLRLebp9QyZtaoIpE2iaF3Q8itJOcdgYoX&index=37&ab_channel=PyConAU

- Apache Spark RDD vs DataFrame vs DataSet - Data Flair https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/

- The Internals of Spark SQL - Jacek Laskowski (2020) https://the-internals-of-spark-sql.readthedocs.io/spark-sql/

- Flat map in Python - Tomasz Urbaszek (2020) https://dev.to/turbaszek/flat-map-in-python-3g98
