-
Notifications
You must be signed in to change notification settings - Fork 3
Scaling with Dask
medicaid-utils uses Dask for distributed computation. All DataFrames in the package are lazy Dask DataFrames — operations are deferred until .compute() is called.
For workstations with sufficient RAM (recommended: 64 GB+ for state-level data):
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=8,
threads_per_worker=1, # 1 thread per worker avoids GIL contention with pandas
memory_limit="8GB",
)
client = Client(cluster)
print(client.dashboard_link) # Opens Dask dashboard at http://localhost:8787For high-performance computing environments, use dask-jobqueue:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
cluster = SLURMCluster(
cores=4,
memory="32GB",
processes=1,
walltime="04:00:00",
queue="standard",
)
cluster.scale(jobs=10) # Request 10 SLURM jobs
client = Client(cluster)from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=4,
memory="32GB",
processes=1,
walltime="04:00:00",
queue="batch",
)
cluster.scale(jobs=10)
client = Client(cluster)Dask defaults to its synchronous scheduler if no distributed client is created. This works for small datasets or debugging:
import dask
dask.config.set(scheduler="threads") # Multi-threaded (default)
dask.config.set(scheduler="synchronous") # Single-threaded (debugging)Use tmp_folder when loading claims to cache intermediate results to disk:
ip = max_ip.MAXIP(
year=2012, state="WY", data_root="/data/cms",
tmp_folder="/tmp/cache"
)This prevents the Dask task graph from growing too large when processing multiple claim files.
Aim for partitions of 50–200 MB each. The package handles partitioning automatically based on the input Parquet files, but you can repartition when exporting:
ip.export("/output/", output_format="parquet", repartition=True)The Dask dashboard (typically at http://localhost:8787) shows:
- Task progress and scheduling
- Worker memory usage
- CPU utilization per worker
- Task stream (timeline view)
For multi-state analyses, process states sequentially to control memory:
for state in ["AL", "IL", "CA"]:
ip = taf_ip.TAFIP(year=2019, state=state, data_root="/data/cms")
# ... process ...
ip.export(f"/output/{state}/", output_format="parquet")
del ip # Free memory before next state
import gc; gc.collect()medicaid-utils | Documentation | PyPI | GitHub | MIT License | Research Computing Group, Biostatistics Laboratory, The University of Chicago
Getting Started
User Guide
Recipes & How-Tos
Reference
Links