<a href="https://colab.research.google.com/github/ufrpe-ensino/ic-aulas/blob/master/aulas/15_Dask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Dask

Adaptado deste [tutorial](https://colab.research.google.com/github/dask/dask-examples/blob/main/dataframes/01-data-access.ipynb#scrollTo=gAg2OgAbX-0h)

Como conjuntos de dados e cálculos são escalonados mais rápido do que CPUs e RAM, precisamos encontrar maneiras de escalar nossos cálculos em várias máquinas.

O [Dask](https://docs.dask.org/en/latest/) fornece maneiras de dimensionar os fluxos de trabalho do Pandas, Scikit-Learn e Numpy de forma mais nativa, com o mínimo de reescrita. Ele se integra bem com essas ferramentas para que copie a maior parte de sua API e use suas estruturas de dados internamente.

## Dask vs Spark

O Dask é menor e mais leve do que o Spark. Isso significa que ele tem menos recursos e, em vez disso, é usado em conjunto com outras bibliotecas, especialmente aquelas no ecossistema Python numérico. 

O framework é escrito em Python e só realmente suporta Python (ao contrário de Spark, cuja base é feita em Scala). Ele interopera bem com C / C ++ / Fortran / LLVM ou outro código compilado nativamente vinculado por meio de Python.


## Instalando

In [13]:
!pip install "dask[complete]"    # Install everything
!pip install dask distributed --upgrade

Requirement already up-to-date: dask in /usr/local/lib/python3.7/dist-packages (2021.4.0)
Requirement already up-to-date: distributed in /usr/local/lib/python3.7/dist-packages (2021.4.0)


## Dask Client

Iniciar o Dask Client é opcional. Ele fornecerá um painel útil para obter informações sobre a computação.

O link para o painel ficará visível quando você criar o cliente abaixo. Recomendamos que ele seja aberto em uma aba a parte

In [14]:
# Obs: não acessível via google colab
# from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, 
#                 processes=False, memory_limit='2GB')
# client

## Criar dataset sintético

In [15]:
import os
import datetime
import dask

df = dask.datasets.timeseries()
df.head()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,982,Frank,0.372798,-0.671421
2000-01-01 00:00:01,1030,Zelda,-0.265306,0.282275
2000-01-01 00:00:02,1023,Alice,-0.70673,0.832769
2000-01-01 00:00:03,1001,Patricia,-0.98702,-0.68723
2000-01-01 00:00:04,986,Dan,-0.138893,-0.305021


In [16]:
df.tail()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-30 23:59:55,995,Ursula,-0.155304,-0.431605
2000-01-30 23:59:56,963,Zelda,-0.390994,0.485158
2000-01-30 23:59:57,998,Bob,-0.922478,-0.450349
2000-01-30 23:59:58,1025,Zelda,-0.357398,-0.88804
2000-01-30 23:59:59,960,Oliver,-0.309276,0.559729


In [17]:

if not os.path.exists('data'):
    os.mkdir('data')

def name(i):
    """ Provide date for filename given index
    
    Examples
    --------
    >>> name(0)
    '2000-01-01'
    >>> name(10)
    '2000-01-11'
    """
    return str(datetime.date(2000, 1, 1) + i * datetime.timedelta(days=1))
    
df.to_csv('data/*.csv', name_function=name);

In [18]:
!ls data/*.csv | head

data/2000-01-01.csv
data/2000-01-02.csv
data/2000-01-03.csv
data/2000-01-04.csv
data/2000-01-05.csv
data/2000-01-06.csv
data/2000-01-07.csv
data/2000-01-08.csv
data/2000-01-09.csv
data/2000-01-10.csv


In [19]:
!head data/2000-01-01.csv

timestamp,id,name,x,y
2000-01-01 00:00:00,982,Frank,0.37279827235717056,-0.6714212255522889
2000-01-01 00:00:01,1030,Zelda,-0.26530616867534285,0.2822752724887949
2000-01-01 00:00:02,1023,Alice,-0.7067303321390788,0.832768637714254
2000-01-01 00:00:03,1001,Patricia,-0.9870202387565903,-0.6872295813292968
2000-01-01 00:00:04,986,Dan,-0.13889310641262664,-0.305020666519799
2000-01-01 00:00:05,962,Wendy,0.4659816575664466,-0.11166972686196552
2000-01-01 00:00:06,1019,Michael,0.7298711473135215,0.836366540355163
2000-01-01 00:00:07,1010,Alice,-0.4845917994009965,0.4769086053244449
2000-01-01 00:00:08,999,Ray,0.7312452870485233,0.3129597989115833


## Lendo múltiplos arquivos em um único dataframe

In [20]:
import dask.dataframe as dd

df = dd.read_csv('data/2000-*-*.csv')
df.head()

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01 00:00:00,982,Frank,0.372798,-0.671421
1,2000-01-01 00:00:01,1030,Zelda,-0.265306,0.282275
2,2000-01-01 00:00:02,1023,Alice,-0.70673,0.832769
3,2000-01-01 00:00:03,1001,Patricia,-0.98702,-0.68723
4,2000-01-01 00:00:04,986,Dan,-0.138893,-0.305021


# Salvando em parquet

In [21]:
df.to_parquet('data/2000-01.parquet', engine='pyarrow')

### Partitioning

In [22]:
df['year-month-day'] = dd.to_datetime(df['timestamp']).dt.strftime('%Y-%m-%d')
df.head()

Unnamed: 0,timestamp,id,name,x,y,year-month-day
0,2000-01-01 00:00:00,982,Frank,0.372798,-0.671421,2000-01-01
1,2000-01-01 00:00:01,1030,Zelda,-0.265306,0.282275,2000-01-01
2,2000-01-01 00:00:02,1023,Alice,-0.70673,0.832769,2000-01-01
3,2000-01-01 00:00:03,1001,Patricia,-0.98702,-0.68723,2000-01-01
4,2000-01-01 00:00:04,986,Dan,-0.138893,-0.305021,2000-01-01


In [24]:
df.to_parquet('data/mydata.parquet', engine='pyarrow',
              partition_on=['year-month-day'])