# What is Dask?

Dask provides multi-core and distributed parallel execution on larger-than-memory datasets.

- process data that doesn't fit into memory by breaking it into blocks and specifying task chains
- parallelize execution of tasks across cores and even nodes of a cluster
- move computation to the data rather than the other way around, to minimize communication overheads

## Prereqs

This notebook assumes that you have the *Python 3.6* version of anaconda installed:
https://www.continuum.io/downloads

> Anaconda provides the majority of packages necessary for dask to run

Libraries needed for dask:
```
# Latest dask & distributed scheduler
python -m pip install dask[complete]
```

In addition you'll need to install `graphviz` installed to view the computation graphs generated by *dask*.

```
# install binaries on mac
brew install graphviz

# install related python library
python -m pip install graphviz
```

## What We'll Cover

Dask fundamentals

- Dask Task graphs
- Dask Collections
- Running Distributed



## Task graphs

Using code very similar to standard python, dask builds up a task graph for the operations you want to perform.

This task graph is then used by the schedular to determine how to parallelize the computation.

This task graph can be viewed using the delayed object's, `.visualize()` method.
Calling the `.visualize()` method does not calculate the result, only allows you to verify the computed task graph.

> If graphviz is installed

In [None]:
total.visualize()

## Why delay?

Delayed operations allow dask to specify *how* a collection of operations should be executed. 
This *calculation specification* can then be split up and distributed across a cluster of cores or even machines.

> This is similar how Spark or Airflow execute a DAG.

## delayed functions and standard operators

Supported operations include arithmetic operators, item or slice selection, attribute access and method calls - essentially anything that could be phrased as a lambda expression.

> Operations which are not supported include mutation, setter methods, iteration (for) and bool (predicate).


## Dask Task Graphs

For all dask objects, the resulting calculated task graph is accessible via the `.dask` attribute on the resulting object.

In [None]:
# total object from obove
total.dask

## Dask Collections

Dask provides a number of objects or data types in the form of collections.
Using these collections dask supports, functional, numpy and pandas style syntax.

- Bag
    - Simple object good for functional style programming supporting map, filter, and other pyspark style operations. 
- Dataframe
   - Mimcs pandas dataframe with some limitations
- array
   - Mimics numpy arrays, allows for more dimensions than the dataframe object
   
## Dask task schedulers

Dask includes a number of task schedulers, but in general the *distributed* scheduler is recommended for all usage, as it allows you to parallize tasks, not only across cores, but machines as well.

## Dask distributed clusters

![Dask Dataframe](./images/dask-distributed-architecture.png)


A dask cluster consists of a scheduler and 1 or more workers.
To interface with a cluster, you create a client object with the approriate connection information.

By default, if connection information is not supplied to the `Client()` object, a scheduler will be spun up locally and x workers attached.


## Dask Dataframe

The Dataframe collection type mimics the pandas Dataframe API to provide an easy way to manipulate large datasets using the pandas API.

![Dask Dataframe](./images/dask-dataframe-structure.png)



In [None]:
%%time
import os
import pandas as pd

# taxi data available at:
# http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml
directory = os.path.abspath('./data/nyctaxi')
taxi_data_files = [os.path.join(directory, f) for f in os.listdir(directory)]

# pandas makes it easy to analyze data IF it is in a single data frame
# -- attempt to create a huge dataframe to get the sum of passengers
dataframes = []
for filepath in taxi_data_files:
    df = pd.read_csv(filepath, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
    dataframes.append(df)
    
# make 1 big dataframe~!! (6.8GB)
pandas_big_df = pd.concat(dataframes)

result = pandas_big_df.passenger_count.sum()
# howver, you may hit memory errors OR, wait a long time.... ~10 minutes

In [None]:
# Yes, there are workarounds for memory issues....

# load each file and process sequentially... and sum externally to pandas
passanger_total = 0
for filepath in taxi_data_files:
    df = pd.read_csv(f, nrows=5, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
    passenger_total += df.passenger_count.sum()

print(passenger_total)



In [None]:
%%time
# you could even parallelize this a bit using a thread or processing pool
from concurrent import futures

def calculate_csv_column_sum(args):    
    filepath, column_name = args
    df = pd.read_csv(filepath, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
    return df.passenger_count.sum()

args = [(filepath, 'passenger_count') for filepath in taxi_data_files]
with futures.ThreadPoolExecutor() as executor:
    result = sum(result for result in executor.map(calculate_csv_column_sum, args))

print(result)                 


# ... but what if you want to do something more complex?
# > Compute average trip distance grouped by passenger count

## With Dask Dataframe

In [None]:
# start local schedular and local workers
from dask.distributed import Client

# start client with:
# dask-scheduler from the command line
client = Client(ENTER_IPPORT_HERE)  # if IP:PORT not defined it will start a scheduler
client

In [None]:
%%time
import os
import dask.dataframe as dd

directory = '/tmp/nyctaxi'
files_for_analysis = os.path.join(directory, '*.csv')  

df = dd.read_csv(files_for_analysis, parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])
client.persist(df)  # persist data to cluster

# let's get that passenager count!
operation = df.passenger_count.sum()
result = operation.calculate()

In [None]:
# .dask is where the computed Task Graph is stored
operation.dask

# .visualize() can be used to visuallze the calculated task graph.
#... however when to big you may be be able to display without adjusting your notebook output IO settings.
operation.visualize()

In [None]:
%%time

operation.compute()

In [None]:
# how about something more complex

# Compute average trip distance grouped by passenger count
operation = df.groupby(df.passenger_count).trip_distance.mean()

In [None]:
%%time
operation.compute()