In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
import tifffile
from skimage.measure import regionprops_table

import os
import re
from pathlib import Path

In [None]:
def metrics_from_timeseries(spots_path, label_path, YFP_path, PI_path, output_folder):
    # Load files
    spots_df = pd.read_csv(spots_path)
    label_img = tifffile.imread(label_path)
    YFP_img = tifffile.imread(YFP_path)
    PI_img = tifffile.imread(PI_path)

    # Cleanup data
    spots_df['POSITION_X'] = pd.to_numeric(spots_df['POSITION_X'], errors='coerce')
    spots_df['POSITION_Y'] = pd.to_numeric(spots_df['POSITION_Y'], errors='coerce')
    spots_df['TRACK_ID'] = pd.to_numeric(spots_df['TRACK_ID'], errors='coerce')
    spots_df['FRAME'] = pd.to_numeric(spots_df['FRAME'], errors='coerce')

    spots_df.dropna(subset=['POSITION_X', 'POSITION_Y', 'TRACK_ID', 'FRAME'], inplace=True)

    sample_idx = re.findall(r'xy(\d+)', str(spots_path)) # for naming output files

    # Collect measurements
    all_measurements = []

    for i, row in spots_df.iterrows():
        frame = int(row['FRAME'])
        x = int(round(row['POSITION_X']))
        y = int(round(row['POSITION_Y']))
        source = row['SOURCE']
    
        if source == 'YFP':
            intensity_frame = YFP_img[frame]
        elif source == 'PI':
            intensity_frame = PI_img[frame]
        else:
            continue  # Skip unknown sources
    
        label_frame = label_img[frame]
    
        if 0 <= y < label_frame.shape[0] and 0 <= x < label_frame.shape[1]:
            label_id = label_frame[y, x]
        else:
            label_id = 0
    
        if label_id == 0:
            continue  # Skip background or invalid
    
        # Get regionprops only for this label
        mask = (label_frame == label_id).astype(np.uint8)
        props = regionprops_table(
            mask,
            intensity_image=intensity_frame,
            properties=[
                'area',
                'eccentricity',
                'solidity',
                'mean_intensity',
                'max_intensity',
                'min_intensity'
            ]
        )
    
        prop_dict = {k: v[0] for k, v in props.items()}
        prop_dict.update({
            'TRACK_ID': row['TRACK_ID'],
            'FRAME': frame,
            'ID': label_id,
            'SOURCE': source
        })
    
        all_measurements.append(prop_dict)
    
    # Save results
    results_df = pd.DataFrame(all_measurements)
    results_df.to_csv(os.path.join(output_folder, f'xy{sample_idx[-1]}_dual_channel_metrics.csv'), index=False)

    return results_df

In [None]:
def metrics_from_timeseries(spots_path, concat_label_path, YFP_label_path, YFP_path, PI_path, output_folder):
    # Load files
    spots_df = pd.read_csv(spots_path)
    concat_label_img = tifffile.imread(concat_label_path)
    YFP_label_img = tifffile.imread(YFP_label_path)
    YFP_img = tifffile.imread(YFP_path)
    PI_img = tifffile.imread(PI_path)

    # Cleanup data
    spots_df['POSITION_X'] = pd.to_numeric(spots_df['POSITION_X'], errors='coerce')
    spots_df['POSITION_Y'] = pd.to_numeric(spots_df['POSITION_Y'], errors='coerce')
    spots_df['TRACK_ID'] = pd.to_numeric(spots_df['TRACK_ID'], errors='coerce')
    spots_df['FRAME'] = pd.to_numeric(spots_df['FRAME'], errors='coerce')

    spots_df.dropna(subset=['POSITION_X', 'POSITION_Y', 'TRACK_ID', 'FRAME'], inplace=True)

    sample_idx = re.findall(r'xy(\d+)', str(spots_path))  # For naming output files

    # Collect measurements
    all_measurements = []

    for i, row in spots_df.iterrows():
        frame = int(row['FRAME'])
        x = int(round(row['POSITION_X']))
        y = int(round(row['POSITION_Y']))
        source = row['SOURCE']

        # Load the label frame for the current timepoint
        concat_label_frame = concat_label_img[frame]
        YFP_label_frame = YFP_label_img[frame]

        # Ensure valid positions
        if 0 <= y < concat_label_frame.shape[0] and 0 <= x < concat_label_frame.shape[1]:
            nucleus_label_id = concat_label_frame[y, x]  # Get the nuclear label
        else:
            nucleus_label_id = 0

        if nucleus_label_id == 0:
            continue  # Skip background or invalid

        # Check if the cell is alive or dead based on the source
        if source == 'Live':
            # Get the corresponding cytoplasm label based on nucleus position
            cytoplasm_label_id = YFP_label_frame[y, x]
            # Create a binary mask for the cytoplasm
            if cytoplasm_label_id > 0:
                cytoplasm_mask = (YFP_label_frame == cytoplasm_label_id).astype(np.uint8)

            # Use the YFP channel for metric extraction
            intensity_frame = YFP_img[frame]

            # Calculate metrics exclusively for the cytoplasmic region
            if np.any(cytoplasm_mask):  # Proceed only if there's a cytoplasmic area
                props = regionprops_table(
                    cytoplasm_mask,
                    intensity_image=intensity_frame,
                    properties=[
                        'area',
                        'eccentricity',
                        'solidity',
                        'mean_intensity',
                        'max_intensity',
                        'min_intensity'
                    ]
                )

                prop_dict = {k: v[0] for k, v in props.items()}
                prop_dict.update({
                    'TRACK_ID': row['TRACK_ID'],
                    'FRAME': frame,
                    'ID': nucleus_label_id,
                    'SOURCE': source
                })

                all_measurements.append(prop_dict)

        elif source == 'Dead':
            # Directly use the dead cell label to get metrics from the PI channel
            # Create a binary mask for the dead cell
            dead_cell_mask = (concat_label_frame == nucleus_label_id).astype(np.uint8)

            # Use the PI channel for metrics extraction
            intensity_frame = PI_img[frame]

            # Calculate metrics for the dead cells
            if np.any(dead_cell_mask):  # Proceed only if there's a dead cell area
                props = regionprops_table(
                    dead_cell_mask,
                    intensity_image=intensity_frame,
                    properties=[
                        'area',
                        'eccentricity',
                        'solidity',
                        'mean_intensity',
                        'max_intensity',
                        'min_intensity'
                    ]
                )

                prop_dict = {k: v[0] for k, v in props.items()}
                prop_dict.update({
                    'TRACK_ID': row['TRACK_ID'],
                    'FRAME': frame,
                    'ID': nucleus_label_id, 
                    'SOURCE': source
                })

                all_measurements.append(prop_dict)

    # Save results
    results_df = pd.DataFrame(all_measurements)
    results_df.to_csv(os.path.join(output_folder, f'xy{sample_idx[-1]}_dual_channel_metrics.csv'), index=False)

    return results_df

In [None]:
# Batch collect metrics
master_folder = '/home/vil945/live_cell_imaging/2025-06-18_live_cell_imaging'

# Create output folder to store measurements
output_folder = os.path.join(master_folder, 'measurements')
os.makedirs(output_folder, exist_ok=True)

# Look for experiment date
date_match = re.search(r"\d{4}-\d{2}-\d{2}", str(master_folder))
date_str = date_match.group(0)

# Recursively go through subfolders within master_folder
YFP_fluor_images = []
YFP_segmented = []
for root, dirs, files in os.walk(master_folder):
    path = Path(root)
    # Get list of spots.csv files, label images, YFP
    if "combined_spots_relabeled" in path.name:
        spots_csv = list(path.glob("*.csv"))

    elif "concatenated_segmentation" in path.name:
        label_tiffs = list(path.glob("*.tif"))

    elif path.name == f"{date_str}_YFP":
        YFP_tiffs = list(path.rglob("*.tif"))
        for tif_path in YFP_tiffs:
            if "segmented_" in tif_path.name:
                YFP_segmented.append(tif_path)
            else:
                YFP_fluor_images.append(tif_path)

    elif path.name == f"{date_str}_PI":
        PI_tiffs = list(path.glob("*.tif"))

# Match groups of fives of (1)spots csv, (2)concatenated label images, (3)YFP label images, (4)YFP fluorescent images, (5)PI fluorescent images 
# by matching index i within their file names containing "xy[i]"

# Dictionaries: key = index i, value = file path
spots_files = {}
concat_label_files = {}
YFP_label_files = {}
YFP_files = {}
PI_files = {}

for f in spots_csv:
    spots_match = re.search(r'xy(\d+)_target_combined_spots', f.name)
    if spots_match:
        idx = spots_match.group(1)
        spots_files[idx] = f
for f in label_tiffs:
    label_match = re.search(r'xy(\d+)_target_concatenated', f.name)
    if label_match:
        idx = label_match.group(1)
        concat_label_files[idx] = f
for f in YFP_segmented:
    YFP_label_match = re.search(r'xy(\d+)-C3', f.name)
    if YFP_label_match:
        idx = YFP_label_match.group(1)
        YFP_label_files[idx] = f
for f in YFP_fluor_images:
    YFP_img_match = re.search(r'xy(\d+)-C3', f.name)
    if YFP_img_match:
        idx = YFP_img_match.group(1)
        YFP_files[idx] = f
for f in PI_tiffs:
    PI_match = re.search(r'xy(\d+)-C2', f.name)
    if PI_match:
        idx = PI_match.group(1)
        PI_files[idx] = f

matched_indices = sorted(set(spots_files.keys()) & set(concat_label_files.keys()) & set(YFP_label_files.keys()) & set(YFP_files.keys()) & set(PI_files.keys()))

for i in matched_indices:
    spots_file = spots_files[i]
    concat_label_file = concat_label_files[i]
    YFP_label_file = YFP_label_files[i]
    YFP_file = YFP_files[i]
    PI_file = PI_files[i]

    metrics_df = metrics_from_timeseries(spots_file, concat_label_file, YFP_label_file, YFP_file, PI_file, output_folder) # Call record metrics function