## Dask

### What is dask?
A package in python that does __distributed programming__. It is useful as a higher level interface for doing complicated distributed programming in an intuitive way. Distributed programming is useful when you don't have the ability to read things entirely into memory, and so of greate use when we have large datasets.

It is similar in function to __Apache Spark__ or the `pyspark` package for Python.

Before we learned how to manually parallelize operations, but in `dask` it will more or less be done automatically. A great advantage is that the interface mimics `pandas` and `numpy` and so can be used easily.

### Delayed tasks

One of the first handy things that `dask` can do is do functions in a batch using the `delayed` framework

Before doing that, we need to initiate a client for us to work with:

In [None]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=2, n_workers=4)
client

Say that we wanted to run this sequence of functions:

In [None]:
import time
import random
import pandas as pd

def inc(x):
    time.sleep(1)
    return x + 1

def dec(x):
    time.sleep(1)
    return x - 1

def add(x, y):
    time.sleep(1)
    return x + y

In [None]:
%%time
x = inc(1)
y = dec(2)
z = add(x, y)
z 

Why did it take so long? It turns out that calling these functions takes quite a bit of time. Notice, however, that the function `inc` and the function `dec` could be called independently! You only need to add them at the end.

The power of the dask delayed framework is its ability to lay out the best way to execute it before it needs to be done:

In [None]:
import dask
inc_d = dask.delayed(inc)
dec_d = dask.delayed(dec)
add_d = dask.delayed(add)

In [None]:
x = inc_d(1)
y = dec_d(2)
z = add_d(x, y)
z

That didn't take any time at all. Why? Because it hasn't run it yet.

In [None]:
z.visualize(rankdir='LR')

In [None]:
%%time
z.compute()

In [None]:
zs_s = []
for i in range(20):
    x = inc(i)
    y = dec(x)
    z = add(x, y)
    zs_s.append(z)

In [None]:
zs_f = []
for i in range(20):
    x = inc_d(i)
    y = dec_d(x)
    z = add_d(x, y)
    zs_f.append(z)

In [None]:
zs = dask.compute(*zs_f)

Using Dask makes parallelizing quite easy!

## Distributed Programming

### Dask Dataframe

One of the most useful implementations of `dask` is using the `dataframe` API, which mimicks an actual DataFrame.

For this example and your homework, we will look at a bilateral trade flows dataset called BACI. We will be analyzing the years 2012-2018.

In [None]:
df18 = pd.read_csv("P:trade_data/BACI_HS12_Y2018_V202001.csv")
df18

Each row represents the trade flows from country $i$ to country $j$ for product $k$ in time $t$ in terms of either value or quantity.

Since we have 8 million rows of data in one year, this means we can expect about 56 million rows in the final dataset. This will be hard to work with all in memory at the same time!

Some options:
1. Chunk all our operations and run on each file.
2. Use the `dask` `dataframe` API framework.

We got a sense for how to use option 1, so now let's go to option 2.

In [None]:
import dask.dataframe as dd

In [None]:
df = dd.read_csv("P:trade_data/BACI_HS12_Y*.csv")
df

Notice I used read_csv to find __all__ the files in that directory and read them into one structure.

In [None]:
import glob
glob.glob("P:trade_data/BACI_HS12_Y*.csv")

But why did it give me 42 partitions if I only have 7 csv files?

In [None]:
df.map_partitions(len).compute().plot(kind='bar')

So all the partitions are about the same number of obs.

In [None]:
def mem_usage(x):
    return x.memory_usage().sum()

df.map_partitions(mem_usage).compute().plot(kind='bar')

And about the same memory usage (around 70-80 mb per partition).

__Rule of thumb__: partitions should be around 100 mb each.

Here `dask` tries to do it automatically.

My strategic use of the `map_partitions` will map a function to each partition.

All 7 of these files now exist as "partitions" in this dataframe object. Now when we do an operation it will be "distributed" to all the partitions.

So now the advantage of this framework is that we can run operations on all the data but `dask` does the chunking and parallelizing for us!

__Advantage__: no need to do everything in memory!

__Disadvantage__: will be slower than in-memory.

Thus only an advantage when your data will not fit in memory.

#### How it works:
- Run as many operations as you need to on the symbolic dataframe object.
- Call "compute" when you actually need to see the results.

In [None]:
df

Note that the read_csv function will have to run 42 times in the current flow.

What happens if we add an operation?

In [None]:
df.loc[df.t==2018]

So it added a couple more tasks after I did a slicing.

Let's add one more:

In [None]:
df.loc[df.t==2018].groupby("k")['q'].mean()

So taking a group average for the year 2018 takes 267 tasks.

When we actually need the results, we can run `compute`:

In [None]:
df.loc[df.t==2018].groupby("k")['q'].mean().compute()

So now we can calculate a useful summary statistic of the entire dataset without having to read the entire dataset!

#### Persisting memory

In case you want one set of results in memory, you can use the `persist` function to put your results in memory.

Suppose I want to persist the product with the code 10121:

In [None]:
df10121 = df[df.k==10121].persist()

In [None]:
df10121

Now it will apply all these changes in the background so we can keep working. 
 
Notice that it's still not a real "dataframe" yet. This is because its keeping the results distributed across the nodes. We can keep calling more functions on this flow and when it's done it will finish much quicker.

The downside of this is that it has transferred some of this (but not all of it) into the distributed RAM.

The upside here is that it can be called very quickly:

In [None]:
df10121.compute()

So what is the difference between `persist` and `compute`?

The command `persist` puts it into distributed memory (each worker holding a part of the data) so there are still as many partitions as before. The command `compute` puts it into one python object in your environment. 

__Why we use `persist`__:
- We want to call something into the distributed memory to calculate with `compute`.
- We need it to run asynchronously.
- We need to "set up" a computation that we will eventually compute.
    - Essentially saves us from having to wait a very long time

### Merging

Doing merges can be accomplished the usual way:

In [None]:
products = pd.read_csv("P:trade_data/product_codes_HS12_V202001.csv")
products = products.rename(columns={"code":"k"})

In [None]:
products

In [None]:
df_wdes = df.merge(products)

So now we've merged in a text description of our product codes

In [None]:
countries = pd.read_csv("P:trade_data/country_codes_V202001.csv",encoding = "ISO-8859-1")
countries = countries.rename(columns={"country_code":"i","country_name_full":"name_i"})
df_wdes = df_wdes.merge(countries[['i','name_i']])

countries = pd.read_csv("P:trade_data/country_codes_V202001.csv",encoding = "ISO-8859-1")
countries = countries.rename(columns={"country_code":"j","country_name_full":"name_j"})
df_wdes = df_wdes.merge(countries[['j','name_j']])

In [None]:
df_wdes.head()

### Which dataframe operations the dask API supports:

__Element by element operations__

In [None]:
df['avg_price'] = df['v']/df['q']

Since its delaying the operations, it won't run until I call `compute`, though notice that it just added a column to this dataset.

In [None]:
df

In [None]:
df.head()

__Selecting and slicing__

In [None]:
avg18 = df.loc[df.t==2018]['avg_price']

Now call compute to actually make a DataFrame:

In [None]:
ex = avg18.compute()

In [None]:
ex

__Statistical functions__

In [None]:
avg18.mean().compute()

In [None]:
avg18.var().compute()

__Typical pandas functionality__

In [None]:
df['k'].value_counts().compute()

In [None]:
df['q'].nlargest(10).compute()

In this case, `nlargest` returns the n largest elements of a column.

It also does groupby in parallel:

In [None]:
df.groupby("i")['v'].sum().compute()

You'll notice another advantage of this framework is that you can do standard `pandas` operations but in parallel.

## Dask Bags
This works with dataframes well, but about with python objects?

This is what `bags` are for. Bags essentially hold a list of python objects which can be parsed with any function necessary. 

This is handier for something like text analysis or image analysis where our objects are unstructured.

In [None]:
import dask.bag as db
import json

doge = db.read_text('../../Data/Text/doge_tweets*.json')

In [None]:
doge.take(1)

So now we've extracted a tweet! We could apply a function so that they become json objects:

To take each one individually, have to do some parsing of the strings so I wrote this function:

In [None]:
def make_json(x):
    if x[0] =="[":
        extract = x[1:-2]
    elif x[-1] =="]":
        extract = x[:-1]
    else:
        extract = x[:-2]
    return json.loads(extract)

In [None]:
doge = doge.map(make_json)

So now what if we wanted to extract all of the timestamps from the data?

In [None]:
times = doge.map(lambda x: x['created_at'])

In [None]:
time_stamps = times.compute()

In [None]:
pd.Series(time_stamps)

We could also write a function that processes the data in some way:

In [None]:
import re

In [None]:
find_hashtags = re.compile("#\w+")

In [None]:
def get_hashtags(x):
    text = x['text']
    return find_hashtags.findall(text)

In [None]:
doge_hash = doge.map(get_hashtags)

In [None]:
res = doge_hash.compute()

In [None]:
res[:10]

In [None]:
def flatten_list(_2d_list):
    flat_list = []
    # Iterate through the outer list
    for element in _2d_list:
        if type(element) is list:
            # If the element is of type list, iterate through the sublist
            for item in element:
                flat_list.append(item)
        else:
            flat_list.append(element)
    return flat_list

In [None]:
hash_list = flatten_list(doge_hash)

In [None]:
pd.Series(hash_list).value_counts()

We can also filter our tweets accordingly:

In [None]:
doge.filter(lambda x: pd.to_datetime(x['created_at']) > pd.to_datetime('2021-02-04T15:00:00-05:00')).take(1)

#### Takeaway message: we can process any kind of data with dask without having to put anything in memory.

In this case, we could be reading text using dask, processing it in parallel, and writing it out each time.

When could we apply this?

1. In Homework 1, we needed to read large amounts of tweets in. Rather than read them all in at once, we could have filtered them as json files using a `dask` "bag."
    - The Dask `bag` is frequently used in unstructured data processing.
2. In Homework 2, we could have read several .tif files in and process them in a distributed framework. 
    - Very useful if you had several raster files that you need to read in.

### Dask Arrays

Just as there is a dataframe object, there is also a `dask` equivalent to a `numpy` array.

As an example, let's make a giant array:

In [None]:
import dask.array as da
x = da.random.random((80000, 80000), chunks=(1000, 1000))

In [None]:
x

My machine will not read this in since it exceeds the memory usage of my machine.

However, that doesn't mean we can't calculate some basic statistics:

In [None]:
x.mean(axis=0)[::10].compute()

These arrays support nearly every operation that numpy arrays support. Notably, it supports __linear algebra__.

In [None]:
from dask.array import linalg
x = da.random.random((20000, 20000), chunks=(1000, 1000))
x_inv = linalg.inv(x)

In [None]:
x

In [None]:
x_inv[:100,:10].compute()

Being able to take an inverse of an array in parallel with minimal memory is __very powerful__ and __very useful__ in many cases!

One of the uses of dask arrays would be processing high granularity satelite data.

Your workflow might look like:
- Read in a numpy array from a raster.
- Process the numpy array in the desired way (e.g. clipping, transformations).
- Write out the processed file.

Potentially useful for doing linear algebra on very large arrays as well.

## Other Application of Dask
- Machine learning
    - Dask has a package that blends seamlessly with the `sklearn` machine learning packages, which makes processing, cross-validation, and hyperparameter tuning all possible in the `dask` framework.
- Cluster programming
    - One of the best uses of `dask` is working on __multiple computers__, which is what this framework is usually used for.
    - While its useful on a desktop, it is the most powerful when you have multiple clusters on a server you can use.


### Next Topic: Using Clusters and Cloud Computing