# Dask graph computation

Here we illustrate Dask graph for computation, the lazy mode, and parrallelism with distributed

In [1]:
import dask.array as da

## Graph for chunk of operation

In [2]:
x = da.ones((4, 1), chunks=(2, 1))
x.visualize()

RuntimeError: No visualization engine detected, please install graphviz or ipycytoscape

In [None]:
y = 3 - da.ones((4, 1), chunks=(2,1))
y.visualize()

In [None]:
z = x.T * y
z.visualize()

In [None]:
x = da.triu(da.ones((3, 3), chunks=(1, 1)))
x.visualize()

In [None]:
print("Compare this print of x:")
print(x)
print("\nWith this one:")
print(x.compute())

In [None]:
x = da.triu(da.ones((4, 4), chunks=(2, 2)))
x.visualize()

## Graph for lazy operations

See here https://github.com/dask/dask-examples/blob/master/delayed.ipynb for more

In [None]:
# Init dummy functions
import time
import random

def inc(x):
    time.sleep(2)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1
    
def add(x, y):
    time.sleep(1)
    return x + y

In [None]:
%%time
# Bench these sequential list of function calls:
x = inc(1) # Return 1 + 1 = 2
y = dec(2) # Return 2 - 1 = 1
z = add(x, y) # Return 2 + 1 = 3
print("z =",z,"\n")

note that the x and y computations are performed sequentially, hence the 6sec of wall time

In [None]:
# Make these functions to be "lazy", so that they are executed only when needed:
import dask
inc = dask.delayed(inc)
dec = dask.delayed(dec)
add = dask.delayed(add)

In [None]:
%%time
# Bench the same sequential list of function calls, but let Dask decide if they should be computed or not ("lazy" mode):
x = inc(1)
y = dec(2)
z = add(x, y)
print("z =",z,"\n")

see:

- that `z` is now a `delayed` Dask object, the result (3) is not returned
- how the wall time is much shorter, that's because the computation was not performed, only the graph was built

In [None]:
# This is the graph:
z.visualize(rankdir='LR')

In [None]:
%%time
# In order to realy get the computation done, simple call "compute":
print("z =",z.compute(),"\n")

## Lazy operations in parrallel with distributed

*This won't work under Google Colab*

In [None]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

In [None]:
%%time
print("z =",z.compute(),"\n")

Here the point is to look at the Task Stream of the dashboard, which should look like the figure below:

We see the parallel ``dec`` (magenta) and ``inc`` (green) calls lasting 3 and 2 secs respectively, followed by the ``add`` (blue) call lasting 1 sec. The total execution time in parallel is about 4 secs.

![](https://github.com/obidam/ds2-2025/raw/main/practice/environment/Dask_Graph.png)

In [None]:
client.close()