## DASK intro
Original source: https://www.machinelearningplus.com/python/dask-tutorial/

### Dask DataFrames
A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. 

In [2]:
import dask.dataframe as dd
import dask
data_frame = dask.datasets.timeseries()

In [3]:
%%time
# Applying groupby operation
df = data_frame.groupby('name').y.std()
df

CPU times: user 20.2 ms, sys: 3.79 ms, total: 24 ms
Wall time: 40.3 ms


Dask Series Structure:
npartitions=1
    float64
        ...
Name: y, dtype: float64
Dask Name: sqrt, 67 tasks

In [4]:
data_frame.columns

Index(['id', 'name', 'x', 'y'], dtype='object')

In [5]:
%%time
pandas_df = df.compute()
type(pandas_df)

CPU times: user 2.2 s, sys: 454 ms, total: 2.65 s
Wall time: 1.63 s


pandas.core.series.Series

In [6]:
# Calling the persist function of dask dataframe
df = df.persist()
type(df)

dask.dataframe.core.Series

### Dask Bags
Dask.bag is a high-level Dask collection used as an alternative for the regular python lists, etc. The main difference is Dask Bags are lazy and distributed.
Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.


In [6]:
dask.__version__

'2022.7.0'

In [7]:
import dask.bag as db

In [8]:
a = db.from_url('http://raw.githubusercontent.com/dask/dask/master/README.rst',) 
a.npartitions  

1

In [9]:
a.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [10]:
adf = a.to_dataframe()

In [11]:
adf.head()

Unnamed: 0,0
0,b'Dask\n'
1,b'====\n'
2,b'\n'
3,b'|Build Status| |Coverage| |Doc Status| |Disc...
4,b'\n'


### DASK distributed

Install https://github.com/dask/dask-labextension for more features!

In [None]:
# Import dask.distributed.Client and pandas
from dask.distributed import Client
import pandas as pd
import time

# Initializing a client
client = Client(processes=False)

In [28]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://172.18.157.180:8787/status,

0,1
Dashboard: http://172.18.157.180:8787/status,Workers: 1
Total threads: 5,Total memory: 28.00 GiB
Status: running,Using processes: False

0,1
Comm: inproc://172.18.157.180/155/9,Workers: 1
Dashboard: http://172.18.157.180:8787/status,Total threads: 5
Started: 6 minutes ago,Total memory: 28.00 GiB

0,1
Comm: inproc://172.18.157.180/155/12,Total threads: 5
Dashboard: http://172.18.157.180:40829/status,Memory: 28.00 GiB
Nanny: None,
Local directory: /tmp/dask-worker-space/worker-yxh1nnc_,Local directory: /tmp/dask-worker-space/worker-yxh1nnc_


In [14]:
#client.close()

In [35]:
# Read csv  file into a pandas dataframe and process it
# This is a large file!!!
df = pd.read_csv('/v/courses/dataexp2024.public/Datasets/D-LargeData/crimes.csv', nrows=10000)
df.head()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11034701,JA366925,01/01/2001 11:00:00 AM,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8.0,45.0,11,,,2001,08/05/2017 03:50:08 PM,,,
1,11227287,JB147188,10/08/2017 03:00:00 AM,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,21.0,73.0,2,,,2017,02/11/2018 03:57:41 PM,,,
2,11227583,JB147595,03/28/2017 02:00:00 PM,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,18.0,70.0,5,,,2017,02/11/2018 03:57:41 PM,,,
3,11227293,JB147230,09/09/2017 08:17:00 PM,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,20.0,42.0,6,,,2017,02/11/2018 03:57:41 PM,,,
4,11227634,JB147599,08/26/2017 10:00:00 AM,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,42.0,32.0,2,,,2017,02/11/2018 03:57:41 PM,,,


In [36]:
cols = df.columns

In [37]:
# A function to perform desired operation
def do_operation(df, index, col):
    new_df=df[col]

In [38]:
%%time
# Loop through the indices and columns and call the function.
for index in df.index:
    for col in cols:
        do_operation(df, index, col)

CPU times: user 557 ms, sys: 16.2 ms, total: 573 ms
Wall time: 562 ms


In [39]:
%%time
# Use Dask client to parallelize the workload.

# Create a futures array to store the futures returned by Dask
futures = []

# Scatter the dataframe beforehand
df_future = client.scatter(df)

for index in df.index:
    for col in cols:
        # Submit tasks to the dask client in parallel
        future = client.submit(do_operation, df_future, index, col)
        # Store the returned future in futures list
        futures.append(future)

# Gather the results.
_ = client.gather(futures)

KeyboardInterrupt: 

Other example:

https://www.vantage-ai.com/en/blog/4-strategies-how-to-deal-with-large-datasets-in-pandas