# Dask graph computation

Here we illustrate Dask graph for computation, the lazy mode, and parrallelism with distributed

In [None]:
import dask.array as da

## Graph for chunk of operation

In [None]:
x = da.ones((4, 1), chunks=(2, 1))
x.visualize()

In [None]:
y = 3 - da.ones((4, 1), chunks=(2,1))
y.visualize()

In [None]:
z = x.T * y
z.visualize()

In [None]:
x = da.triu(da.ones((3, 3), chunks=(1, 1)))
x.visualize()

In [None]:
print("Compare this print of x:")
print(x)
print("\nWith this one:")
print(x.compute())

In [None]:
x = da.triu(da.ones((4, 4), chunks=(2, 2)))
x.visualize()

## Graph for lazy operations

See here https://github.com/dask/dask-examples/blob/master/delayed.ipynb for more

In [3]:
# Init dummy functions
import time
import random

def inc(x):
    time.sleep(2)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1
    
def add(x, y):
    time.sleep(1)
    return x + y

In [4]:
%%time
# Bench these sequential list of function calls:
x = inc(1) # Return 1 + 1 = 2
y = dec(2) # Return 2 - 1 = 1
z = add(x, y) # Return 2 + 1 = 3
print("z =",z,"\n")

z = 3 

CPU times: user 406 ms, sys: 56.1 ms, total: 462 ms
Wall time: 6 s


note that the x and y computations are performed sequentially, hence the 6sec of wall time

In [5]:
# Make these functions to be "lazy", so that they are executed only when needed:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

In [6]:
%%time
# Bench the same sequential list of function calls, but let Dask decide if they should be computed or not ("lazy" mode):
x = inc(1)
y = dec(2)
z = add(x, y)
print("z =",z,"\n")

z = Delayed('add-f38bdbb6-18c9-43c8-b620-5fc3a927730c') 

CPU times: user 1.32 ms, sys: 1.22 ms, total: 2.54 ms
Wall time: 1.44 ms


see:

- that `z` is now a `delayed` Dask object, the result (3) is not returned
- how the wall time is much shorter, that's because the computation was not performed, only the graph was built

In [7]:
# This is the graph:
z.visualize(rankdir='LR')

RuntimeError: Drawing dask graphs requires the `graphviz` python library and the `graphviz` system library to be installed.

In [8]:
%%time
# In order to realy get the computation done, simple call "compute":
print("z =",z.compute(),"\n")

z = 3 

CPU times: user 516 ms, sys: 146 ms, total: 662 ms
Wall time: 4.33 s


## Lazy operations in parrallel with distributed

*This won't work under Google Colab*

In [1]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55929 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:55929/status,

0,1
Dashboard: http://127.0.0.1:55929/status,Workers: 1
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55930,Workers: 1
Dashboard: http://127.0.0.1:55929/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:55936,Total threads: 4
Dashboard: http://127.0.0.1:55937/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:55933,
Local directory: /Users/gmaze/git/github/obidam/ds2-2023/practice/environment/dask-worker-space/worker-mwit42pv,Local directory: /Users/gmaze/git/github/obidam/ds2-2023/practice/environment/dask-worker-space/worker-mwit42pv


In [9]:
%%time
print("z =",z.compute(),"\n")

z = 3 

CPU times: user 304 ms, sys: 36.2 ms, total: 340 ms
Wall time: 4.02 s


Here the point is to look at the Task Stream of the dashboard:
![]()

In [None]:
client.close()