In [7]:
import os
from pathlib import Path
import zarr
import dask.array as da
from tqdm.notebook import tqdm

import joblib
from contextlib import contextmanager

source = "source"
source_num = "08"
cluster_dir = Path("/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/")
directory_to_rechunk = cluster_dir / Path(f"final_{source}{source_num}/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_{int(source_num)}/workspace/segmentation/cellpose/objects")
target_dir = cluster_dir / Path(f"final_{source}{source_num}/snakemake/results/aggregated/broad_compressed/cellpainting-gallery/cpg0016-jump/source_{int(source_num)}/workspace/segmentation/cellpose/objects")
os.makedirs(target_dir, exist_ok=True)

In [2]:
directory_to_rechunk

PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects')

In [4]:
batch_folders = [f for f in directory_to_rechunk.glob("*") if f.is_dir()]
batch_folders

[PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3'),
 PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J4'),
 PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J1'),
 PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J2')]

In [5]:
plate_paths = []
plate_names = []

for batch_folder in batch_folders:
    batch_name = Path(batch_folder.parts[-1])
    batch_folder_contents = [f for f in batch_folder.glob("*") if f.is_dir()]
    plate_names.append([batch_name / Path(f.parts[-1]) for f in batch_folder_contents])

plate_names = [item for sublist in plate_names for item in sublist]

for plate in plate_names:
    plate_id = plate.parts[-1]
    plate_paths.append(directory_to_rechunk / plate / f"{plate_id}.zarr")

plate_paths[:3]

[PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170525/A1170525.zarr'),
 PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170540/A1170540.zarr'),
 PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr')]

### Proof of concept for reduction of number of chunks

In [8]:
# List of original zarr store paths
stores = [
    Path('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170525/A1170525.zarr'),
    Path('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170540/A1170540.zarr'),
    Path('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr')
]

# List of subgroup names to rechunk
subgroups = ['label_image', 'single_cell_data', 'single_cell_index']

for store_path in stores:
    new_store_path = Path(str(store_path).replace('/broad/', '/broad_aggregated/'))
    print(f"Processing store: {store_path} -> {new_store_path}")

    src = zarr.open(store_path, mode='r')

    groups = list(src.group_keys())
    for group_name in tqdm(
        groups,
        desc=f"Processing groups in {store_path.name}",
        total=len(groups),
        bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ETA: {eta}]'
    ):        
        
        src_group = src[group_name]
        dst_group_path = new_store_path / group_name

        for sub in subgroups:
            if sub in src_group:
                src_ds = src_group[sub]
                darray = da.from_zarr(src_ds)
                darray_single = darray.rechunk(darray.shape)
                dst_ds_path = dst_group_path / sub
                os.makedirs(dst_ds_path, exist_ok=True)
                darray_single.to_zarr(str(dst_ds_path), mode='w', overwrite=True)
            else:
                print(f"    Subgroup {sub} not found in group {group_name}")


Processing store: /ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170525/A1170525.zarr -> /ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad_aggregated/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170525/A1170525.zarr


Processing groups in A1170525.zarr:   1%|          | 28/3456 [00:14<30:11, ETA: 2025-03-05 11:59:40.348326]


KeyboardInterrupt: 

In [9]:
# Create a context manager to use tqdm with joblib
@contextmanager
def tqdm_joblib(tqdm_object):
    """Context manager to patch joblib to report into tqdm progress bar"""
    class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
        def __call__(self, *args, **kwargs):
            tqdm_object.update(n=self.batch_size)
            return super().__call__(*args, **kwargs)

    old_batch_callback = joblib.parallel.BatchCompletionCallBack
    joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
    try:
        yield tqdm_object
    finally:
        joblib.parallel.BatchCompletionCallBack = old_batch_callback

def process_group(store_path, new_store_path, group_name, subgroups):
    """
    Process a single group from the Zarr store.
    """
    src = zarr.open(store_path, mode='r')
    src_group = src[group_name]
    dst_group_path = new_store_path / group_name
    
    for sub in subgroups:
        if sub in src_group:
            src_ds = src_group[sub]
            darray = da.from_zarr(src_ds)
            # Rechunk so that the entire array is one chunk
            darray_single = darray.rechunk(darray.shape)
            dst_ds_path = dst_group_path / sub
            os.makedirs(dst_ds_path, exist_ok=True)
            darray_single.to_zarr(str(dst_ds_path), mode='w', overwrite=True)


subgroups = ['label_image', 'single_cell_data', 'single_cell_index']

n_jobs = os.cpu_count()

for store_path in stores:
    # Create the new store path
    new_store_path = Path(str(store_path).replace('/broad/', '/broad_aggregated/'))
    
    # Create the destination directory
    os.makedirs(new_store_path, exist_ok=True)
    
    src = zarr.open(store_path, mode='r')
    groups = list(src.group_keys())
    
    tasks = [
        joblib.delayed(process_group)(store_path, new_store_path, group_name, subgroups)
        for group_name in groups
    ]
    
    with tqdm(total=len(tasks), desc=f"Processing {store_path.name}") as pbar:
        with tqdm_joblib(pbar):
            joblib.Parallel(n_jobs=n_jobs, verbose=0)(tasks)


Processing A1170525.zarr:   0%|          | 0/3456 [00:00<?, ?it/s]

Processing A1170540.zarr:   0%|          | 0/3456 [00:00<?, ?it/s]

Processing A1170494.zarr:   0%|          | 0/3456 [00:00<?, ?it/s]

## Compare

In [19]:
old_plate = cluster_dir / Path(f"final_{source}{source_num}/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr")
old_plate

PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr')

In [20]:
!find /ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr -type f | wc -l

1886.70s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


240847


In [23]:
new_plate = cluster_dir / Path(f"final_{source}{source_num}/snakemake/results/aggregated/broad_aggregated/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr")
new_plate

PosixPath('/ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad_aggregated/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr')

In [24]:
!find /ictstr01/groups/ml01/projects/2023_ttreis_segment_JUMP/snakemake/final_source08/snakemake/results/aggregated/broad_aggregated/cellpainting-gallery/cpg0016-jump/source_8/workspace/segmentation/cellpose/objects/J3/A1170494/A1170494.zarr -type f | wc -l

1941.29s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


20562
