In [None]:
# ─── 1) Imports & Drive mount ──────────────────────────────────────────
import os, json, glob
from tqdm import tqdm

import numpy as np
from skimage import io, img_as_ubyte
from pycocotools import mask as mask_utils


# ─── 2) Paths & parameters ─────────────────────────────────────────────
BASE_DIR          = r'C:\Users\shiva\OneDrive\Desktop\MASK-R-CNN-Vision-Biology-Lab\image and seg data'
OUTPUT_PATCH_ROOT = r'C:\Users\shiva\OneDrive\Desktop\MASK-R-CNN-Vision-Biology-Lab\processed data\patches'
COCO_JSON_PATH    = r'C:\Users\shiva\OneDrive\Desktop\MASK-R-CNN-Vision-Biology-Lab\processed data\json'

PATCH_SIZE    = (256, 256)  # (height, width)
STRIDE        = 128         # overlap stride
MIN_MASK_AREA = 50          # drop tiny mask fragments

# Channel correspondence
# C1 -> dapi , C2 -> SCGN (CBCs) , C3 - > RBCs
CHANNEL2CAT = {'C1': 0, 'C2': 1, 'C3': 2}
CATEGORIES = [
    {"id": 0, "name": "other"},
    {"id": 1, "name": "rbc"},
    {"id": 2, "name": "cbc"},
]

In [9]:
IOU_THRESHOLD = 0.25


### Helper functions 

In [8]:
def save_patch(patch_img, out_dir, img_id):
    """
    Save the patch to disk and return its path relative to the JSON file.
    """
    filename = f"patch_{img_id:06d}.png"
    full_path = os.path.join(out_dir, filename)
    io.imsave(full_path, img_as_ubyte(patch_img))
    return os.path.relpath(full_path, start=os.path.dirname(COCO_JSON_PATH))

def make_patches_with_coords(img, masks, patch_size, stride):
    H, W = img.shape[:2]
    ph, pw = patch_size
    for y0 in range(0, H, stride):
        for x0 in range(0, W, stride):
            y1, x1 = min(y0+ph, H), min(x0+pw, W)
            patch_img = img[y0:y1, x0:x1]
            kept_masks, bboxes, original_mask_indices = [], [], [] # Store indices of original masks
            for i, m in enumerate(masks):
                m_patch = m[y0:y1, x0:x1]
                if m_patch.sum() < MIN_MASK_AREA:
                    continue
                ys, xs = np.where(m_patch)
                ymin, xmin, ymax, xmax = ys.min(), xs.min(), ys.max(), xs.max()
                kept_masks.append(m_patch)
                bboxes.append([
                    int(xmin),
                    int(ymin),
                    int(xmax - xmin + 1),
                    int(ymax - ymin + 1)
                ])
                original_mask_indices.append(i) # Store original index
            if len(kept_masks) > 2:
                  yield {
                      "patch_img": patch_img,
                      "masks": kept_masks,
                      "bboxes": bboxes,
                      "y0": y0, # Add original y0
                      "x0": x0, # Add original x0
                      "original_mask_indices": original_mask_indices # Add original indices
                  }

def iou(mask1, mask2):
    """
    Calculate the Intersection over Union (IOU) of two boolean masks.
    Args:
        mask1 (np.ndarray): First boolean mask.
        mask2 (np.ndarray): Second boolean mask.
    Returns:
        float: The IOU value.
    """
    # Ensure masks are boolean type for reliable logical operations
    mask1 = mask1.astype(bool)
    mask2 = mask2.astype(bool)

    # Check if shapes are compatible for logical operations
    if mask1.shape != mask2.shape:
        return 0.0

    intersection = np.logical_and(mask1, mask2)
    union = np.logical_or(mask1, mask2)
    intersection_sum = intersection.sum()
    union_sum = union.sum()

    if union_sum == 0:
        return 0.0
    return intersection_sum / union_sum


In [10]:
def extract_masks(seg_array):
    """
    Turn a (H,W,instances) {3D array} array or (H,W) label map into a list of boolean masks.
    arg: numpy seg aray
    returns: list of boolean numpy arrays. Is array is a binary mask for a single seg instance.
    """
    masks = []
    # this is not rlly needed
    if isinstance(seg_array, np.ndarray) and seg_array.ndim == 3:
        for i in range(seg_array.shape[2]):
            m = seg_array[..., i].astype(bool)
            if m.sum() > 0:
                masks.append(m)
    elif isinstance(seg_array, np.ndarray) and seg_array.ndim == 2:
        for lbl in np.unique(seg_array):
            if lbl == 0:
                continue
            m = (seg_array == lbl)
            if m.sum() > 0:
                masks.append(m)
    else:
        raise ValueError(f"Unexpected seg array shape: {getattr(seg_array,'shape',None)}")
    return masks


## Load image data into dict

In [None]:
image_files = glob.glob(os.path.join(BASE_DIR, "*.png"))

image_groups = {}
for img_path in image_files:
    basename = os.path.basename(img_path)
    # Assuming the format is like C1-C30000.png, C2-C30000.png, C3-C30000.png
    parts = basename.split('-') # Split by hyphen instead of underscore
    if len(parts) > 1:
        channel = parts[0] # Extract channel (e.g., C1)
        # Extract image ID (e.g., C30000), removing the extension
        image_id = parts[1].split('.')[0]
        if image_id not in image_groups:
            image_groups[image_id] = {}
        image_groups[image_id][channel] = img_path


# print(image_groups)
print(image_groups.keys())
print(image_groups['C30001'])

dict_keys(['C30000', 'C30001', 'C30002', 'C30003', 'C30004', 'C30005', 'C40000', 'C40001', 'C40002', 'C40003', 'C50000', 'C50001', 'C50002', 'C50003', 'D10000', 'D10001', 'D10002', 'D10003'])
{'C1': 'C:\\Users\\shiva\\OneDrive\\Desktop\\MASK-R-CNN-Vision-Biology-Lab\\image and seg data\\C1-C30001.png', 'C2': 'C:\\Users\\shiva\\OneDrive\\Desktop\\MASK-R-CNN-Vision-Biology-Lab\\image and seg data\\C2-C30001.png', 'C3': 'C:\\Users\\shiva\\OneDrive\\Desktop\\MASK-R-CNN-Vision-Biology-Lab\\image and seg data\\C3-C30001.png'}


In [13]:
print(image_groups.keys())


dict_keys(['C30000', 'C30001', 'C30002', 'C30003', 'C30004', 'C30005', 'C40000', 'C40001', 'C40002', 'C40003', 'C50000', 'C50001', 'C50002', 'C50003', 'D10000', 'D10001', 'D10002', 'D10003'])


In [None]:
# 1. initialise empty dict 
# 2. iterate through images and fnd corresponding seg file
# 3. Load C1 channel image and set the target shape 
# 4. load and process seg data- convert to boolenan mask 
# 5. list of (padded) masks is stored in loaded data dict with image-id as the key 

loaded_data = {}

for image_id, files_by_channel in tqdm(image_groups.items()):
    c1_img_path = files_by_channel.get('C1')
    if not c1_img_path:
        print(f"⚠️ Missing C1 channel image for image ID {image_id}; skipping group.")
        continue

    img = io.imread(c1_img_path)
    # Get the target shape from the C1 image or masks
    target_shape = img.shape[:2] # Assuming C1 image shape is the target shape

    seg_data = {}
    missing_seg = False
    for channel in ['C1', 'C2', 'C3']:
        # Construct the expected segmentation file path using the correct format: [Channel]-[ImageID]_seg.npy
        seg_stem = f"{channel}-{image_id}"
        seg_path = os.path.join(BASE_DIR, f"{seg_stem}_seg.npy")

        if not os.path.exists(seg_path):
            print(f"Missing segmentation file {seg_path} for image ID {image_id}; skipping group.")
            missing_seg = True
            break

        # Load segmentation data, handling different formats
        seg_raw = np.load(seg_path, allow_pickle=True)
        if isinstance(seg_raw, np.ndarray) and seg_raw.shape == ():
            seg_raw = seg_raw.item()
        if isinstance(seg_raw, dict) and "masks" in seg_raw:
            seg_array = seg_raw["masks"]
        else:
            seg_array = seg_raw

        masks = extract_masks(seg_array)

        # Pad masks if their shape is different from the target shape for all channels
        padded_masks = []
        for mask in masks:
            if mask.shape != target_shape:
                # Create a padded mask with zeros
                padded_mask = np.zeros(target_shape, dtype=bool)
                # Calculate the padding needed
                pad_h = target_shape[0] - mask.shape[0]
                pad_w = target_shape[1] - mask.shape[1]
                # Apply padding - assuming padding is needed only at the bottom and right
                # If padding is needed differently, this logic will need adjustment.
                padded_mask[:mask.shape[0], :mask.shape[1]] = mask
                padded_masks.append(padded_mask)
            else:
                padded_masks.append(mask)
        seg_data[channel] = padded_masks


    if missing_seg:
        continue

    loaded_data[image_id] = {
        "image": img,
        "c1_masks": seg_data.get('C1', []),
        "c2_masks": seg_data.get('C2', []),
        "c3_masks": seg_data.get('C3', [])
    }

# At this point, the 'loaded_data' dictionary contains the image and extracted masks for each image_id
# where all required channel segmentation files were found.

100%|██████████| 18/18 [00:04<00:00,  3.75it/s]


In [17]:
loaded_data.keys()
loaded_data['C30000'].keys()

dict_keys(['image', 'c1_masks', 'c2_masks', 'c3_masks'])

### Classify masks by stain overlap

In [18]:
# iterate through loaded_data and 
# initialise image specific key for each image id: matched_iou_results[image_id] = {}

# after the dust settles 
# matched_iou_results contains the best IOU scores for each C1 mask against C2 and C3 for every image
# classified_masks contains the assigned category ID (0, 1, or 2) for each C1 mask for every image
matched_iou_results = {}
classified_masks = {}

for image_id, data in tqdm(loaded_data.items(), desc="Matching masks and calculating IOUs"):
    c1_masks = data["c1_masks"]
    c2_masks = data["c2_masks"]
    c3_masks = data["c3_masks"]

    matched_iou_results[image_id] = {}
    classified_masks[image_id] = []

    for i, c1_mask in enumerate(c1_masks):
        best_c2_iou = 0.0
        best_c3_iou = 0.0

        # Find best matching C2 mask
        for c2_mask in c2_masks:
            iou_val = iou(c1_mask, c2_mask)
            best_c2_iou = max(best_c2_iou, iou_val)

        # Find best matching C3 mask
        for c3_mask in c3_masks:
            iou_val = iou(c1_mask, c3_mask)
            best_c3_iou = max(best_c3_iou, iou_val)

        # Store the best IOUs for this C1 mask
        matched_iou_results[image_id][f"c1_mask_{i}"] = {
            "best_c2_iou": best_c2_iou,
            "best_c3_iou": best_c3_iou
        }

        # Classify the C1 mask based on best IOU
        category_id = 0  # Default to 'other'

        if best_c2_iou > IOU_THRESHOLD and best_c3_iou > IOU_THRESHOLD:
            # If both are above threshold, choose the one with the higher IOU
            if best_c2_iou > best_c3_iou:
                category_id = 1 # rbc
            else:
                category_id = 2 # cbc
        elif best_c2_iou > IOU_THRESHOLD:
            category_id = 1 # rbc
        elif best_c3_iou > IOU_THRESHOLD:
            category_id = 2 # cbc


        classified_masks[image_id].append(category_id)

# matched_iou_results now contains the best IOU scores for each C1 mask against C2 and C3 per image ID.
# classified_masks contains the assigned category ID for each C1 mask per image ID.

Matching masks and calculating IOUs: 100%|██████████| 18/18 [14:39<00:00, 48.86s/it]


In [19]:
matched_iou_results.keys()
print(matched_iou_results['C30000'].keys())
print(len(matched_iou_results['C30000'].keys()))

# finally they are not all zero :))

print(matched_iou_results['C30000']['c1_mask_0'])
print(matched_iou_results)

dict_keys(['c1_mask_0', 'c1_mask_1', 'c1_mask_2', 'c1_mask_3', 'c1_mask_4', 'c1_mask_5', 'c1_mask_6', 'c1_mask_7', 'c1_mask_8', 'c1_mask_9', 'c1_mask_10', 'c1_mask_11', 'c1_mask_12', 'c1_mask_13', 'c1_mask_14', 'c1_mask_15', 'c1_mask_16', 'c1_mask_17', 'c1_mask_18', 'c1_mask_19', 'c1_mask_20', 'c1_mask_21', 'c1_mask_22', 'c1_mask_23', 'c1_mask_24', 'c1_mask_25', 'c1_mask_26', 'c1_mask_27', 'c1_mask_28', 'c1_mask_29', 'c1_mask_30', 'c1_mask_31', 'c1_mask_32', 'c1_mask_33', 'c1_mask_34', 'c1_mask_35', 'c1_mask_36', 'c1_mask_37', 'c1_mask_38', 'c1_mask_39', 'c1_mask_40', 'c1_mask_41', 'c1_mask_42', 'c1_mask_43', 'c1_mask_44', 'c1_mask_45', 'c1_mask_46', 'c1_mask_47', 'c1_mask_48', 'c1_mask_49', 'c1_mask_50', 'c1_mask_51', 'c1_mask_52', 'c1_mask_53', 'c1_mask_54', 'c1_mask_55', 'c1_mask_56', 'c1_mask_57', 'c1_mask_58', 'c1_mask_59', 'c1_mask_60', 'c1_mask_61', 'c1_mask_62', 'c1_mask_63', 'c1_mask_64', 'c1_mask_65', 'c1_mask_66', 'c1_mask_67', 'c1_mask_68', 'c1_mask_69', 'c1_mask_70', 'c1_m

In [20]:
count_0, count1, count2 = 0,0,0
for val in classified_masks['C30000']:
  if 0 == val:
    count_0 += 1
  elif 1 == val:
    count1 += 1
  elif 2 == val:
    count2 += 1

print(f"count_0: {count_0}") # 80 other cells
print(f"count1: {count1}") # 56 cbcs
print(f"count2: {count2}") # 25 rbcs

count_0: 87
count1: 50
count2: 24


### patching process

In [21]:
# initialise the coco output structure
# initiliase counter for each image and patch 
# iterate through the loaded data_dict
# generate patches with coordinates and original mask indiced
# iterate through the generated patches and extract patch data 
# Add patch info to the coco output: id, file_name, height, and width updated to the images list 
# retrieve the og classifications from classified_masks 
# Iterate through Masks within the Patch and get og mask index and category id 
# Create and Add Annotation to COCO Output
# save the generated patch_coco_output to a JSON file . IN NEXT CELL


# Ensure the output directory exists
os.makedirs(OUTPUT_PATCH_ROOT, exist_ok=True)

patch_coco_output = {
    "images": [],
    "annotations": [],
    "categories": CATEGORIES,
}

patch_ann_id_counter = 0
patch_img_id_counter = 0

# Consolidate patch generation and annotation into a single loop
for image_id, data in tqdm(loaded_data.items(), desc="Generating and annotating image patches"):
    img = data["image"]
    c1_masks = data["c1_masks"]
    c2_masks = data["c2_masks"] # Keep C2 and C3 masks for potential future use if needed
    c3_masks = data["c3_masks"]

    # Generate patches from the original image and C1 masks, including coordinates and original mask indices
    patch_generator_with_coords = make_patches_with_coords(img, c1_masks, PATCH_SIZE, STRIDE)

    for patch_data_with_coords in patch_generator_with_coords:
        patch_img = patch_data_with_coords["patch_img"]
        patch_masks = patch_data_with_coords["masks"]
        patch_bboxes = patch_data_with_coords["bboxes"]
        y0 = patch_data_with_coords["y0"]  # Top-left y-coordinate of the patch in the original image
        x0 = patch_data_with_coords["x0"]  # Top-left x-coordinate of the patch in the original image
        original_mask_indices = patch_data_with_coords["original_mask_indices"] # Indices of original masks in c1_masks

        # Assign a unique ID for the current patch image
        patch_img_id = patch_img_id_counter
        patch_img_id_counter += 1

        # Save the patch image and get its relative path
        patch_file_name = save_patch(patch_img, OUTPUT_PATCH_ROOT, patch_img_id)

        # Add image information to the patch coco_output
        ph, pw = patch_img.shape[:2]
        patch_coco_output["images"].append({
            "id": patch_img_id,
            "file_name": patch_file_name,
            "height": ph,
            "width": pw,
        })

        # Classify each patch mask using the pre-calculated classifications
        # of their corresponding original masks.
        if image_id in classified_masks and len(classified_masks[image_id]) == len(c1_masks):
             image_classified_masks_original = classified_masks[image_id]
        else:
             print(f"⚠️ Classification data missing or mismatched for image ID {image_id}; skipping annotations for this patch.")
             continue

        for i, patch_mask in enumerate(patch_masks):
            original_index = original_mask_indices[i]
            category_id = image_classified_masks_original[original_index]

            # The bbox and area are already calculated for the patch
            bbox = patch_bboxes[i]
            area = int(np.sum(patch_mask))

            # Encode segmentation using RLE
            patch_mask_fortran = np.asfortranarray(patch_mask.astype(np.uint8))
            segmentation = mask_utils.encode(patch_mask_fortran)
            segmentation['counts'] = segmentation['counts'].decode('utf-8')

            # Add annotation to the patch coco_output
            patch_coco_output["annotations"].append({
                "id": patch_ann_id_counter,
                "image_id": patch_img_id,
                "category_id": category_id,
                "bbox": bbox,
                "area": area,
                "segmentation": segmentation,
                "iscrowd": 0, # Assuming each annotation is a single object
            })
            patch_ann_id_counter += 1

# Moved the saving logic outside the loop to save the complete COCO JSON at the end

  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
  return func(*args, **kwargs)
Generating and annotating image patches: 100%|██████████| 18/18 [00:21<00:00,  1.17s/it]


In [22]:
# create the annotations for the newly created patches 
'''
id: A unique ID for this specific annotation across the entire dataset.
image_id: The ID of the patch image that this annotation belongs to. This links the annotation to its corresponding image file.
category_id: The classification of the mask (0 for 'other', 1 for 'rbc', 2 for 'cbc'). This is retrieved from the classified_masks data based on the original mask's index.
bbox: The bounding box coordinates of the mask within the patch. This tells you the rectangular region containing the mask in the patch image.
area: The number of pixels covered by the mask within the patch.
segmentation: The RLE (Run-Length Encoding) representation of the mask's shape within the patch. This provides the precise pixel-level boundary of the object.
iscrowd: A flag indicating if the annotation is for a single object (0) or a group of objects. Here, it's set to 0 assuming each mask represents a single instance.
'''


for patch_data_with_coords in patch_generator_with_coords:
      patch_img = patch_data_with_coords["patch_img"]
      patch_masks = patch_data_with_coords["masks"]
      patch_bboxes = patch_data_with_coords["bboxes"]
      y0 = patch_data_with_coords["y0"]
      x0 = patch_data_with_coords["x0"]
      original_mask_indices = patch_data_with_coords["original_mask_indices"]

      patch_img_id = patch_img_id_counter
      patch_img_id_counter += 1

      patch_file_name = save_patch(patch_img, OUTPUT_PATCH_ROOT, patch_img_id)

      ph, pw = patch_img.shape[:2]
      patch_coco_output["images"].append({
          "id": patch_img_id,
          "file_name": patch_file_name,
          "height": ph,
          "width": pw,
      })

      # Now, classify each patch mask using the pre-calculated classifications
      # of their corresponding original masks.
      if image_id in classified_masks and len(classified_masks[image_id]) == len(c1_masks):
            image_classified_masks_original = classified_masks[image_id]
      else:
            print(f"Classification data missing or mismatched for image ID {image_id}; skipping annotations for this patch.")
            continue




In [23]:
'''
Iterates through the individual masks present in the current patch.
Retrieves the pre-calculated category ID for each mask.
Calculates the bounding box and area for the mask within the patch.
Encodes the mask's shape into the RLE format.
Constructs the complete COCO annotation dictionary for that specific mask.
Appends this annotation dictionary to the "annotations" list in the patch_coco_output.
'''

for i, patch_mask in enumerate(patch_masks):
    original_index = original_mask_indices[i]
    category_id = image_classified_masks_original[original_index]

    # include all the cell labels
    # if category_id in [0,1, 2]: # Removed the if condition
    # The bbox and area are already calculated for the patch
    bbox = patch_bboxes[i]
    area = int(np.sum(patch_mask))

    # Encode segmentation using RLE
    patch_mask_fortran = np.asfortranarray(patch_mask.astype(np.uint8))
    segmentation = mask_utils.encode(patch_mask_fortran)
    segmentation['counts'] = segmentation['counts'].decode('utf-8')

    # Add annotation to the patch coco_output
    patch_coco_output["annotations"].append({
        "id": patch_ann_id_counter,
        "image_id": patch_img_id,
        "category_id": category_id,
        "bbox": bbox,
        "area": area,
        "segmentation": segmentation,
        "iscrowd": 0,
    })
    patch_ann_id_counter += 1

### save json 

In [25]:
# Define the output path for the patch COCO JSON file with the new filename
PATCH_COCO_JSON_PATH = os.path.join(OUTPUT_PATCH_ROOT, '.json')

# Write the patch COCO output to a JSON file
with open(PATCH_COCO_JSON_PATH, 'w') as f:
    json.dump(patch_coco_output, f)

print(f"Patch COCO JSON file saved to {PATCH_COCO_JSON_PATH}")
print(f"Saved {patch_img_id_counter} patch images to {OUTPUT_PATCH_ROOT}")
print(f"Generated {patch_ann_id_counter} patch annotations.")

Patch COCO JSON file saved to C:\Users\shiva\OneDrive\Desktop\MASK-R-CNN-Vision-Biology-Lab\processed data\patches\.json
Saved 797 patch images to C:\Users\shiva\OneDrive\Desktop\MASK-R-CNN-Vision-Biology-Lab\processed data\patches
Generated 11762 patch annotations.
