In [1]:
# ! pip install flowcept[dask]

In [2]:
! python reset_dask_nb_exec_counts.py   
! rm -f output.log
# This notebook is causing a silly bug after starting dask cluster. 
# This command just resets the nb's execution counts to fix it. 
# Just save the notebook (cmd+s) after you run dask cluster setup.

In [4]:
def dummy_func1(x):
    return x * 2


def dummy_func2(y):
    return y + y


def calculate_batch_and_epochs(z, w):
    return {
        "batch_size": int(z + w + 16),
        "epochs": max(int(z/w)+1, 2)
    }

### Set the env var pointing to the conf file where the ports, hostnames, and other conf variables are read from.

There is an exemplary conf file available in the `resources` directory in FlowCept repository. You can use it as is if running this Notebook on your local laptop.

In [5]:
def setup_local_dask_cluster():
    from dask.distributed import Client, LocalCluster
    from flowcept import FlowceptDaskWorkerAdapter
    
    cluster = LocalCluster(n_workers=2)
    scheduler = cluster.scheduler
    client = Client(scheduler.address)

    # Register Worker Adapter
    client.register_plugin(FlowceptDaskWorkerAdapter())
    
    return client, cluster

## Start Local Dask Cluster

In [6]:
dask_client, dask_cluster = setup_local_dask_cluster()
dask_client

0,1
Connection method: Direct,
Dashboard: http://127.0.0.1:8787/status,

0,1
Comm: tcp://127.0.0.1:60853,Workers: 2
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:60860,Total threads: 5
Dashboard: http://127.0.0.1:60862/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:60856,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-_o40lygg,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-_o40lygg
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 65.78 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B

0,1
Comm: tcp://127.0.0.1:60861,Total threads: 5
Dashboard: http://127.0.0.1:60863/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:60857,
Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-uw18wljg,Local directory: /var/folders/jx/23j21rtx1czb2tpqht16pz907m8f48/T/dask-scratch-space/worker-uw18wljg
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 65.66 MiB,Spilled bytes: 0 B
Read bytes: 0.0 B,Write bytes: 0.0 B


## Start Flowcept's Consumer

In [7]:
from flowcept import Flowcept
from flowcept.flowceptor.adapters.dask.dask_plugins import register_dask_workflow

flowcept = Flowcept('dask')
flowcept.start()

<flowcept.flowcept_api.flowcept_controller.Flowcept at 0x12c4ea830>

In [8]:
submit_based_wf_id = register_dask_workflow(dask_client)
print(f"Workflow_Id={submit_based_wf_id}")

Workflow_Id=01ab07bf-a373-4edb-bc16-f30b651545b6


In [9]:
submit_based_wf_id

'01ab07bf-a373-4edb-bc16-f30b651545b6'

## Client.Submit-based Workflow

In [10]:
import numpy as np
i1 = np.random.random()
i1 = np.random.random()
o1 = dask_client.submit(dummy_func1, i1)
o2 = dask_client.submit(dummy_func2, o1)
o3 = dask_client.submit(calculate_batch_and_epochs, o1, o2)
print(f"Task3_id={o3.key}")
print(f"Result={o3.result()}")

Task3_id=calculate_batch_and_epochs-1b4c2c8b4e2158d1c192687e74f52532
Result={'batch_size': 18, 'epochs': 2}


## Map-based Workflow

In [11]:
def incr(n):
    return n+1

map_based_wf_id = register_dask_workflow(dask_client)
futures = dask_client.map(incr, range(1000))
results = dask_client.gather(futures)
print(len(results))

1000


## Stopping Flowcept and Dask cluster

In [12]:
dask_client.close()
dask_cluster.close()
flowcept.stop()

## Query the database

In [13]:
_filter = {"workflow_id": submit_based_wf_id}
tasks = Flowcept.db.query(_filter)
tasks

[{'task_id': 'dummy_func1-3556b1e6f4fac706ba2ecae737fb8790',
  'telemetry_at_start': {'cpu': {'times_avg': {'user': 102114.83,
     'nice': 0.0,
     'system': 64370.41,
     'idle': 740298.72},
    'percent_all': 45.9,
    'frequency': 3228,
    'times_per_cpu': [{'user': 30044.2,
      'nice': 0.0,
      'system': 22043.37,
      'idle': 38299.98},
     {'user': 29819.77, 'nice': 0.0, 'system': 21430.73, 'idle': 39154.36},
     {'user': 13774.64, 'nice': 0.0, 'system': 7423.76, 'idle': 69318.13},
     {'user': 9760.09, 'nice': 0.0, 'system': 4698.01, 'idle': 76190.93},
     {'user': 5921.55, 'nice': 0.0, 'system': 2996.1, 'idle': 81810.56},
     {'user': 4232.81, 'nice': 0.0, 'system': 2219.85, 'idle': 84313.04},
     {'user': 3412.66, 'nice': 0.0, 'system': 1419.81, 'idle': 85977.61},
     {'user': 2211.28, 'nice': 0.0, 'system': 888.56, 'idle': 87730.38},
     {'user': 1591.38, 'nice': 0.0, 'system': 673.45, 'idle': 88578.53},
     {'user': 1346.45, 'nice': 0.0, 'system': 576.77, '

In [14]:
_filter = {"workflow_id": map_based_wf_id}
tasks = Flowcept.db.query(_filter)
assert len(tasks) == len(results)

In [15]:
len(tasks)

1000

In [16]:
exit()