# Custom modules in Dask
We've seen in other tutorials how to use Dask to speed up a number of calculations. These calculations have so far included Python built-ins and pip-installable packages. Common scientific workflows often include *custom* modules and packages built by users that aren't easily pip-installable. What then?

This tutorial will demonstrate how to use Dask to accelerate code that uses custom modules. We'll show how to do this with custom code of increasing complexity, starting from a simple python file and ending with a full custom package cloned from GitHub.

Before we get too far, let's set up Dask as usual.

In [2]:
from platform import python_version

print(python_version())

3.11.7


In [3]:
!pip install dask[dataframe]==2024.1.0 distributed==2024.1.0 msgpack==1.0.7 pandas==2.1.4 numpy==1.26.3 toolz==0.12.0 tornado==6.4 dask_gateway



In [4]:
import time

import astropy.units as u
import dask
import emcee
import dask.array as da
# import dask.dataframe as dd
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy 
import corner
from tqdm import tqdm

from astroquery.mast import Observations, Catalogs
from astroquery.gaia import Gaia
from astropy.coordinates import SkyCoord
from dask import delayed, compute
from dask.dataframe.utils import make_meta
from dask.diagnostics import ProgressBar  # Import the ProgressBar
from dask_gateway import Gateway, GatewayCluster
from distributed.diagnostics.plugin import PipInstall

# Import necessary libraries
import numpy as np
import os
import psutil
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
from IPython.display import display, HTML

# Import Dask components
import dask
from dask.distributed import Client, performance_report, wait

from dask.distributed import performance_report

import time
import matplotlib.pyplot as plt
import numpy as np
from functools import partial
import psutil
import os
from dask_gateway import Gateway, GatewayCluster

import numpy as np
import pandas as pd
import os
import psutil
import matplotlib.pyplot as plt
from datetime import datetime
import seaborn as sns
from IPython.display import display, HTML

# Import Dask components
import dask
from dask.distributed import Client, performance_report, wait

In [29]:
gateway = Gateway(address="http://traefik-dask-gateway", auth="jupyterhub")

In [30]:
cluster = gateway.new_cluster()

In [38]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

In [32]:
# Adaptively scale between 2 and 3 workers
cluster.adapt(minimum=2, maximum=3)

In [33]:
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: dask_gateway.GatewayCluster
Dashboard: http://traefik-dask-gateway/clusters/default.f37d74ee89e1459385f8c1905a07d1d8/status,


2025-06-25 14:37:35,548 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client


In [34]:
plugin = PipInstall(packages=[
    "tornado==6.4.1",
    "lz4==4.4.4",
])

# essentially pip installs the package on the cluster
client.register_plugin(plugin)

## Importing a custom module
Modules are basically just Python files [reword this!]. Normally, we'd import a function like so:

In [11]:
from python_test_module import sum_to_num

We can now use the `sum_to_num` function in this environment [word better!]: 

In [12]:
sum_to_num(4)

10

Great! Let's try to do the same with a `Dask` workflow.

In [13]:
numbers = [10, 50, 100, 200, 500, 1000]

futures = client.map(sum_to_num, numbers)

Exception: Tried sending message after closing.  Status: closed
Message: {'op': 'update-graph', 'graph_header': {'serializer': 'pickle', 'writeable': ()}, 'graph_frames': [b"\x80\x05\x95f\x02\x00\x00\x00\x00\x00\x00\x8c\x1edistributed.protocol.serialize\x94\x8c\x08ToPickle\x94\x93\x94)\x81\x94}\x94\x8c\x04data\x94\x8c\x13dask.highlevelgraph\x94\x8c\x0eHighLevelGraph\x94\x93\x94)\x81\x94}\x94(\x8c\x0cdependencies\x94}\x94\x8a\x06\x80\xad'\xb4\x1f\x7f\x8f\x94s\x8c\x10key_dependencies\x94}\x94\x8c\x06layers\x94}\x94\x8a\x06\x80\xad'\xb4\x1f\x7fh\x06\x8c\x11MaterializedLayer\x94\x93\x94)\x81\x94}\x94(\x8c\x0bannotations\x94N\x8c\x16collection_annotations\x94N\x8c\x07mapping\x94}\x94(\x8c+sum_to_num-833b1cd032bd15aad92344b10b620428\x94\x8c\x12python_test_module\x94\x8c\nsum_to_num\x94\x93\x94K\n\x86\x94\x8c+sum_to_num-ef99fc33fd82ac56422b6c28b0f59af3\x94h\x1dK2\x86\x94\x8c+sum_to_num-ab9da737ab6c33e617ba234b92a59423\x94h\x1dKd\x86\x94\x8c+sum_to_num-faefd3ec60f0a5a006f6ed59468482d1\x94h\x1dK\xc8\x86\x94\x8c+sum_to_num-93f38f092176324142121774523349d7\x94h\x1dM\xf4\x01\x86\x94\x8c+sum_to_num-13d95e6ffdc3ac2f36b1df845ab42dc7\x94h\x1dM\xe8\x03\x86\x94uubsubsb."], 'keys': ['sum_to_num-833b1cd032bd15aad92344b10b620428', 'sum_to_num-ef99fc33fd82ac56422b6c28b0f59af3', 'sum_to_num-ab9da737ab6c33e617ba234b92a59423', 'sum_to_num-faefd3ec60f0a5a006f6ed59468482d1', 'sum_to_num-93f38f092176324142121774523349d7', 'sum_to_num-13d95e6ffdc3ac2f36b1df845ab42dc7'], 'internal_priority': {'sum_to_num-833b1cd032bd15aad92344b10b620428': 0, 'sum_to_num-ef99fc33fd82ac56422b6c28b0f59af3': 1, 'sum_to_num-ab9da737ab6c33e617ba234b92a59423': 2, 'sum_to_num-faefd3ec60f0a5a006f6ed59468482d1': 3, 'sum_to_num-93f38f092176324142121774523349d7': 4, 'sum_to_num-13d95e6ffdc3ac2f36b1df845ab42dc7': 5}, 'submitting_task': None, 'fifo_timeout': '100 ms', 'actors': False, 'code': <ToPickle: ()>, 'annotations': <ToPickle: {}>}

This led to a pretty scary error! We basically skipped the step that we had made with the Python packages—while the scheduler (i.e., the Jupyter notebook) is aware of the Python module and can use all the functions within it, the client (i.e., the Dask cluster) has no awareness. We can fix this with a simple command:

In [35]:
client.upload_file('python_test_module.py')

{}

In [36]:
futures = client.map(sum_to_num, numbers)

[todo: describe what the futures are]

In [37]:
futures

[<Future: pending, key: sum_to_num-833b1cd032bd15aad92344b10b620428>,
 <Future: pending, key: sum_to_num-ef99fc33fd82ac56422b6c28b0f59af3>,
 <Future: pending, key: sum_to_num-ab9da737ab6c33e617ba234b92a59423>,
 <Future: pending, key: sum_to_num-faefd3ec60f0a5a006f6ed59468482d1>,
 <Future: pending, key: sum_to_num-93f38f092176324142121774523349d7>,
 <Future: pending, key: sum_to_num-13d95e6ffdc3ac2f36b1df845ab42dc7>]

In [26]:
results = client.gather(futures)

CancelledError: sum_to_num-833b1cd032bd15aad92344b10b620428

In [None]:
results

In [28]:
cluster.shutdown()

## next up: full package from GitHub
anyone have something they'd like to run? I've done a bunch of exoplanet transit stuff.

In principle, you could pip install something from GitHub. But we won't do that here, so that we can demonstrate a workflow that's applicable in cases for which pip installing isn't possible (e.g., a locally developed package).

In [None]:
def install_local_package(client, package_path):
    """Install a local package properly on all workers"""
    
    def install_package(pkg_path):
        import subprocess
        import sys
        import os
        
        # Install in development mode
        result = subprocess.run(
            [sys.executable, '-m', 'pip', 'install', '-e', pkg_path],
            capture_output=True,
            text=True
        )
        
        if result.returncode == 0:
            return f"SUCCESS: Installed {pkg_path}"
        else:
            return f"ERROR: {result.stderr}"
    
    # Upload the entire package directory first
    # (This is the tricky part - need to recreate directory structure)
    
    # Then install it properly
    results = client.run(install_package, pkg_path='./cortecs')
    return results

# Usage:
client = Client()
install_local_package(client, './cortecs')

# Now you can do normal imports!
def log_prob(theta):
    import cortecs  # This works now!
    return cortecs.my_likelihood_function(theta)