# Deploy a dask cluster with AWS cloudformation

## Objectives

Create a deploy a dask cluster using AWS cloud formation (success see file blabla). The current template creates:
- 1 dask-scheduler and 3 dask-workers. 
- A security group that allows a local machine to connect to the cluster (see the other notebook)
- EC2 role and connects this all instances to allow S3 access.
- S3 bucket in which we can store csv files

Things you have to do manually:
- upload the csv files to the s3 bucket
- create a user with programmatic access and configure the aws cli to work with these keys.

### TODO
- scheduler can be t2.micro but I want workers to be larger.
- use parameters to select the instance type for the scheduler and worker.
- To have 3 workers template of one is copied 3 times. It is better to maybe use auto scaling groups and define the number workers

### Blogs to do
- `https://docs.coiled.io/blog/tpch.html`
- `https://medium.com/@shubham27/introduction-to-dask-insights-on-nyc-parking-dataset-using-dask-b34019aa44b`

### CloudFormation Template

execute the cell below if you want to see the entire cloud formation template in the notebook

In [None]:
%load cf-dask-cluster.yaml

You can use this template to create a CloudFormation stack on AWS. 

Once the stack is completed and all EC2 instances are running you can take the public IP of the scheduler and copy this in the code snippet below.

Everything should work smoothly (for me it did)

In [1]:
import dask
from dask.distributed import Client

# copy the public ip of scheduler from the output section of cloudformation
ip = "3.81.208.197"
address = f"tcp://{ip}:8786"
dashboard = f"http://{ip}:8787/status"

print(f"Use the link below to connect to the cluster dashboard:\n{dashboard}")

print(address)
client = Client(address=address)

client

Use the link below to connect to the cluster dashboard:
http://3.81.208.197:8787/status
tcp://3.81.208.197:8786



+-------------+----------+-----------+----------+
| Package     | Client   | Scheduler | Workers  |
+-------------+----------+-----------+----------+
| cloudpickle | 3.0.0    | 3.1.0     | 3.1.0    |
| dask        | 2024.2.1 | 2024.9.1  | 2024.9.1 |
| distributed | 2024.2.1 | 2024.9.1  | 2024.9.1 |
| lz4         | 4.3.2    | 4.3.3     | 4.3.3    |
| msgpack     | 1.0.3    | 1.1.0     | 1.1.0    |
| numpy       | 1.26.4   | 2.1.2     | 2.1.2    |
| toolz       | 0.12.0   | 1.0.0     | 1.0.0    |
+-------------+----------+-----------+----------+


0,1
Connection method: Direct,
Dashboard: http://3.81.208.197:8787/status,

0,1
Comm: tcp://172.31.38.50:8786,Workers: 3
Dashboard: http://172.31.38.50:8787/status,Total threads: 6
Started: 8 minutes ago,Total memory: 11.46 GiB

0,1
Comm: tcp://172.31.88.111:45981,Total threads: 2
Dashboard: http://172.31.88.111:38627/status,Memory: 3.82 GiB
Nanny: tcp://172.31.88.111:44787,
Local directory: /tmp/dask-scratch-space/worker-64iuzyq4,Local directory: /tmp/dask-scratch-space/worker-64iuzyq4
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 142.75 MiB,Spilled bytes: 0 B
Read bytes: 258.1714237600052 B,Write bytes: 1.45 kiB

0,1
Comm: tcp://172.31.88.31:38951,Total threads: 2
Dashboard: http://172.31.88.31:35509/status,Memory: 3.82 GiB
Nanny: tcp://172.31.88.31:40437,
Local directory: /tmp/dask-scratch-space/worker-6mr2b13d,Local directory: /tmp/dask-scratch-space/worker-6mr2b13d
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 141.62 MiB,Spilled bytes: 0 B
Read bytes: 257.96850307763765 B,Write bytes: 1.45 kiB

0,1
Comm: tcp://172.31.95.115:37363,Total threads: 2
Dashboard: http://172.31.95.115:45683/status,Memory: 3.82 GiB
Nanny: tcp://172.31.95.115:38711,
Local directory: /tmp/dask-scratch-space/worker-3laygsxy,Local directory: /tmp/dask-scratch-space/worker-3laygsxy
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 143.36 MiB,Spilled bytes: 0 B
Read bytes: 258.48775346639593 B,Write bytes: 1.45 kiB


In [2]:
import dask.array as da

a_da = da.ones(10, chunks=5)
a_da

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 80 B 40 B Shape (10,) (5,) Dask graph 2 chunks in 1 graph layer Data type float64 numpy.ndarray",10  1,

Unnamed: 0,Array,Chunk
Bytes,80 B,40 B
Shape,"(10,)","(5,)"
Dask graph,2 chunks in 1 graph layer,2 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [3]:
a_da_sum = a_da.sum()
a_da_sum

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 3 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [4]:
a_da_sum.compute()


+-------------+----------+-----------+----------+
| Package     | Client   | Scheduler | Workers  |
+-------------+----------+-----------+----------+
| cloudpickle | 3.0.0    | 3.1.0     | 3.1.0    |
| dask        | 2024.2.1 | 2024.9.1  | 2024.9.1 |
| distributed | 2024.2.1 | 2024.9.1  | 2024.9.1 |
| lz4         | 4.3.2    | 4.3.3     | 4.3.3    |
| msgpack     | 1.0.3    | 1.1.0     | 1.1.0    |
| numpy       | 1.26.4   | 2.1.2     | 2.1.2    |
| toolz       | 0.12.0   | 1.0.0     | 1.0.0    |
+-------------+----------+-----------+----------+


CancelledError: ('sum-aggregate-f94bc8eefef5009aefd1caecd8fe73e1',)

In [94]:
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
xd

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,68.66 MiB
Shape,"(30000, 30000)","(3000, 3000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 6.71 GiB 68.66 MiB Shape (30000, 30000) (3000, 3000) Dask graph 100 chunks in 1 graph layer Data type float64 numpy.ndarray",30000  30000,

Unnamed: 0,Array,Chunk
Bytes,6.71 GiB,68.66 MiB
Shape,"(30000, 30000)","(3000, 3000)"
Dask graph,100 chunks in 1 graph layer,100 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [95]:
%%time
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()

CPU times: total: 0 ns
Wall time: 7.01 s


array([10.00046721, 10.00009501,  9.99963736, ...,  9.99928946,
       10.00008436,  9.99983567])

In [9]:
!aws s3 ls

2024-10-15 14:08:28 cf-templates-v59jdi5oa9wz-us-east-1
2024-10-15 14:10:17 dask-input-data


## Dask dataframes

In this part we point to data in S3.

Source for examples:

`https://tutorial.dask.org/01_dataframe.html`

In [96]:
import dask.dataframe as dd

# Read all CSV files from the root of the bucket
ddf = dd.read_csv("s3://dask-input-data/*.csv", 
                  dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
                  blocksize="25MB" )


ddf

Unnamed: 0_level_0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1
,int64,int64,int64,int64,float64,int64,float64,int64,string,int64,string,float64,float64,float64,float64,float64,string,string,float64,float64,float64,bool,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [11]:
%%time
len(ddf)

CPU times: total: 15.6 ms
Wall time: 5.58 s


2611892

In [12]:
%%time
ddf.head(2)

CPU times: total: 62.5 ms
Wall time: 2.48 s


Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990,1,1,1,1621.0,1540,1747.0,1701,US,33,...,,46.0,41.0,EWR,PIT,319.0,,,False,0
1,1990,1,2,2,1547.0,1540,1700.0,1701,US,33,...,,-1.0,7.0,EWR,PIT,319.0,,,False,0


In [14]:
%%time
result = ddf.DepDelay.max()
result.compute()

CPU times: total: 31.2 ms
Wall time: 4.6 s


np.float64(1435.0)

In [15]:
%%time
len(ddf[~ddf.Cancelled])

CPU times: total: 46.9 ms
Wall time: 7.95 s


2540961

In [16]:
%%time
ddf[~ddf.Cancelled].groupby("Origin")["Origin"].count().compute()

CPU times: total: 31.2 ms
Wall time: 3.34 s


Origin
EWR    1139451
JFK     427243
LGA     974267
Name: Origin, dtype: int64

In [17]:
%%time
ddf.groupby("Origin").DepDelay.mean().compute()

CPU times: total: 62.5 ms
Wall time: 2.93 s


Origin
EWR    10.295469
JFK    10.351299
LGA     7.431142
Name: DepDelay, dtype: float64

In [18]:
%%time
ddf.groupby("DayOfWeek").DepDelay.mean().idxmax().compute()

CPU times: total: 62.5 ms
Wall time: 2.71 s


np.int64(5)

## Sharing Intermediate Results

#### Example 1

In [19]:
non_canceled = ddf[~ddf.Cancelled]
mean_delay = non_canceled.DepDelay.mean()
std_delay = non_canceled.DepDelay.std()

If you compute them with two calls to compute, there is no sharing of intermediate computations.

In [20]:
%%time

mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

CPU times: total: 219 ms
Wall time: 9.8 s


But let’s try by passing both to a single compute call

In [23]:
%%time

mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

CPU times: total: 15.6 ms
Wall time: 3.74 s


#### Example 2

In [26]:
non_cancelled = ddf[~ddf.Cancelled]
ddf_jfk = non_cancelled[non_cancelled.Origin == "JFK"]

In [27]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.sum().compute()

CPU times: total: 109 ms
Wall time: 6.58 s


np.float64(4422520.0)

In [28]:
ddf_jfk = ddf_jfk.persist() 

In [29]:
%%time
ddf_jfk.DepDelay.mean().compute()
ddf_jfk.DepDelay.std().compute()

CPU times: total: 15.6 ms
Wall time: 592 ms


np.float64(31.242509798271147)

## Custom code with Dask DataFrame

In [31]:
import pandas as pd
import dask.dataframe as dd

In [64]:
df1 = pd.DataFrame({'x': [1, 2, 3, 4, 5],
                   'y': [1., 2., 3., 4., 5.]})

ddf = dd.from_pandas(df, npartitions=2)
ddf

Unnamed: 0_level_0,x,y
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1
0,int64,float64
3,...,...
4,...,...


One can use map_partitions to apply a function on each partition. Extra arguments and keywords can optionally be provided, and will be passed to the function after the partition.

In [67]:
def myadd(df1, a, b=1):
    return df.x + df.y + a + b

#using pandas
display(df1.apply(myadd, args=(1,2), axis=1))

res= ddf.map_partitions(myadd, 1, b=2)
print(res.dtype)
res.compute()

Unnamed: 0,0,1,2,3,4
0,5.0,7.0,9.0,11.0,13.0
1,5.0,7.0,9.0,11.0,13.0
2,5.0,7.0,9.0,11.0,13.0
3,5.0,7.0,9.0,11.0,13.0
4,5.0,7.0,9.0,11.0,13.0


float64


0     5.0
1     7.0
2     9.0
3    11.0
4    13.0
0     5.0
1     7.0
2     9.0
3    11.0
4    13.0
dtype: float64

In [54]:
res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))
res.compute()

0     5.0
1     7.0
2     9.0
3    11.0
4    13.0
dtype: float64

In [56]:
res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
res.dtypes

x      int64
y    float64
z    float64
dtype: object

In [97]:
ddf = dd.read_csv("s3://dask-input-data/*.csv", 
                  dtype={"TailNum": str, "CRSElapsedTime": float, "Cancelled": bool},
                  blocksize="25MB" )


In [103]:
%%time
ddfD = ddf[~ddf.Distance.isna()]
dask.compute(len(ddf),len(ddfD))

CPU times: total: 46.9 ms
Wall time: 11.1 s


(2611892, 2610397)

In [99]:
ddfD['Distance']= ddfD.Distance.astype('float64')

In [100]:
def my_custom_converter(df, multiplier=1):
    return df * multiplier

meta = pd.Series(name="Distance", dtype="float64")

distance_km = ddfD.Distance.map_partitions(
    my_custom_converter, multiplier=0.6, meta=meta
)

distance_km.compute()

0         191.4
1         191.4
2         191.4
3         191.4
4         191.4
          ...  
269176    971.4
269177    971.4
269178    971.4
269179    971.4
269180    971.4
Name: Distance, Length: 2610397, dtype: float64

In [89]:
client.close()