<img src="https://drive.google.com/uc?export=view&id=1YjAWn06OMcVhlyixBZBDnY17rnn7Otg5" width="100%">

# Dataframes de Dask

En este notebook veremos una introducción práctica al procesamiento distribuido con la librería `dask`, primero lo instalaremos:

In [None]:
!pip install dask[complete] h5py

## **1. ¿Qué son los DataFrames de Dask?**
---

Los DataFrames de `dask` son una estructura de datos tabular que está compuesta de múltiples DataFrames de `pandas`:

<img src="https://drive.google.com/uc?export=view&id=1YU3e3pTdLNsmsfinUCPnGSN2PK_zrAUL" width="50%">

Este tipo de estructura de datos permite coordinar, paralelizar y distribuir series y dataframes de `pandas` dandonos una forma de uso muy similar a los `pd.DataFrame`.

Generalmente usamos los `DataFrame` de `dask` cuando:

* Tenemos conjuntos de datos grandes que no caben en la memoria RAM.
* Queremos acelerar operaciones sobre datasets usando varios núcleos de un computador o varios nodos.

Veamos los detalles de este tipo de estructuras, primero importamos el módulo para usar DataFrames:

In [None]:
import numpy as np
import pandas as pd
import dask.dataframe as dd

## **2. Creación**
---

Existen diversas funciones para crear `DataFrames` en `dask`, veamos algunos casos:

* `from_pandas`: podemos crear un `DataFrame` de `dask` desde un `pd.DataFrame`:

In [None]:
df = pd.DataFrame(
        {
            "A": np.random.uniform(-1, 1, 100),
            "B": np.random.randint(1, 5, 100)
            },
        )
df

Creamos la tabla en `dask`:

In [None]:
df_dask = dd.from_pandas(df, npartitions=2)
df_dask

El parámetro `n_partitions` nos permite especificar en cuántos chunks se divide el `DataFrame`, también puede usar el parámetro `chunks` de forma equivalente a los arreglos de `dask`:

In [None]:
df_dask = dd.from_pandas(df, chunksize=10)
df_dask

* `from_array`: permite crear un `DataFrame` a partir de un arreglo de `numpy`:

In [None]:
df = dd.from_array(
        np.random.uniform(0, 1, size=(10, 2)),
        columns=["grade", "value"],
        )
df

Como vimos, es posible crear `DataFrames` de `dask` desde multiples estructuras de datos clásicas de _Python_, no obstante, estos enfoques requieren que los datos estén en la memoria RAM (lo cual no es posible con grandes cantidades de datos). Por ello, normalmente estaremos creando `DataFrames` de `dask` por medio de funciones de lectura para distintos tipos de formatos de datos tabulares, algunos ejemplos comunes son:

* `dd.read_csv`: funciona igual que la función de `pandas` (tiene los mismos parámetros), no obstante, agrega el parámetro `blocksize` (tamaño de las particiones en bytes) para controlar las particiones.
* `dd.read_json`: funciona igual que la función de `pandas` y también agrega el parámetro `blocksize`.
* `dd.read_sql_query`: funciona igual que la función `pd.read_sql` pero agrega el parámetro `npartitions` para controlar el número de particiones.
* `dd.read_parquet`: permite cargar archivos en formato `parquet`, el cual es un tipo de formato que ya viene particionado y resulta ser muy compatible con `dask` como lo veremos más adelante.

Veamos un ejemplo donde cargamos un conjunto de datos desde `dask`, usaremos el conjunto de datos [Netflix Data: Cleaning, Analysis and Visualization](https://www.kaggle.com/datasets/ariyoomotade/netflix-data-cleaning-analysis-and-visualization), el cual incluye información acerca del contenido añadido a la plataforma de streaming *Netflix* entre el $2008$ y el $2021$. Está conformado por 10 columnas, las cuales son:

* `show_id`: corresponde a la llave primaria de la tabla. Tiene un formato establecido el cual corresponde a una <i>s</i> seguida de un número en secuencia, por ejemplo: s34.
* `type`: indica el tipo de show ofrecido (Película o Serie).
* `title`: señala el nombre de la serie o la película.
* `director`: indica el nombre de quién dirigió la película o serie.
* `country`: indica el lugar de producción del show.
* `date_added`: muestra la fecha de publicación de la serie o película en la plataforma con el formato <i>MM, DD, AAAA</i>.
* `release_year`: muestra el año de publicación original de la película o serie.
* `rating`: muestra las calificaciones o el nivel de conveniencia de la película según su contenido, por ejemplo: PG-13, TV-PG, etc.
* `duration`: corresponde a la duración en minutos en el caso de las películas y la cantidad de temporadas en el caso de las series.
* `listed_in`: indica el género o categoría donde se clasifica la serie o película dentro de la plataforma.

Primero, descargamos el conjunto de datos:

In [None]:
!wget 'https://drive.google.com/uc?export=view&id=1B0Cgf1mlulbRCvbZ5DytUCI1FI6UCP9u' -O 'netflix2.zip'

Ahora, lo descomprimimos:

In [None]:
![[ -d 'netflix2.parquet' ]] && rm -rf 'netflix2.parquet'
!unzip 'netflix2.zip'

## **3. Apache Parquet**
---

En este caso, el conjunto de datos se encuentra en formato `parquet`, se trata de un formato de código abierto, orientado a columnas (como _Cassandra_) que está pensado para un almacenamiento eficiente y de rápida lectura:

<img src="https://drive.google.com/uc?export=view&id=1Bjig-W9_-0JJ_I6IYxZa6oJkMKTpla4E	" width="70%">

Este formato tiene tres componentes:

* **Header**: guarda información general del archivo (por ejemplo, el id de la partición que estamos manejando).
* **Data block**: almacena la información como chunks columnares de los datos.
* **Footer**: almacena metadatos del archivo (por ejemplo, fecha de creación, versión del formato, esquema de columnas, tipos, entre otros).

Este formato es muy popular hoy en día para almacenar conjuntos de datos por los siguientes motivos:

* Almacena los tipos de cada columna.
* Es particionado, lo cual facilita la tranferencia de datos.
* Se lee bastante rápido, lo cual hace que sea preferido sobre formatos clásicos como `csv` o `excel`.

El formato `parquet` es normalmente usado para crear _Data Lakes_. Veamos cómo cargar este conjunto de datos con `dask`:

In [None]:
df = dd.read_parquet("netflix2.parquet")
df

Como se puede ver, el archivo ya trae `20` particiones (nativas del formato `parquet`), veamos algunos detalles de los `DataFrames` de `dask` con este conjunto de datos.

## **4. Atributos y Propiedades**
---

Los `DataFrame` de `dask` tienen una forma de uso muy parecida a los de `pandas`, sin embargo, en `dask` no tenemos los resultados cargados directamente en la memoria RAM. Funcionan de una forma muy equivalente a los arreglos de `dask` y terminan siendo promesas de `DataFrame` de `pandas`. Veamos los atributos más comunes que se usan en `dask`:

* `columns`: permite obtener el nombre de las columnas del `DataFrame`:

In [None]:
print(df.columns)

* `dtypes`: permite extraer los tipos que tiene cada columna del `DataFrame`:

In [None]:
print(df.dtypes)

* `shape`: permite extraer el tamaño del `DataFrame`:

In [None]:
print(df.shape)

Note que el resultado tiene un tipo `Delayed` que no es directamente un número, esto se debe a que `dask` no conoce qué tamaño va a tener el arreglo en memoria (no ha sido cargado). Podemos calcular el tamaño con el método `compute`:

In [None]:
print(df.shape[0].compute())

* `npartitions`: permite obtener el número de particiones del `DataFrame`:

In [None]:
print(df.npartitions)

* También podemos acceder a una columna específica por medio de la notación punto, por ejemplo, podemos obtener una serie de `Dask` al acceder a la propiedad `title` (nombre de columna) del `DataFrame`:

In [None]:
col = df.title
col

## **5. Métodos**
---

Los métodos de los `DataFrames` en `dask` tratan de ser lo más cercanos posibles a los métodos en `pandas`.

Al igual que con los arreglos de `dask`, hay un método clave que nos permite evaluar los resultados directamente en memoria, por ejemplo, el siguiente código extrae los primeros `5` registros usando el método `head`:

In [None]:
res = df.head(5)
res

El resultado obtenido es un `DataFrame` de `pandas`:

In [None]:
print(type(res))

Otro método específico en `dask` es `repartition`, el cual permite cambiar el número de particiones de un `DataFrame`:

In [None]:
df2 = df.repartition(npartitions=40)
df2

Veamos algunos de los métodos más comunes que se usan con los `DataFrames` de `dask`:

* `info`: permite obtener una descripción muy general del `DataFrame` (mucho más compacta que la de `pandas` ya que no hemos cargado el conjunto de datos completo):

In [None]:
print(df.info())

* `describe`: permite obtener estadísticas generales del conjunto de datos, recuerde que el parámetro `include` funciona como en `pandas` y permite seleccionar los tipos de columnas a describir.

In [None]:
desc = df.describe(include="all")
desc

Debemos evaluarlo para ver el resultado:

In [None]:
desc.compute()

* `mean`: permite obtener el promedio por columnas del `DataFrame`:

In [None]:
mean = df.mean()
print(mean)

Debemos evaluarlo para ver el resultado (únicamente aplica sobre las columnas numéricas):

In [None]:
print(mean.compute())

* `std`: permite obtener la desviación estándar por columnas del `DataFrame`:

In [None]:
std = df.std()
print(std)

Debemos evaluarlo para ver el resultado (únicamente aplica sobre las columnas numéricas):

In [None]:
print(std.compute())

* `value_counts`: permite obtener un recuento de valores en una columna dada:

In [None]:
type_counts = df.type.value_counts()
print(type_counts)

Debemos evaluarlo para ver el resultado:

In [None]:
print(type_counts.compute())

* `rename`: permite cambiar el esquema del `DataFrame`:

In [None]:
df2 = df.rename(columns={"title": "titulo"})
print(df2.columns)

* `astype`: permite cambiar los tipos:

In [None]:
new_col = df.release_year.astype("string")
print(new_col)

También funciona sobre varias columnas de un `DataFrame`:

In [None]:
df2 = df.astype({"release_year": "string", "title": "string"})
print(df2.dtypes)

* `isna`: permite detectar valores faltantes, por ejemplo, la siguiente celda calcula el número de valores faltantes por columna:

In [None]:
nas = df.isna().sum()
print(nas)

Evaluamos el resultado:

In [None]:
print(nas.compute())

* `dropna`: permite eliminar valores faltantes, funciona exactamente como lo hace la función de `pandas`:

In [None]:
res = df.dropna()
res

* `fillna`: permite reemplazar valores faltante, funciona exactamente como lo hace la función de `pandas`:

In [None]:
res = df.fillna(0)
res

* `apply`: esta función tiene la misma utilidad que en `pandas`, no obstante, resulta ser bastante importante en `dask` ya que nos permite ejecutar una función de _Python_ de forma distribuida y paralelizada sobre un `DataFrame`, por ejemplo, la siguiente función calcula los dos últimos dígitos del año de publicación de un show:

In [None]:
def get_digits(year):
    return year % 100

Veamos algunos ejemplos:

In [None]:
print(get_digits(2009))

In [None]:
print(get_digits(1996))

In [None]:
print(get_digits(2022))

Podemos aplicarla sobre la columna `release_year` con `dask`.

In [None]:
digits = (
        df.release_year.apply(get_digits, meta=("release_year", "int64")).compute()
        )
digits

En este caso, agregamos el parámetro `meta` para dar más información a `dask` sobre el tipo de columna que es `release_year`. Ya que `dask` por defecto trabaja sobre unos tipos inferidos de la carga de datos que pueden ser erróneos.

* `map`: permite mapear una tabla de referencia a una columna:

In [None]:
maps = {"movie": "pelicula", "tv show": "television"}
new_types = df.type.map(maps)
new_types

Veamos el resultado evaluado:

In [None]:
print(new_types.compute())

* `assign`: permite crear nuevas columnas:

In [None]:
df2 = df.assign(
        new_col = 1,
        types_spa = df.type.map(maps)
        )
df2

Veamos los primeros 5 registros:

In [None]:
df2.head(5)

* `groupby`: funciona igual que su analogo en `pandas` pero este se ejecuta de forma distribuida:

In [None]:
res = df.groupby("type").agg({"title": "count"})
res

Veamos el resultado:

In [None]:
print(res.compute())

* `merge`: al igual que en `pandas`, podemos unir dos o más `DataFrames` por medio de las operaciones `merge` y `join` (recuerde que `dask` lo hace de forma distribuida, lo cual es muy útil para cruzar tablas muy grandes), veamos un ejemplo donde definimos dos `DataFrames`:

In [None]:
data = pd.DataFrame({
        "col1": [1, 2, 3, 4, 5],
        "col2": ["a", "b", "c", "d", "e"]
        })
df1 = dd.from_pandas(data, npartitions=1)
df1

In [None]:
data = pd.DataFrame({
        "col1": [1, 2, 3, 4],
        "col3": ["perro", "gato", "pajaro", "pez"]
        })
df2 = dd.from_pandas(data, npartitions=1)
df2

Vamos a unir los dos `DataFrame` con la operación `merge`:

In [None]:
res = df1.merge(df2, on="col1")
res

Veamos el resultado:

In [None]:
res.compute()

## **6. Filtrado**
---

La sintaxis para la selección de valores en `DataFrame` de `dask` es muy parecida a `pandas`, no obstante, hay algunas consideraciones en cuanto a que no se recomienda el indexado posicional, en especial por que tenemos datos distribuidos de los que no conocemos directamente su tamaño. Veamos algunos ejemplos:

* Para seleccionar columnas, podemos indexar el `DataFrame` como si fuera un diccionario, por ejemplo:

In [None]:
df2 = df[["type", "show_id"]]
print(df2.columns)

* Podemos usar el método `loc` para extraer datos con respecto a su índice:

In [None]:
df2 = df.loc[20:30]
df2

No obstante, el método `iloc` (indexado posicional) no funciona correctamente:

In [None]:
try:
    df2 = df.iloc[:10]
except Exception as e:
    print(e)

* Podemos hacer selecciones condicionales de la misma forma que en `pandas`, por ejemplo, seleccionamos todos los registros correspondientes al tipo `movie`:

In [None]:
df2 = df[df.type == "movie"]
df2

Calculamos el resultado:

In [None]:
df2.compute()

* También podemos usar el método `query` para seleccionar valores de acuerdo a un criterio, tal y como funciona en `pandas`:

In [None]:
df2 = df.query("type == 'movie'")
df2

Calculamos el resultado:

In [None]:
df2.compute()

## **7. Comparativa con Pandas**
---

Veamos una comparativa en tiempo y memoria con respecto a los `DataFrame` de `pandas`, para ello, usaremos las librerías `psutil` y `time`:

In [None]:
import psutil, time

Veamos la diferencia en memoria entre la carga del `DataFrame` desde `pandas` y desde `dask`:

* `pandas`:

In [None]:
memory = psutil.virtual_memory()[2]
print(f"RAM inicial: {memory:.2f}%")

Cargamos el conjunto de datos con `pandas`:

In [None]:
df_pandas = pd.read_parquet("netflix2.parquet")

Veamos qué tanto subió la memoria RAM:

In [None]:
memory = psutil.virtual_memory()[2]
print(f"RAM final: {memory:.2f}%")

* `dask`:

In [None]:
memory = psutil.virtual_memory()[2]
print(f"RAM inicial: {memory:.2f}%")

Cargamos el conjunto de datos con `dask`:

In [None]:
df_dask = dd.read_parquet("netflix2.parquet")

Veamos qué tanto subió la memoria RAM:

In [None]:
memory = psutil.virtual_memory()[2]
print(f"RAM final: {memory:.2f}%")

Como puede ver `dask` no carga directamente el conjunto de datos en memoria. Ahora veamos una comparativa en tiempo en operaciones de agregación (se ven beneficiadas de paralelismo).

* `pandas`:

In [None]:
t0 = time.time()
res = df_pandas.groupby(["type", "country"]).agg({"release_year": "mean"})
delta_t = time.time() - t0
print(f"Segundos: {delta_t:.2f}")

* `dask`:

In [None]:
t0 = time.time()
res = (
        df_dask
        .groupby(["type", "country"])
        .agg({"release_year": "mean"})
        .compute()
        )
delta_t = time.time() - t0
print(f"Segundos: {delta_t:.2f}")

Como podemos ver, el resultado demora más en `dask`. Recuerde que esta herramienta debe coordinar y gestionar multiples `DataFrame` que están contenidos en varias particiones, esto permite trabajar con grandes cantidades de datos. No obstante, si el conjunto de datos es pequeño `pandas` sigue siendo la mejor opción.

## **8. Recursos Adicionales**
---

* [DataFrames de Dask](https://docs.dask.org/en/stable/dataframe.html).
* [Dask - Talks & tutorials](https://docs.dask.org/en/stable/presentations.html).

## **9. Créditos**
---

**Profesor**

- [Jorge E. Camargo, PhD](https://dis.unal.edu.co/~jecamargom/)

**Diseño, desarrollo del notebook y material audiovisual**

- [Juan S. Lara MSc](https://www.linkedin.com/in/juan-sebastian-lara-ramirez-43570a214/)

**Universidad Nacional de Colombia** - *Facultad de Ingeniería*