In [2]:
%pip install -U -q transformers[torch] evaluate timm albumentations accelerate huggingface_hub

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m536.7/536.7 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m87.3 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25h

In [3]:
import json
import torch
import datasets
import requests
import evaluate
import numpy as np
import huggingface_hub
from PIL import Image
import albumentations as A
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from typing import Tuple, Any
from dataclasses import dataclass
from datasets import load_dataset, load_from_disk
import os
import matplotlib.patches as mpatches
from huggingface_hub import hf_hub_download
from torch.utils.data import Dataset, DataLoader
from transformers import (
    MaskFormerImageProcessor,
    AutoImageProcessor,
    MaskFormerForInstanceSegmentation,
)
from transformers import MaskFormerConfig

torch.manual_seed(42)

<torch._C.Generator at 0x7ad32938a610>

In [4]:
# For Can RAT ontology

names= ["CC Clyr Can", "TT Temp Tar", "RAT"]

categories = {
    "CC Clyr Can": 0,  # No need for 'unlabeled' at index 0
    "TT Temp Tar": 1,
    "RAT": 2
}

id2label = {0: "CC Clyr Can", 1: "TT Temp Tar", 2: "RAT"}
label2id = {v: k for k, v in id2label.items()}

print(id2label)

{0: 'CC Clyr Can', 1: 'TT Temp Tar', 2: 'RAT'}


In [5]:
# Pull datasets from local directories for images and dataset
import os
local_image_dir = r'//content//data//Training_Data//Jan_2026//Can_and_RAT//images'
local_dataset_dir = r'//content//data//Training_Data//Jan_2026//Can_and_RAT'

hf_dataset = load_from_disk(os.path.join(local_dataset_dir, "hf_mask2former_can_rat_dataset"))
train_ds = hf_dataset['train']
val_ds = hf_dataset['validation']

FileNotFoundError: Directory //content//data//Training_Data//Jan_2026//Can_and_RAT/hf_mask2former_can_rat_dataset not found

In [None]:
# Pull data from google drive if stored there
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Copy dataset from Drive to local runtime
import shutil
import os

# Source (Google Drive)
drive_dataset_dir = '/content/drive/MyDrive/Training_Data/Jan_2026/Can_and_RAT'

# Destination (local runtime - fast!)
local_dataset_dir = '/content/Training_Data/Jan_2026/Can_and_RAT'

# Copy to local
print("Copying dataset from Google Drive to local runtime...")
shutil.copytree(drive_dataset_dir, local_dataset_dir)
print("✓ Copy complete!")

# Now load from LOCAL path (fast)
hf_dataset = load_from_disk(os.path.join(local_dataset_dir, "hf_mask2former_can_rat_dataset"))
train_ds = hf_dataset['train']
val_ds = hf_dataset['validation']

In [None]:
def show_samples(dataset: datasets.Dataset, n: int = 5):
    """
    Displays 'n' samples from the dataset.
    ----
    Args:
      - dataset: The dataset which should contain 'pixel_values' and 'label' in its items.
      - n (int): Number of samples to display.

    """
    if n > len(dataset):
        raise ValueError("n is larger than the dataset size")

    fig, axs = plt.subplots(n, 2, figsize=(10, 5 * n))

    for i in range(n):
        sample = dataset[i]
        image = np.array(sample["pixel_values"])
        label = np.array(sample["label"])

        axs[i, 0].imshow(image)
        axs[i, 0].set_title("Image")
        axs[i, 0].axis("off")

        axs[i, 1].imshow(image)
        axs[i, 1].imshow(label/len(names), cmap="nipy_spectral", alpha=0.5)
        axs[i, 1].set_title("Segmentation Map")
        axs[i, 1].axis("off")

    plt.tight_layout()
    plt.show()

In [None]:
show_samples(train_ds, n=5)

In [None]:
preprocessor = MaskFormerImageProcessor(
    ignore_index=255, #this was originally set to zero which meant background was ignored!
    do_reduce_labels=False,
    do_resize=False,
    do_rescale=False,
    do_normalize=False,
)
ade_mean = np.array([123.675, 116.280, 103.530]) / 255
ade_std = np.array([58.395, 57.120, 57.375]) / 255

train_transform = A.Compose(
    [
        A.SmallestMaxSize(max_size=512),  # Resize the image to have the smallest side be 512 while maintaining aspect ratio.
        A.RandomCrop(width=512, height=512),
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=ade_mean, std=ade_std),
    ]
)

test_transform = A.Compose(
    [
        A.Resize(width=512, height=512),
        A.Normalize(mean=ade_mean, std=ade_std),
    ]
)


@dataclass
class SegmentationDataInput:
    original_image: np.ndarray
    transformed_image: np.ndarray
    original_segmentation_map: np.ndarray
    transformed_segmentation_map: np.ndarray


class SemanticSegmentationDataset(Dataset):
    def __init__(self, dataset: datasets.Dataset, transform: Any) -> None:
        """
        Dataset for Semantic Segmentation.
        ----
        Args:
          - dataset: A dataset containing images and segmentation maps.
          - transform: A transformation function to apply to the images and segmentation maps.
        """
        self.dataset = dataset
        self.transform = transform

    def __len__(self) -> int:
        return len(self.dataset)

    def __getitem__(self, idx: int) -> Tuple[np.ndarray, np.ndarray]:
        sample = self.dataset[idx]
        original_image = np.array(sample["pixel_values"])
        original_segmentation_map = np.array(sample["label"])

        transformed = self.transform(
            image=original_image, mask=original_segmentation_map
        )
        transformed_image = transformed["image"].transpose(
            2, 0, 1
        )  # Transpose to channel-first format
        transformed_segmentation_map = transformed["mask"]

        return SegmentationDataInput(
            original_image=original_image,
            transformed_image=transformed_image,
            original_segmentation_map=original_segmentation_map,
            transformed_segmentation_map=transformed_segmentation_map,
        )


def collate_fn(batch: SegmentationDataInput) -> dict:
    original_images = [sample.original_image for sample in batch]
    transformed_images = [sample.transformed_image for sample in batch]
    original_segmentation_maps = [sample.original_segmentation_map for sample in batch]
    transformed_segmentation_maps = [
        sample.transformed_segmentation_map for sample in batch
    ]

    preprocessed_batch = preprocessor(
        transformed_images,
        segmentation_maps=transformed_segmentation_maps,
        return_tensors="pt",
    )

    preprocessed_batch["original_images"] = original_images
    preprocessed_batch["original_segmentation_maps"] = original_segmentation_maps

    return preprocessed_batch

In [None]:
train_dataset = SemanticSegmentationDataset(train_ds, transform=train_transform)
val_dataset = SemanticSegmentationDataset(val_ds, transform=train_transform)
test_dataset = SemanticSegmentationDataset(val_ds, transform=test_transform)#Need a separate test set eventually

# Prepare Dataloaders
train_dataloader = DataLoader(
    train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn
)
val_dataloader = DataLoader(
    val_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn
)
test_dataloader = DataLoader(
    test_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn
)

In [None]:
# Sanity check 1 - spatial dimensions
sample = next(iter(train_dataloader))
print(
    {
        key: value[0].shape if isinstance(value, list) else value.shape
        for key, value in sample.items()
    }
)

In [None]:
# Sanity check 2 - denormalization and visualization
def denormalize_image(image, mean, std):
    """
    Denormalizes a normalized image.
    ----
    Args:
     - image (numpy.ndarray): The normalized image.
     - mean (list or numpy.ndarray): The mean used for normalization.
     - std (list or numpy.ndarray): The standard deviation used for normalization.

    """
    unnormalized_image = (image * std[:, None, None]) + mean[:, None, None]
    unnormalized_image = (unnormalized_image * 255).numpy().astype(np.uint8)
    unnormalized_image = np.moveaxis(unnormalized_image, 0, -1)
    return unnormalized_image


denormalized_image = denormalize_image(sample["pixel_values"][0], ade_mean, ade_std)
pil_image = Image.fromarray(denormalized_image)
pil_image

In [None]:
# Sanity check 3 - mask labels consistent to class labels
labels = [id2label[label] for label in sample["class_labels"][0].tolist()]
print(labels)

In [None]:
# Sanity check 4 - mask visualization
def visualize_mask(sample, labels, label_name):
    print(f"Category: {label_name}")
    idx = labels.index(label_name)

    visual_mask = (sample["mask_labels"][0][idx].bool().numpy() * 255).astype(np.uint8)
    return Image.fromarray(visual_mask)


In [None]:
# Sanity check 5 - visualize_mask
visualize_mask(sample, labels, labels[0])

In [None]:
from transformers import AutoImageProcessor, Mask2FormerForUniversalSegmentation

#output_dir = r'/content/drive/My Drive/AI Models/Mask2Former'
#model_iteration = os.path.join(output_dir,'27March_3')
#processor = AutoImageProcessor.from_pretrained(model_iteration)
#model = MaskFormerForInstanceSegmentation.from_pretrained(
 #   model_iteration, id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True
#)
#model = Mask2FormerForUniversalSegmentation.from_pretrained(
#    model_iteration, id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True
#)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#processor = AutoImageProcessor.from_pretrained("facebook/mask2former-swin-tiny-coco-instance")

processor = AutoImageProcessor.from_pretrained("facebook/mask2former-swin-base-ade-semantic")
model = Mask2FormerForUniversalSegmentation.from_pretrained(
   "facebook/mask2former-swin-base-ade-semantic", id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True
)

In [None]:
processor = AutoImageProcessor.from_pretrained("facebook/maskformer-swin-base-coco")
model = MaskFormerForInstanceSegmentation.from_pretrained(
    "facebook/maskformer-swin-base-coco", id2label=id2label, label2id=label2id, ignore_mismatched_sizes=True
)

In [None]:
model.model.pixel_level_module.encoder.encoder.layers[4]

In [None]:
# freeze the backbone and pixel decoder
# pixel level module contains both the backbone and the pixel decoder
for param in model.model.pixel_level_module.parameters():
    param.requires_grad = False

# Confirm that the parameters are correctly frozen
for name, param in model.model.pixel_level_module.named_parameters():
    assert not param.requires_grad

In [None]:
def unfreeze_gradually(model, num_backbone_stages_to_unfreeze=2):
    """
    Unfreezes the last few blocks of the backbone and pixel decoder gradually.

    Args:
        model: The MaskFormerForInstanceSegmentation model.
        num_backbone_stages_to_unfreeze: Number of backbone stages to unfreeze (from the end).
        num_pixel_decoder_layers_to_unfreeze: Number of pixel decoder layers to unfreeze (from the end).
    """

    # 1. Access the Backbone and Pixel Decoder
    backbone = model.model.pixel_level_module.encoder.model.encoder  # Access encoder layers within backbone
    pixel_decoder = model.model.pixel_level_module.decoder  # Access decoder

    # 2. Unfreeze Backbone Stages
    for i in range(len(backbone.layers) - num_backbone_stages_to_unfreeze, len(backbone.layers)):
        for param in backbone.layers[i].parameters():
            param.requires_grad = True

    # 3. Unfreeze Pixel Decoder Layers
    # The pixel decoder in MaskFormer is a single module, not a stack of layers.
    # So, we directly unfreeze its parameters.
    for param in pixel_decoder.parameters():  # Corrected line
        param.requires_grad = True

    # 4. Verify Unfreezing (Optional)
    for name, param in model.named_parameters():
        if param.requires_grad:
            print(f"Unfrozen: {name}")

In [None]:
unfreeze_gradually(model, num_backbone_stages_to_unfreeze=4)

In [None]:
#unfreeze pixel decoder
for param in pixel_decoder.parameters():  # Corrected line
    param.requires_grad = True

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
metric = evaluate.load("mean_iou")


def evaluate_model(
    model: MaskFormerForInstanceSegmentation,
    dataloader: DataLoader,
    preprocessor: AutoImageProcessor,
    metric: Any,
    id2label: dict,
    max_batches=None,
):
    """
    Evaluates the given model using the specified dataloader and computes the mean Intersection over Union (IoU).
    ----
    Args:
      - model (MaskFormerForInstanceSegmentation): The trained model to be evaluated.
      - dataloader (DataLoader): DataLoader containing the dataset for evaluation.
      - preprocessor (AutoImageProcessor): The preprocessor used for post-processing the model outputs.
      - metric (Any): Metric instance used for calculating IoU.
      - id2label (dict): Dictionary mapping class ids to their corresponding labels.
      - max_batches (int, optional): Maximum number of batches to evaluate. If None, evaluates on the entire validation dataset.

    Returns:
    float: The mean IoU calculated over the specified number of batches.
    """
    model.eval()
    running_iou = 0
    num_batches = 0
    with torch.no_grad():
        for idx, batch in enumerate(tqdm(dataloader)):
            if max_batches and idx >= max_batches:
                break

            pixel_values = batch["pixel_values"].to(device)
            outputs = model(pixel_values=pixel_values)

            original_images = batch["original_images"]
            target_sizes = [
                (image.shape[0], image.shape[1]) for image in original_images
            ]

            predicted_segmentation_maps = (
                preprocessor.post_process_semantic_segmentation(
                    outputs, target_sizes=target_sizes
                )
            )

            ground_truth_segmentation_maps = batch["original_segmentation_maps"]
            metric.add_batch(
                references=ground_truth_segmentation_maps,
                predictions=predicted_segmentation_maps,
            )

            running_iou += metric.compute(num_labels=len(id2label), ignore_index=0)[
                "mean_iou"
            ]
            num_batches += 1

    mean_iou = running_iou / num_batches
    return mean_iou


def train_model(
    model: MaskFormerForInstanceSegmentation,
    train_dataloader: DataLoader,
    val_dataloader: DataLoader,
    preprocessor: AutoImageProcessor,
    metric: AutoImageProcessor,
    id2label: dict,
    num_epochs=20,
    learning_rate=5e-5,
    log_interval=100,
):
    """
    Trains the MaskFormer model for semantic segmentation over a specified number of epochs and evaluates it on a validation set.
    ----
    Args:
      - model (MaskFormerForInstanceSegmentation): The model to be trained.
      - train_dataloader (DataLoader): DataLoader for the training data.
      - val_dataloader (DataLoader): DataLoader for the validation data.
      - preprocessor (AutoImageProcessor): The preprocessor used for preparing the data.
      - metric (Any): Metric instance used for calculating performance metrics.
      - id2label (dict): Dictionary mapping class IDs to their corresponding labels.
      - num_epochs (int): Number of epochs to train the model.
      - learning_rate (float): Learning rate for the optimizer.
      - log_interval (int): Interval (in number of batches) at which to log training progress.

    """
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        print(f"Current epoch: {epoch+1}/{num_epochs}")
        model.train()

        running_loss = 0.0
        num_samples = 0

        for idx, batch in enumerate(tqdm(train_dataloader)):
            optimizer.zero_grad()

            outputs = model(
                pixel_values=batch["pixel_values"].to(device),
                mask_labels=[labels.to(device) for labels in batch["mask_labels"]],
                class_labels=[labels.to(device) for labels in batch["class_labels"]],
            )

            loss = outputs.loss
            loss.backward()

            batch_size = batch["pixel_values"].size(0)
            running_loss += loss.item()
            num_samples += batch_size

            if idx % log_interval == 0 and idx > 0:
                print(f"Iteration {idx} - loss: {running_loss/num_samples}")

            optimizer.step()
        val_mean_iou = evaluate_model(
            model, val_dataloader, preprocessor, metric, id2label, max_batches=6
        )
        print(f"Validation Mean IoU: {val_mean_iou}")

In [None]:
train_model(
    model,
    train_dataloader,
    val_dataloader,
    preprocessor,
    metric,
    id2label,
    num_epochs=1,
    log_interval=100,
)

In [None]:
from tqdm.auto import tqdm

def train_model_with_schedule(model, train_dataloader, val_dataloader, preprocessor, metric, id2label, train_schedule):
    output_dir = r'/content/drive/My Drive/AI Models/Mask2Former_base'  # Or your preferred output directory
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    backbone = model.model.pixel_level_module.encoder.encoder  # Access encoder layers within backbone
    pixel_decoder = model.model.pixel_level_module.decoder  # Access decoder

    #starting point is frozen pixel_decoder and backbone
    for param in model.model.pixel_level_module.parameters():
        param.requires_grad = False

    # Confirm that the parameters are correctly frozen
    for name, param in model.model.pixel_level_module.named_parameters():
        assert not param.requires_grad

    #for phase_index, phase in enumerate(train_schedule):
    for phase_index, phase in enumerate(train_schedule[2:], start=2):
        learning_rate = phase["learning_rate"]
        epochs = phase["epochs"]

        # Freeze/Unfreeze Pixel Decoder
        for param in pixel_decoder.parameters():
            param.requires_grad = not phase["pixel_decoder_freeze"]

        # Freeze/Unfreeze Backbone Layers (if needed)
        num_backbone_unfreeze = phase["num_backbone_unfreeze"]

        if num_backbone_unfreeze > 0:  # Only unfreeze if num_backbone_freeze is greater than 0
            for i in range(len(backbone.layers) - num_backbone_unfreeze, len(backbone.layers)):
                for param in backbone.layers[i].parameters():
                    param.requires_grad = True

        # Train
        train_model(
            model,
            train_dataloader,
            val_dataloader,
            preprocessor,
            metric,
            id2label,
            num_epochs=epochs,
            learning_rate=learning_rate,
            log_interval=100,
        )

        # Save Model
        model_iteration = os.path.join(output_dir, f'phase_{phase_index + 1}')
        if not os.path.exists(model_iteration):
            os.makedirs(model_iteration)
        model.save_pretrained(model_iteration)
        processor.save_pretrained(model_iteration)

        # Test Mean IoU
        test_mean_iou = evaluate_model(model, test_dataloader, preprocessor, metric, id2label)
        print(f"Test Mean IoU (Phase {phase_index + 1}): {test_mean_iou}")

In [None]:
train_schedule = [
    {"learning_rate": 5e-5, "epochs": 15, "pixel_decoder_freeze": True, "num_backbone_unfreeze": 0},  # Phase 1
    {"learning_rate": 1e-5, "epochs": 10, "pixel_decoder_freeze": False, "num_backbone_unfreeze": 0},  # Phase 2
    {"learning_rate": 5e-6, "epochs": 5, "pixel_decoder_freeze": False, "num_backbone_unfreeze": 1},  # Phase 3
    {"learning_rate": 5e-6, "epochs": 5, "pixel_decoder_freeze": False, "num_backbone_unfreeze": 2},  # Phase 4
    {"learning_rate": 5e-6, "epochs": 5, "pixel_decoder_freeze": False, "num_backbone_unfreeze": 3},  # Phase 5
    {"learning_rate": 5e-6, "epochs": 5, "pixel_decoder_freeze": False, "num_backbone_unfreeze": 4},  # Phase 6
    # Add more phases as needed...
]
train_model_with_schedule(model, train_dataloader, val_dataloader, preprocessor, metric, id2label, train_schedule)