# Lab 6 Reverse Lecture

In this reverse lecture, we'll explore in more details a subset of the functionality of [dask](dask.org), a parallel data processing library for Python. We'll spend the majority of our time on dask's [`DataFrame`](https://docs.dask.org/en/latest/dataframe.html) API, as well as examine concepts such as lazy computation and its relationship with the different schedulers.

We'll follow that by data processing using columnar binary data (Parquet), and end with a brief overview of dask's [`Bag`](https://docs.dask.org/en/latest/bag.html) API, using a JSON data "scraper" as working example.

We close with a list of additional resources for you to learn more about dask, including examples on how to use it for ML tasks. Finally, for the take home portion of this lab, you'll solve two parallel programming assignments, which we detail on the [github repo](http://github.com/mitdbg/datascienceclass/tree/master/lab_6/README.md). 

## Running this notebook

To execute code from a cell, you can either click `Run` at the top, or type `shift+Enter` after clicking a cell.  You can either run the entire notebook (`Restart & Run All` from the `Kernel` drop-down), or run each cell individually.  If you choose the latter, note that it is important that you run cells in order, as later cells depend on earlier ones. And to be able to `Run All` successfully, you'll have to write code for answering some of the questions in the notebook.

Once you open your notebook on the browser, and check that the cells are rendering correctly (e.g., try running the "Python packages" cell below), we're good to go from there.

## Python packages we'll need

First, import the python packages we'll be using for the lab:

In [None]:
import time
from time import perf_counter

# Data processing.
import json
import numpy as np
import pandas as pd

# Q6: For loading multiple CSVs into a single (pandas) dataframe.
import glob
import os

# Plotting.
import matplotlib.pyplot as plt
%matplotlib notebook
import seaborn as sns
sns.set_style('whitegrid')

# "Vanilla" python parallelism.
import multiprocessing

# Scalable data analytics: dask.
import dask
import dask.bag as db
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import graphviz

# Unused: scalable data analytics using Spark.
#from pyspark.sql import SparkSession

# For GC large pandas dataframes after use.
import gc

# Ignore warnings.
import warnings
warnings.simplefilter("ignore")

## Part 1: dask

Dask is a python library aimed at parallel computation. It comprises two main components: a task scheduler, and a collection of data structures suited for different parallel programming tasks. Take a look at the [main page of their documentation](https://docs.dask.org/en/latest/) for more details.

In this part of the reverse lecture, we'll introduce you to a subset of these features, as well as cover example tasks using the two different categories of dask schedulers.

Additionally, of special interest to our course is the fact that dask provides an interface that almost exactly matches that of numpy arrays and pandas dataframes. We'll see that this is useful when speeding up some of the data analysis tasks we've covered so far in our course.


### Example pandas vs dask comparison: data loading

Before getting into too many details about the different task schedulers, let's take a look at a simple example runtime comparison between pandas and dask. Specifically, let's load the same spotify dataset from lab 4 in both, and see how long each one takes:

In [None]:
%%time
# load dataset using pandas:
df = pd.read_csv('data/spotify_songs.csv')

In [None]:
%%time
# load dataset using dask:
dd_df = dd.read_csv('data/spotify_songs.csv')

Here we already see a 20x difference, which is impressive. However, this is also because dask does *lazy* computation.  That is, dask arrays have the required shape and data type, but they still point to data on disk. The loading is *lazy* in the sense that dask will load the array contents into memory (and in small chunks) only when necessary.

We can observe this effect in action when we ask dask to compute an operation that requires inspecting the entire contents of the dataset.

**Q1. Inspect the runtime each of the two dataframes (both pandas and dask) using `describe()`. How does the output in pandas compare to that of dask?**

In [None]:
%%time
# Q1: YOUR CODE GOES HERE (pandas)

In [None]:
%%time
# Q1: YOUR CODE GOES HERE (dask)

**Q2. Clicker question: just by running `describe()` (and nothing else) on both `df` (pandas) and `dd_df` (dask), which of the following is true?**

**a)** both output the same

**b)** pandas output is more informative

**c)** dask output is more informative

So if we want dask to actually perform computations for us, we need to call `compute()`. For example, in the case of `describe()` above:

In [None]:
%%time
dd_df.describe().compute()

We can also inspect the task graph that dask assembles and executes for the computation:

In [None]:
dd_df.describe().visualize()

As we can see above, dask will try and perform as many block computations as possible, only merging and aggregating block results as needed.

**Q3. Clicker question: why do we have 11 "pipelines" in parallel for computing quantiles in the task graph above?**

**a)** This is the number of workers dask is using for the computation.

**b)** This is the number of categorical columns the dataframe has.

**c)** This is the number of numerical columns the dataframe has.

**d)** This is the number of threads dask is using for the computation.

### Task scheduler

Dask provides two categories of task schedulers: *single machine* and *distributed*. From the client setup [documentation](https://docs.dask.org/en/latest/setup.html?highlight=client#setup): 

> **Single machine scheduler:** This scheduler provides basic features on a local process or thread pool. This scheduler was made first and is the default. It is simple and cheap to use. It can only be used on a single machine and does not scale.
>
> **Distributed scheduler:** This scheduler is more sophisticated. It offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster.

If you load data into a dask dataframe, and run some computation on it, when you call `compute()` to start the computation it will use the *single machine scheduler*. For example, the sum over `acousticness` below:

In [None]:
%%time
# sum values for acousticness column using pandas
df.acousticness.sum()

In [None]:
%%time
# sum values for acousticness column using dask.
# This computation uses the single machine scheduler:
dd_df.acousticness.sum().compute()

To use the *distributed scheduler* (even if single node on the same machine), you need to explicitly create a dask [`Client`](https://docs.dask.org/en/latest/setup.html?highlight=client#setup).  Hence, starting a dask `Client` is optional. We'll go through this step, however, as it provides an example of how to setup a [`LocalCluster`](https://distributed.dask.org/en/latest/local-cluster.html). Creating a dask client will also provide a dashboard that we can use to monitor the dask workers while they're computing results for us.

There are several ways to create a dask client that connects to a `LocalCluster`. We'll do each step explicitly, although if you create a `Client` without specifying which cluster to connect to, by default dask will create a task scheduler associated to a `LocalCluster` instance.

First, we check how many cores we have in our server:

In [None]:
n_cores = multiprocessing.cpu_count()
print('number of cores we have: ', n_cores)

We'll use that to size the number of workers for our `LocalCluster` instance, and create a `Client` connected to it:

In [None]:
# multithreaded:
# cluster = LocalCluster(n_workers=1, processes=False, threads_per_worker=4)
# multiprocessing:
# cluster = LocalCluster(n_workers=n_cores, processes=True)

# If we start out the dask-scheduler from CLI on the Docker container using:
# $ dask-scheduler --host 0.0.0.0 --dashboard-address 8787
#
# Then we specify the address for the client explicitly:
# client = Client(address='0.0.0.0:8786')

# If we start out the dask-scheduler from this jupyter notebook, then need
# to set "ip=None" for the status dashboard to work correctly via Docker.
# See:
# https://github.com/dask/distributed/issues/1875#issuecomment-387519880
cluster = LocalCluster(ip=None, n_workers=n_cores, processes=True)
client = Client(cluster)
client

In [None]:
# Info on each worker, together with how many threads per worker.
client.ncores()

# To restart the client and scheduler
#client.restart()
#
# To shutdown the client and scheduler
#client.shutdown()

We can scale the cluster down:

In [None]:
n_workers = n_cores / 2
print('Scale cluster down: ')
cluster.scale(n_workers)

print('\nWait a bit for scaling to take effect...')
time.sleep(1)

print('\nCluster workers:')
client.ncores()

And scale it back up again:

In [None]:
n_workers = n_cores
print('Scale cluster up again: ')
cluster.scale(n_workers)

print('\nWait a bit for scaling to take effect...')
time.sleep(1)

print('\nCluster workers:')
client.ncores()

**Q4. Measure the time to compute the total sum of `acousticness` for `dd_df` on a local cluster with varying number of workers. Start with 1 core, and double number of cores all the way up to 8, or however many cores you have available (if less than 8). Take a look at our usage of `scale()` above for how to scale your cluster up or down, and at [`time.perf_counter`](https://docs.python.org/3/library/time.html#time.perf_counter) to measure elapsed time.** 

**Compare it against the time you get from using pandas. Was dask faster or slower than pandas?**

In [None]:
# Q4: YOUR CODE GOES HERE.

As we can see above, the Spotify 218k songs dataset is not large enough to justify the additional scheduler overhead from dask. Specifically, on datasets that comfortably fit within available memory, pandas is expected to be faster than dask for most operations other than loading from disk. Take a look at ["Best Practices"](https://docs.dask.org/en/latest/dataframe-best-practices.html) section of dask documentation for more details.

So let's use a larger dataset instead. We can use dask's `demo` package to create a synthetic timeseries dataset:

In [None]:
# This lazily creates a timeseries dataset for us with around 7.6M rows.
dd_df = dd.demo.make_timeseries(start='2018-01-01',
                                end='2018-03-30',
                                # Use the one above; I'll use this larger one in class.
                                #start='2008-01-01',
                                #end='2018-03-30',
                                dtypes={'x': float, 'y': float, 'id': int},
                                freq='1s',
                                partition_freq='24h')

dd_df

As we've learned so far, dask computations are lazy up until we either explicitly call `compute()`, or call computations that themselves call `compute()`.

This is why creating a large dataset above is almost instant. When we call `make_timeseries()`, calculates the number of partitions required for the parallel computation (we asked for one partition every 24h on that date range). It will only perform the actual computation required to create the dataset or brings required data chunks into memory once we actually need it to.

We can see below that dask uses lazy computations even for the shape attribute:

In [None]:
print('pandas shape: ', df.shape)
print('dask shape: ', dd_df.shape)

We can retrieve the actual shape by again calling `compute()`, or by calling `len()`, which itself calls `compute()` behind the scenes: 

In [None]:
print('dask shape (compute): ', dd_df.index.compute())
print('dask shape (compute): ', len(dd_df.index))

Similarly, computing memory usage for the dataframe, as well as the number of rows in it will both call `compute()` behind the scenes:

In [None]:
dd_df.info(memory_usage='deep')
print('Number of rows: ', len(dd_df))

**Q5. Compare the runtimes of running `head()` and `len(dd_df.index)` on the dask dataframe. Which one was faster? Why do you think that was the case?**

In [None]:
# Q5: YOUR CODE GOES HERE.

Next, let's compare the total runtime of a sequence of operations over the dask and pandas dataframes:

In [None]:
%%time
# Filter and groupby using dask.
dd_df2 = dd_df[dd_df.y > 0]
dd_df3 = dd_df2.groupby('id').x.std()

# Calling head() will also call compute() behind the scenes.
dd_df3.head()

In [None]:
%%time
# This gives us a pandas dataframe from the dask dataframe.
df = dd_df.compute()

# Filter and groupby using pandas.
df2 = df[df.y > 0]
df3 = df2.groupby('id').x.std()

df3.head()

# Garbage collect the pandas dataframes we created.
#
# XXX: pandas seems to be a lot less memory efficient than dask.
# Even with explicit GC, perf monitors still show leaky behavior.
# https://github.com/pandas-dev/pandas/issues?utf8=%E2%9C%93&q=is%3Aissue+leak+
del df, df2, df3
gc.collect()

Operations over the index is where dask usually does a lot better than pandas (see ["Best Practices"](https://docs.dask.org/en/latest/dataframe-best-practices.html)):

In [None]:
%%time
# Dask: rolling standard deviation of 'x' column on 1m windows over the Datetime index.
# Return index of first occurrence of max value out of those rolling 1m stddev.
dd_df.x.rolling('1min').std().loc['2018-01-01':'2018-02-15'].idxmax().compute()

In [None]:
%%time
# Dask: rolling standard deviation of 'x' column on 1m windows over the Datetime index.
# Return index of first occurrence of max value out of those rolling 1m stddev.
df = dd_df.compute()
df.x.rolling('1min').std().loc['2018-01-01':'2018-02-15'].idxmax()

# GC pandas df.
del df
gc.collect()

## Reading and loading data

Let's take a look at processing data in column-oriented binary format using [`Parquet`](https://en.wikipedia.org/wiki/Apache_Parquet). As we've seen in class, Parquet is an open-source column-oriented data storage format. In addition to allowing us to store our dataframe data in a column-wise fashion, it also supports a number of compression techniques (e.g., dictionary encoding, RLE, and bit packing, as we've seen in class).

In this part of the lab, we'll briefly look at how to store and read data written in Parquet format, and compare it to loading the same dataset from CSV.

First, we can save the larger timeseries dataset as both Parquet and CSV format. We'll do this for a smaller subset of the data, as it takes a really long time to save the entire dataset as CSV.

In [None]:
subset_df = dd_df.loc['2018-01-01':'2018-03-30']

# Measure time for saving as Parquet files.
t1_start = perf_counter()
subset_df.to_parquet('data/parquet/')
t1_stop = perf_counter()
print('to_parquet() (ms): %s' % ((t1_stop - t1_start)*1000))

# Measure time for saving as CSV files.
t1_start = perf_counter()
subset_df.to_csv('data/csv/timeseries-*.csv')
t1_stop = perf_counter()
print('to_csv() (ms): %s' % ((t1_stop - t1_start)*1000))

Even though we used dask to write to Parquet, both pandas and dask support writing to / reading from Parquet files. The method interface is the same: [pandas's `read_parquet`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html) vs [dask's `read_parquet`](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.read_parquet). You can also specify which Parquet engine to use.  There are some [`performance differences between both`](https://stackoverflow.com/questions/51361356/a-comparison-between-fastparquet-and-pyarrow), but for the time being we'll use the default (if you're using our Docker container, you have both installed and the default is `fastparquet`).

**Q6. Measure the runtime of reading a dataframe from the same Parquet and CSV data we saved above. Compare using both pandas and dask, as well as fetching only column `x` vs the entire dataset.**

**NOTE: When we saved our data as Parquet and CSV, we saved into multiple files (#files = #dask partitions). While dask supports reading multiple CSV files directly into a dask dataframe, pandas doesn't support the same for pandas dataframes. Take a look at [`glob`](https://docs.python.org/3/library/glob.html) for expanding the `timeseries-*.csv` into multiple files and [`pd.concat`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.concat.html) to create a single pandas dataframe from multiple CSVs.**

In [None]:
# Q6: YOUR CODE GOES HERE.

## Dask bag

Next, we'll look at processing JSON data hosted on the web using the dask [`Bag`](https://docs.dask.org/en/latest/bag.html) data structure ([full API](https://docs.dask.org/en/latest/bag-api.html)). 

There is some overlap between what you can do with bags and dataframes, as we'll see below. However, bags are useful when the data you're analyzing is more naturally represented as Python objects (e.g., dicts) than as tabular data (dataframe). That's often the case with JSON data, which has an almost 1:1 mapping with Python dicts.

From the official documentation:

>Dask Bag implements operations like `map`, `filter`, `groupby` and `aggregations` on collections of Python objects. It does this in parallel and in small memory using Python iterators. It is similar to a parallel version of itertools or a Pythonic version of the PySpark RDD.


### Example: using Bag to process JSON data

Below we'll use events JSON data from a web service that runs Jupyter notebooks called [mybinder.org](http://mybinder.org). Every time a user launches a notebook on platforms such as GitHub or GitLab, mybinder publishes an event and stores it in publicly accessible JSON files (one per day).

For example, we can look at the first 5 events published this past Monday by running:

In [None]:
%%time
db.read_text('https://archive.analytics.mybinder.org/events-2019-10-28.jsonl').take(5)

[`mybinder.org`](mybinder.org) also publishes an index containing all other JSON files that they're currently hosting:

In [None]:
# Load index JSON file, inspect its contents.
db.read_text('https://archive.analytics.mybinder.org/index.jsonl').map(json.loads).take(5)

Using bag's [`pluck`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.pluck), we can filter out for only the name attributes. We'll use that to retrieve a list of URLs of the index contents:

In [None]:
urls = (db.read_text('https://archive.analytics.mybinder.org/index.jsonl')
                    .map(json.loads)
                    .pluck('name')
                    .compute())
urls = ['https://archive.analytics.mybinder.org/' + u for u in urls]
urls[:3]

Using a bag on the list of urls, we can automatically parse the JSON data into Python dict:

In [None]:
notebook_runs = db.read_text(urls).map(json.loads)
notebook_runs.take(2)

**Q7. Using the Python dict data we just saved above in `notebook_runs`, filter it for runs whose provider was "GitHub", and that happened in September 2019. What were the top-3 most run notebooks in that month, and how many times were they run?**

**To answer this question, you have two options. You can either use only `Bag` functions, such as [`filter`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.filter) and [`frequencies`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.frequencies) (take a look at some [usage examples here](https://examples.dask.org/bag.html)). Alternatively, you can also use `Bag`'s [`to_dataframe`](https://docs.dask.org/en/latest/bag-api.html#dask.bag.Bag.to_dataframe) and do your processing as dataframe-style computations.**

In [None]:
# Q7: YOUR CODE GOES HERE.

# Additional Resources

## Dask examples and tutorials

* Dask examples: https://examples.dask.org/
* Dask for ML tasks (e.g., incremental learning and hyperparameter tuning): https://ml.dask.org/examples.html
* YouTube playlist with introductory Dask concepts: https://www.youtube.com/playlist?list=PLTgRMOcmRb3OlkfAdqJWyGGrQM7eU-mi7

## Dask benchmark codes
* On a 512 core server: https://matthewrocklin.com/blog/work/2017/07/03/scaling
* On GCS: https://gist.github.com/mrocklin/4c198b13e92f881161ef175810c7f6bc#file-scaling-gcs-ipynb
