# K8s Dask notebook

# 1️⃣ Import and access the cluster

In [1]:
from dask_kubernetes import HelmCluster
cluster = HelmCluster(release_name="mydask")
cluster

☝️ Look at how much RAM and Thread you have access to! 🏋️‍♀️

# 2️⃣ Plug the cluster to the client

Run the code below 👇 to add the **cluster** to the client. You will see some warnings about mismatched versions ⚠️. This is okay provided that it is not a library we are trying to use when we distribute our code!

In [2]:
from dask.distributed import Client

client = Client(cluster)

### 🧪 Testing our cluster

You can even scale the number of worker pods from inside dask!

Let's start by scaling the cluster down to **1**

In [3]:
cluster.scale(1)

Run this random super-heavy computation on dataframe of size **2.5 billion** and check your Status on Dask dashboard!

In [6]:
%%time
import dask.array as da

axis_size = 50_000
chunk = 5_000

x = da.random.random((axis_size, axis_size), chunks=(chunk, chunk))
y = x + x.T
z = y[::2, axis_size/2:].mean(axis=1)

z.compute()

☝️ You should see 3 thread running in parallel on your dashboard !

In [7]:
# Now let's increase it to its max level (6), then re-run cell above!
cluster.scale(6)

☝️ 18 thread should run in parallel to accelerate this CPU-bound task!

# 3️⃣ Utilising our power for real use case💪

🎯 Our goal is to get all of the summary statistics (means, std...) from the yellow taxi dataset, for the two years `2020-2021`. 

This real dataset comprises of `60` million rows, each with `19` columns!

It is split in `24` monthly parquet files

## 3.1) One file / one month

In [3]:
import pandas as pd

In [4]:
%%time
df = pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet")

In [5]:
df

In [6]:
%%time
description = df.describe()

In [7]:
round(description, 2)

## 3.2) 24 months at once

Now you can see how long it takes us to access one of the **twenty four** parquet files we need!

### 3.2.1) Pure pandas

Here is the code we used for **naive** pandas version. 

In [18]:
monthly_statistics = []

In [19]:
%%time 
for i in range(1,13):
    if i < 10:
        i = f"0{i}"
    print(f"downloading 2020-{i}...")
    df = pd.read_parquet(f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-{i}.parquet")
    monthly_statistics.append(df.describe())
    print(f"downloading 2021-{i}...")
    df = pd.read_parquet(f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-{i}.parquet")
    monthly_statistics.append(df.describe())

In [26]:
# We computed our 24 statistics in a single thread...but it took ages!
len(monthly_statistics)

Computing 2-year averages is now very quick

In [26]:
%%time
global_means = sum([statistic.loc["mean",:] for statistic in monthly_statistics])/len(monthly_statistics)
global_means

### 3.2.3) DASK 💪

In [8]:
import pandas as pd
import dask

Then we decorate the functions we want to distrbute with `@dask.delayed`, here we have created two seperate tasks to help you distinguish them on the dashboard!

In [21]:
@dask.delayed
def twenty_twenty_monthly_describe(i: int):
    if i < 10:
        i = f"0{i}"
    df = pd.read_parquet(f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2020-{i}.parquet")
    return df.describe()

@dask.delayed
def twenty_twenty_one_monthly_describe(i: int):
    if i < 10:
        i = f"0{i}"
    df = pd.read_parquet(f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-{i}.parquet")
    return df.describe()

We then create a similar list of 24 monthly description

In [22]:
monthly_data = []

for i in range(1,13):
    monthly_data.append(twenty_twenty_monthly_describe(i))
    monthly_data.append(twenty_twenty_one_monthly_describe(i))

In [23]:
monthly_data

☝️ They are all these delayed objects, **dask** is lazy until we call **compute()** and then works out the result of these objects!

👉 Run this and watch the dashboard and see dask tear through this task!

In [32]:
%%time
monthly_statistics = dask.compute(monthly_data)[0][0]

In [35]:
len(monthly_statistics)

In [36]:
%%time
global_means = sum([statistic.loc["mean",:] for statistic in monthly_statistics])/len(monthly_statistics)
global_means

❓ **Dig inside one specific task profile, and try to figure out the share of time spend on I/O tasks (downloading parquets) vs. CPU tasks**

<img src="https://wagon-public-datasets.s3.amazonaws.com/data-engineering/W3D3-processing/dask/task-dashboard.png">

# 🚨 If you want to delete your cluster now

```bash
gcloud container clusters delete mydaskcluster --zone=europe-west-1b
```

### Otherwise, you can play with other Dask-made introduction notebooks by following step 4️⃣ in the README.md