In [1]:
import dask.array as da
import dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import numpy as np
import os

  from distributed.utils import tmpfile


In [2]:
LOCAL = True

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(
                           # Memory and core limits should be sufficient here
                           memory='4GB', cores=2,

                           # Ensure that Dask uses the correct version of Python on the cluster
                           python='/scratch/work/public/dask/{}/bin/python'.format(dask.__version__),                           
                           
                           # Place the output logs in an accessible location
                           job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])])

    cluster.submit_command = 'slurm'
    cluster.scale(50)

    display(cluster)
    client = Client(cluster)

display(client)

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34511,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:35061,Total threads: 3
Dashboard: http://127.0.0.1:34579/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:42111,
Local directory: /home/ss16270/dask-worker-space/worker-q2jck9hu,Local directory: /home/ss16270/dask-worker-space/worker-q2jck9hu

0,1
Comm: tcp://127.0.0.1:44121,Total threads: 3
Dashboard: http://127.0.0.1:35765/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:44373,
Local directory: /home/ss16270/dask-worker-space/worker-g658n0ol,Local directory: /home/ss16270/dask-worker-space/worker-g658n0ol

0,1
Comm: tcp://127.0.0.1:43737,Total threads: 3
Dashboard: http://127.0.0.1:40397/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:46883,
Local directory: /home/ss16270/dask-worker-space/worker-n8mqbxok,Local directory: /home/ss16270/dask-worker-space/worker-n8mqbxok

0,1
Comm: tcp://127.0.0.1:36245,Total threads: 3
Dashboard: http://127.0.0.1:46039/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:39079,
Local directory: /home/ss16270/dask-worker-space/worker-hr5_o55d,Local directory: /home/ss16270/dask-worker-space/worker-hr5_o55d


In [3]:
%%time
X_numpy = np.random.normal(3, 1, size=(20000, 20000))
y_numpy = X_numpy.mean(axis=1)
y_numpy

CPU times: user 9.61 s, sys: 795 ms, total: 10.4 s
Wall time: 9.64 s


array([3.01016503, 2.99119233, 3.0018328 , ..., 2.9867571 , 3.0026876 ,
       2.99428741])

In [4]:
%%time
X_dask = da.random.normal(3, 1, size=(20000, 20000), chunks=(2000, 2000))
y_dask = X_dask.mean(axis=1)
y_dask.compute()

CPU times: user 290 ms, sys: 66.8 ms, total: 357 ms
Wall time: 1.28 s


array([3.00739022, 2.99650675, 3.00030292, ..., 3.00528557, 3.00477995,
       3.00719328])

In [5]:
X_dask

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000, 20000)","(2000, 2000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.98 GiB 30.52 MiB Shape (20000, 20000) (2000, 2000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,30.52 MiB
Shape,"(20000, 20000)","(2000, 2000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [6]:
y_dask

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,15.62 kiB
Shape,"(20000,)","(2000,)"
Count,240 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 156.25 kiB 15.62 kiB Shape (20000,) (2000,) Count 240 Tasks 10 Chunks Type float64 numpy.ndarray",20000  1,

Unnamed: 0,Array,Chunk
Bytes,156.25 kiB,15.62 kiB
Shape,"(20000,)","(2000,)"
Count,240 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [7]:
%%time
X_numpy.std(axis=1)

CPU times: user 1.25 s, sys: 587 ms, total: 1.84 s
Wall time: 1.68 s


array([1.00547804, 1.00352003, 1.00654121, ..., 0.99610562, 1.00059365,
       0.99794757])

In [8]:
%%time
X_numpy @ X_numpy

CPU times: user 3min 14s, sys: 21.9 s, total: 3min 36s
Wall time: 18.4 s


array([[180436.97605383, 180708.24546818, 180463.14750191, ...,
        181000.35235261, 181396.07382818, 180645.76115852],
       [179241.94181006, 179743.19534252, 179368.80558871, ...,
        179742.2534341 , 180351.9882196 , 179399.67411673],
       [179925.76693553, 180167.10848543, 180085.83825226, ...,
        180454.01579663, 180843.72119688, 179920.53752469],
       ...,
       [178955.85297113, 179241.0968985 , 179127.14844463, ...,
        179584.29634204, 179792.36921491, 178854.72016186],
       [180024.62623219, 180431.63401211, 179912.16412549, ...,
        180594.26270894, 180830.71561409, 179933.20589139],
       [179252.12413717, 179525.67351942, 179568.40329789, ...,
        179841.01626185, 180410.02918464, 179644.48238211]])

In [9]:
%%time
(X_dask @ X_dask).compute() 

  out = blockwise(


CPU times: user 7.68 s, sys: 6.95 s, total: 14.6 s
Wall time: 58.5 s


array([[180704.15615296, 180100.4575045 , 180776.34944376, ...,
        180399.79604826, 181056.79200284, 180595.82064826],
       [179998.40770391, 179412.4345194 , 180189.73249852, ...,
        180038.22030797, 180686.05071812, 179753.41927094],
       [180310.9303314 , 179690.28990538, 180456.48146614, ...,
        180248.94260419, 180615.68693251, 180113.29057765],
       ...,
       [180565.64470435, 179860.80598532, 180628.81575302, ...,
        180539.65599365, 180991.40281409, 180246.66591581],
       [180806.47591307, 179966.05813791, 180415.40792433, ...,
        180500.65864919, 181113.30764626, 180293.13380262],
       [180836.87871797, 179956.13799349, 180931.4947152 , ...,
        180568.81384184, 181228.73148639, 180499.25919425]])

It warns that the chunk size we specified is not optimal! Usually we don't have to specifiy the chunk size, it selects an optimal one automatically.

In [10]:
%%time
X_dask.std(axis=1).compute()

CPU times: user 252 ms, sys: 35.9 ms, total: 288 ms
Wall time: 1.2 s


array([0.99518638, 0.99959358, 1.00717424, ..., 1.00218345, 0.99621492,
       0.99878286])