# Preface

Over the past year, I had the opportunity to work with **out-of-core** datasets (larger than available memory) in both data engineering and analysis tasks.  
This article documents my experiments with Dask as a key tool for tackling these challenges.  

## Simulate data for continuous data integration workflow
The raw data use for testing from [Synthetic credit card transaction data](https://www.kaggle.com/datasets/ealtman2019/credit-card-transactions) with single .csv file ~2.3GB in size.  
To test simulate continuous data ingestion, use below code to split the single csv file in to multiple files split by Year. 

In [3]:
import os
from pathlib import Path

# Set data path to support Kaggle dataset
if os.environ.get('KAGGLE_KERNEL_RUN_TYPE', 'Localhost') == 'Interactive':
    data_path = Path("/kaggle/input/credit-card-transactions")

In [4]:
os.listdir(data_path)

['sd254_users.csv',
 'User0_credit_card_transactions.csv',
 'sd254_cards.csv',
 'credit_card_transactions-ibm_v2.csv']

In [5]:
import pandas as pd
import numpy as np

pdf = pd.read_csv(data_path/"credit_card_transactions-ibm_v2.csv", dtype={'Errors?': 'object'})

In [6]:
uniq_year = pdf["Year"].unique().tolist()

In [7]:
pdf.dtypes

User                int64
Card                int64
Year                int64
Month               int64
Day                 int64
Time               object
Amount             object
Use Chip           object
Merchant Name       int64
Merchant City      object
Merchant State     object
Zip               float64
MCC                 int64
Errors?            object
Is Fraud?          object
dtype: object

In [8]:
type(uniq_year)
uniq_year.sort()
uniq_year

[1991,
 1992,
 1993,
 1994,
 1995,
 1996,
 1997,
 1998,
 1999,
 2000,
 2001,
 2002,
 2003,
 2004,
 2005,
 2006,
 2007,
 2008,
 2009,
 2010,
 2011,
 2012,
 2013,
 2014,
 2015,
 2016,
 2017,
 2018,
 2019,
 2020]

In [9]:
for y in uniq_year:
    print(y)
    yrly_ddf = pdf[pdf["Year"] == y]
    yrly_ddf.to_csv(f"/kaggle/working/yearly_data_{y}.csv", index=False)

1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020


# Dask Cluster & Dashboard 

Dask could run locally without cluster initiation. The benefit of explicited cluster initiation is the cluster dashboard that very useful to monitor and optimize dataflow.  

Some Dask version have conflict with library `msgpack-python` and could not show the cluster dashboard. [To fix the issue, upgrade msgpack-python == 1.0.5](https://github.com/dask/distributed/issues/8038)  

## Cluster configuration

There are 3 important parameters number of worker, memory configuration and shuffle method 

### Number of worker
This parameter will define the size of worker memory and also level of parallization.  
**n_workers** : number of workers (Dask show _processes_ ). The worker memory after allocated to core service (2G) will be distribued equally among each workers if Local machine have 16MB memory, setting 4 workers ; each will have (16 - 2 (core service)) = 14 / 4 ~ 3.5G per worker.  

**Reference**
- [Officail doccuments](https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)
- [Threads Process](https://stackoverflow.com/questions/57760475/difference-between-dask-distributed-localcluster-with-threads-vs-processes)

## Memory configuration

Will define how Dask handle data in each worker memory and when to spilled to disk when work with data that larger than worker memory size.

distributed worker memory:  
- target: 0.60     # fraction of managed memory where we start spilling to disk  
- spill: 0.70      # fraction of process memory where we start spilling to disk  
- pause: 0.80      # fraction of process memory at which we pause worker threads  
- terminate: 0.95  # fraction of process memory at which we terminate the worker


**Reference**
- [Official doccumetns](https://distributed.dask.org/en/stable/worker-memory.html#thresholds-configuration)
- [SO on how to set memory config](https://stackoverflow.com/questions/55784232/right-way-to-set-memory-parameters-for-localcluster-in-dask)

### Shuffling Method

Shuffle is the method of transferring data between worker when function `sort`, `merge`, `groupby` called.  
Currently Dask default on `P2P Shuffle` technics which help reduce worker memory footprint.

**Reference**
- [Official doccuments](https://docs.coiled.io/blog/shuffling-large-data-at-constant-memory.html)


In [14]:
import dask
from dask.distributed import LocalCluster
from dask import dataframe as dd

dask.config.set({ "distributed.worker.memory.target": 0.6, 
                 "distributed.worker.memory.spill": 0.7, 
                 "distributed.worker.memory.pause": 0.8, 
                 "distributed.worker.memory.terminate": 0.95,
                 "dataframe.shuffle.method": "p2p" }) 

client = LocalCluster(n_workers=4).get_client()

In [15]:
# Print the client to see the cluster information
print(client)

<Client: 'tcp://127.0.0.1:36143' processes=4 threads=4, memory=30.00 GiB>


In [17]:
# Show the dashboard link, not work in kaggle
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 30.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36143,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: 1 minute ago,Total memory: 30.00 GiB

0,1
Comm: tcp://127.0.0.1:42113,Total threads: 1
Dashboard: http://127.0.0.1:36743/status,Memory: 7.50 GiB
Nanny: tcp://127.0.0.1:38581,
Local directory: /tmp/dask-scratch-space/worker-8_rrctt_,Local directory: /tmp/dask-scratch-space/worker-8_rrctt_

0,1
Comm: tcp://127.0.0.1:38399,Total threads: 1
Dashboard: http://127.0.0.1:33173/status,Memory: 7.50 GiB
Nanny: tcp://127.0.0.1:42461,
Local directory: /tmp/dask-scratch-space/worker-sk408wpu,Local directory: /tmp/dask-scratch-space/worker-sk408wpu

0,1
Comm: tcp://127.0.0.1:39721,Total threads: 1
Dashboard: http://127.0.0.1:33281/status,Memory: 7.50 GiB
Nanny: tcp://127.0.0.1:40057,
Local directory: /tmp/dask-scratch-space/worker-l7u3bkxp,Local directory: /tmp/dask-scratch-space/worker-l7u3bkxp

0,1
Comm: tcp://127.0.0.1:46311,Total threads: 1
Dashboard: http://127.0.0.1:35561/status,Memory: 7.50 GiB
Nanny: tcp://127.0.0.1:42675,
Local directory: /tmp/dask-scratch-space/worker-8m8fzshr,Local directory: /tmp/dask-scratch-space/worker-8m8fzshr


In [18]:
# Show the dashboard link
client.dashboard_link

'http://127.0.0.1:8787/status'

# Data ingesting workflow & test case

In this part will test Dask capability for data engineering tasks, by appending each year of data to datamart in Parquet format.


## Datamart creation
**Configuration**
- Appending each raw data in to parquet.
- Set index on column `Card` by each raw file ingested, since `Card` is non-unique then division in each raw file ingest need to turn off (`ignore_divisions=True`) for appending into .pqrquet
- The index columns is sorted by default
- Non-Hive style partition folder (no `partition_on` parameter)

In [None]:
schema = {
    "User": "int64",
    "Card": "int64",
    "Year": "int64",
    "Month": "int64",
    "Day": "int64",
    "Time": "string",
    "Amount": "string",
    "Use Chip": "string",
    "Merchant Name": "int64",
    "Merchant City": "string",
    "Merchant State": "string",
    "Zip": "float64",
    "MCC": "int64",
    "Errors?": "string",
    "Is Fraud?": "string"
}

In [None]:
yr_rng = list(range(1991, 2021))
data_combined = data_path/"credit"/"data_combined_no_hive_sort_idx_no_div.parquet"

for y in yr_rng:
    print(y)
    yrly_ddf = dd.read_csv(data_path/"credit"/f"yearly_data_{y}.csv", dtype=schema)
    yrly_ddf = yrly_ddf.set_index("Card", partition_size="100MB")
    
    if data_combined.exists():    
        yrly_ddf.to_parquet(data_combined, append=True, ignore_divisions=True)
    else:
        yrly_ddf.to_parquet(data_combined)

## Query performance
Reference tips to optimized query performance  

Indexing  
- https://stackoverflow.com/questions/75915860/dask-and-best-practices-with-multiple-indices
- https://stackoverflow.com/questions/16626058/what-is-the-performance-impact-of-non-unique-indexes-in-pandas

Joining performance 
- https://stackoverflow.com/questions/71233619/why-do-i-get-always-a-memory-error-after-i-run-dask-with-big-dataframes
- https://docs.dask.org/en/latest/dataframe-joins.html#sorted-joins
- https://www.coiled.io/blog/dask-dataframe-merge-join

The duplicated index (User) force output without division metadata

In [None]:
parquet_path = data_path/"credit"/"data_combined_no_hive_sort_idx_no_div.parquet"
ddf = dd.read_parquet(parquet_path)

In [None]:
# Number of partitions = number of raw ingested files (no need to split, due to small size)
ddf.npartitions

In [None]:
# Unknown divisions, since the specified not to store divisions
ddf.known_divisions

In [None]:
# Check index type, and name 'Card'
ddf.index.head()

In [None]:
# The rest of data frame columns will excluding the index
ddf.columns

In [None]:
%%timeit
ddf.groupby("Card")["MCC"].nunique().compute()

In [None]:
%%timeit
# To groupby and aggregate index column, reset_index() is needed
ddf.reset_index().groupby("User")["Card"].nunique().compute()

In [None]:
%%timeit
ddf.groupby("Year")["Zip"].sum().compute()

In [None]:
%%timeit
ddf.query("Year == 2010")["User"].count().compute()

In [None]:
ddf.query("Year == 2010")["User"].count().visualize()

# Case 2) Datamart with Hive Parition
**Reference**   
- https://docs.dask.org/en/latest/dataframe-hive.html

**Configuration**
- Appending each raw data in to parquet.
- Set index on column `Card` by each raw file ingested, since `Card` is non-unique then division in each raw file ingest need to turn off (`ignore_divisions=True`) for appending into .pqrquet
- The index columns is sorted by default
- With Hive style partition folder on columns "Year" (`partition_on`="Year")

In [None]:
schema = {
    "User": "int64",
    "Card": "int64",
    "Year": "int64",
    "Month": "int64",
    "Day": "int64",
    "Time": "string",
    "Amount": "string",
    "Use Chip": "string",
    "Merchant Name": "int64",
    "Merchant City": "string",
    "Merchant State": "string",
    "Zip": "float64",
    "MCC": "int64",
    "Errors?": "string",
    "Is Fraud?": "string"
}

In [None]:
yr_rng = list(range(1991, 2021))
data_combined = data_path/"credit"/"data_combined_with_hive_sort_idx_no_div.parquet"

for y in yr_rng:
    print(y)
    yrly_ddf = dd.read_csv(data_path/"credit"/f"yearly_data_{y}.csv", dtype=schema)
    yrly_ddf = yrly_ddf.set_index("Card", partition_size="100MB")
    
    if data_combined.exists():    
        yrly_ddf.to_parquet(data_combined, append=True, ignore_divisions=True, partition_on=["Year"])
    else:
        yrly_ddf.to_parquet(data_combined, partition_on=["Year"])

## Query performance

In [None]:
parquet_path = data_path/"credit"/"data_combined_with_hive_sort_idx_no_div.parquet"
ddf2 = dd.read_parquet(parquet_path)

In [None]:
# Number of partitions = number of raw ingested files (no need to split, due to small size)
ddf2.npartitions

In [None]:
# Unknown divisions, since the specified not to store divisions
ddf2.known_divisions

In [None]:
# Check index type, and name 'Card'
ddf2.index.head()

In [None]:
# The rest of data frame columns will excluding the index
ddf2.columns

With hive partitionning, there are overhead on folder scanning

In [None]:
%%timeit
ddf2.groupby("User")["MCC"].nunique().compute()

In [None]:
%%timeit
ddf2.reset_index().groupby("User")["Card"].nunique().compute()

In [None]:
%%timeit
ddf2.groupby("Year")["Zip"].sum().compute()

In [None]:
%%timeit
ddf2.query("Year == 2010")["User"].count().compute()

In [None]:
ddf2.query("Year == 2010")["User"].count().visualize()

To fully utilized hive partition, filtering must definded on `read_parquet` with `filter` parameter defined  
No predicate pushdown at query optimization like Spark

In [None]:
parquet_path = data_path/"credit"/"data_combined_with_hive_sort_idx_no_div.parquet"
year_2010 = dd.read_parquet(parquet_path, filters=[("Year", "==", 2010)])
year_2010.query("Year == 2010")["User"].count().visualize()

In [None]:
%%timeit
year_2010.query("Year == 2010")["User"].count().compute()