In [11]:
import pandas as pd
import dask
import dask.dataframe as dd
import dask.array as da
import numpy as np
import os
import glob
import time
import dask
import dask.array as da
import sys
import time

# Dask Delayed

### With decorators

In [12]:
@dask.delayed
def slow_increment(x):
    time.sleep(1)
    return x + 1

@dask.delayed
def add_nums(x, y):
    time.sleep(1)
    return x + y

In [13]:
z = add_nums(add_nums(slow_increment(1), slow_increment(2)), add_nums(slow_increment(3), slow_increment(4)))

In [14]:
z.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [15]:
%%time
z.compute()

CPU times: total: 0 ns
Wall time: 3.02 s


14

### Without decorators

In [16]:
def slow_increment(x):
    time.sleep(1)
    return x + 1

def add_nums(x, y):
    time.sleep(1)
    return x + y

In [17]:
ds = dask.delayed(slow_increment)
da = dask.delayed(add_nums)

In [18]:
z = da(da(ds(1), ds(2)), da(ds(3), ds(4)))

In [19]:
z.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [10]:
z.compute()

14

### Without decorators and without wrapping all functions

In [20]:
def slow_increment(x):
    time.sleep(1)
    return x + 1

def add_nums(x, y):
    time.sleep(1)
    return x + y

In [21]:
z = dask.delayed(add_nums)(add_nums(slow_increment(1), slow_increment(2)), add_nums(slow_increment(3), slow_increment(4)))

In [22]:
z.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

# Simple case (processing a list in parallel)

In [23]:
img = np.random.random(size=[200, 1000, 10000])

### Without dask

In [24]:
def calculate_garbage(a):
    return np.std(a) * np.mean(a) / np.sqrt(a)

In [25]:
%%time
means = [calculate_garbage(a) for a in img]

CPU times: total: 30.2 s
Wall time: 30.3 s


### With dask

In [26]:
@dask.delayed
def calculate_garbage_delayed(a):
    return np.std(a) * np.mean(a) / np.sqrt(a)

In [29]:
%%time
means = dask.compute([calculate_garbage_delayed(a) for a in img], num_workers=4)

CPU times: total: 30.9 s
Wall time: 30.9 s
