Initialize Dask. For more information, see [dask tutorial](https://tutorial.dask.org/).

In [1]:
from dask.distributed import Client
import dask

client = Client(n_workers=4)
client


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 31.21 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:45497,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 31.21 GiB

0,1
Comm: tcp://127.0.0.1:36501,Total threads: 4
Dashboard: http://127.0.0.1:42607/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:33359,
Local directory: /tmp/dask-scratch-space/worker-ftegb00w,Local directory: /tmp/dask-scratch-space/worker-ftegb00w

0,1
Comm: tcp://127.0.0.1:44453,Total threads: 4
Dashboard: http://127.0.0.1:39973/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:40387,
Local directory: /tmp/dask-scratch-space/worker-xzv8pwbv,Local directory: /tmp/dask-scratch-space/worker-xzv8pwbv

0,1
Comm: tcp://127.0.0.1:43283,Total threads: 4
Dashboard: http://127.0.0.1:45915/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:41183,
Local directory: /tmp/dask-scratch-space/worker-dwi_wkvx,Local directory: /tmp/dask-scratch-space/worker-dwi_wkvx

0,1
Comm: tcp://127.0.0.1:34723,Total threads: 4
Dashboard: http://127.0.0.1:35155/status,Memory: 7.80 GiB
Nanny: tcp://127.0.0.1:33495,
Local directory: /tmp/dask-scratch-space/worker-bigozcx6,Local directory: /tmp/dask-scratch-space/worker-bigozcx6


Let's import necessary dask and numpy functions, so that we could compare their performance on our 16-core machine with 32 Gb RAM!

In [2]:
from dask.array.core import Array as dask_array
import dask.array as da
import numpy as np


def generate_trajectory(
    positions: list[float], scale: float = 0.1, length: int = 1_000_000
):
    traj = np.random.normal(
        positions, scale=positions * scale, size=(length, len(positions))
    ).astype(np.float16)

    return traj


class Dask_performance:
    def __init__(self, traj: np.ndarray) -> dask_array:
        self.traj = da.from_array(traj, chunks={0: -1, 1: "auto"})

    def rmsf(self) -> dask_array:
        traj = self.traj
        ref = traj[0]
        return ((traj - ref) ** 2).mean(axis=0)  # note that there is no .compute()

    def rmsd(self) -> dask_array:
        traj = self.traj
        ref = traj[0]
        return da.sqrt(
            ((traj - ref) ** 2).mean(axis=1)
        )  # note that there is no .compute()


class Numpy_performance:
    def __init__(self, traj) -> np.ndarray:
        self.traj = traj

    def rmsf(self) -> np.ndarray:
        traj = self.traj
        ref = traj[0]
        return ((traj - ref) ** 2).mean(axis=0)

    def rmsd(self) -> np.ndarray:
        traj = self.traj
        ref = traj[0]
        return np.sqrt(((traj - ref) ** 2).mean(axis=1))


And the same for numpy code

In [3]:
%%time

positions = np.ones(20_000)
traj = generate_trajectory(positions, length=20_000)

CPU times: user 14.8 s, sys: 301 ms, total: 15.1 s
Wall time: 14.7 s


Let's have a look at numpy performance

In [4]:
%%time

numpy_obj = Numpy_performance(traj)

CPU times: user 17 µs, sys: 1 µs, total: 18 µs
Wall time: 26.2 µs


In [5]:
%%time

print(f'rmsd: {numpy_obj.rmsd().sum()}, rmsf: {numpy_obj.rmsf().sum()}')

rmsd: 2822.0, rmsf: 398.25
CPU times: user 13.7 s, sys: 118 ms, total: 13.8 s
Wall time: 13.4 s


Now it's dask time!

In [6]:
%%time

dask_obj = Dask_performance(traj)

CPU times: user 623 ms, sys: 156 µs, total: 623 ms
Wall time: 606 ms


In [7]:
%%time

d_rmsd, d_rmsf = dask.compute(dask_obj.rmsd(), dask_obj.rmsf())
print(f'rmsd: {dask_obj.rmsd().sum()}, rmsf: {dask_obj.rmsf().sum()}')

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


rmsd: dask.array<sum-aggregate, shape=(), dtype=float16, chunksize=(), chunktype=numpy.ndarray>, rmsf: dask.array<sum-aggregate, shape=(), dtype=float16, chunksize=(), chunktype=numpy.ndarray>
CPU times: user 1.88 s, sys: 1.17 s, total: 3.05 s
Wall time: 4.8 s


We got almost 3x performance increase, which is almost linear! Isn't that amazing?

Also, we potentially can work with objects much larger than RAM and more complicated computations.

This is kinda cool.

Now we will play around with PIDs of the processes and lay the groundwork for the testing with gh-actions.

In [8]:
from time import sleep
from os import getpid

In [9]:
@dask.delayed
def pid_compute(sleep_time: float = 0.01):
    sleep(sleep_time)
    return getpid()


In [10]:
# should be single PID
set([getpid() for _ in range(10)])


{9025}

In [11]:
# should be 4 different PIDs -- as num_workers in the first cell
len(set((pid_compute().compute()) for _ in range(100)))


4