# Dask DataFrames

<img src="images/dask-dataframe.svg" 
     align="right"
     width="20%"
     alt="Dask dataframes are blocked Pandas dataframes">
     
Dask DataFrames coordinan muchos DataFrames de Pandas, particionados a lo largo de un índice. Admiten una gran parte de la API de Pandas.

## Iniciar Dask Client para el Panel de Control

Iniciar el Cliente Dask es opcional. Proporcionará un panel de control 
que es útil para obtener información sobre la computación. 

El enlace al panel de control se hará visible cuando crees el cliente a continuación. Recomendamos tenerlo abierto en un lado de tu pantalla mientras usas tu cuaderno en el otro lado. Puede llevar algún esfuerzo organizar tus ventanas, pero ver ambas a la vez es muy útil al aprender.

In [None]:
from dask.distributed import Client

cliente = Client(n_workers=2, threads_per_worker=2, memory_limit="1GB")
cliente


## Crear DataFrame Aleatorio

Creamos una serie temporal aleatoria con las siguientes características:

1. Almacena un registro para cada segundo en el mes de enero del año 2000
2. Divide ese mes por días, manteniendo cada día como un DataFrame particionado
3. Junto con un índice de fecha y hora, tiene columnas para nombres, identificadores y valores numéricos

Este es un conjunto de datos pequeño de alrededor de 240 MB. Aumenta el número de días o reduce el intervalo de tiempo entre puntos de datos para practicar con un conjunto de datos más grande configurando algunos de los argumentos de [`dask.datasets.timeseries()`](https://docs.dask.org/en/stable/api.html#dask.datasets.timeseries).

In [None]:
import dask

df = dask.datasets.timeseries()


A diferencia de Pandas, los DataFrames de Dask son _perezosos_, lo que significa que los datos solo se cargan cuando se necesitan para una computación. No se imprimen datos aquí, en su lugar se reemplazan por puntos suspensivos (`...`).

In [None]:
df


Sin embargo, los nombres de las columnas y los tipos de datos son conocidos.

In [None]:
df.dtypes


Algunas operaciones mostrarán automáticamente los datos.

In [None]:
# Esto establece algunos parámetros de formato para los datos mostrados.
import pandas as pd

pd.options.display.precision = 2
pd.options.display.max_rows = 10


In [None]:
df.head(3)


## Usar Operaciones Estándar de Pandas

La mayoría de las operaciones comunes de Pandas se pueden usar de la misma manera en los DataFrames de Dask. Este ejemplo muestra cómo cortar los datos según una condición de máscara y luego determinar la desviación estándar de los datos en la columna `x`.

In [None]:
df2 = df[df.y > 0]
df3 = df2.groupby("name").x.std()
df3


Observa que los datos en `df3` todavía están representados por puntos suspensivos. Todas las operaciones en la celda anterior son operaciones perezosas. Puedes llamar a `.compute()` cuando desees tu resultado como un DataFrame o Serie de Pandas.

Si iniciaste `Client()` arriba, puedes ver la página de estado durante la computación para ver el progreso.

In [None]:
df3_calculado = df3.compute()
type(df3_calculado)


In [None]:
df3_calculado


Observa que los datos calculados ahora se muestran en la salida.

Otro ejemplo de cálculo es agregar varias columnas, como se muestra a continuación. Nuevamente, el panel de control mostrará el progreso de la computación.

In [None]:
df4 = df.groupby("name").aggregate({"x": "sum", "y": "max"})
df4.compute()


Los DataFrames de Dask también se pueden unir como los DataFrames de Pandas. En este ejemplo, unimos los datos agregados en `df4` con los datos originales en `df`. Dado que el índice en `df` es la serie temporal y `df4` está indexado por nombres, usamos `left_on="name"` y `right_index=True` para definir las columnas de fusión. También configuramos sufijos para cualquier columna que sea común entre los dos DataFrames para que podamos distinguirlas.

Finalmente, como `df4` es pequeño, también nos aseguramos de que sea un DataFrame de una sola partición.

In [None]:
df4 = df4.repartition(npartitions=1)
unido = df.merge(
    df4, left_on="name", right_index=True, suffixes=("_original", "_agregado")
)
unido.head()


## Persistir datos en memoria

Si tienes la RAM disponible para tu conjunto de datos, puedes persistir los datos en memoria.  

Esto permite que las futuras computaciones sean mucho más rápidas.

In [None]:
df = df.persist()


## Operaciones de Series Temporales

Dado que `df` tiene un índice de fecha y hora, las operaciones de series temporales funcionan de manera eficiente.

El primer ejemplo a continuación vuelve a muestrear los datos a intervalos de 1 hora para reducir el tamaño total del DataFrame. Luego se toma la media de las columnas `x` e `y`.

In [None]:
df[["x", "y"]].resample("1h").mean().head()


El siguiente ejemplo vuelve a muestrear los datos a intervalos de 24 horas y grafica los valores medios. Observa que `plot()` se llama después de `compute()` porque `plot()` no funcionará hasta que los datos estén calculados.

In [None]:
%matplotlib inline
df[['x', 'y']].resample('24h').mean().compute().plot();

Este último ejemplo calcula la media móvil de 24 horas de los datos.

In [None]:
df[["x", "y"]].rolling(window="24h").mean().head()


El acceso aleatorio es económico a lo largo del índice, pero ya que el DataFrame de Dask es perezoso, debe calcularse para materializar los datos.

In [None]:
df.loc["2000-01-05"]


In [None]:
%time df.loc['2000-01-05'].compute()

## Establecer Índice

Los datos están ordenados por la columna de índice.

In [None]:
df5 = df.set_index("name")
df5


Dado que restablecer el índice para este conjunto de datos es costoso y podemos ajustarlo en nuestra RAM disponible, persistimos el conjunto de datos en memoria.

In [None]:
df5 = df5.persist()
df5


Dask ahora sabe dónde residen todos los datos, indexados por nombre. Como resultado, operaciones como el acceso aleatorio son baratas y eficientes.

In [None]:
%time df5.loc['Alice'].compute()

## Groupby Apply con Scikit-Learn

Ahora que nuestros datos están ordenados por nombre, podemos hacer operaciones de bajo costo como el acceso aleatorio por nombre, o groupby-apply con funciones personalizadas.

Aquí entrenamos un modelo de regresión lineal de scikit-learn diferente en cada nombre.

In [None]:
from sklearn.linear_model import LinearRegression


def train(partition):
    if not len(partition):
        return
    est = LinearRegression()
    est.fit(partition[["x"]].values, partition.y.values)
    return est


El argumento `partition` para `train()` será una de las instancias de grupo de `DataFrameGroupBy`. Si no hay datos en la partición, no necesitamos continuar. Si hay datos, queremos ajustar el modelo de regresión lineal y devolverlo como el valor para este grupo.

Ahora, trabajando con `df5`, cuyo índice son los nombres de `df`, podemos agrupar por la columna `names`. Esto también resulta ser el índice, pero está bien. Luego usamos `.apply()` para ejecutar `train()` en cada grupo en el `DataFrameGroupBy` generado por `.groupby()`.

El argumento `meta` le dice a Dask cómo crear el `DataFrame` o `Series` que contendrá el resultado de `.apply()`. En este caso, `train()` devuelve un solo valor, por lo que `.apply()` creará una `Series`. Esto significa que necesitamos decirle a Dask cuál debe ser el tipo de esa única columna y, opcionalmente, darle un nombre.

La forma más fácil de especificar una sola columna es con una tupla. El nombre de la columna es el primer elemento de la tupla. Dado que se trata de una serie de regresiones lineales, nombraremos la columna como `"LinearRegression"`. El segundo elemento de la tupla es el tipo del valor de retorno de `train`. En este caso, Pandas almacenará el resultado como un `object` general, que debería ser el tipo que pasamos.

In [None]:
df6 = df5.groupby("name").apply(
    train, meta=("LinearRegression", object)
).compute()
df6

## Lecturas Adicionales

Para una introducción más profunda a los DataFrames de Dask, consulta el [tutorial de Dask](https://github.com/dask/dask-tutorial), notebooks 04 y 07.