## DASK intro
Original source: https://www.machinelearningplus.com/python/dask-tutorial/

In [None]:
from time import sleep

### Dask DataFrames
A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. 

In [None]:
import dask.dataframe as dd
data_frame = dask.datasets.timeseries()

In [None]:
# Applying groupby operation
df = data_frame.groupby('name').y.std()
df

In [None]:
data_frame.columns

In [None]:
%%time
pandas_df = df.compute()
type(pandas_df)

In [None]:
# Calling the persist function of dask dataframe
df = df.persist()
type(df)

### Dask Bags
Dask.bag is a high-level Dask collection used as an alternative for the regular python lists, etc. The main difference is Dask Bags are lazy and distributed.

In [None]:
dask.__version__

In [None]:
import dask.bag as db

In [None]:
a = db.from_url('http://raw.githubusercontent.com/dask/dask/master/README.rst',) 
a.npartitions  

In [None]:
a.visualize()

In [None]:
adf = a.to_dataframe()

In [None]:
adf.head()

### DASK distributed

Install https://github.com/dask/dask-labextension for more features!

In [None]:
# Import dask.distributed.Client and pandas
from dask.distributed import Client
import pandas as pd
import time

# Initializing a client
client = Client(processes=False)
client

In [None]:
client.close()

In [None]:
# Read csv  file into a pandas dataframe and process it
# This is a large file!!!
#df = pd.read_csv('/v/courses/dataesp2023.public/Datasets/D-LargeData/crcrimes.csv', sep='\t')
#df = df.drop('Unnamed: 0', axis=1)
#df = df.set_index('itm_nb')
df.head()

In [None]:
cols = df.columns

In [None]:
# A function to perform desired operation
def do_operation(df, index, col):
    new_df=df[col]

In [None]:
%%time
# Loop through the indices and columns and call the function.
for index in df.index:
    for col in cols:
        do_operation(df, index, col)

In [None]:
%%time
# Use Dask client to parallelize the workload.

# Create a futures array to store the futures returned by Dask
futures = []

# Scatter the dataframe beforehand
df_future = client.scatter(df)

for index in df.index:
    for col in cols:
        # Submit tasks to the dask client in parallel
        future = client.submit(do_operation, df_future, index, col)
        # Store the returned future in futures list
        futures.append(future)

# Gather the results.
_ = client.gather(futures)