# Final project: Dask execution

At this time you are going to adapt your preprocessing code (of the first part) to be distributed via DASK.

We will instantiate 2 different kind of DASK clusters:
- a local one, distributing task on the 2 threads of this jupyterlab instance
- a multi node one, consisting of 6 single thread worker nodes

We will then compare the performances, and, hopefully, observing some improvements

## Multi node cluster

## Initiate a local Dask scheduler on this machine

In [1]:
from distributed import Client
c_local = Client()
c_local

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 3
Total threads: 3,Total memory: 4.39 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38985,Workers: 3
Dashboard: http://127.0.0.1:8787/status,Total threads: 3
Started: Just now,Total memory: 4.39 GiB

0,1
Comm: tcp://127.0.0.1:44563,Total threads: 1
Dashboard: http://127.0.0.1:45221/status,Memory: 1.46 GiB
Nanny: tcp://127.0.0.1:44221,
Local directory: /tmp/dask-worker-space/worker-pg7nvgm1,Local directory: /tmp/dask-worker-space/worker-pg7nvgm1

0,1
Comm: tcp://127.0.0.1:33863,Total threads: 1
Dashboard: http://127.0.0.1:41991/status,Memory: 1.46 GiB
Nanny: tcp://127.0.0.1:34231,
Local directory: /tmp/dask-worker-space/worker-y0ggzl11,Local directory: /tmp/dask-worker-space/worker-y0ggzl11

0,1
Comm: tcp://127.0.0.1:43689,Total threads: 1
Dashboard: http://127.0.0.1:34375/status,Memory: 1.46 GiB
Nanny: tcp://127.0.0.1:40957,
Local directory: /tmp/dask-worker-space/worker-0r_tvmra,Local directory: /tmp/dask-worker-space/worker-0r_tvmra


## Initiate a distributed scheduler with 6 workers

In [2]:
from dask_kubernetes.operator import KubeCluster

username = "mbarbetti"

cluster = KubeCluster(
    name=f'{username}',
    namespace=f'user-{username}', 
    image='dodasts/sosc22:v1',
    n_workers=1,
    worker_command=[
        'dask-worker',
        '--name=$(DASK_WORKER_NAME)',
        '--local-directory=temp_dir', 
        '--nthreads', '1',
        '--death-timeout', '60',
        '--memory-limit','2G'
    ],
    resources={
        "requests": {
            "memory": "2Gi",
            "cpu": "1"
        },
        "limits": {
            "memory": "3Gi",
            "cpu": "1"
        }
    }
)

In [3]:
c_distributed = Client(cluster)
c_distributed


+-------------+----------------+----------------+----------------+
| Package     | Client         | Scheduler      | Workers        |
+-------------+----------------+----------------+----------------+
| dask        | 2022.11.0      | 2022.10.0      | 2022.10.0      |
| distributed | 2022.11.0      | 2022.10.0      | 2022.10.0      |
| lz4         | 4.0.2          | 4.0.0          | 4.0.0          |
| numpy       | 1.23.5         | 1.23.4         | 1.23.4         |
| pandas      | 1.5.2          | 1.5.1          | 1.5.1          |
| python      | 3.10.8.final.0 | 3.10.6.final.0 | 3.10.6.final.0 |
+-------------+----------------+----------------+----------------+


0,1
Connection method: Cluster object,Cluster type: dask_kubernetes.KubeCluster
Dashboard: http://mbarbetti-scheduler.user-mbarbetti:8787/status,

0,1
Dashboard: http://mbarbetti-scheduler.user-mbarbetti:8787/status,Workers: 1
Total threads: 1,Total memory: 1.86 GiB

0,1
Comm: tcp://10.42.49.16:8786,Workers: 1
Dashboard: http://10.42.49.16:8787/status,Total threads: 1
Started: 2 hours ago,Total memory: 1.86 GiB

0,1
Comm: tcp://10.42.175.33:44819,Total threads: 1
Dashboard: http://10.42.175.33:42349/status,Memory: 1.86 GiB
Nanny: tcp://10.42.175.33:43835,
Local directory: /home/jovyan/temp_dir/dask-worker-space/worker-6c0itpp_,Local directory: /home/jovyan/temp_dir/dask-worker-space/worker-6c0itpp_


In [4]:
#cluster.scale(1)
#c_distributed.close()

You can now visit the dashboard of your distributed resources here: `http://${YOUR JHUB USERNAME}-dask.131.154.96.42.myip.cloud.infn.it/status`

In [5]:
# Hint: upload a file (for instance a compiled C lib, cough cough... ) to your cluster via
from subprocess import run as run_shell

libpath = "lib.288931.so"
run_shell(['gcc', '--shared', '-fPIC', '-o', "/tmp/" + libpath, 'src.C'])
c_distributed.upload_file("/tmp/" + libpath)

{'tcp://10.42.175.33:44819': {'status': 'OK'}}

## Ex 1.a:  Distribute butt_filtfilt function on local dask

In [6]:
import pandas as pd

data = pd.read_csv("https://pandora.infn.it/public/806aa1/dl/exoTrain.csv")
cols = data.columns[1:]

In [7]:
data = data[cols[1:]]

In [8]:
data.values.shape

(5087, 3196)

In [9]:
import dask.array as da

data_da = da.from_array(data.values, chunks=(750, 3198))
data_da

Unnamed: 0,Array,Chunk
Bytes,124.04 MiB,18.29 MiB
Shape,"(5087, 3196)","(750, 3196)"
Dask graph,7 chunks in 1 graph layer,7 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 124.04 MiB 18.29 MiB Shape (5087, 3196) (750, 3196) Dask graph 7 chunks in 1 graph layer Data type float64 numpy.ndarray",3196  5087,

Unnamed: 0,Array,Chunk
Bytes,124.04 MiB,18.29 MiB
Shape,"(5087, 3196)","(750, 3196)"
Dask graph,7 chunks in 1 graph layer,7 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [10]:
import dask
import numpy as np
from functools import partial
from scipy.signal import butter, filtfilt
from distributed.client import temp_default_client


def preprocess(data, filter_order=3, filter_cutoff=(0.01, 0.2)):
    b, a = butter(filter_order, filter_cutoff, btype='bandpass')
    filtered = filtfilt(b, a, data)
    hf = data - filtered
    hf = (hf - hf.mean(axis=1, keepdims=True))/hf.std(axis=1, keepdims=True)
    hf = (hf - hf.mean(axis=0, keepdims=True))/hf.std(axis=0, keepdims=True)
    return hf

prep_delay = dask.delayed(partial(preprocess, filter_order=3, filter_cutoff=(0.01, 0.2)))

In [11]:
preprocess(data_da)

Unnamed: 0,Array,Chunk
Bytes,124.04 MiB,18.29 MiB
Shape,"(5087, 3196)","(750, 3196)"
Dask graph,7 chunks in 20 graph layers,7 chunks in 20 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 124.04 MiB 18.29 MiB Shape (5087, 3196) (750, 3196) Dask graph 7 chunks in 20 graph layers Data type float64 numpy.ndarray",3196  5087,

Unnamed: 0,Array,Chunk
Bytes,124.04 MiB,18.29 MiB
Shape,"(5087, 3196)","(750, 3196)"
Dask graph,7 chunks in 20 graph layers,7 chunks in 20 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [12]:
prep_delay(data_da)

Delayed('preprocess-38e6dff2-0c65-4545-91d1-d5b85bb782dc')

In [13]:
res_np = preprocess(data.values)
res_da = preprocess(data_da).compute()
assert np.allclose(res_np, res_da, 1e-4)

In [14]:
res_np = preprocess(data.values)
res_da_delay = prep_delay(data_da).compute()
assert np.allclose(res_np, res_da_delay, 1e-4)

In [15]:
%%timeit
res_np = preprocess(data.values)

774 ms ± 21.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [16]:
%%timeit
with temp_default_client(c_local):
    res_da = preprocess(data_da).compute()

3.24 s ± 51.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%%timeit
with temp_default_client(c_local):
    res_da_delay = prep_delay(data_da).compute()

1.88 s ± 77.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Ex 1.b: distribute the same function on the remote cluster

In [18]:
%%timeit
with temp_default_client(c_distributed):
    res_da = preprocess(data_da).compute()

9.06 s ± 106 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%%timeit
with temp_default_client(c_distributed):
    res_da_delay = prep_delay(data_da).compute()

4.04 s ± 79.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Ex 2: finalize the preparation of the array to be ready for get_period function 

In [20]:
# datemelo buono 🤯

## Ex 3.a: distribute and measure the duration of the of compute_period function on the local dask
- also check the period distribution plot 

In [21]:
libpath = "lib.288931.so"
run_shell(['gcc', '--shared', '-fPIC', '-o', libpath, 'src.C'])

CompletedProcess(args=['gcc', '--shared', '-fPIC', '-o', 'lib.288931.so', 'src.C'], returncode=0)

In [22]:
import ctypes

lib = ctypes.CDLL("/tmp/" + libpath)
lib.best_matching_period.restype = ctypes.c_int
lib.best_matching_period.argtypes = [
    ctypes.c_int, 
    ctypes.c_double, 
    np.ctypeslib.ndpointer(dtype=np.float64),
    np.ctypeslib.ndpointer(dtype=np.int32),
    np.ctypeslib.ndpointer(dtype=np.float64)
]

In [31]:
import dask.bag as db

data_db = db.from_sequence(data.values.tolist(), npartitions=7)
data_db

dask.bag<from_sequence, npartitions=7>

In [43]:
def compute_periods(row, alpha):    
    best_period = np.array(0, dtype=np.int32)
    min_score = np.array(9999., dtype=np.float64)
    lib.best_matching_period(len(row), alpha, row, best_period, min_score);
    return int(best_period), float(min_score)

In [44]:
res_py = map(partial(compute_periods, alpha=0.5), list(data.values)[:10])
list(res_py)

[(1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603),
 (1171, -378214.51033354603)]

In [33]:
with temp_default_client(c_local):
    data_db.map(partial(compute_periods, alpha=0.5)).compute()

2022-12-01 16:55:51,592 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/opt/conda/lib/python3.10/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Pac

CancelledError: ('compute_periods-1d176cc4d34d4d8fec9ced48bdc2c444', 2)

## Ex 3.a: distribute and measure the duration of the of compute_period function on the remote cluster
- also check the period distribution plot 

In [None]:
libpath = "lib.288931.so"
run_shell(['gcc', '--shared', '-fPIC', '-o', "/tmp/" + libpath, 'src.C'])
c_distributed.upload_file("/tmp/" + libpath)

## Ex 4. (BONUS): Execute the steps of ex 1 and 2 (with local dask) on an argo workflow

- Running the get_period task after the first preprocessing is done.
- Store the period plot on minio