In [None]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time

x = inc(1)
y = inc(2)
z = add(x, y)

## Parallelize with the `dask.delayed`

In [None]:
from dask import delayed

@delayed
def inc_d(x):
    sleep(1)
    return x + 1

@delayed
def add_d(x, y):
    sleep(1)
    return x + y

In [None]:
%%time

x = inc_d(1)
y = inc_d(2)
z = add_d(x, y)

In [None]:
# z.visualize()

In [None]:
%%time

z.compute()

## Parallelizing a `for` loop

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)

In [None]:
%%time
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)

total.compute()

In [None]:
# total.visualize()

# Dask DataFrame

In [None]:
# Download the NYC Flights dataset to our workspace
import urllib

print("- Downloading NYC Fligths dataset... ", end='', flush=True)
url = 'https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz'
filename, headers = urllib.request.urlretrieve(url, 'nycflights.tar.gz')
print('Done!', flush=True)

In [None]:
import tarfile

# Extract the .csv files from the tar file
with tarfile.open(filename, mode='r:gz') as flights:
    flights.extractall('data/')

In [None]:
import os
import dask.dataframe as dd

df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'), parse_dates={'Date': [0, 1, 2]})

df

In [None]:
df.head()

In [None]:
# df.tail()   # ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`

In [None]:
# For columns with mixed dtypes we have to infer the correct type
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                    parse_dates={'Date': [0, 1, 2]},
                    dtype={'TailNum': str, 'CRSElapsedTime': float, 'Cancelled': bool}
)

In [None]:
df.tail()

## Computations with `dask.dataframe`

In [None]:
%time df.DepDelay.max().compute()

In [None]:
# df.DepDelay.max().visualize(rankdir='LR', size='12, 12!')

## Machine Learning with Dask

In [None]:
# You can grab columns from the Dask DataFrame just as you would with Pandas
df_train = df[['CRSDepTime', 'CRSArrTime', 'Cancelled']]

df_train.shape

In [None]:
# Perform searches and operations on the data
df_train.isnull().sum().compute()