In [1]:
# ! pip install flowcept[dask]

In [2]:
! python reset_dask_nb_exec_counts.py   
! rm -f output.log
# This notebook is causing a silly bug after starting dask cluster. 
# This command just resets the nb's execution counts to fix it. 
# Just save the notebook (cmd+s) after you run dask cluster setup.

In [3]:
def dummy_func1(x):
    return x * 2


def dummy_func2(y):
    return y + y


def calculate_batch_and_epochs(z, w):
    return {
        "batch_size": int(z + w + 16),
        "epochs": max(int(z/w)+1, 2)
    }

### Set the env var pointing to the conf file where the ports, hostnames, and other conf variables are read from.

There is an exemplary conf file available in the `resources` directory in FlowCept repository. You can use it as is if running this Notebook on your local laptop.

In [4]:
def setup_local_dask_cluster():
    from dask.distributed import Client, LocalCluster
    from flowcept import FlowceptDaskWorkerAdapter
    
    cluster = LocalCluster(n_workers=2)
    scheduler = cluster.scheduler
    client = Client(scheduler.address)

    # Register Worker Adapter
    client.register_plugin(FlowceptDaskWorkerAdapter())
    
    return client, cluster

## Start Local Dask Cluster

In [5]:
dask_client, dask_cluster = setup_local_dask_cluster()
dask_client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:63485,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:63492,Total threads: 5
Dashboard: http://127.0.0.1:63493/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63488,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-hudxh27h,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-hudxh27h
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 66.14 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:63495,Total threads: 5
Dashboard: http://127.0.0.1:63496/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:63489,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-wlgrkw5v,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-wlgrkw5v
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 65.47 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


## Start Flowcept's Consumer

In [6]:
from flowcept import Flowcept

flowcept = Flowcept('dask', dask_client=dask_client).start()

In [7]:
submit_based_wf_id = Flowcept.current_workflow_id
submit_based_wf_id

'cad11985-57cc-4c27-97e7-13dc40cbd176'

## Client.Submit-based Workflow

In [8]:
import numpy as np
i1 = np.random.random()
i1 = np.random.random()
o1 = dask_client.submit(dummy_func1, i1)
o2 = dask_client.submit(dummy_func2, o1)
o3 = dask_client.submit(calculate_batch_and_epochs, o1, o2)
print(f"Task3_id={o3.key}")
print(f"Result={o3.result()}")

Task3_id=calculate_batch_and_epochs-76806a6ddd587733988b045df413ddb8
Result={'batch_size': 17, 'epochs': 2}


In [9]:
flowcept.stop()

## Map-based Workflow

In [10]:
def incr(n):
    return n+1

flowcept = Flowcept('dask', dask_client=dask_client).start()
map_based_wf_id = Flowcept.current_workflow_id
futures = dask_client.map(incr, range(1000))
results = dask_client.gather(futures)
print(len(results))

1000


## Stopping Flowcept and Dask cluster

In [11]:
dask_client.close()
dask_cluster.close()
flowcept.stop()

## Query the database

In [12]:
_filter = {"workflow_id": submit_based_wf_id}
tasks = Flowcept.db.query(_filter)
tasks

[{'task_id': 'dummy_func1-e47d47e3a393a41b92690e2e94dc9046',
  'telemetry_at_start': {'cpu': {'times_avg': {'user': 491618.87,
     'nice': 0.0,
     'system': 330923.21,
     'idle': 3163655.12},
    'percent_all': 60.1,
    'frequency': 3228,
    'times_per_cpu': [{'user': 139411.37,
      'nice': 0.0,
      'system': 105750.13,
      'idle': 152264.4},
     {'user': 139086.73, 'nice': 0.0, 'system': 101976.15, 'idle': 156455.91},
     {'user': 73259.98, 'nice': 0.0, 'system': 45222.1, 'idle': 279395.59},
     {'user': 45617.13, 'nice': 0.0, 'system': 26766.28, 'idle': 326092.26},
     {'user': 26676.77, 'nice': 0.0, 'system': 17574.1, 'idle': 354560.85},
     {'user': 19625.87, 'nice': 0.0, 'system': 13526.0, 'idle': 365814.28},
     {'user': 18081.27, 'nice': 0.0, 'system': 7156.86, 'idle': 373912.01},
     {'user': 12399.84, 'nice': 0.0, 'system': 5196.11, 'idle': 381665.69},
     {'user': 9373.46, 'nice': 0.0, 'system': 4136.18, 'idle': 385827.79},
     {'user': 8086.45, 'nice': 

In [13]:
_filter = {"workflow_id": map_based_wf_id}
tasks = Flowcept.db.query(_filter)
assert len(tasks) == len(results)

In [14]:
len(tasks)

1000

In [15]:
exit()