In [1]:
import dask
import time
import sys
from datetime import timedelta
from dask import delayed, compute
import numpy as np
import dask.array as da
from dask.distributed import Client
from IPython.display import SVG, display


In [2]:
%%time

def func1(a):
    print('Func1')
    time.sleep(2)
    return a + a


def func2(a):
    print('Func2')
    time.sleep(2)
    return a * a


def func3(a):
    print('Func3')
    time.sleep(2)
    return a * a * a


def func4(a, b, c):
    print('Func4')
    return a + b + c

def serial(a, b, c):
    a = func1(a)
    b = func2(b)
    c = func3(c)
    print(func4(a, b, c))
    
serial(1, 2, 3)
# Runs func1, func2, func3 serially, gets the result pass to func4.

Func1
Func2
Func3
Func4
33
CPU times: user 2.99 ms, sys: 793 µs, total: 3.78 ms
Wall time: 6.01 s


In [3]:
%%time

@delayed
def func1(a):
    print('Func1')
    time.sleep(2)
    return a + a

@delayed
def func2(a):
    print('Func2')
    time.sleep(2)
    return a * a

@delayed
def func3(a):
    print('Func3')
    time.sleep(2)
    return a * a * a

@delayed
def func4(a, b, c):
    print('Func4')
    return a + b + c

def parallel(a, b, c):
    a = func1(a)
    b = func2(b)
    c = func3(c)
    print(func4(a, b, c).compute())
    
parallel(1, 2, 3)

# func1, func2, func3 run parallely, than pass the result to func4

Func1Func2
Func3

Func4
33
CPU times: user 5.85 ms, sys: 4.88 ms, total: 10.7 ms
Wall time: 2.02 s


In [4]:
def show_graph(a, b, c):
    a = func1(a)
    b = func2(b)
    c = func3(c)
    print(func4(a, b, c).visualize(filename='test.svg'))
    
# generate parallel diagram

In [5]:
def test_numpy_array(x, y):
    a = np.random.random((x, y))
    a[a < 0.5] = 0
    s = a + a.T * a.T

    print(np.sum(s))

In [6]:
%%time

test_numpy_array(10000, 10000)

66662521.31315274
CPU times: user 6.49 s, sys: 630 ms, total: 7.12 s
Wall time: 2.49 s


In [7]:
def test_dask_array(x, y):
    a = da.random.random(size=(x, y))
    a[a < 0.5] = 0
    s = a + a.T * a.T

    print(da.sum(s).compute())

In [8]:
%%time 

test_dask_array(10000,10000)

66656708.3104638
CPU times: user 15.8 s, sys: 14.7 s, total: 30.5 s
Wall time: 1.42 s


In [None]:
def test_dask_array_graph(x, y):
    a = da.random.random(size=(x, y))
    a[a < 0.5] = 0
    s = a + a.T * a.T

    print(da.sum(s).visualize('da_array.svg'))

In [10]:
# run by multiple processes instead of threads

def test_dask_array_process(x, y):
    a = da.random.random(size=(x, y))
    a[a < 0.5] = 0
    s = a + a.T * a.T

    print(da.sum(s).compute(scheduler='Processes'))

In [11]:
%%time

test_dask_array_process(10000,10000)

66672006.76043082
CPU times: user 1.97 s, sys: 3.94 s, total: 5.91 s
Wall time: 5.71 s


In [12]:
x = np.arange(64).reshape((8, 8))
d = da.from_array(x, chunks=(5, 5))
d.chunks
np.array(d)


array([[ 0,  1,  2,  3,  4,  5,  6,  7],
       [ 8,  9, 10, 11, 12, 13, 14, 15],
       [16, 17, 18, 19, 20, 21, 22, 23],
       [24, 25, 26, 27, 28, 29, 30, 31],
       [32, 33, 34, 35, 36, 37, 38, 39],
       [40, 41, 42, 43, 44, 45, 46, 47],
       [48, 49, 50, 51, 52, 53, 54, 55],
       [56, 57, 58, 59, 60, 61, 62, 63]])

In [13]:
g = da.overlap.overlap(d, depth={0: 2, 1: 1},
                     boundary={0: 100, 1: 'reflect'})
g.chunks
np.array(g)

array([[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [  0,   0,   1,   2,   3,   4,   5,   4,   5,   6,   7,   7],
       [  8,   8,   9,  10,  11,  12,  13,  12,  13,  14,  15,  15],
       [ 16,  16,  17,  18,  19,  20,  21,  20,  21,  22,  23,  23],
       [ 24,  24,  25,  26,  27,  28,  29,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  37,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  45,  44,  45,  46,  47,  47],
       [ 48,  48,  49,  50,  51,  52,  53,  52,  53,  54,  55,  55],
       [ 24,  24,  25,  26,  27,  28,  29,  28,  29,  30,  31,  31],
       [ 32,  32,  33,  34,  35,  36,  37,  36,  37,  38,  39,  39],
       [ 40,  40,  41,  42,  43,  44,  45,  44,  45,  46,  47,  47],
       [ 48,  48,  49,  50,  51,  52,  53,  52,  53,  54,  55,  55],
       [ 56,  56,  57,  58,  59,  60,  61,  60,  61,  62,  63,  63],
       [100, 100, 100, 100, 100, 1

In [14]:
g = da.overlap.overlap(d, depth={0: 2, 1: 1},
                     boundary={0: 100, 1: 'periodic'})
g.chunks
np.array(g)

array([[100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100],
       [  7,   0,   1,   2,   3,   4,   5,   4,   5,   6,   7,   0],
       [ 15,   8,   9,  10,  11,  12,  13,  12,  13,  14,  15,   8],
       [ 23,  16,  17,  18,  19,  20,  21,  20,  21,  22,  23,  16],
       [ 31,  24,  25,  26,  27,  28,  29,  28,  29,  30,  31,  24],
       [ 39,  32,  33,  34,  35,  36,  37,  36,  37,  38,  39,  32],
       [ 47,  40,  41,  42,  43,  44,  45,  44,  45,  46,  47,  40],
       [ 55,  48,  49,  50,  51,  52,  53,  52,  53,  54,  55,  48],
       [ 31,  24,  25,  26,  27,  28,  29,  28,  29,  30,  31,  24],
       [ 39,  32,  33,  34,  35,  36,  37,  36,  37,  38,  39,  32],
       [ 47,  40,  41,  42,  43,  44,  45,  44,  45,  46,  47,  40],
       [ 55,  48,  49,  50,  51,  52,  53,  52,  53,  54,  55,  48],
       [ 63,  56,  57,  58,  59,  60,  61,  60,  61,  62,  63,  56],
       [100, 100, 100, 100, 100, 1

In [15]:
n = da.overlap.trim_overlap(g, depth={0:2, 1:1})
np.array(n)

# what do you expect the output?

array([[ 0,  1,  2,  3,  4,  5,  6,  7],
       [ 8,  9, 10, 11, 12, 13, 14, 15],
       [16, 17, 18, 19, 20, 21, 22, 23],
       [24, 25, 26, 27, 28, 29, 30, 31],
       [32, 33, 34, 35, 36, 37, 38, 39],
       [40, 41, 42, 43, 44, 45, 46, 47],
       [48, 49, 50, 51, 52, 53, 54, 55],
       [56, 57, 58, 59, 60, 61, 62, 63]])

In [16]:
# compute with overlap, map function no parameters
def func(a):
    for i in range (0, a.shape[0] - 1):
        a[i] = a[i] + a[i+1]
    #a = a + x1 + y1
    return a

k = da.map_overlap(d, func, depth={0:2, 1:1}, boundary={0: 100, 1: 'reflect'} ,dtype=g.dtype, trim=False)
np.array(k)

array([[200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200],
       [100, 100, 101, 102, 103, 104, 105, 104, 105, 106, 107, 107],
       [  8,   8,  10,  12,  14,  16,  18,  16,  18,  20,  22,  22],
       [ 24,  24,  26,  28,  30,  32,  34,  32,  34,  36,  38,  38],
       [ 40,  40,  42,  44,  46,  48,  50,  48,  50,  52,  54,  54],
       [ 56,  56,  58,  60,  62,  64,  66,  64,  66,  68,  70,  70],
       [ 72,  72,  74,  76,  78,  80,  82,  80,  82,  84,  86,  86],
       [ 88,  88,  90,  92,  94,  96,  98,  96,  98, 100, 102, 102],
       [ 48,  48,  49,  50,  51,  52,  53,  52,  53,  54,  55,  55],
       [ 56,  56,  58,  60,  62,  64,  66,  64,  66,  68,  70,  70],
       [ 72,  72,  74,  76,  78,  80,  82,  80,  82,  84,  86,  86],
       [ 88,  88,  90,  92,  94,  96,  98,  96,  98, 100, 102, 102],
       [104, 104, 106, 108, 110, 112, 114, 112, 114, 116, 118, 118],
       [156, 156, 157, 158, 159, 160, 161, 160, 161, 162, 163, 163],
       [200, 200, 200, 200, 200, 2

In [17]:
def func(a):
    for i in range (0, a.shape[0] - 1):
        a[i] = a[i] + a[i+1]
    #a = a + x1 + y1
    return a

k = da.map_overlap(d, func, depth={0:2, 1:1}, boundary={0: 100, 1: 'reflect'} ,dtype=g.dtype)
np.array(k)

# trim the extra data

array([[  8,  10,  12,  14,  16,  18,  20,  22],
       [ 24,  26,  28,  30,  32,  34,  36,  38],
       [ 40,  42,  44,  46,  48,  50,  52,  54],
       [ 56,  58,  60,  62,  64,  66,  68,  70],
       [ 72,  74,  76,  78,  80,  82,  84,  86],
       [ 88,  90,  92,  94,  96,  98, 100, 102],
       [104, 106, 108, 110, 112, 114, 116, 118],
       [156, 157, 158, 159, 160, 161, 162, 163]])

In [None]:
# map function with parameter

def func(a, x1, y1):
    for i in range (0, a.shape[0] - 1):
        a[i] = a[i] + a[i+1]
    a = a + x1 + y1
    return a

k = da.map_overlap(d, func, depth={0:2, 1:1}, boundary={0: 100, 1: 'reflect'} ,dtype=g.dtype, x1=1, y1=2)
np.array(k)