## Comparing Distributed ways of processing hdf5 files

It would be extremely helpful if I could create a dataframe instead of looping through every file each time. The issue however is about memory, as these files would be very hard to hold all in memory. In this notebook, I will explore ways of doing this in a distributed, lazy manner. The real question is whether ``dask`` or ``polars`` would be better.

#### Useful imports

In [1]:
import os
import sys
import warnings
sys.path.append("..")  # add project root


import h5py

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

from tqdm import tqdm

from src.data_utils import *
from src.constants import *

In [2]:
import tables
import dask
import dask.array as da
import polars as pol

In [3]:
# Filter out the NaturalNameWarning. Raised when attribute cannot be dot indexed because of how it is named.
warnings.filterwarnings('ignore', category=tables.NaturalNameWarning)

In [4]:
pd.options.display.float_format = '{:10,.2f}'.format

In [5]:
np.random.seed(420)

In [6]:
sns.set_theme(context="talk")

In [7]:
# path constants
data_dir = "/home/mr2238/project_pi_np442/mr2238/accelerate/data"
img_dir = "/home/mr2238/project_pi_np442/mr2238/accelerate/imgs/overview"
labels_path = os.path.join(data_dir, "labels")
raw_data_path = os.path.join(data_dir, "raw_data")

In [8]:
global_data_path = "/home/mr2238/project_pi_np442/mr2238/accelerate/data/processed/all_data.hdf5"

In [9]:
# list files
h5py_files = [f for f in os.listdir(raw_data_path) if f.endswith(".icmh5")]
print(f"Number of h5py files: {len(h5py_files)}")
print(f"Example file: {h5py_files[0]}")

Number of h5py files: 215
Example file: 1002.icmh5


In [10]:
example_file = os.path.join(raw_data_path, h5py_files[0])
print(example_file)

/home/mr2238/project_pi_np442/mr2238/accelerate/data/raw_data/1002.icmh5


#### Opening HDF files through ``tables``

I tried this and it was super annoying to deal with .icmh5 files (would cause kernel crashes). However I realized I do not need it for ``dask``.

In [11]:
# will open with h5py instead
global_f = h5py.File(global_data_path, mode="r")

#### Loading Large Dataframe through ``dask``

In [12]:
from dask.distributed import LocalCluster
cluster = LocalCluster()          # Fully-featured local Dask cluster
client = cluster.get_client()

2025-10-10 13:45:44,374 - tornado.application - ERROR - Uncaught exception GET /status/ws (10.18.9.118)
HTTPServerRequest(protocol='http', host='127.0.0.1:8787', method='GET', uri='/status/ws', version='HTTP/1.1', remote_ip='10.18.9.118')
Traceback (most recent call last):
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/site-packages/tornado/websocket.py", line 965, in _accept_connection
    open_result = handler.open(*handler.open_args, **handler.open_kwargs)
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/site-packages/tornado/web.py", line 3375, in wrapper
    return method(self, *args, **kwargs)
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/site-packages/bokeh/server/views/ws.py", line 149, in open
    raise ProtocolError("Token is expired. Configure the app with a larger value for --session-token-expiration if necessary")
bokeh.protocol.exceptions.ProtocolError: Token is expired. C

In [13]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 3
Total threads: 3,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37557,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:36213,Total threads: 1
Dashboard: http://127.0.0.1:43797/status,Memory: 5.33 GiB
Nanny: tcp://127.0.0.1:44971,
Local directory: /tmp/dask-scratch-space/worker-aqjd1xtm,Local directory: /tmp/dask-scratch-space/worker-aqjd1xtm

0,1
Comm: tcp://127.0.0.1:45771,Total threads: 1
Dashboard: http://127.0.0.1:34039/status,Memory: 5.33 GiB
Nanny: tcp://127.0.0.1:34843,
Local directory: /tmp/dask-scratch-space/worker-drgh3nn_,Local directory: /tmp/dask-scratch-space/worker-drgh3nn_

0,1
Comm: tcp://127.0.0.1:37249,Total threads: 1
Dashboard: http://127.0.0.1:35291/status,Memory: 5.33 GiB
Nanny: tcp://127.0.0.1:33517,
Local directory: /tmp/dask-scratch-space/worker-qrz4v41f,Local directory: /tmp/dask-scratch-space/worker-qrz4v41f


In [14]:
global_f['1002']

<HDF5 group "/1002" (3 members)>

In [None]:
# see if I can make a dataframe of all abp values
abp = da.concatenate([da.from_array(global_f[f"{pt}/raw/waves/abp"], chunks=(1e12, )) for pt in global_f.keys()])

In [None]:
abp

In [None]:
mean = da.mean(abp)
mean.compute()

In [15]:
all_vars = ['hr', 'rso2l', 'rso2r', 'abp', 'spo2', 'icp', 'deoxhg_r',
       'sthg_index_r', 'sthg_index_l', 'oxhg_l', 'deoxhg_l', 'oxhg_r',
       'scthg_l', 'scthg_r']

In [None]:
from tqdm import tqdm
batch_n = 10
for v in tqdm(all_vars):
    if v in ['hr', 'rso2l', 'rso2r', 'spo2']:
        key_string = f"numerics/{v}"
    else:
        key_string = f"waves/{v}"
    arr_list = [da.from_array(global_f[f"{pt}/raw/{key_string}"]) for pt in global_f.keys() if key_string in global_f[f"{pt}/raw"] and not global_f[f"{pt}/processed"].attrs["broken_numeric"]]
    v_arr = da.concatenate(arr_list)
    mini = da.concatenate(arr_list[:batch_n]).min().compute()
    maxi = da.concatenate(arr_list)[:batch_n].max().compute()
    hist, bins = da.histogram(v_arr, bins=50, range=[mini, maxi])
    # print(hist)
    hist.compute()

Exception ignored in: <function WeakSet.__init__.<locals>._remove at 0x1531f0ed01f0>
Traceback (most recent call last):
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/_weakrefset.py", line 39, in _remove
    def _remove(item, selfref=ref(self)):
KeyboardInterrupt: 
Process Dask Worker process (from Nanny):
2025-10-10 13:45:29,357 - distributed.nanny - ERROR - Worker process died unexpectedly
Traceback (most recent call last):
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mr2238/project_pi_np442/mr2238/conda_envs/cppopt-dl/lib/python3.10/site-packages/distributed/process.py", line 202, in _run
    target(*args, **kwargs)
  File "/home/mr2238/project_pi_np442/mr2238/con