## Dask dependency management plugins

Dask’s plugin system enables you to run custom Python code for certain events. You can use plugins that are specific to schedulers, workers, or nannies. A worker plugin, for example, allows you to run custom Python code on all your workers at certain event in the worker’s lifecycle (e.g. when the worker process is started). Let's check dependency management plugins allowing you to install packages on workers:

In [1]:
from dask.distributed import Client

client = Client("tls://localhost:8786")
client

  from pandas.core.computation.check import NUMEXPR_INSTALLED


0,1
Connection method: Direct,
Dashboard: /user/alexander.held@cern.ch/proxy/8787/status,

0,1
Comm: tls://172.16.134.69:8786,Workers: 8
Dashboard: /user/alexander.held@cern.ch/proxy/8787/status,Total threads: 8
Started: 3 hours ago,Total memory: 22.89 GiB

0,1
Comm: tls://c073.af.uchicago.edu:42081,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/44283/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:40915,
Local directory: /scratch/dir_231271/dask-scratch-space/worker-b1aehpe9,Local directory: /scratch/dir_231271/dask-scratch-space/worker-b1aehpe9
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 457.07 MiB,Spilled bytes: 0 B
Read bytes: 330.46031602868396 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c073.af.uchicago.edu:38739,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/37415/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.4:38797,
Local directory: /scratch/dir_231638/dask-scratch-space/worker-y1kdxe1c,Local directory: /scratch/dir_231638/dask-scratch-space/worker-y1kdxe1c
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 386.97 MiB,Spilled bytes: 0 B
Read bytes: 330.43362993111765 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c002.af.uchicago.edu:37165,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/42237/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:36465,
Local directory: /scratch/dir_3673277/dask-scratch-space/worker-sk96ccbs,Local directory: /scratch/dir_3673277/dask-scratch-space/worker-sk96ccbs
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 0.0%,Last seen: Just now
Memory usage: 413.85 MiB,Spilled bytes: 0 B
Read bytes: 865.0189110162008 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c076.af.uchicago.edu:33043,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/35177/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:33307,
Local directory: /scratch/dir_62199/dask-scratch-space/worker-_f4mgah3,Local directory: /scratch/dir_62199/dask-scratch-space/worker-_f4mgah3
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 4.0%,Last seen: Just now
Memory usage: 438.54 MiB,Spilled bytes: 0 B
Read bytes: 863.2620490385067 B,Write bytes: 1.50 kiB

0,1
Comm: tls://c008.af.uchicago.edu:39235,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/42063/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:33971,
Local directory: /scratch/dir_1710938/dask-scratch-space/worker-h9h8uemf,Local directory: /scratch/dir_1710938/dask-scratch-space/worker-h9h8uemf
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 447.47 MiB,Spilled bytes: 0 B
Read bytes: 329.9719809783942 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c028.af.uchicago.edu:33823,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/34645/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:33227,
Local directory: /scratch/dir_2233991/dask-scratch-space/worker-5el707yo,Local directory: /scratch/dir_2233991/dask-scratch-space/worker-5el707yo
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 451.62 MiB,Spilled bytes: 0 B
Read bytes: 330.431264475423 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c086.af.uchicago.edu:44451,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/33109/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:45319,
Local directory: /scratch/dir_728938/dask-scratch-space/worker-fhdjbfgd,Local directory: /scratch/dir_728938/dask-scratch-space/worker-fhdjbfgd
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 6.0%,Last seen: Just now
Memory usage: 486.14 MiB,Spilled bytes: 0 B
Read bytes: 330.1707933091796 B,Write bytes: 1.51 kiB

0,1
Comm: tls://c048.af.uchicago.edu:39719,Total threads: 1
Dashboard: /user/alexander.held@cern.ch/proxy/33255/status,Memory: 2.86 GiB
Nanny: tls://172.17.0.2:38047,
Local directory: /scratch/dir_603090/dask-scratch-space/worker-wc6q9hut,Local directory: /scratch/dir_603090/dask-scratch-space/worker-wc6q9hut
Tasks executing:,Tasks in memory:
Tasks ready:,Tasks in flight:
CPU usage: 2.0%,Last seen: Just now
Memory usage: 485.70 MiB,Spilled bytes: 0 B
Read bytes: 329.94794342082264 B,Write bytes: 1.51 kiB


In [2]:
from dask.distributed import PipInstall

plugin = PipInstall(packages=["py-spy"])

client.register_plugin(plugin)

# By the way py-spy (https://github.com/benfred/py-spy) is best sampling profiler for Python programs.

Or we can simply execute custom function on worker:

In [3]:
def worker_setup(dask_worker):
    import os
    #install_root_packages_cmd = "mamba install -y -c conda-forge root"
    install_root_packages_cmd = "mamba install -y -c conda-forge hepconvert"
    os.system(install_root_packages_cmd)
    
# By the way hepconvert (https://github.com/scikit-hep/hepconvert is a bridge between columnar file formats, currently ROOT, and Parquet and soon will include HDF5. It aims to simplify file conversions in Python

In [4]:
client.register_worker_callbacks(worker_setup)

{'tls://c002.af.uchicago.edu:37165': {'status': 'OK'},
 'tls://c008.af.uchicago.edu:39235': {'status': 'OK'},
 'tls://c028.af.uchicago.edu:33823': {'status': 'OK'},
 'tls://c048.af.uchicago.edu:39719': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:38739': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:42081': {'status': 'OK'},
 'tls://c076.af.uchicago.edu:33043': {'status': 'OK'},
 'tls://c086.af.uchicago.edu:44451': {'status': 'OK'}}

Or to enable CMSSW environmnet:

In [8]:
 def worker_cmssw_setup(dask_worker):
    import os
    install_cmssw_packages_cmd = "source /cvmfs/cms.cern.ch/cmsset_default.sh; cd /cvmfs/cms.cern.ch/${SCRAM_ARCH}/cms/cmssw/CMSSW_12_6_5; cmsenv"
    os.system(install_cmssw_packages_cmd)

In [9]:
client.register_worker_callbacks(worker_cmssw_setup)

{}

Or enable environment variable:

In [5]:
def set_env(dask_worker):
        import pathlib, os
        path = str(pathlib.Path(dask_worker.local_directory))
        os.environ["HOME_DIR"] = path

In [6]:
client.register_worker_callbacks(set_env)

{'tls://c002.af.uchicago.edu:37165': {'status': 'OK'},
 'tls://c008.af.uchicago.edu:39235': {'status': 'OK'},
 'tls://c028.af.uchicago.edu:33823': {'status': 'OK'},
 'tls://c038.af.uchicago.edu:39487': {'status': 'OK'},
 'tls://c048.af.uchicago.edu:39719': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:38739': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:42081': {'status': 'OK'},
 'tls://c076.af.uchicago.edu:33043': {'status': 'OK'},
 'tls://c086.af.uchicago.edu:44451': {'status': 'OK'}}

You can simply create your plugins:

In [7]:
from dask.distributed import WorkerPlugin
class ErrorLogger(WorkerPlugin):
    def __init__(self, logger):
        self.logger = logger

    def setup(self, worker):
        self.worker = worker

    def transition(self, key, start, finish, *args, **kwargs):
        if finish == 'error':
            ts = self.worker.tasks[key]
            exc_info = (type(ts.exception), ts.exception, ts.traceback)
            self.logger.error(
                "Error during computation of '%s'.", key,
                exc_info=exc_info
            )

In [8]:
import logging
plugin = ErrorLogger(logging)
client.register_plugin(plugin) 

{'tls://c002.af.uchicago.edu:37165': {'status': 'OK'},
 'tls://c008.af.uchicago.edu:39235': {'status': 'OK'},
 'tls://c028.af.uchicago.edu:33823': {'status': 'OK'},
 'tls://c048.af.uchicago.edu:39719': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:38739': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:42081': {'status': 'OK'},
 'tls://c076.af.uchicago.edu:33043': {'status': 'OK'},
 'tls://c086.af.uchicago.edu:44451': {'status': 'OK'}}

Or you can upload your file using plugin `UploadFile` (`UploadDirectory` doesnt work for now on coffea-casa, we are fixing it):

In [19]:
from distributed.diagnostics.plugin import UploadFile

client.register_plugin(UploadFile("upload_directory/bar.py"))  

{'tls://c002.af.uchicago.edu:37165': {'status': 'OK'},
 'tls://c008.af.uchicago.edu:39235': {'status': 'OK'},
 'tls://c028.af.uchicago.edu:33823': {'status': 'OK'},
 'tls://c048.af.uchicago.edu:39719': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:38739': {'status': 'OK'},
 'tls://c073.af.uchicago.edu:42081': {'status': 'OK'},
 'tls://c076.af.uchicago.edu:33043': {'status': 'OK'},
 'tls://c086.af.uchicago.edu:44451': {'status': 'OK'}}

In [22]:
def check_env(_):
    import bar
    return bar.foo()

In [23]:
client.gather(client.map(check_env, [None]))

[5]

## Cloudpickle "magic"

By default, functions and classes that are attributes of an importable
module are to be pickled by reference, that is relying on re-importing
the attribute from the module at load time.

If `register_pickle_by_value(module)` is called, all its functions and
classes are subsequently to be pickled by value, meaning that they can
be loaded in Python processes where the module is not importable.

This is especially useful when developing a module in a distributed
execution environment: restarting the client Python process with the new
source code is enough: there is no need to re-install the new version
of the module on all the worker nodes nor to restart the workers.

In [5]:
import cloudpickle
import upload_directory

cloudpickle.register_pickle_by_value(upload_directory)

In [17]:
def check_env_pickle(_):
    return upload_directory.bar.foo()

In [18]:
from dask.distributed import wait

client.gather(client.map(check_env_pickle, [None]))

[5]