# Introducción a Dask

Dask es una biblioteca de Python diseñada para facilitar el cálculo en paralelo y distribuido, lo que lo convierte en una herramienta extremadamente útil para trabajar con grandes volúmenes de datos y realizar cómputos complejos que no caben en la memoria de un solo equipo o que requieren ser ejecutados más rápidamente.

Uno de sus mayores atractivos es que está diseñado para integrarse bien con las bibliotecas más populares en el ecosistema de datos de Python, como Pandas, NumPy y Scikit-learn. Dask permite ampliar estas bibliotecas a clusters de múltiples máquinas sin cambiar prácticamente el código que ya funciona en un entorno local.

Principales componentes de Dask
- Dask Arrays: Extiende las capacidades de NumPy a grandes conjuntos de datos que no caben en memoria.
- Dask DataFrames: Una extensión de los DataFrames de Pandas para trabajar con datos más grandes que la memoria disponible.
- Dask Bags: Trabaja con datos semiestructurados o no estructurados, similares a listas o generadores de Python.
- Dask Delayed: Permite paralelizar funciones personalizadas y crear gráficos de tareas que se pueden ejecutar en paralelo.
- Dask Futures: Una API para el procesamiento asíncrono y paralelismo interactivo.

Beneficios de usar Dask
- Escalabilidad: Desde un solo equipo hasta clústeres distribuidos de miles de máquinas.
- Memoria eficiente: Permite procesar grandes volúmenes de datos que no caben en la memoria dividiendo el trabajo en pequeñas piezas.
- Fácil de usar: La integración con Pandas, NumPy y Scikit-learn facilita la transición a Dask.
- Paralelismo: Aprovecha múltiples núcleos de CPU en tu máquina o distribuye las tareas en un clúster para acelerar el procesamiento.
- Dinamismo: Soporta tanto el procesamiento interactivo como por lotes.
- Flexibilidad: Permite trabajar tanto con datos estructurados como no estructurados.


Para instalar dask lo ideal es instalarla al completo, ya que en caso contrario deberemos instalar de manera independiente cada subrama.

```
pip install dask[complete]
```

## Dask arrays

In [1]:
%%time

import numpy as np
x = np.ones((20,20),dtype=int)

x.sum()

CPU times: user 1.29 s, sys: 54.4 ms, total: 1.34 s
Wall time: 584 ms


np.int64(400)

Vemos como esto lo hace facil y rapido con numpy, pero y un array de mayor tamaño?

In [3]:
%%time
x = np.ones((20000,20000),dtype=int)

x.sum()

CPU times: user 210 ms, sys: 1.09 s, total: 1.3 s
Wall time: 1.3 s


np.int64(400000000)

In [4]:
%%time
x = np.ones((100000,100000),dtype=int)

x.sum()

MemoryError: Unable to allocate 74.5 GiB for an array with shape (100000, 100000) and data type int64

Da error debido al tamaño, por lo que es donde nos puede ayudar dask, ya que porejemplo dask array implementa una subset de la interfaz de numpy, pero con técnicas de optimizacion de uso de memoria, dividiendo grandes arrays en muchos mas pequeños, de manera que puede computar arrays mas grandes que la propia memoria del dispositivo.

Primero instaciamos el cliente, para visualizar como juega con ello en memoria. Puedes ir visualizandolo en otra ventana el dashboard.

In [10]:
from dask.distributed import Client
client=Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46009 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:46009/status,

0,1
Dashboard: http://127.0.0.1:46009/status,Workers: 4
Total threads: 16,Total memory: 7.39 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45795,Workers: 4
Dashboard: http://127.0.0.1:46009/status,Total threads: 16
Started: Just now,Total memory: 7.39 GiB

0,1
Comm: tcp://127.0.0.1:37719,Total threads: 4
Dashboard: http://127.0.0.1:33853/status,Memory: 1.85 GiB
Nanny: tcp://127.0.0.1:39853,
Local directory: /tmp/dask-scratch-space/worker-pp5vr4l8,Local directory: /tmp/dask-scratch-space/worker-pp5vr4l8

0,1
Comm: tcp://127.0.0.1:43279,Total threads: 4
Dashboard: http://127.0.0.1:35759/status,Memory: 1.85 GiB
Nanny: tcp://127.0.0.1:39401,
Local directory: /tmp/dask-scratch-space/worker-u1dj3hyk,Local directory: /tmp/dask-scratch-space/worker-u1dj3hyk

0,1
Comm: tcp://127.0.0.1:43123,Total threads: 4
Dashboard: http://127.0.0.1:37279/status,Memory: 1.85 GiB
Nanny: tcp://127.0.0.1:37307,
Local directory: /tmp/dask-scratch-space/worker-59kfdvcz,Local directory: /tmp/dask-scratch-space/worker-59kfdvcz

0,1
Comm: tcp://127.0.0.1:39119,Total threads: 4
Dashboard: http://127.0.0.1:45801/status,Memory: 1.85 GiB
Nanny: tcp://127.0.0.1:37831,
Local directory: /tmp/dask-scratch-space/worker-ngw691ug,Local directory: /tmp/dask-scratch-space/worker-ngw691ug


In [12]:
%%time
import dask.array as da
x = da.ones((20000,20000), chunks=(1000,1000))
x

CPU times: user 2.12 ms, sys: 0 ns, total: 2.12 ms
Wall time: 2.09 ms


Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Dask graph,400 chunks in 1 graph layer,400 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.98 GiB 7.63 MiB Shape (20000, 20000) (1000, 1000) Dask graph 400 chunks in 1 graph layer Data type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Dask graph,400 chunks in 1 graph layer,400 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [13]:
%%time
result=x.sum()
print(result.compute())

400000000.0
CPU times: user 357 ms, sys: 11.9 ms, total: 368 ms
Wall time: 394 ms


Comparemos la diferencia de tiempo entre numpy y dask, ha tardado menos pero por ejemplo para un array mayor tambien lo hara.
Diviendo el array en pequeños trozos, llamados chucks que permite operaciones en paralelo

In [11]:
%%time
y = da.ones((100000,100000), chunks=(1000,1000))
result=y.sum()
print(result.compute())

10000000000.0
CPU times: user 7.78 s, sys: 614 ms, total: 8.39 s
Wall time: 9.22 s


In [15]:
client.close() #no olvidar cerrar la conexion

Esto es igual para dataframe por ejemplo o para elemento de puython como listas o diccisonarios.

In [20]:
import dask.dataframe as dd

df = dd.read_csv("cars.csv",sep=";")
df

Unnamed: 0_level_0,Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,string,string,string,string,string,string,string,string,string
,...,...,...,...,...,...,...,...,...


In [21]:
df.head()

Unnamed: 0,Car,MPG,Cylinders,Displacement,Horsepower,Weight,Acceleration,Model,Origin
0,STRING,DOUBLE,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,CAT
1,Chevrolet Chevelle Malibu,18.0,8,307.0,130.0,3504.,12.0,70,US
2,Buick Skylark 320,15.0,8,350.0,165.0,3693.,11.5,70,US
3,Plymouth Satellite,18.0,8,318.0,150.0,3436.,11.0,70,US
4,AMC Rebel SST,16.0,8,304.0,150.0,3433.,12.0,70,US
