# Workflow Overview: ND2 to OME-Zarr, Colony & Nucleus Segmentation, Feature Extraction

This notebook implements a reproducible workflow for high-content image analysis of stem cell experiments. The workflow is designed to process microscopy data, segment colonies and nuclei, track cells, and extract biologically relevant features such as ERK and Oct4 intensities.

### Main Steps:

1. **ND2 to OME-Zarr conversion**: Converts raw ND2 microscopy files to the OME-Zarr format for scalable, cloud-friendly storage and analysis.
2. **Colony segmentation using ConvPaint**: Identifies and segments colonies in the images using a convolutional neural network.
3. **Nucleus segmentation using StarDist**: Detects and segments individual nuclei using the StarDist algorithm.
4. **Cell Tracking**: Links nuclei across timepoints to track individual cells throughout the experiment.
5. **Distance to colony edge & extraction of ERK/Oct4 features**: Calculates the distance of each nucleus to the colony edge and extracts quantitative features such as ERK and Oct4 intensities for downstream analysis.

The following sections provide code and explanations for each step, focusing on feature extraction from segmented and tracked data.


## 5. Feature Extraction of ERK-KTR + Oct4

In [None]:
# --- Dask cluster setup and configuration imports ---
# Start a Dask cluster for parallel processing and import utility functions
from configuration.dask import start_dask_cluster
from dask.distributed import progress

cluster, client = start_dask_cluster()  # Start Dask cluster and client

# Import project-specific settings and utility functions
from configuration.settings import get_output_path, get_fovs
import configuration.settings as settings

# --- Label extraction configuration ---
# Define which label to extract and how to save it

import dataclasses


@dataclasses.dataclass
class LabelExtractionConfig:
    """Configuration for label extraction."""

    feature_to_extract: str  # Name of the label to extract
    feature_name_in_omezarr: str  # Name of the label in the OME-Zarr dataset
    is_grayscale: bool  # Whether the label is grayscale (like ERK images) or not
    tifffile_name: str = (
        None  # Name of the TIFF file to save the label image as (if not empty)
    )


MASKS_TO_EXTRACT = [LabelExtractionConfig("CNr", "CNr", True, "CNr")]

COMPUTE_CELL_DENSITY = True  # Whether to compute cell density

if COMPUTE_CELL_DENSITY:
    MASKS_TO_EXTRACT.append(
        LabelExtractionConfig("cell_density", "cell_density", False, None)
    )
client  # Show Dask client status

Perhaps you already have a cluster running?
Hosting the HTTP server on port 61253 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:61253/status,

0,1
Dashboard: http://127.0.0.1:61253/status,Workers: 4
Total threads: 16,Total memory: 127.65 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:61256,Workers: 0
Dashboard: http://127.0.0.1:61253/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:61277,Total threads: 4
Dashboard: http://127.0.0.1:61280/status,Memory: 31.91 GiB
Nanny: tcp://127.0.0.1:61261,
Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-2hf89i9s,Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-2hf89i9s

0,1
Comm: tcp://127.0.0.1:61279,Total threads: 4
Dashboard: http://127.0.0.1:61284/status,Memory: 31.91 GiB
Nanny: tcp://127.0.0.1:61263,
Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-ntx0xw2l,Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-ntx0xw2l

0,1
Comm: tcp://127.0.0.1:61286,Total threads: 4
Dashboard: http://127.0.0.1:61287/status,Memory: 31.91 GiB
Nanny: tcp://127.0.0.1:61265,
Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-3hsjnt2_,Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-3hsjnt2_

0,1
Comm: tcp://127.0.0.1:61278,Total threads: 4
Dashboard: http://127.0.0.1:61281/status,Memory: 31.91 GiB
Nanny: tcp://127.0.0.1:61267,
Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-jpl8yr3p,Local directory: C:\Users\Niesen\AppData\Local\Temp\dask-scratch-space\worker-jpl8yr3p


In [6]:
# --- Required imports for feature extraction and image processing ---
import dask
import dask.delayed
import dask.array as da
import numpy as np
import pandas as pd
import skimage
import edt
from scipy import ndimage as ndi
from scipy.spatial import KDTree
import ome_zarr.reader as ozr
import ome_zarr.io as ozi
import ome_zarr.writer as ozw
import ome_zarr
import zarr
import os


# --- Utility function to save label images to OME-Zarr ---
def save_labels(label, label_name, root, greyscale=False):
    """Save a label image to the OME-Zarr group, removing any existing label with the same name."""
    if "labels" in root:
        if label_name in root.labels.attrs["labels"]:
            del root["labels"][label_name]
            current_labels = root.labels.attrs["labels"]
            new_labels = [lbl for lbl in current_labels if lbl != label_name]
            root.labels.attrs["labels"] = new_labels
        try:
            del root["labels"][label_name]
        except:
            pass
    Y_dim = root["0"].shape[-2]
    X_dim = root["0"].shape[-1]
    ozw.write_labels(
        labels=label,
        group=root,
        name=label_name,
        axes="tyx",
        scaler=ome_zarr.scale.Scaler(max_layer=1),
        chunks=(1, Y_dim, X_dim),
        storage_options={
            "compressor": zarr.storage.Blosc(cname="zstd", clevel=5),
        },
        metadata={"is_grayscale_label": greyscale},
        delayed=True,
    )


# --- Map DataFrame values to label image (Dask, all frames) ---
def label_to_value_dask(tracks, labels_stack, what):
    """Map values from a DataFrame to a label image stack using Dask for parallelization."""
    tracks_df_norm = tracks[["t", "label", what]].copy()
    tracks_df_norm.replace([np.inf, -np.inf], np.nan, inplace=True)
    tracks_df_norm.dropna(inplace=True)
    dtype = np.uint16
    if tracks_df_norm[what].dtype in [np.float16, np.float32, np.float64]:
        dtype = np.float32
    elif tracks_df_norm[what].dtype in [np.uint32, np.uint64, np.int32, np.int64]:
        dtype = np.uint32
    elif tracks_df_norm[what].dtype in [np.uint8, np.int8]:
        dtype = np.uint8

    def block_func(labels_f, block_info=None):
        frame = block_info[0]["chunk-location"][0]
        if labels_f.shape[0] == 1:
            labels_f = labels_f[0]
        result = label_to_value_frame(
            tracks_df_norm, labels_f, frame, what, out_dtype=dtype
        )
        return result[None, :, :]  # Output shape: (1, Y, X)

    if not isinstance(labels_stack, da.Array):
        labels_stack = da.from_array(
            labels_stack, chunks=(1, labels_stack.shape[1], labels_stack.shape[2])
        )
    gen_image = labels_stack.map_blocks(
        block_func,
        dtype=dtype,
        drop_axis=[],
        new_axis=[],
    )
    return gen_image


def label_to_value_frame(tracks_df_norm, labels_f, frame, what, out_dtype=None):
    tracks_f = tracks_df_norm[tracks_df_norm["t"] == frame]
    from_label = tracks_f["label"].values.astype(np.float16)
    to_particle = tracks_f[what].to_numpy()
    if out_dtype is None:
        out_dtype = to_particle.dtype
    out = np.zeros_like(labels_f, dtype=out_dtype)
    skimage.util.map_array(labels_f, from_label, to_particle, out=out)
    return out


def compute_nearest_neighbor(df_t):
    """Compute nearest-neighbor distances for a given time frame DataFrame."""
    tree = KDTree(df_t[["x", "y"]])
    distances, _ = tree.query(df_t[["x", "y"]], k=2)
    df_t["nearest_neighbor_dist"] = distances[:, 1]
    return df_t


# --- Extract features from labels and images for a single frame ---
@dask.delayed
def extract_from_labels_to_df(
    i,
    frame: np.array,
    label: np.array,
    edge: np.array,
    distlabel: np.array,
    channel_h2b: int,
    channel_erk: int,
    channel_oct4: int = None,
    colony: np.array = None,
    compute_cell_density: bool = True,
):
    """Extracts region properties and intensity features for each nucleus and cytoplasm."""
    # Measure nuclear properties
    df_class = skimage.measure.regionprops_table(
        label,
        intensity_image=frame[channel_h2b, :, :],
        properties=("label", "centroid", "area", "mean_intensity"),
    )
    df_class = pd.DataFrame(df_class)
    df_class["t"] = i
    df_class["t"] = df_class["t"].astype(np.uint16)
    df_class["centroid-0"] = df_class["centroid-0"].astype(np.uint16)
    df_class["centroid-1"] = df_class["centroid-1"].astype(np.uint16)
    df_class.rename(columns={"centroid-0": "y", "centroid-1": "x"}, inplace=True)
    df_class.rename(columns={"mean_intensity": "mean_intensity_H2B"}, inplace=True)

    # Create cytoplasm and shrunken nucleus labels
    cyto_label = skimage.segmentation.expand_labels(
        label, distance=4
    ) - skimage.segmentation.expand_labels(label, distance=1)
    distance = 1.5 * settings.SCALING_FACTOR
    distances = edt.edt(label)
    shrunknuc_label = label * (distances >= distance)

    # Calculate nuclear envelope and smaller nucleus
    distance_env = 4 * settings.SCALING_FACTOR
    nuc_env = skimage.segmentation.expand_labels(
        label, 2 * settings.SCALING_FACTOR
    ) - label * (distances >= distance_env)
    nuc_smaller = label * (distances >= 5 * settings.SCALING_FACTOR)

    # Measure mean intensities in different compartments
    df_Meas_nuc = (
        pd.DataFrame(
            skimage.measure.regionprops_table(
                shrunknuc_label,
                intensity_image=frame[channel_erk, :, :],
                properties=("label", "mean_intensity"),
            )
        )
        .rename(columns={"mean_intensity": "mean_intensity_ERK_nuc"})
        .set_index("label")
    )
    df_Meas_cyto = (
        pd.DataFrame(
            skimage.measure.regionprops_table(
                cyto_label,
                intensity_image=frame[channel_erk, :, :],
                properties=("label", "mean_intensity"),
            )
        )
        .rename(columns={"mean_intensity": "mean_intensity_ERK_cyto"})
        .set_index("label")
    )

    if channel_oct4 is not None:
        df_Meas_OCT_nuc_env = (
            pd.DataFrame(
                skimage.measure.regionprops_table(
                    nuc_env,
                    intensity_image=frame[channel_oct4, :, :],
                    properties=("label", "mean_intensity"),
                )
            )
            .rename(columns={"mean_intensity": "mean_intensity_OCT_nuc_env"})
            .set_index("label")
        )
        df_Meas_OCT_tot_nuc = (
            pd.DataFrame(
                skimage.measure.regionprops_table(
                    label,
                    intensity_image=frame[channel_oct4, :, :],
                    properties=("label", "mean_intensity"),
                )
            )
            .rename(columns={"mean_intensity": "mean_intensity_OCT_nuc_tot"})
            .set_index("label")
        )
        df_Meas_OCT_nuc_smaller = (
            pd.DataFrame(
                skimage.measure.regionprops_table(
                    nuc_smaller,
                    intensity_image=frame[channel_oct4, :, :],
                    properties=("label", "mean_intensity"),
                )
            )
            .rename(columns={"mean_intensity": "mean_intensity_OCT_nuc_small"})
            .set_index("label")
        )

    df_class = df_class.set_index("label", drop=False)
    if channel_oct4 is not None:
        df_class = df_class.join(
            [
                df_Meas_nuc,
                df_Meas_cyto,
                df_Meas_OCT_nuc_env,
                df_Meas_OCT_tot_nuc,
                df_Meas_OCT_nuc_smaller,
            ]
        )
        df_class["Oct4Tot"] = (
            df_class["mean_intensity_OCT_nuc_tot"]
            / df_class["mean_intensity_OCT_nuc_env"]
        )
    else:
        df_class = df_class.join([df_Meas_nuc, df_Meas_cyto])

    # Calculate cytoplasm-to-nucleus ERK ratio
    df_class["CNr"] = (
        df_class["mean_intensity_ERK_cyto"] / df_class["mean_intensity_ERK_nuc"]
    )

    # Add distance to colony edge and label
    x = df_class["x"].to_numpy().astype(np.uint16)
    y = df_class["y"].to_numpy().astype(np.uint16)
    dist_t = ndi.distance_transform_edt(edge == 0)
    df_class["dist_to_edge"] = dist_t[y, x]
    df_class["dist_to_edge_label"] = distlabel[y, x]

    # Add colony label if available
    if colony is not None:
        df_class["colony"] = colony[y, x]

    # Compute cell density if requested
    if compute_cell_density:
        df_class = pd.concat(
            [compute_nearest_neighbor(df_t.copy()) for _, df_t in df_class.groupby("t")]
        )
        df_class["cell_density"] = 1 / df_class["nearest_neighbor_dist"]
    return df_class


# --- Write a Dask array to a TIFF file ---
def write_tiff(path, arr):
    """Write a Dask array to a TIFF file with OME metadata."""
    import tifffile

    arr = arr.compute()
    tifffile.imwrite(path, arr, compression="zlib", metadata={"axes": "TYX"}, ome=True)


# --- Process a single field of view (FOV): extract features, save DataFrame, and write label images ---
@dask.delayed
def process_fov(
    fov,
    masks_to_extract: list[LabelExtractionConfig] = MASKS_TO_EXTRACT,
):
    """Process a single FOV: extract features, save DataFrame, and write label images to OME-Zarr and TIFF."""
    dest = os.path.join(get_output_path(), fov)
    store = ozi.parse_url(dest, mode="a").store
    root = zarr.group(store=store)
    nodes = list(ozr.Reader(ozi.parse_url(dest, mode="r"))())
    raw = nodes[0].data[0]
    # Get channel indices for H2B and ERK
    if "channel_names" in nodes[0].metadata:
        H2B_channel = nodes[0].metadata["channel_names"].index("H2B")
        ERK_channel = nodes[0].metadata["channel_names"].index("ERK")
    elif "channel" in nodes[0].metadata:
        H2B_channel = nodes[0].metadata["name"].index("H2B")
        ERK_channel = nodes[0].metadata["name"].index("ERK")
    else:
        raise ValueError("Channel names not found in metadata.")

    OCT4_channel = (
        nodes[0].metadata["name"].index("Oct4")
        if "Oct4" in nodes[0].metadata["name"]
        else (
            nodes[0].metadata["channel_names"].index("Oct4")
            if "Oct4" in nodes[0].metadata["channel_names"]
            else None
        )
    )

    if "tracked" in nodes[1].zarr.root_attrs["labels"]:
        i_nucleus = nodes[1].zarr.root_attrs["labels"].index("tracked")
        nucleus = nodes[i_nucleus + 2].data[0]
    elif "nucleus" in nodes[1].zarr.root_attrs["labels"]:
        i_nucleus = nodes[1].zarr.root_attrs["labels"].index("nucleus")
        nucleus = nodes[i_nucleus + 2].data[0]
    else:
        raise ValueError("Nucleus label not found in metadata.")

    i_edge = nodes[1].zarr.root_attrs["labels"].index("edges")
    edge = nodes[i_edge + 2].data[0]
    i_distlabel = nodes[1].zarr.root_attrs["labels"].index("distlabel")
    distlabel = nodes[i_distlabel + 2].data[0]

    try:
        i_colony = nodes[1].zarr.root_attrs["labels"].index("colony")
        colony = nodes[i_colony + 2].data[0]
    except ValueError:
        colony = None

    # Extract features for each frame in the FOV
    df_delayed = [
        extract_from_labels_to_df(
            i,
            raw[i, :, :, :],
            nucleus[i],
            edge[i],
            distlabel[i],
            H2B_channel,
            ERK_channel,
            channel_oct4=OCT4_channel,
            colony=colony[i] if colony is not None else None,
        )
        for i in range(0, raw.shape[0])
    ]
    dfs = dask.compute(*df_delayed)
    df = pd.concat(dfs)
    if colony is not None:  # add colony information if available
        colony_df = pd.read_parquet(
            os.path.join(get_output_path(), f"{fov}_colony_df.parquet")
        )
        df = df.merge(
            colony_df[["colony", "colony_x", "colony_y", "colony_area", "t"]],
            on=["colony", "t"],
            how="left",
        )

    df.to_parquet(os.path.join(get_output_path(), f"{fov}_df.parquet"))

    # Generate and save label images for each requested feature
    tasks = []
    for mask_to_extract in masks_to_extract:
        gen_image = label_to_value_dask(df, nucleus, mask_to_extract.feature_to_extract)
        tasks.append(
            save_labels(
                gen_image,
                mask_to_extract.feature_name_in_omezarr,
                root,
                greyscale=mask_to_extract.is_grayscale,
            )
        )
        if mask_to_extract.tifffile_name is not None:
            tiff_path = os.path.join(
                get_output_path(), f"{fov}_{mask_to_extract.tifffile_name}.tiff"
            )
            tasks.append(write_tiff(tiff_path, gen_image))
    return tasks


# --- Main processing loop: process all FOVs in parallel ---
fov_tasks = []
for fov in get_fovs():
    fov_tasks.append(process_fov(fov, MASKS_TO_EXTRACT))
futures = client.compute(fov_tasks)
progress(futures)

VBox()

In [None]:
# --- Shutdown Dask cluster and client after processing is complete ---
cluster.close()
client.close()