![Dask Icon](images/dask_horizontal_black.gif "Dask Icon")
![Pandas Icon](images/pandas_logo.png "Pandas Icon")

# Gotcha's from Pandas to Dask

https://github.com/sephib/dask_pyconil2019

This notebook highlights some key differences when transfering code from `Pandas` to run in a `Dask` environment.  
Most issues have a link to the [Dask documentation](https://docs.dask.org/en/latest/) for additional information.

# Agenda  
1. Intro to `Dask` framework
2. Basic setup
3. 

![Dask Icon](images/dask_horizontal_black.gif "Dask Icon")

Dask is a flexible library for parallel computing in Python.

![Dask Framework](images/dask_graph_outline.gif)



Dask is composed of two parts:

1. *Dynamic task scheduling* optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
2. *“Big Data” collections* like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

[link to documentation](https://docs.dask.org/en/latest/)

Dask emphasizes the following virtues:

* Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
* Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
* Native: Enables distributed computing in pure Python with access to the PyData stack.
* Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
* Scales up: Runs resiliently on clusters with 1000s of cores
* Scales down: Trivial to set up and run on a laptop in a single process
* Responsive: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans


See the [dask.distributed documentation (separate website)](https://distributed.dask.org/en/latest/) for more technical information on Dask’s distributed scheduler.

In [3]:
# since Dask is activly beeing developed - the current example is running with the below version
import dask
import dask.dataframe as dd
import pandas as pd
print(f'Dask versoin: {dask.__version__}')
print(f'Pandas versoin: {pd.__version__}')

Dask versoin: 1.2.2
Pandas versoin: 0.24.2


## Dask `Distributed` scheduler  
* When running code within a script use `context manager`  

```python   
import dask.dataframe as dd  
from dask.distributed import Client  

df = dd.read_csv(...) # do something
``` 
vs
```python   
if __name__ == '__main__':
    with Client() as client:
        df = dd.read_csv(...) # do something
```


* see question in [stack overflow](https://stackoverflow.com/a/53520917/5817977)  
* In order to get url dashboard use [inner function ](https://github.com/dask/distributed/issues/2083#issue-337057906)  


## Start Dask Client for Dashboard
![Dask Dashboard](images/dask_dashboard.png)


In [4]:
from dask.distributed import Client
# client = Client(n_workers=1, threads_per_worker=4, processes=False, memory_limit='2GB')
client = Client()
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:50403  Dashboard: http://127.0.0.1:50406/status,Cluster  Workers: 4  Cores: 4  Memory: 8.50 GB


Starting the Dask Client is optional.  In this example we are running on a `LocalCluster`, this  will also provide a dashboard which is useful to gain insight on the computation.  
For additional information on [Dask Client see documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup)  

The link to the dashboard will become visible when you create a client (as shown below).  
When running in `Jupyter Lab` an [extenstion](https://github.com/dask/dask-labextension) can be installed to be able to view the various dashboard widgets. 

See [documentation for addtional cluster configuration](http://distributed.dask.org/en/latest/local-cluster.html)

# Create 2 DataFrames for comparison: 
* `Dask framework` is **lazy**  ![lazy python](images/Sleeping-snake.jpg)

In [13]:
ddf = dask.datasets.timeseries() #  Dask comes with builtin dataset samples, we will use this sample for our example. 
ddf

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int32,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


In order to see the result we need to run [compute()](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.compute) 
 (or `head()` which runs under the hood compute()) )

In [14]:
ddf.compute()

Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,949,Laura,-0.523592,-0.692488
2000-01-01 00:00:01,1083,Edith,0.931568,0.510529
2000-01-01 00:00:02,1005,Tim,-0.424328,-0.696765
2000-01-01 00:00:03,1034,Alice,0.202373,0.311415
2000-01-01 00:00:04,964,Sarah,-0.043097,0.390466
2000-01-01 00:00:05,993,Quinn,0.060435,0.631855
2000-01-01 00:00:06,992,Patricia,-0.878349,-0.387187
2000-01-01 00:00:07,989,Victor,0.704567,0.923895
2000-01-01 00:00:08,1007,Kevin,0.467177,0.589763
2000-01-01 00:00:09,989,Michael,0.009290,-0.164473


## Pandas
In order to create a `Pandas` dataframe we can use the `compute()` 

In [15]:
pdf = ddf.compute()  
print(type(pdf))
pdf.head()

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,949,Laura,-0.523592,-0.692488
2000-01-01 00:00:01,1083,Edith,0.931568,0.510529
2000-01-01 00:00:02,1005,Tim,-0.424328,-0.696765
2000-01-01 00:00:03,1034,Alice,0.202373,0.311415
2000-01-01 00:00:04,964,Sarah,-0.043097,0.390466


## Creating a `Dask dataframe` from `Pandas`

In order to utilize `Dask` capablities on an existing `Pandas dataframe` (pdf) we need to convert the `Pandas dataframe` into a `Dask dataframe` (ddf)  with the [from_pandas](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_pandas) method. 
You must supply the number of `partitions` or `chunksize` that will be used to generate the dask dataframe

In [16]:
ddf2 = dd.from_pandas(pdf, npartitions=10)
print(type(ddf2))
ddf2.head() 

<class 'dask.dataframe.core.DataFrame'>


Unnamed: 0_level_0,id,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:00,949,Laura,-0.523592,-0.692488
2000-01-01 00:00:01,1083,Edith,0.931568,0.510529
2000-01-01 00:00:02,1005,Tim,-0.424328,-0.696765
2000-01-01 00:00:03,1034,Alice,0.202373,0.311415
2000-01-01 00:00:04,964,Sarah,-0.043097,0.390466


## Partitions in Dask Dataframes

Notice that when we created a `Dask dataframe` we needed to supply an argument of `npartitions`.  
The number of partitions will assist `Dask` on how it's going to parallelize the computation.  
Each partition is a *separate* dataframe. For additional information see [partition documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#partitions)  



Using `reset_index()` method we can examin the partitions:  
First lets look at the `Pandas` dataframe

In [17]:
pdf2 = pdf.reset_index()
# Only 1 row
pdf2.iloc[0]

timestamp    2000-01-01 00:00:00
id                           949
name                       Laura
x                      -0.523592
y                      -0.692488
Name: 0, dtype: object

Now lets look at a `Dask` dataframe

In [18]:
ddf2 = ddf2.reset_index() 
ddf2.loc[0].compute()  # each partition has an index=0
# ddf2.loc[0].visualize()

Unnamed: 0,timestamp,id,name,x,y
0,2000-01-01,949,Laura,-0.523592,-0.692488
0,2000-01-04,1020,George,0.096655,-0.964252
0,2000-01-07,994,Oliver,0.33011,0.54771
0,2000-01-10,1009,Quinn,0.135462,0.201359
0,2000-01-13,978,Victor,0.063773,0.398564
0,2000-01-16,1079,Yvonne,-0.740677,0.205735
0,2000-01-19,948,Jerry,0.065367,-0.164667
0,2000-01-22,1060,Norbert,0.706973,0.018618
0,2000-01-25,980,George,-0.315419,-0.398249
0,2000-01-28,1032,Wendy,0.323498,-0.52307


## dataframe.shape  
since `Dask` is lazy we cannot get the full shape before running `len`

In [19]:
print(f'Pandas shape: {pdf.shape}')
print('---------------------------')
print(f'Dask lazy shape: {ddf.shape}') 

Pandas shape: (2592000, 4)
---------------------------
Dask lazy shape: (Delayed('int-0ba98bb8-9fdf-4fe4-9886-55cad7624884'), 4)


In [20]:
print(f'Dask computed shape: {len(ddf.index)}')  # expensive

Dask computed shape: 2592000


Now that we have a `dask` (ddf) and a `pandas` (pdf) dataframe we can start to compair the interactions with them.

# Moving from Update to Insert/Delete


![inplaceTrue](images/inplace_true.png "inplace_true")


Dask does not update - thus there are no arguments such as `inplace=True` which exist in Pandas.  
For more detials see [issue#653 on github](https://github.com/dask/dask/issues/653)

### Rename Columns

In [23]:
# Pandas 
print(pdf.columns)

pdf.rename(columns={'id':'ID'}, inplace=True)

pdf.columns

Index(['ID', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

* using `inplace=True` is *not* considerd to be *best practice*. 

In [None]:
# Dask - Error
# ddf.rename(columns={'id':'ID'}, inplace=True)
# ddf.columns

'''
---------------------------------------------------------------------------  
TypeError                                 Traceback (most recent call last)  
<ipython-input-12-3e70ff3a549e> in <module>  
      1 # Dask - Error  
----> 2 ddf.rename(columns={'id':'ID'}, inplace=True)  
      3 ddf.columns  
TypeError: rename() got an unexpected keyword argument 'inplace'  
'''

In [22]:
# Dask or Pandas
print(ddf.columns)
ddf = ddf.rename(columns={'id':'ID'})
ddf.columns

Index(['id', 'name', 'x', 'y'], dtype='object')


Index(['ID', 'name', 'x', 'y'], dtype='object')

## Data manipilations  
There are several diffrences when manipulating data.  

### loc - Pandas

In [27]:
mask_cond = (pdf['x']>0.5) & (pdf['x']<0.8)

pdf.loc[mask_cond, ['y']] = pdf['y']* 100
pdf[mask_cond].head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:07,989,Victor,0.704567,923894.616916
2000-01-01 00:00:14,1003,Victor,0.603634,186217.826416


### Dask - use mask/where

In [None]:
# Error
# cond_dask = (ddf['x']>0.5) & (ddf['x']<0.8)
# ddf.loc[cond_dask, ['y']] = ddf['y']* 100

'''
> TypeError                                 Traceback (most recent call last)  
> <ipython-input-16-2bbb2ae570bd> in <module> 
>       2 # Error  
> ----> 3 ddf.loc[cond_dask, ['y']] = ddf['y']* 100  
> TypeError: '_LocIndexer' object does not support item assignment  
'''

In [29]:
cond_dask = (ddf['x']>0.5) & (ddf['x']<0.8)

ddf['y'] = ddf['y'].mask(cond_dask, ddf['y']* 100)
ddf[cond_dask].head(2)

Unnamed: 0_level_0,ID,name,x,y
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01 00:00:07,989,Victor,0.704567,9238.946169
2000-01-01 00:00:14,1003,Victor,0.603634,1862.178264


[dask mask documentation](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.mask)

## Meta argument

> `meta` is the prescription of the names/types of the computation output   
[see stack overflow answer](https://stackoverflow.com/questions/44432868/dask-dataframe-apply-meta)

![crystal python](images/crystalBallsnake.png "crystal snake")
Since `Dask` creates a DAG for the computation it requires to understand what are the outputs of each calculation (see [meta documentation](https://docs.dask.org/en/latest/dataframe-design.html?highlight=meta%20utils#metadata))

In [None]:
pdf['initials'] = pdf['name'].apply(lambda x: x[0]+x[1])
pdf.head(2)

In [None]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1])
ddf.head(2)

In [None]:
# Describe the outcome type of the calculation
meta_cal = pd.Series(object, name='initials')

In [None]:
ddf['initials'] = ddf['name'].apply(lambda x: x[0]+x[1]
                                    , meta = meta_cal)
ddf.head(2)

In [None]:
def func(row, col1, col2):
    if (row[col1]> 0):
        return row[col1] * 1000  
    else:
        return row[col2] * -1

In [None]:
# ddf['z'] = ddf.apply(func, args=('coor_x', 'coor_y'), axis=1, meta=('z', 'float'))
ddf['z'] = ddf.apply(func,args=('x', 'y'), axis=1, meta=('z', 'float'))
ddf.head()

### Map partitions
* We can supply an ad-hoc function to run on each partition using the [map_partitions](https://dask.readthedocs.io/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) method.   
Mainly useful for functions that are not implemented in `Dask` or `Pandas` . 
* Finally we can return a new `dataframe` which needs to be described in the `meta` argument  
The function could also include arguments.

In [None]:
import numpy as np
def func2(df, coor_x, coor_y, drop_cols):
    df['dist'] =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                           +  (df[coor_y] - df[coor_y].shift())**2 )
    df = df.drop(drop_cols, axis=1)
    return df

In [None]:
ddf2 = ddf.map_partitions(func2
                          , coor_x='x'
                          , coor_y='y'
                          , drop_cols=['initials', 'z']
                          , meta=pd.DataFrame({'ID':'i8'
                                              , 'name':str
                                              , 'x':'f8'
                                              , 'y':'f8'                                              
                                              , 'dist':'f8'}, index=[0]))
ddf2.head()

### Convert index into DateTime column

In [None]:
# Only Pandas
pdf = pdf.assign(times=pd.to_datetime(pdf.index).time)
pdf.head(2)

In [None]:
# ddf = ddf.assign(Time= dask.dataframe.to_datetime(ddf.index, format='%Y-%m-%d'). )
ddf = ddf.assign(times= dd.to_datetime(ddf.index).dt.time
                , dates = dd.to_datetime(ddf.index).dt.date)                 
ddf.head(2)

In [None]:
# Dask or Pandas
ddf = ddf.assign(times=ddf.index.astype('M8[ns]'))
ddf['dates'] = ddf['times'].dt.date
ddf['times'] = ddf['times'].dt.time
ddf.head()

## Drop NA on column

In [None]:
# Pandas
pdf = pdf.assign(colna = None)
print(pdf.head(2))
pdf = pdf.dropna(axis=1, how='all')
print(pdf.head(2))

In odrer for `Dask` to drop a column with all `na` 

In [None]:
# Dask
ddf = ddf.assign(colna = None)
# check if all values in column are Null - expensive
if ddf.colna.isnull().all().compute() == True:   
    ddf = ddf.drop(labels=['colna'],axis=1)
print(ddf.head(2))

##  Reset Index

In [None]:
# Pandas
pdf = pdf.reset_index(drop=True)
pdf.head(2)

In [None]:
# Dask
ddf = ddf.reset_index()
ddf = ddf.drop(labels=['timestamp'], axis=1 )
ddf.head(2)

# Read / Save files

When working with `pandas` and `dask` preferable try and work with [parquet](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#store-data-in-apache-parquet-format).  
Even so when working with `Dask` - the files can be read with multiple workers .  
Most `kwargs` are applicable for reading and writing files [see documentaion](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_csv) (including the option for output file naming).  
e.g. 
ddf = dd.read_csv('data/pd2dd/ddf*.csv', compression='gzip', header=False).  

However some are not available such as  `nrows`.

## Save files

In [None]:
# Pandas
from pathlib import Path
output_file = 'pdf_single_file.csv'
output_dir = Path('data/')
output_dir.mkdir(parents=True, exist_ok=True)
pdf.to_csv(output_dir / output_file)

In [None]:
list(Path(output_dir).glob('*.csv'))

`Dask`
Notice the '*' to allow for multiple file renaming. 



In [None]:
output_dask_dir = Path('data/pd2dd/')
output_dir.mkdir(parents=True, exist_ok=True)

In [None]:
# Dask
ddf.to_csv(f'{output_dask_dir}/ddf*.csv', index = False)

To find the number of partitions which will determine the number of output files use [dask.dataframe.npartitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.npartitions)  

In [None]:
ddf.npartitions

In [None]:
list(Path(output_dask_dir).glob('*.csv'))

To change the number of output files use [repartition](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.repartition) which is an expensive operation.

## Read files

For `pandas` it is possible to iterate and concat the files [see answer from stack overflow](https://stackoverflow.com/questions/20906474/import-multiple-csv-files-into-pandas-and-concatenate-into-one-dataframe).

In [None]:
%%time
# Pandas
dir_path = Path(r'data/pd2dd')
concat_df = pd.concat([pd.read_csv(f) for f in list(dir_path.glob('*.csv'))])
len(concat_df)

In [None]:
%%time
# Dask
_ddf = dd.read_csv('data/pd2dd/ddf*.csv')
len(_ddf)

 ## Consider using Persist
Since Dask is lazy - it may run the **entire** graph/DAG (again) even if it already run part of the calculation in a previous cell.  Thus use [persist](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=parquet#persist-intelligently) to keep the results in memory 
```python
ddf = client.persist(ddf)
```
This is different from Pandas which once a variable was created it will keep all data in memory.  
Additional information can be read in this [stackoverflow issue](https://stackoverflow.com/questions/45941528/how-to-efficiently-send-a-large-numpy-array-to-the-cluster-with-dask-array/45941529#45941529) or see an exampel in [this post](http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes)   
This concept should also  be used when running a code within a script (rather then a jupyter notebook) which incoperates loops within the code.

In [None]:
_ddf = dd.read_csv('data/pd2dd/ddf*.csv')
ddf = client.persist(_ddf)
ddf.head(2)

# Group By - custom aggregations
In addition to the [groupby notebook example](https://github.com/dask/dask-examples/blob/master/dataframes/02-groupby.ipynb)  - 
this is another example how to try to eliminate the use of `groupby.apply`  
In this example we are grouping by columns into unique list.

In [None]:
# prepare pandas dataframe
pdf = pdf.assign(Time=pd.to_datetime(pdf.index).time)
pdf['seconds'] = pdf.Time.astype(str).str[-2:]
pdf.head()

In [None]:
%%time
pdf_gb = pdf.groupby(pdf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [pdf_gb[att_col_gr].apply
               (lambda x: list(set(x.to_list()))) 
               for att_col_gr in gp_col]
df_edge_att = pdf_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')        

In [None]:
df_edge_att.head(2)

In any case sometimes using Pandas is more efficiante (assuming that you can load all the data into the RAM).  
In this case Pandas is faster

In [None]:
def set_list_att(x: dd.Series):
        return list(set([item for item in x.values]))
ddf['seconds'] = ddf.times.astype(str).str[-2:]
ddf = client.persist(ddf)
ddf.head(2)

In [None]:
%%time
# Dask option1 using apply
# notice the meta argument in the apply function
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].apply(set_list_att
                ,meta=pd.Series(dtype='object', name=f'{att_col_gr}_att')) 
               for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
    df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)

Using [dask custom aggregation](https://docs.dask.org/en/latest/dataframe-api.html?highlight=dropna#dask.dataframe.groupby.Aggregation) is consideribly better

In [None]:
# Dask
import itertools
custom_agg = dd.Aggregation(
    'custom_agg', 
    lambda s: s.apply(set), 
    lambda s: s.apply(lambda chunks: list(set(itertools.chain.from_iterable(chunks)))),)

In [None]:
%%time
# Dask option1 using apply
df_gb = ddf.groupby(ddf.name)
gp_col = ['ID', 'seconds']
list_ser_gb = [df_gb[att_col_gr].agg(custom_agg) for att_col_gr in gp_col]
df_edge_att = df_gb.size().to_frame(name="Weight")
for ser in list_ser_gb:
        df_edge_att = df_edge_att.join(ser.to_frame(), how='left')
df_edge_att.head(2)  

In [None]:
df_edge_att.head()

## Debugging
Debugging may be more challenging since
1. when using a client - mutliprocessing is complecated
2. sometime introducing a faulty command into a graph (such as in a jupyter notebook) requirues to cache-out the graph and start the process from the begining

## Corrupted DAG

In [None]:
# reset index
ddf = dask.datasets.timeseries()

In [None]:
# returns an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2  
                     +  (df[coor_y] - df[coor_y].shift())^2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))

In [None]:
ddf.head()

* Even if the function is currected the DAG is corrupted

In [None]:
# Still results with an error
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  
                     +  (df[coor_y] - df[coor_y].shift())**2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y'
                                , meta=('float'))
ddf.head(2)

Need to reset the dataframe

In [None]:
ddf = dask.datasets.timeseries()
def func_dist2(df, coor_x, coor_y):
    dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())**2  +  (df[coor_y] - df[coor_y].shift())**2 )
#     dist =  np.sqrt ( (df[coor_x] - df[coor_x].shift())^2  +  (df[coor_y] - df[coor_y].shift())^2 )
    return dist
ddf['col'] = ddf.map_partitions(func_dist2, coor_x='x', coor_y='y', meta=('float'))
ddf.head(2)

# Summary
1. `Dask` is lazy but efficient (parallel computing)
2. Usefull when comming from a `Pandas` (instead of `pysaprk`) 
3. Distributed environments - from single laptop to thousands clusters (including visabilty into the computation)
4. But beware of:
  * missing functionalities from `Pandas` API
  * currupted DAGs


js.berry@gmail.com

https://github.com/sephib/dask_pyconil2019