In [1]:
#Module Needed: mkl,da,numpy

In [2]:
import mkl
import numpy as np
import dask.array as da

# Process an array with multiple threads

Multiple threads to process simultaneously different parts of the same array. `dask` automatically provides this feature by replacing the `numpy` function with `dask` functions. The key concept is a chunk, each chunk of data is executed separately by different threads. For example for a matrix we define a 2D block size and each of those blocks can be executed independently and then the results accumulated to get to the final answer.

### Library Dependancies

Need mkl, numpy. Install mkl with pip: ```pip install mkl```. Install numpy with pip: ```pip install numpy```.

In [3]:
# Currently numpy on some platforms is already multithreaded thanks to Intel MKL,
# for this example we disable multithreading
#import mkl
mkl.set_num_threads(1)

10

In [4]:
#import numpy as np
#import dask.array as da

In [5]:
A = np.random.rand(20000,4000)

`%whos` is a magic function provided by `IPython` that gives memory consumption of defined variables

In [6]:
%whos

Variable   Type       Data/Info
-------------------------------
A          ndarray    20000x4000: 80000000 elems, type `float64`, 640000000 bytes (610.3515625 Mb)
da         module     <module 'dask.array' from<...>/dask/array/__init__.py'>
mkl        module     <module 'mkl' from '/cm/s<...>ackages/mkl/__init__.py'>
np         module     <module 'numpy' from '/ho<...>kages/numpy/__init__.py'>


In [7]:
A

array([[0.00313438, 0.8623137 , 0.18285884, ..., 0.04099879, 0.06696881,
        0.23650964],
       [0.87851713, 0.50328227, 0.56782502, ..., 0.70481854, 0.09592343,
        0.15063186],
       [0.04992656, 0.21432047, 0.32482416, ..., 0.20797934, 0.04942724,
        0.23446435],
       ...,
       [0.04700344, 0.65313864, 0.23519254, ..., 0.03974845, 0.18292909,
        0.53856287],
       [0.30874109, 0.17632578, 0.90900571, ..., 0.68716664, 0.76345713,
        0.38443535],
       [0.33337904, 0.10007628, 0.38080112, ..., 0.75666445, 0.02641877,
        0.17604549]])

First let's perform some operations on the matrix in pure `numpy`, using a single thread

In [8]:
%time B = A**2 + np.sin(A) * A * np.log(A)

CPU times: user 729 ms, sys: 321 ms, total: 1.05 s
Wall time: 1.05 s


## Processing with dask

First create a chunked `dask` array from the `numpy` array

In [9]:
A_dask = da.from_array(A, chunks=(2000, 1000))

In [10]:
A_dask.numblocks

(10, 4)

Then replace each function with the equivalent provided by `dask`, it implements most of the `numpy` functions and operations.

In [11]:
compute_B = (A_dask**2 + da.sin(A_dask) * A_dask * da.log(A_dask))

In [12]:
%time B_dask = compute_B.compute(num_workers=1)

CPU times: user 1.06 s, sys: 268 ms, total: 1.33 s
Wall time: 1.34 s


In [13]:
%time B_dask = compute_B.compute(num_workers=2)

CPU times: user 1.09 s, sys: 257 ms, total: 1.34 s
Wall time: 786 ms


In [14]:
#%time B_dask = compute_B.compute(num_workers=12)

In [15]:
#%time B_dask = compute_B.compute(num_workers=num_workers)

In [16]:
assert np.allclose(B, B_dask)