# Custom Cellpose Model Training for Brown Nuclei

This notebook trains a custom Cellpose model using ImageJ ROI annotations.

## Workflow:
1. **Prepare Training Data**: Extract patches from ImageJ ROI positions
2. **Optional Manual Refinement**: Edit masks in napari
3. **Train Custom Model**: Fine-tune Cellpose on your data
4. **Apply to Full Image**: Use trained model for segmentation

## Prerequisites:
- Mark nuclei in ImageJ (Multi-point tool or ROI Manager)
- Save ROIs as .zip file: `Analyze > Tools > ROI Manager > More > Save`

## Setup and Imports

In [None]:
# Install required packages if needed
# !pip install read-roi cellpose napari scikit-learn scikit-image tifffile

In [None]:
import numpy as np
import tifffile
from skimage import io, color
from skimage.color import rgb2gray
from skimage.draw import disk
import os
import torch
from cellpose import models, io as cellpose_io, train
from read_roi import read_roi_file, read_roi_zip
import napari

# Check for GPU
use_gpu = torch.cuda.is_available()
print(f"GPU Available: {use_gpu}")
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")

## Step 1: Prepare Training Data from ImageJ ROIs

In [None]:
# --- CONFIGURATION ---
IMAGE_PATH = r"u:\zyu\Jupyter\xinjian\your_image.jpg"  # Your HE image
ROI_PATH = r"u:\zyu\Jupyter\xinjian\RoiSet.zip"      # ImageJ ROI file (.zip or .roi)
OUTPUT_DIR = r"u:\zyu\Jupyter\xinjian\training_data" # Output directory
CROP_SIZE = 64          # Size of training patches (64x64 or 128x128)
NUCLEUS_RADIUS = 8      # Approximate nucleus radius in pixels
TRAIN_TEST_SPLIT = 0.8  # 80% training, 20% testing

print(f"Configuration:")
print(f"  Image: {IMAGE_PATH}")
print(f"  ROIs: {ROI_PATH}")
print(f"  Output: {OUTPUT_DIR}")
print(f"  Crop Size: {CROP_SIZE}x{CROP_SIZE}")
print(f"  Nucleus Radius: {NUCLEUS_RADIUS} pixels")

In [None]:
# Create output directories
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, "train"), exist_ok=True)
os.makedirs(os.path.join(OUTPUT_DIR, "test"), exist_ok=True)

print(f"Created directories:")
print(f"  {os.path.join(OUTPUT_DIR, 'train')}")
print(f"  {os.path.join(OUTPUT_DIR, 'test')}")

In [None]:
# Load image
img_rgb = io.imread(IMAGE_PATH)

# Handle different image formats
if img_rgb.ndim == 3 and img_rgb.shape[0] < 5:
    img_rgb = np.transpose(img_rgb, (1, 2, 0))
if img_rgb.shape[-1] == 4:
    img_rgb = img_rgb[..., :3]

print(f"Loaded image: {img_rgb.shape} | dtype: {img_rgb.dtype}")
print(f"Value range: [{img_rgb.min()}, {img_rgb.max()}]")

In [None]:
# Load ROIs from ImageJ
if not os.path.exists(ROI_PATH):
    raise FileNotFoundError(f"ROI file not found: {ROI_PATH}")

if ROI_PATH.endswith('.zip'):
    rois = read_roi_zip(ROI_PATH)
else:
    rois = read_roi_file(ROI_PATH)

print(f"Loaded {len(rois)} ROIs from ImageJ")
print(f"ROI names: {list(rois.keys())[:5]}...")  # Show first 5

In [None]:
# Extract nuclei positions from ROIs
nuclei_positions = []

for roi_name, roi_data in rois.items():
    roi_type = roi_data.get('type', 'unknown')
    
    if roi_type == 'point':
        # Point ROI (most common for Multi-point tool)
        x = roi_data['x']
        y = roi_data['y']
        if isinstance(x, list):
            for xi, yi in zip(x, y):
                nuclei_positions.append((int(yi), int(xi)))  # (row, col)
        else:
            nuclei_positions.append((int(y), int(x)))
    
    elif roi_type in ['oval', 'rectangle']:
        # Use center of shape
        x = roi_data['left'] + roi_data['width'] / 2
        y = roi_data['top'] + roi_data['height'] / 2
        nuclei_positions.append((int(y), int(x)))
    
    elif roi_type in ['freehand', 'polygon', 'traced']:
        # Use centroid of polygon
        x = np.mean(roi_data['x'])
        y = np.mean(roi_data['y'])
        nuclei_positions.append((int(y), int(x)))
    
    else:
        print(f"Warning: Unsupported ROI type '{roi_type}' for {roi_name}")

print(f"Extracted {len(nuclei_positions)} nuclei positions")

# Remove duplicates (same position)
nuclei_positions = list(set(nuclei_positions))
print(f"After removing duplicates: {len(nuclei_positions)} unique positions")

In [None]:
# Crop patches around each nucleus and create masks
half_size = CROP_SIZE // 2
cropped_images = []
cropped_masks = []
valid_positions = []

for idx, (row, col) in enumerate(nuclei_positions):
    # Define crop boundaries
    r_min = row - half_size
    r_max = row + half_size
    c_min = col - half_size
    c_max = col + half_size
    
    # Skip if too close to edge
    if r_min < 0 or r_max > img_rgb.shape[0] or c_min < 0 or c_max > img_rgb.shape[1]:
        continue
    
    # Crop image
    crop = img_rgb[r_min:r_max, c_min:c_max, :].copy()
    
    # Create mask with nucleus at center
    mask = np.zeros((CROP_SIZE, CROP_SIZE), dtype=np.uint16)
    center_r = half_size
    center_c = half_size
    
    # Create circular nucleus mask
    rr, cc = disk((center_r, center_c), NUCLEUS_RADIUS, shape=(CROP_SIZE, CROP_SIZE))
    mask[rr, cc] = 1  # Label ID = 1
    
    cropped_images.append(crop)
    cropped_masks.append(mask)
    valid_positions.append((row, col))

print(f"Generated {len(cropped_images)} valid training patches")
print(f"Skipped {len(nuclei_positions) - len(cropped_images)} patches (too close to edge)")

In [None]:
# Split into train/test sets
n_train = int(TRAIN_TEST_SPLIT * len(cropped_images))
n_test = len(cropped_images) - n_train

# Shuffle indices
indices = np.random.permutation(len(cropped_images))
train_indices = indices[:n_train]
test_indices = indices[n_train:]

train_images = [cropped_images[i] for i in train_indices]
train_masks = [cropped_masks[i] for i in train_indices]
test_images = [cropped_images[i] for i in test_indices]
test_masks = [cropped_masks[i] for i in test_indices]

print(f"Training set: {len(train_images)} patches")
print(f"Test set: {len(test_images)} patches")
print(f"Split ratio: {TRAIN_TEST_SPLIT*100:.0f}% / {(1-TRAIN_TEST_SPLIT)*100:.0f}%")

In [None]:
# Save training data
# Cellpose expects: image.tif and image_masks.tif

for idx, (img, mask) in enumerate(zip(train_images, train_masks)):
    tifffile.imwrite(
        os.path.join(OUTPUT_DIR, "train", f"nuclei_{idx:04d}.tif"), 
        img
    )
    tifffile.imwrite(
        os.path.join(OUTPUT_DIR, "train", f"nuclei_{idx:04d}_masks.tif"), 
        mask
    )

for idx, (img, mask) in enumerate(zip(test_images, test_masks)):
    tifffile.imwrite(
        os.path.join(OUTPUT_DIR, "test", f"nuclei_{idx:04d}.tif"), 
        img
    )
    tifffile.imwrite(
        os.path.join(OUTPUT_DIR, "test", f"nuclei_{idx:04d}_masks.tif"), 
        mask
    )

print(f"\nSaved training data to: {OUTPUT_DIR}")
print(f"  Train: {len(train_images)} image/mask pairs")
print(f"  Test: {len(test_images)} image/mask pairs")
print(f"\nFile naming convention: nuclei_XXXX.tif and nuclei_XXXX_masks.tif")

In [None]:
# View samples in napari
viewer = napari.Viewer()

# Show first 5 training examples
n_show = min(5, len(train_images))
for i in range(n_show):
    viewer.add_image(train_images[i], name=f"Train_{i}_Image")
    viewer.add_labels(train_masks[i], name=f"Train_{i}_Mask")

print(f"\nOpened napari with {n_show} training examples")
print("Review the masks to ensure they are correct")
print("Close napari window to continue")

## Step 2: Manual Mask Refinement (Optional)

If the circular masks are not accurate enough, you can manually refine them in napari.

In [None]:
# --- OPTIONAL: Manually refine individual masks ---
# Load specific image for editing

EDIT_INDEX = 0  # Change this to edit different images

edit_img = tifffile.imread(os.path.join(OUTPUT_DIR, "train", f"nuclei_{EDIT_INDEX:04d}.tif"))
edit_mask = tifffile.imread(os.path.join(OUTPUT_DIR, "train", f"nuclei_{EDIT_INDEX:04d}_masks.tif"))

viewer = napari.Viewer()
viewer.add_image(edit_img, name="Image")
labels_layer = viewer.add_labels(edit_mask, name="Mask (Editable)")

print(f"\nInstructions for manual editing:")
print(f"1. Select the 'Mask (Editable)' layer")
print(f"2. Use 'paint' tool to draw accurate nucleus boundaries")
print(f"3. Use 'erase' tool to remove incorrect regions")
print(f"4. Each nucleus should have label ID = 1")
print(f"5. When done, execute the next cell to save")
print(f"\nEditing: nuclei_{EDIT_INDEX:04d}.tif")

In [None]:
# Save edited mask
if 'viewer' in locals() and viewer.layers:
    edited_mask = viewer.layers['Mask (Editable)'].data
    tifffile.imwrite(
        os.path.join(OUTPUT_DIR, "train", f"nuclei_{EDIT_INDEX:04d}_masks.tif"), 
        edited_mask.astype(np.uint16)
    )
    print(f"Saved edited mask for nuclei_{EDIT_INDEX:04d}")
    viewer.close()
else:
    print("No viewer found - run previous cell first")

## Step 3: Train Custom Cellpose Model

In [None]:
# Training configuration
MODEL_NAME = "nuclei_brown_custom"
N_EPOCHS = 500           # More epochs = better performance (100-1000)
LEARNING_RATE = 0.1      # Default is 0.1-0.2
WEIGHT_DECAY = 0.0001    # Regularization
BATCH_SIZE = 8           # Adjust based on GPU memory

TRAIN_DIR = os.path.join(OUTPUT_DIR, "train")
TEST_DIR = os.path.join(OUTPUT_DIR, "test")

print(f"Training Configuration:")
print(f"  Model Name: {MODEL_NAME}")
print(f"  Epochs: {N_EPOCHS}")
print(f"  Learning Rate: {LEARNING_RATE}")
print(f"  Batch Size: {BATCH_SIZE}")
print(f"  GPU: {use_gpu}")

In [None]:
# Load training data using Cellpose's loader
output = cellpose_io.load_train_test_data(TRAIN_DIR, test_dir=TEST_DIR, mask_filter='_masks')
train_images, train_labels, train_files, test_images, test_labels, test_files = output

print(f"\nLoaded training data:")
print(f"  Training images: {len(train_images)}")
print(f"  Test images: {len(test_images)}")
print(f"  Image shape: {train_images[0].shape}")
print(f"  Mask shape: {train_labels[0].shape}")
print(f"  Unique labels in first mask: {np.unique(train_labels[0])}")

In [None]:
# Initialize model - start from pretrained 'nuclei' model for transfer learning
model = models.CellposeModel(gpu=use_gpu, model_type='nuclei')

print(f"Initialized Cellpose model: nuclei (pretrained)")
print(f"Starting training...")
print(f"This may take 30 minutes to 2 hours depending on data size and GPU")

In [None]:
# Train the model
# Note: Training will print progress during execution

model_path, train_losses, test_losses = model.train(
    train_data=train_images,
    train_labels=train_labels,
    test_data=test_images,
    test_labels=test_labels,
    channels=[0, 0],          # Grayscale: [cytoplasm=0, nucleus=0]
    save_path=OUTPUT_DIR,
    n_epochs=N_EPOCHS,
    learning_rate=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY,
    batch_size=BATCH_SIZE,
    model_name=MODEL_NAME,
    save_every=50             # Save checkpoint every 50 epochs
)

print(f"\n{'='*60}")
print(f"Training Complete!")
print(f"Model saved to: {model_path}")
print(f"Final train loss: {train_losses[-1]:.4f}")
print(f"Final test loss: {test_losses[-1]:.4f}")
print(f"{'='*60}")

In [None]:
# Save training curves data
import json

training_info = {
    'model_name': MODEL_NAME,
    'n_epochs': N_EPOCHS,
    'learning_rate': LEARNING_RATE,
    'batch_size': BATCH_SIZE,
    'train_losses': train_losses,
    'test_losses': test_losses,
    'n_train_images': len(train_images),
    'n_test_images': len(test_images),
    'model_path': model_path
}

with open(os.path.join(OUTPUT_DIR, 'training_info.json'), 'w') as f:
    json.dump(training_info, f, indent=2)

print(f"Saved training info to: {os.path.join(OUTPUT_DIR, 'training_info.json')}")

## Step 4: Apply Trained Model to Full Image

In [None]:
# Load the trained model
MODEL_PATH = model_path  # From training, or specify path manually

custom_model = models.CellposeModel(gpu=use_gpu, pretrained_model=MODEL_PATH)

print(f"Loaded custom model from: {MODEL_PATH}")
print(f"Model diameter: {custom_model.diam_labels if hasattr(custom_model, 'diam_labels') else 'auto'}")

In [None]:
# Preprocess full image for nuclei detection
# Use color clustering to isolate brown nuclei

from sklearn.cluster import KMeans

# Reshape for clustering
pixels = img_rgb.reshape(-1, 3).astype(float)

# K-means with 3 clusters: background, muscle, nuclei
print("Performing K-means clustering to separate tissue components...")
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
labels = kmeans.fit_predict(pixels)
cluster_centers = kmeans.cluster_centers_

# Identify clusters by intensity (darkest = nuclei)
intensities = np.mean(cluster_centers, axis=1)
background_label = np.argmax(intensities)  # Brightest
nuclei_label = np.argmin(intensities)       # Darkest
muscle_label = [i for i in range(3) if i not in [background_label, nuclei_label]][0]

print(f"\nCluster Centers (RGB):")
print(f"  Background (label {background_label}): {cluster_centers[background_label]}")
print(f"  Muscle (label {muscle_label}): {cluster_centers[muscle_label]}")
print(f"  Nuclei (label {nuclei_label}): {cluster_centers[nuclei_label]}")

# Create masks
label_image = labels.reshape(img_rgb.shape[0], img_rgb.shape[1])
nuclei_mask = (label_image == nuclei_label)
background_mask = (label_image == background_label)

print(f"\nPixel distribution:")
print(f"  Background: {background_mask.sum()} ({100*background_mask.sum()/background_mask.size:.1f}%)")
print(f"  Nuclei: {nuclei_mask.sum()} ({100*nuclei_mask.sum()/nuclei_mask.size:.1f}%)")

In [None]:
# Create nuclei channel for Cellpose
# Convert to grayscale and invert so nuclei are bright on dark background

nuclei_channel = rgb2gray(img_rgb)

# Zero out background
nuclei_channel[background_mask] = nuclei_channel.max()

# Enhance nuclei regions
nuclei_channel_enhanced = nuclei_channel.copy()
nuclei_channel_enhanced[nuclei_mask] = nuclei_channel[nuclei_mask].min()

# Invert: dark nuclei -> bright nuclei
nuclei_norm = 1.0 - (nuclei_channel_enhanced - nuclei_channel_enhanced.min()) / (nuclei_channel_enhanced.max() - nuclei_channel_enhanced.min())

# Convert to uint8
nuclei_norm = (nuclei_norm * 255).astype(np.uint8)

print(f"Nuclei channel prepared:")
print(f"  Shape: {nuclei_norm.shape}")
print(f"  Range: [{nuclei_norm.min()}, {nuclei_norm.max()}]")
print(f"  Mean: {nuclei_norm.mean():.1f}")

In [None]:
# View preprocessing results in napari
viewer = napari.Viewer()
viewer.add_image(img_rgb, name="Original Image", rgb=True)
viewer.add_labels(label_image, name="K-means Clustering")
viewer.add_image(nuclei_norm, name="Nuclei Channel (for Cellpose)", colormap='gray')

print("Opened napari to review preprocessing")
print("Check if nuclei channel looks good before running Cellpose")
print("Close napari window to continue")

In [None]:
# Run custom model on full image
print("Running custom Cellpose model on full image...")
print("This may take several minutes depending on image size...")

masks_nuclei, flows, styles = custom_model.eval(
    nuclei_norm,
    diameter=None,              # Use diameter from training
    channels=[0, 0],            # Grayscale
    flow_threshold=0.4,         # Default: 0.4
    cellprob_threshold=0.0,     # Default: 0.0
    min_size=15                 # Minimum nucleus size in pixels
)

print(f"\nSegmentation complete!")
print(f"Detected {masks_nuclei.max()} nuclei")
print(f"Mask shape: {masks_nuclei.shape}")
print(f"Mask dtype: {masks_nuclei.dtype}")

In [None]:
# Save results
output_masks_path = os.path.join(OUTPUT_DIR, "full_image_nuclei_masks.tif")
output_overlay_path = os.path.join(OUTPUT_DIR, "full_image_nuclei_overlay.tif")

# Save masks
tifffile.imwrite(output_masks_path, masks_nuclei.astype(np.uint16))

print(f"\nSaved results:")
print(f"  Masks: {output_masks_path}")
print(f"  Total nuclei detected: {masks_nuclei.max()}")

In [None]:
# View final results in napari
viewer = napari.Viewer()

# Add layers
viewer.add_image(img_rgb, name="Original Image", rgb=True)
viewer.add_image(nuclei_norm, name="Nuclei Channel", colormap='gray', visible=False)
viewer.add_labels(masks_nuclei, name="Nuclei Masks (Custom Model)")

print(f"\nOpened napari with final results")
print(f"Detected {masks_nuclei.max()} nuclei using custom trained model")
print(f"\nTips:")
print(f"  - Toggle layer visibility to compare")
print(f"  - Adjust opacity of masks layer for better overlay view")
print(f"  - Use 'pick' tool to inspect individual nucleus labels")

## Step 5: Evaluate Model Performance (Optional)

In [None]:
# Calculate basic statistics
from skimage.measure import regionprops

props = regionprops(masks_nuclei)

areas = [p.area for p in props]
eccentricities = [p.eccentricity for p in props]

print(f"\nNuclei Statistics (n={len(props)}):")
print(f"  Area (pixels):")
print(f"    Mean: {np.mean(areas):.1f}")
print(f"    Std: {np.std(areas):.1f}")
print(f"    Min: {np.min(areas)}")
print(f"    Max: {np.max(areas)}")
print(f"  Eccentricity:")
print(f"    Mean: {np.mean(eccentricities):.3f}")
print(f"    Std: {np.std(eccentricities):.3f}")

# Store statistics
nuclei_stats = {
    'n_nuclei': len(props),
    'area_mean': float(np.mean(areas)),
    'area_std': float(np.std(areas)),
    'eccentricity_mean': float(np.mean(eccentricities)),
    'eccentricity_std': float(np.std(eccentricities))
}

with open(os.path.join(OUTPUT_DIR, 'nuclei_statistics.json'), 'w') as f:
    json.dump(nuclei_stats, f, indent=2)

print(f"\nSaved statistics to: {os.path.join(OUTPUT_DIR, 'nuclei_statistics.json')}")

## Summary

Training complete! Key outputs:

1. **Trained Model**: `{OUTPUT_DIR}/{MODEL_NAME}`
2. **Nuclei Masks**: `{OUTPUT_DIR}/full_image_nuclei_masks.tif`
3. **Training Info**: `{OUTPUT_DIR}/training_info.json`
4. **Statistics**: `{OUTPUT_DIR}/nuclei_statistics.json`

### Next Steps:
- If results are not satisfactory, add more annotations and retrain
- Adjust `flow_threshold` and `cellprob_threshold` in eval() for fine-tuning
- Use this model on other similar images
- Combine with muscle cell segmentation for quantification