In [9]:
import time

import pandas as pd

import dask
from dask.diagnostics import ProgressBar, Profiler
from dask.callbacks import Callback
import dask.dataframe as dd
from dask.distributed import Client, progress, LocalCluster

<function dask.distributed.profile.get_profile(history, recent=None, start=None, stop=None, key=None)>

In [2]:
def dirprint(obj, private=False):
    keys = dir(obj)
    if not private:
        keys = filter(lambda x: not x.startswith('__'), keys)
    
    key = 'NAME'
    atype = 'TYPE'
    val = 'VALUE'
    pattern = '{key:<20}    {atype:<20}   {val}'
    print(pattern.format(key=key, atype=atype, val=val))
    for key in sorted(keys):
        attr = getattr(obj, key)
        atype = attr.__class__.__name__[:20]
        val = str(attr)[:50]
        print(pattern.format(key=key, atype=atype, val=val))

def last_response_to_progress(response):
    all_ = response['all']
    rem = response['remaining']
    total = sum(all_.values())
    complete = sum(rem.values())
    progress = float(complete) / total
    percent = f'{progress * 100:.2f}%'
    tasks_completed = sorted(list(filter(lambda x: rem[x] == 0, rem.keys())))
    tasks_remaining = sorted(list(filter(lambda x: rem[x] != 0, rem.keys())))

    output = dict(
        total=total,
        complete=complete,
        progress=progress,
        percent=percent,
        tasks_completed=tasks_completed,
        tasks_remaining=tasks_remaining,
    )
    return output

def func(x):
    time.sleep(0.1)
    return x

def get_data(n=100):
    d = pd.DataFrame()
    d['a'] = ([0] * n) + ([1] * n)
    d['b'] = list(range(2 * n))
    return d

In [3]:
workers = 2
cluster = LocalCluster(n_workers=workers)
client = Client(cluster)

d = get_data(n=200)
with dask.annotate(step='read'):
    d = dd.from_pandas(d, workers)
with dask.annotate(step='wait'):
    d = d.applymap(func)
with dask.annotate(step='group'):
    d = d.groupby('a')
with dask.annotate(step='sum'):
    d = d.sum()
with dask.annotate(step='wait'):
    d = d.apply(func, axis=1)
# d = d.persist()
# prog = progress(d, notebook=True, multi=True)
# prog


# with Profiler() as prof:
#     d.compute()
#     print(prof.results)

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta={'b': 'int64'})



In [7]:
dsk = d.to_delayed()
dsk

[Delayed(('apply-791941795047997703bfafee9df8a46a', 0))]

In [28]:
for i in range(100):
    time.sleep(1)
    if hasattr(prog, '_last_response'):
        print(last_response_to_progress(prog._last_response))

In [5]:
# !cat /home/ubuntu/.local/lib/python3.7/site-packages/distributed/diagnostics/progressbar.py

In [6]:
# !cat /home/ubuntu/.local/lib/python3.7/site-packages/dask/diagnostics/progress.py