# Introduction to Dask
by Dr Stef Garasto

<p>
Dask is a popular python library for parallel computing. It is designed to integrate easily with existing python code and other common libraries like Numpy and Pandas. During this introductory tutorial you will get to know some fundamental Dask functions, like dask.delayed and dask.dataframe, and workflows. There will also be some hands-on coding exercises for you to practice your newfound Dask knowledge. 
</p>

### References (and sources of inspiration):
[1] Dask Tutorial by https://github.com/dask/dask-tutorial/

[2] Dask documentation https://docs.dask.org/en/latest/

Technical reminder: press shift-enter to execute single cells in this notebook.

# Setup

In [None]:
# Setup
LOCAL = True
if not LOCAL:
    # Install some requirements
    !pip install snakeviz
    %pip install "tornado>=5" 
    !pip -q install dask
    !pip -q install distributed
    #!pip -q install --upgrade --ignore-installed numpy pandas scipy sklearn
    !pip -q install graphviz 
    !apt-get install graphviz -qq
    !pip -q install pydot
    !pip -q install bokeh
    !pip -q install 'fsspec>=0.3.3'


In [None]:
if not LOCAL:
    # to get the data
    !rm -r dask-mini-tutorial
    !git clone https://github.com/stefgrs/dask-mini-tutorial

In [None]:
# collect the dataset
%run prep.py -d flights

In [None]:
# setup the Dask scheduler (just go with it for now!)
from dask.distributed import Client, progress

client = Client(n_workers=4, processes = False)
client

#import dask
#dask.config.set(scheduler='threads')


# Pandas dataframes

Pandas is a python library for data manipulation and analysis. A dataframe is the main data structure in pandas. In the vast majority of cases a pandas dataframe is created by reading a '.csv' file from disk.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# let's have a look at a pandas DataFrame
# load sample data
titanic = pd.read_csv('data/titanic.csv')

# visualise the dataframe
titanic.head()


In [None]:
titanic.tail()


In [None]:
# each row is uniquely identified by its index
titanic.index


In [None]:
# pandas lets us easily compute summary statistics on a dataframe ... 
titanic.mean()


In [None]:
# ... or the full distribution for a variable
# this counts how many instances there are for each unique value in a column
titanic['Pclass'].value_counts()


In [None]:
# Plots are nicely handled as well
titanic['Age'].plot(kind= 'hist')
_ = plt.xlabel('Age')


In [None]:
# How much memory does a dataframe uses?
titanic.info()
print('On disk, the memory usage is 59 KB')


In [None]:
# let's try another one
df_pandas = pd.read_csv(os.path.join('data','nycflights','1990.csv'))

df_pandas.info()
print('On disk, the memory usage is around 22 MB')


The RAM footprint of a pandas dataframe can be higher than the space it occupies on disk! So always check in python (rather than within file explorer) to make sure the dataset will fit in memory.

# Dask dataframes

At times, we might have lots of data (think Terabytes). We already saw how `dask.delayed` can be applied to custom algorithms. When we are operating on dataframe-like data, though, it is more convenient to use dask's out-of-the-box tool `dask.dataframe` instead. It is particularly useful to work with data that doesn't fit in the RAM (although we won't work with data this big today).

A Dask dataframe is basically a collection of pandas dataframe. These dataframes are called partitions, or chunks. The split happens along the index, not along the columns.  A single method call on a Dask DataFrame consists of multiple calls on pandas DataFrame, which then are intelligently combined together by Dask to obtain the final result.

Since the parallelism happens by partition, this means that only one partition at a time has to fully fit into the RAM. Once Dask is done processing a single partition, it can release the data from memory and load the next partition. This is why Dask allows us to work with datasets that are much larger than memory, as long as each partition (a regular pandas DataFrame) fits in memory.

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" width="40%">

Source image: Dask tutorial.

## Creating a dask dataframe

In theory, we can create a dask dataframe from a pandas dataframe. But if we can load the full dataframe with pandas, we don't really need to use Dask!

There are two main way of creating a dask dataframe (both of them use the function `dask.dataframe.read_csv``, only with slightly different arguments. One way consists of creating one dask dataframe from multiple files on disk, the other way consists of reading one large file on disk in a 'chunk-by-chunk' way.

### Dask reading multiple files

At times, it can be that the pandas dataframe is already organised into multiple files (e.g. by year): each of these files fit into memory, but taken together they are too large. In this case we can create a dask dataframe where each partition corresponds to one of the files. 

In [None]:
import dask.dataframe as dd 
import os
filenames= os.path.join('data','nycflights','*.csv')
#*.csv means collect all files whose name follow that pattern. 
# Look into the folder data/nycflights: can you spot the pattern?
df = dd.read_csv(filenames, parse_dates={'Date': [0, 1, 2]})


What just happened?

*    Dask investigated the input path and found that there are 10 matching files
*    Dask created a chunk (or partition) for each file

But what's the object it created?



In [None]:
# it's kind of empty, really! Because dask it's lazy: it knows where the data is and it's
# read it to load it, but it won't actually load it until it absolutely has to!
# However, note that we have information about how many partitions Dask created. 
# In this case it's 10, exactly equal to the number of files that have been read.
df 


In [None]:
# for example, Dask will load the first few rows if we call:
df.head()


In [None]:
# can we compute how many rows are there in the dataframe?
len(df) # this fails


Why did it fail?

The error is as explicit as it can be. It says 'ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.'

But what does it mean? Because of dask's laziness, sometimes it makes certain assumptions that don't hold up to further scrutiny... Specifically, dask will infer the datatypes for each column only from a sample taken from the beginning of the file. It then applies the same datatypes to the rest of the dataframe. But sometimes things change! Imagine for example if a certain piece of data is only collected from 1993 onwards...

When this happens, there is a 'mismatched datatype' and Dask breaks. To make sure this doesn't happen, the ideal solution is to specify the data types directly using the dtype keyword. Always check the schema for your data! What would you expect to have? 

In [None]:
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

# all should be good now!
len(df)

### Dask reading one large file in chunks

What if we have one big file to load?

In [None]:
# We can load one big file in chunks as well
df_from_large = dd.read_csv(os.path.join('data', 'nycflights', '1990.csv'),
                 blocksize = '5MB', #we can specify how large we want each block to be
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

# based on blocksize, dask will decide how many partitions to create. 
# how many are there?
print(f'Number of partitions created: {df_from_large.npartitions}')
print()

# show the (lazy) dataframe
df_from_large


#### Exercise: create dask dataframe partitions from one large data file

In [None]:
# Exercise
# How can we create a dask dataframe where the size of each partition is 10MB?
# How many partitions do we get?
######################
### Your code here ###
######################


## Using dask dataframes

Using dask dataframe is generally very similar to using pandas dataframe! The same functions apply, with few caveats:

- We have to call 'compute' to get the final results, since that's what prompts the task graph to actually execute. Note that execution includes loading the data from scratch every time.
- Not all pandas functions are available (yet!)
- Some operations are more expensive than in pandas, with one notable example being 'setting the index'. This is because setting the index involves reshuffling of rows, which means that all partitions have to talk to each other. However, it's much easier for communication to happen WITHIN a partition that BETWEEN partitions.


### Exercise: query the dask dataframe

In [None]:
# For example, how would you check whether there is an equal distribution of flight leaving
# on any given day of the week? That is, how many flights have left on different days of 
# the week? 
# (hint: there is a column called 'DayOfWeek'. Also, remember the method '.value_counts'?)
######################
### Your code here ###
######################


In [None]:
# What if we wanted to know how many flights got cancelled?
# hint: there is a column called 'Cancelled' that is a boolean value
######################
### Your code here ###
######################

# yep, same function as pandas + compute!


In [None]:
# Just out of curiosity, let's see what the computation graph looks like...
df.Cancelled.sum().visualize()


In [None]:
df.DayOfWeek.value_counts().visualize()


## Dask-specific functions

We can create our own custom operations on a Dask dataframe. One useful function to know is [map_partitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions).

One can use `map_partitions` to apply a custom function on each partition. The output is the combined results from all partitions.



In [None]:
# Let's say we want to know how many morning flights there are among those that didn't get cancelled 
def get_morning_flights_nb(df):
    # get non cancelled flights
    df_subset = df[~df.Cancelled]
    # returns which flight left before 12:00pm
    return (df_subset.DepTime<1200)

df.map_partitions(get_morning_flights_nb).sum().compute()


### Exercise: apply 'map_partitions'

In [None]:
# What if we wanted to know how many morning flights there are among those that didn't get cancelled 
# and that left from JFK?
# hint: check out the possible values in the column 'Origin' (JFK, EWR, LGA)
df.Origin.value_counts().compute()

######################
### Your code here ###
######################




## Questions?