In [None]:
#Requires Zarr version less than 3

In [19]:
import h5py
import zarr
import time
from datetime import timedelta
import dask
from dask import delayed
import dask.array as da
import numpy as np
from numcodecs import Blosc
from pathlib import Path
import dask_memusage
import os

from dask.distributed import Client, LocalCluster, as_completed, wait

print("All libraries imported succesfully!")
print(zarr.__version__)

All libraries imported succesfully!
2.18.7


In [20]:
# Conversion arguments

#input_path = "/dtu/3d-imaging-center/projects/2023_CoM-BraiN/analysis/pipelineTestData/wMB_4bin.h5"
#output_path = "/dtu/3d-imaging-center/projects/2023_CoM-BraiN/analysis/OME_Output/predict_test3.ome.zarr"
#log_path = Path("/dtu/3d-imaging-center/projects/2023_CoM-BraiN/analysis/OME_Output/") / "memusage.csv"  # Save in output folder

input_path = "/Users/tobiasschleiss/documents/dtu/thesis/input/small_wMB_4bin.h5"
output_path = "/Users/tobiasschleiss/Documents/DTU/Thesis/output/direct.ome.zarr"
log_path = Path("/Users/tobiasschleiss/Documents/DTU/Thesis/output") / "memusage.csv"  # Save in output folder

target_chunks = (64, 64, 64)
dataset_path = 'exchange/data'
max_mem_gb=14
downsample_factor=8
compression_level=3
target_top_level_mb=100
safety_factor = 0.80

available_mem_bytes = max_mem_gb * 1e9 * safety_factor

n_workers = 1
threads_per_worker=1
memory_limit = available_mem_bytes/n_workers
print(f"Mem per worker (bytes): {memory_limit}")
print(f"Mem per worker (GB): {(memory_limit/1e9)}")

Mem per worker (bytes): 11200000000.0
Mem per worker (GB): 11.2


In [21]:
# Inspect HDF5 file
with h5py.File(input_path, 'r') as f:
    if dataset_path not in f:
        print(f"  ERROR: Dataset '{dataset_path}' not found")
        print(f"  Available paths: {list(f.keys())}")
        
    dataset = f[dataset_path]
    shape = dataset.shape
    dtype = dataset.dtype
    h5_chunks = dataset.chunks
    data_size_gb = dataset.nbytes / (1024**3)
    data_size_mb = dataset.nbytes / (1024**2)
    dtype_size = dtype.itemsize
        
    print(f"  Shape: {shape}")
    print(f"  Dtype: {dtype}")
    print(f"  Size: {data_size_gb:.2f} GB")
    print(f"  HDF5 chunks: {h5_chunks if h5_chunks else 'Contiguous'}")

  Shape: (1651, 2200, 2200)
  Dtype: float32
  Size: 29.77 GB
  HDF5 chunks: Contiguous


In [22]:
# Calculate levels needed
levels = 1
current_size_mb = data_size_mb
    
while current_size_mb > target_top_level_mb:
    current_size_mb = current_size_mb / (downsample_factor ** 3)
    levels += 1

print(f"Target top level: {target_top_level_mb} MB")
print(f"Recommended levels: {levels}")
print(f"Actual top level: {current_size_mb:.1f} MB")

pyramid_levels = levels

Target top level: 100 MB
Recommended levels: 2
Actual top level: 59.5 MB


In [23]:
#Dynamic block calculation based on memory per worker.
     
z, y, x = shape
chunk_z, chunk_y, chunk_x = target_chunks


available_bytes = memory_limit * (2/3) - 400_000_000

    
# Calculate maximum amount of Z-planes that fit in memory
bytes_per_z_plane = y * x * dtype_size
max_z_planes = int(available_bytes / bytes_per_z_plane)


if max_z_planes < chunk_z:
    print(f"\nFull Target Z plane ({target_chunks[0]}) too large for memory")
    print("Reducing Y axis to fit block in memory")
    
    # Calculate max Y that fits with target Z and full X
    bytes_per_y_row = chunk_z * x * dtype_size
    max_y_rows = int(available_bytes / bytes_per_y_row)

    if max_y_rows < chunk_y:
        print(f"\nFull Target Y rows ({target_chunks[1]}) too large for memory")
        print("Reducing X axis to fit block in memory")
        bytes_per_x_column = chunk_z * chunk_y * dtype_size
        max_x_columns = int(available_bytes / bytes_per_x_column)
        optimal_x = (max_x_columns // chunk_x) * chunk_x
        optimal_x = max(chunk_x, optimal_x)  # At least one chunk depth
        if max_x_columns >= x / 2 + chunk_x:
            optimal_x = int(min(optimal_x, ((x / 2) // chunk_x) * chunk_x + chunk_x))   # Cap to a multiple of chunk_y just above half of y
        x = optimal_x

    optimal_y = (max_y_rows // chunk_y) * chunk_y 
    optimal_y = max(chunk_y, optimal_y)  # At least one chunk depth
    if max_y_rows >= y / 2 + chunk_y:
        optimal_y = int(min(optimal_y, ((y / 2) // chunk_y) * chunk_y + chunk_y))   # Cap to a multiple of chunk_y just above half of y
    y = optimal_y

block_shape = chunk_z, y, x
    
# Calculate actual memory usage
actual_gb = chunk_z * y * x * dtype_size / 1e9

max_gb = available_mem_bytes / 1e9
    
print(f"\n{'='*60}")
print("Optimal Block Size Calculation")
print(f"{'='*60}")
print(f"Memory budget: {max_gb:.2f} GB (using {int(safety_factor*100)}%)")
print(f"Available for block: {available_bytes/1e9:.2f} GB")
print(f"Actual block size: {actual_gb:.2f} GB")
print(f"{'='*60}")
print(f"Read chunks: {block_shape}")


Optimal Block Size Calculation
Memory budget: 11.20 GB (using 80%)
Available for block: 7.07 GB
Actual block size: 1.24 GB
Read chunks: (64, 2200, 2200)


In [24]:
#HDF5 to level 0

with h5py.File(input_path, "r") as f:
        dataset = f[dataset_path]
        shape = dataset.shape
        dtype = dataset.dtype
        dtype_size = dtype.itemsize
        data_size_mb = dataset.nbytes / (1024**2)
    
        print(f"block shape: {block_shape}")

        block_z, block_y, block_x = block_shape
        z_total, y_total, x_total = shape


read_chunks_bytes = np.prod(block_shape) * dtype_size

print(f"Number of Workers: {n_workers} memory per worker {memory_limit}")

cluster = LocalCluster(
    n_workers=n_workers,
    threads_per_worker=threads_per_worker,
    processes=True,
    memory_limit=memory_limit
)
client = Client(cluster)
print(f"Dask dashboard: {client.dashboard_link}")

dask_memusage.install(cluster.scheduler, str(log_path))
print(f"Memory logging to: {log_path}")

store = zarr.NestedDirectoryStore(output_path)
root = zarr.open_group(store, mode="w")
compressor = Blosc(cname="zstd", clevel=compression_level, shuffle=Blosc.BITSHUFFLE)

root.create_dataset(
    "0",
    shape=shape,
    chunks=target_chunks,
    dtype=dtype,
    compressor=compressor
)

del root, store

@dask.delayed
def copy_block(z_start, z_end, y_start, y_end, x_start, x_end):
    with h5py.File(input_path, "r") as f:
        block = f[dataset_path][z_start:z_end, y_start:y_end, x_start:x_end]
    
    store = zarr.NestedDirectoryStore(output_path)
    root = zarr.open_group(store, mode="a")
    root["0"][z_start:z_end, y_start:y_end, x_start:x_end] = block

    return (z_end - z_start, y_end - y_start, x_end - x_start)

tasks = []
for z_start in range(0, z_total, block_z):
    z_end = min(z_start + block_z, shape[0])

    for y_start in range(0, y_total, block_y):
        y_end = min(y_start + block_y, y_total)

        for x_start in range(0, x_total, block_x):
            x_end = min(x_start + block_x, x_total)

            tasks.append(copy_block(z_start, z_end, y_start, y_end, x_start, x_end))

total_tasks = len(tasks)
print(f"\n✓ Submitting {total_tasks} tasks for parallel execution...")

start = time.time()

# Submit all tasks and get futures
futures = client.compute(tasks)

# Track progress
completed = 0

for future in as_completed(futures):
    completed += 1
    elapsed = time.time() - start
    rate = completed / elapsed if elapsed > 0 else 0
    eta = (total_tasks - completed) / rate if rate > 0 else 0
    
    print(f"completed blocks: {completed}")

elapsed = time.time() - start

total_gb = np.prod(shape) * dtype_size / 1e9

print(f"\n✓ Complete: {elapsed:.1f}s | {total_gb/elapsed:.2f} GB/s")

client.close()
cluster.close()

block shape: (64, 2200, 2200)
Number of Workers: 1 memory per worker 11200000000.0


Perhaps you already have a cluster running?
Hosting the HTTP server on port 61097 instead


Dask dashboard: http://127.0.0.1:61097/status
Memory logging to: /Users/tobiasschleiss/Documents/DTU/Thesis/output/memusage.csv

✓ Submitting 26 tasks for parallel execution...


  store = zarr.NestedDirectoryStore(output_path)


completed blocks: 1




completed blocks: 2




completed blocks: 3




completed blocks: 4




completed blocks: 5




completed blocks: 6




completed blocks: 7




completed blocks: 8




completed blocks: 9




completed blocks: 10




completed blocks: 11




completed blocks: 12




completed blocks: 13




completed blocks: 14




completed blocks: 15




completed blocks: 16




completed blocks: 17




completed blocks: 18




completed blocks: 19




completed blocks: 20




completed blocks: 21




completed blocks: 22




completed blocks: 23




completed blocks: 24




completed blocks: 25




completed blocks: 26

✓ Complete: 116.6s | 0.27 GB/s


Exception in thread WorkerMemory:
ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/core.py", line 342, in connect
    comm = await wait_for(
           ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/utils.py", line 1930, in wait_for
    return await fut
           ^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/tcp.py", line 560, in connect
    convert_stream_closed_error(self, e)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/tcp.py", line 143, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: i

In [25]:
# Inspection level_0
source = da.from_zarr(output_path, component='0')
    
print(f"  Source chunks: {source.chunksize}")
print(f"  Source shape: {source.shape}")
print(f"  Source dtype: {source.dtype}")

  Source chunks: (64, 64, 64)
  Source shape: (1651, 2200, 2200)
  Source dtype: float32


In [26]:
client.close()
cluster.close()

In [27]:
#Pyramid Write

def mean_downsample_block(
    source_path,
    destination_path,
    block_region,
    destination_coords,
    downsample_factor
):

    src = zarr.open(source_path, mode="r")
    store = zarr.open(destination_path, mode="r+")

    block = src[block_region]

    d = downsample_factor

    # Trim dimensions to be divisible by downsample factor
    block_z = (block.shape[0] // d) * d
    block_y = (block.shape[1] // d) * d
    block_x = (block.shape[2] // d) * d
    block = block[:block_z, :block_y, :block_x]

    # Reshape to compute block mean
    # Each axis is split into (num_blocks, block_size)
    reshaped = block.reshape(
        block_z // d, d,
        block_y // d, d,
        block_x // d, d
    )

    # Compute mean over the block axes (1,3,5) (downsample the block)
    downsampled = reshaped.mean(axis=(1, 3, 5)).astype(block.dtype)

    # Write to pyramid array
    store[destination_coords] = downsampled


def build_level(
    client,
    output_path,
    level,
    downsample_factor,
    target_chunks,
    compressor,
    max_in_flight=128
):

    print(f"\n{'='*60}")
    print(f"LEVEL {level}: Block-Mean Downsampling")
    print(f"{'='*60}")

    source_path = os.path.join(output_path, str(level - 1))
    destination_path = os.path.join(output_path, str(level))

    #load previous level as source
    source = zarr.open(source_path, mode="r")
    current_shape = source.shape
    new_shape = tuple(max(1, s // downsample_factor) for s in current_shape)

    print(f"Previous shape: {current_shape}")
    print(f"New shape: {new_shape}")

    # Create destination array
    zarr.open(
        destination_path,
        mode="w",
        shape=new_shape,
        chunks=target_chunks,
        dtype=source.dtype,
        compressor=compressor,
        dimension_separator="/"
    )

    futures = []
    
    chunk_z, chunk_y, chunk_x = target_chunks

    current_total_tasks = (
        (int(np.ceil(new_shape[0] / chunk_z)))*
        (int(np.ceil(new_shape[1] / chunk_y)))*
        (int(np.ceil(new_shape[2] / chunk_x)))
    )
    print(f"Total tasks for current level: {current_total_tasks}")

    print(f"Total recurring in flights: {current_total_tasks // max_in_flight}")

    level_start = time.time()
    
    completed = 0

    # Iterate over output blocks
    for z_start in range(0, new_shape[0], chunk_z):
        for y_start in range(0, new_shape[1], chunk_y):
            for x_start in range(0, new_shape[2], chunk_x):

                #Tuple holding python slice objects (block write coordinates)
                destination_coords = (
                    slice(z_start, min(z_start + chunk_z, new_shape[0])),
                    slice(y_start, min(y_start + chunk_y, new_shape[1])),
                    slice(x_start, min(x_start + chunk_x, new_shape[2])),
                )

                # Mapping block
                source_start = (
                    z_start * downsample_factor,
                    y_start * downsample_factor,
                    x_start * downsample_factor,
                )
                source_end = (
                    min((z_start + chunk_z) * downsample_factor, current_shape[0]),
                    min((y_start + chunk_y) * downsample_factor, current_shape[1]),
                    min((x_start + chunk_x) * downsample_factor, current_shape[2]),
                )

                #Tuple holding python slice objects (Block to be read from current array)
                block_region = (
                    slice(source_start[0], source_end[0]),
                    slice(source_start[1], source_end[1]),
                    slice(source_start[2], source_end[2])
                )

                # Submit the block task
                future = client.submit(
                    mean_downsample_block,
                    source_path,
                    destination_path,
                    block_region,
                    destination_coords,
                    downsample_factor
                )

                futures.append(future)

                if len(futures) >= max_in_flight:
                    completed += 1
                    print(f"completed: {completed}")
                    wait(futures)
                    futures = []

    if futures:
        wait(futures)

    print(f"Finished level {level} in {(time.time() - level_start):.1f}s")

cluster = LocalCluster(
    n_workers=n_workers,
    threads_per_worker=threads_per_worker,
    processes=True,
    memory_limit=memory_limit
)
client = Client(cluster)
print(f"Dask dashboard: {client.dashboard_link}")

dask_memusage.install(cluster.scheduler, str(log_path))
print(f"Memory logging to: {log_path}")

print("="*60)
print("Building OME-Zarr Multi-Resolution Pyramid (Block-Mean)")
print("="*60)

compressor = Blosc(
    cname="zstd",
    clevel=compression_level,
    shuffle=Blosc.BITSHUFFLE
)

pyramid_start = time.time()
for level in range(1, pyramid_levels):
    build_level(
        client,
        output_path,
        level,
        downsample_factor,
        target_chunks,
        compressor
    )

print("\nTotal pyramid time: "
      f"{(time.time() - pyramid_start)/60:.2f} minutes")

client.close()
cluster.close()

Perhaps you already have a cluster running?
Hosting the HTTP server on port 61956 instead


Dask dashboard: http://127.0.0.1:61956/status
Memory logging to: /Users/tobiasschleiss/Documents/DTU/Thesis/output/memusage.csv
Building OME-Zarr Multi-Resolution Pyramid (Block-Mean)

LEVEL 1: Block-Mean Downsampling
Previous shape: (1651, 2200, 2200)
New shape: (206, 275, 275)
Total tasks for current level: 100
Total recurring in flights: 0
Finished level 1 in 74.6s

Total pyramid time: 1.24 minutes


Exception in thread WorkerMemory:
ConnectionRefusedError: [Errno 61] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/core.py", line 342, in connect
    comm = await wait_for(
           ^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/utils.py", line 1930, in wait_for
    return await fut
           ^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/tcp.py", line 560, in connect
    convert_stream_closed_error(self, e)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/jlab/lib/python3.12/site-packages/distributed/comm/tcp.py", line 143, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: i

In [28]:
#ADD OME Metadata

print(f"{'='*60}")
print("Adding OME-Zarr Metadata")
print(f"{'='*60}")

store = zarr.NestedDirectoryStore(output_path)
root = zarr.open_group(store, mode="a")  # append mode

# Build datasets list
datasets = []
for level in range(pyramid_levels):
    scale_factor = downsample_factor ** level
    datasets.append({
        'path': str(level),
        'coordinateTransformations': [{
            'type': 'scale',
            'scale': [
                float(scale_factor),  # z
                float(scale_factor),  # y
                float(scale_factor)   # x
            ]
        }]
    })
    
# Add multiscales metadata
root.attrs['multiscales'] = [{
    'version': '0.4',
    'name': 'pyramid',
    'axes': [
        {'name': 'z', 'type': 'space', 'unit': 'micrometer'},
        {'name': 'y', 'type': 'space', 'unit': 'micrometer'},
        {'name': 'x', 'type': 'space', 'unit': 'micrometer'}
    ],
    'datasets': datasets,
    'type': 'mean',  # Downsampling method
    'metadata': {
        'description': 'Multi-resolution pyramid',
        'method': 'block mean downsampling'
    }
}]
print("DONE")
print("\nPyramid Summary:")
print("-" * 60)
    
for level in range(pyramid_levels):
    arr = zarr.open(store, mode='r')[str(level)]
    size_gb = np.prod(arr.shape) * arr.dtype.itemsize / 1e9
    print(f"  Level {level}: shape={arr.shape}, chunks={arr.chunks}, size={size_gb:.2f} GB")

Adding OME-Zarr Metadata
DONE

Pyramid Summary:
------------------------------------------------------------
  Level 0: shape=(1651, 2200, 2200), chunks=(64, 64, 64), size=31.96 GB
  Level 1: shape=(206, 275, 275), chunks=(64, 64, 64), size=0.06 GB


  store = zarr.NestedDirectoryStore(output_path)
