# **Train**
# Mask RCNN
Instance Segmentation with torchvision

---
### Imports

In [None]:
# general
import os
import cv2  # Use OpenCV instead of Pillow
import numpy as np

# Pytorch
import torch
import torchvision
from torchvision.models.detection import MaskRCNN
from torchvision.models.detection.rpn import AnchorGenerator
import torchvision.transforms as T
from torch.utils.data import DataLoader, Dataset

---
### Dataset

In [None]:
# Define a custom dataset
class CustomDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(os.path.join(root, "images"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "masks"))))

    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        mask_path = os.path.join(self.root, "masks", self.masks[idx])

        # Use OpenCV to load the image
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        # Convert the mask to a tensor
        mask = torch.as_tensor(mask, dtype=torch.uint8)
        obj_ids = torch.unique(mask)[1:]  # remove background class
        masks = mask == obj_ids[:, None, None]

        # Create bounding boxes
        boxes = []
        for i in range(len(obj_ids)):
            pos = torch.where(masks[i])
            xmin = torch.min(pos[1])
            xmax = torch.max(pos[1])
            ymin = torch.min(pos[0])
            ymax = torch.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)

        labels = torch.ones((len(obj_ids),), dtype=torch.int64)  # all instances are "class 1"
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.imgs)

In [None]:
# Define the transformations
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

---
### Mask RCNN

In [None]:
# Load MaskRCNN model
def get_model_instance_segmentation(num_classes):
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = torchvision.models.detection.MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)
    
    return model

---
### Training

In [None]:
# Training function
def train_model():
    # Load dataset
    dataset = CustomDataset(root='path/to/dataset', transforms=get_transform(train=True))
    dataset_test = CustomDataset(root='path/to/dataset', transforms=get_transform(train=False))

    data_loader = DataLoader(dataset, batch_size=2, shuffle=True, num_workers=4)
    data_loader_test = DataLoader(dataset_test, batch_size=2, shuffle=False, num_workers=4)

    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    # Create model
    model = get_model_instance_segmentation(num_classes=2)  # Background + 1 object class
    model.to(device)

    # Optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    # Training loop
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        i = 0
        for images, targets in data_loader:
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            if i % 10 == 0:
                print(f"Epoch [{epoch}/{num_epochs}], Step [{i}/{len(data_loader)}], Loss: {losses.item()}")
            i += 1

        lr_scheduler.step()

        # Evaluate after each epoch
        # evaluate(model, data_loader_test, device=device)

In [None]:
train_model()