In [1]:
# imports
from pathlib import Path

import fiftyone as fo
import fiftyone.zoo as foz
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
import torch

from segmentation_dataset import SegmentationDataset

In [33]:
# model properties
NUMBER_OF_CLASSES = 4
CLASSES = ['Cat', 'Horse', 'Van']
ROOT_DATA_DIRECTORY = Path("segmentation_data")
BATCH_SIZE = 16
NUMBER_OF_WORKERS = 4
IN_CHANNELS = 3
OUT_CHANNELS = 4
FEATURES = [32, 64, 128, 256]

LABELS = {
    "Cat": 1,
    "Horse": 2,
    "Van": 3,
}

COLOURS = {
    0: (0, 0, 0),
    1: (53, 94, 59),
    2: (150, 75, 0),
    3: (176,224,230),
}

REVERSE_COLOURS = {
    (0, 0, 0,) : 0,
    (53, 94, 59) : 1,
    (150, 75, 0) : 2,
    (176,224,230) : 3
} 

In [3]:
# train properties
NUMBER_OF_EPOCHS = 80
LEARNING_RATE = 0.001
NUMBER_OF_TRAIN_IMAGES_PER_CLASS = 1000
NUMBER_OF_VALIDATION_IMAGES_PER_CLASS = 125

In [4]:
# test properties
NUMBER_OF_TEST_IMAGES_PER_CLASS = 125

In [12]:
#load images
def load_images(root_directory:str, class_name: str, split: str, size: int) -> None:
    dataset = foz.load_zoo_dataset(
        "open-images-v6",
        split=split,
        label_types=["segmentations"],
        classes=[class_name],
        max_samples=size,
        dataset_name=f"open-images-{split}-{class_name.lower()}-segmentation",
    )
    for sample in dataset:
        if sample.ground_truth is None:
            continue

        cat_detections = [
            d for d in sample.ground_truth.detections if d.label == class_name
        ]

        sample.ground_truth.detections = cat_detections
        sample.save()

    dataset.export(
        export_dir=f"{root_directory}/{split}/{class_name}",
        dataset_type=fo.types.ImageSegmentationDirectory,
        label_field="ground_truth",
    )

In [None]:
# load images by class
for class_name in CLASSES:
    load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="train", size=NUMBER_OF_TRAIN_IMAGES_PER_CLASS)
    load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="validation", size=NUMBER_OF_VALIDATION_IMAGES_PER_CLASS)
    load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="test", size=NUMBER_OF_TEST_IMAGES_PER_CLASS)

In [32]:
class_name = "Van"
load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="train", size=NUMBER_OF_TRAIN_IMAGES_PER_CLASS)
load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="validation", size=NUMBER_OF_VALIDATION_IMAGES_PER_CLASS)
load_images(root_directory=ROOT_DATA_DIRECTORY, class_name=class_name, split="test", size=NUMBER_OF_TEST_IMAGES_PER_CLASS)  

Downloading split 'train' to 'C:\Users\vesta\fiftyone\open-images-v6\train' if necessary
Found 1 images, downloading the remaining 999
 100% |███████████████████| 999/999 [5.9m elapsed, 0s remaining, 7.4 files/s]      
Dataset info written to 'C:\Users\vesta\fiftyone\open-images-v6\info.json'
Loading 'open-images-v6' split 'train'
 100% |███████████████| 1000/1000 [1.2m elapsed, 0s remaining, 14.7 samples/s]      
Dataset 'open-images-train-van-segmentation' created
 100% |███████████████| 1000/1000 [17.6s elapsed, 0s remaining, 58.6 samples/s]      
Downloading split 'validation' to 'C:\Users\vesta\fiftyone\open-images-v6\validation' if necessary
Found 18 images, downloading the remaining 107
 100% |███████████████████| 107/107 [32.7s elapsed, 0s remaining, 3.4 files/s]      
Dataset info written to 'C:\Users\vesta\fiftyone\open-images-v6\info.json'
Loading 'open-images-v6' split 'validation'
 100% |█████████████████| 125/125 [6.5s elapsed, 0s remaining, 16.4 samples/s]      
Dataset 

In [34]:
def create_coloured_masks_for_each_class(
    classes: list[str],
    label_map: dict[str, int],
    colour_map: dict[int, tuple[int, int, int]],
    root_directory: Path,
    split: str
) -> None:
    for class_name in classes:
        class_mask_dir = root_directory / split / class_name / "labels"
        class_coloured_dir = root_directory / split / class_name / "coloured_labels"
        class_coloured_dir.mkdir(parents=True, exist_ok=True)

        class_id = label_map[class_name]
        class_color = colour_map[class_id]

        for mask_path in class_mask_dir.glob("*.png"):
            binary_mask = Image.open(mask_path).convert("L")
            mask_array = np.array(binary_mask) > 127

            height, width = mask_array.shape
            coloured_mask = np.zeros((height, width, 3), dtype=np.uint8)
            coloured_mask[mask_array] = class_color

            out_path = class_coloured_dir / mask_path.name
            Image.fromarray(coloured_mask).save(out_path)

In [None]:
# create multiclass masks
create_coloured_masks_for_each_class(classes=CLASSES, label_map=LABELS, colour_map=COLOURS, root_directory=ROOT_DATA_DIRECTORY, split="train")
create_coloured_masks_for_each_class(classes=CLASSES, label_map=LABELS, colour_map=COLOURS, root_directory=ROOT_DATA_DIRECTORY, split="validation")
create_coloured_masks_for_each_class(classes=CLASSES, label_map=LABELS, colour_map=COLOURS, root_directory=ROOT_DATA_DIRECTORY, split="test")

In [None]:
# calculate normalization values for train dataset
normalization_image_directories = [ROOT_DATA_DIRECTORY / "train" / class_name / "data" for class_name in CLASSES]

channels_sum = torch.zeros(3)
channels_sq_sum = torch.zeros(3)
total_pixels = 0

for image_directory in normalization_image_directories:
    for img_path in image_directory.rglob("*"):
        with Image.open(img_path).convert("RGB") as img:
            img_tensor = torch.from_numpy(np.array(img)).permute(2, 0, 1).float() / 255.0

        channels_sum += img_tensor.sum(dim=[1, 2])
        channels_sq_sum += (img_tensor ** 2).sum(dim=[1, 2])

        height, width = img_tensor.shape[1], img_tensor.shape[2]
        total_pixels += height * width

mean = channels_sum / total_pixels
var = (channels_sq_sum / total_pixels) - mean**2
std = torch.sqrt(var)

print("Mean:", mean)
print("STD:", std)

KeyboardInterrupt: 

In [5]:
# calculated values are assigned manually to avoid recalculation
NORMALIZATION_MEAN = [0.4650, 0.4405, 0.4009]
NORMALIZATION_STD = [0.2812, 0.2759, 0.2809]

In [6]:
class SegmentationConvolutionalNetwork(torch.nn.Module):
    def __init__(self, in_channels: int, out_channels: int, features: list[int]) -> None:
        super().__init__()

        self.pool_layer = torch.nn.MaxPool2d(kernel_size=2, stride=2)

        self.down_layers = torch.nn.ModuleList()
        current_in_channels = in_channels
        for feature in features:
            self.down_layers.append(
                SegmentationConvolutionalNetwork.double_convolution(
                    in_channels=current_in_channels,
                    out_channels=feature
                )
            )
            current_in_channels = feature
        
        self.bottleneck_layer = SegmentationConvolutionalNetwork.double_convolution(in_channels=features[-1], out_channels=features[-1]*2)

        self.up_layers = torch.nn.ModuleList()
        current_in_channels = features[-1] * 2
        for feature in reversed(features):
            self.up_layers.append(
                torch.nn.ConvTranspose2d(
                    in_channels=current_in_channels,
                    out_channels=feature,
                    kernel_size=2,
                    stride=2
                )
            )
            self.up_layers.append(
                SegmentationConvolutionalNetwork.double_convolution(
                    in_channels=feature * 2,
                    out_channels=feature
                )
            )
            current_in_channels = feature

        self.final_convolution = torch.nn.Conv2d(features[0], out_channels, kernel_size=1)

    
    @classmethod
    def double_convolution(cls, in_channels: int, out_channels: int):
        return torch.nn.Sequential(
            torch.nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, padding=1),
            torch.nn.BatchNorm2d(num_features=out_channels),
            torch.nn.ReLU(),
            torch.nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, padding=1),
            torch.nn.BatchNorm2d(num_features=out_channels),
            torch.nn.ReLU()
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        skip_connections = []

        out = x

        for down_layer in self.down_layers:
            out = down_layer(out)
            skip_connections.append(out)
            out = self.pool_layer(out)
        
        out = self.bottleneck_layer(out)

        reversed_skip_connections = skip_connections[::-1]

        for layer_id in range(0, len(self.up_layers), 2):
            transposed_convolution = self.up_layers[layer_id]
            out = transposed_convolution(out)

            skip_out = reversed_skip_connections[layer_id // 2]

            if out.shape != skip_out.shape:
                out = torch.nn.functional.interpolate(out, size=skip_out.shape[2:], mode="bilinear", align_corners=False)

            out = torch.cat([skip_out, out], dim=1)

            out = self.up_layers[layer_id + 1](out)
        
        out = self.final_convolution(out)

        return out

In [8]:
# initialize train device
train_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(train_device)

cuda


In [None]:
# initialize train model
train_model = SegmentationConvolutionalNetwork(in_channels=IN_CHANNELS, out_channels=OUT_CHANNELS, features=FEATURES)
train_model.to(train_device)

SegmentationConvolutionalNetwork(
  (pool_layer): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (down_layers): ModuleList(
    (0): Sequential(
      (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
    )
    (1): Sequential(
      (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU()
      (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): ReLU()
    )
    (2): Sequential(
      (0): Conv2d(64, 128, k

In [10]:
# train and validation datasets
train_dataset = SegmentationDataset(classes=CLASSES, root_directory=ROOT_DATA_DIRECTORY, mode="train", reverse_colours=REVERSE_COLOURS, normalization_mean=NORMALIZATION_MEAN, normalization_std=NORMALIZATION_STD)
validation_dataset = SegmentationDataset(classes=CLASSES, root_directory=ROOT_DATA_DIRECTORY, mode="validation", reverse_colours=REVERSE_COLOURS, normalization_mean=NORMALIZATION_MEAN, normalization_std=NORMALIZATION_STD)

train_dataloader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, num_workers=NUMBER_OF_WORKERS, shuffle=True)
validation_dataloader = torch.utils.data.DataLoader(dataset=validation_dataset, batch_size=1, num_workers=NUMBER_OF_WORKERS, shuffle=False)

segmentation_data
segmentation_data


In [11]:
def compute_mean_iou(preds:torch.Tensor, labels: torch.Tensor, number_of_classes:int) -> float:
    if preds.dim() == 4 and preds.size(1) > 1:
        preds = preds.argmax(dim=1) 
    
    preds_flat = preds.view(-1)
    labels_flat = labels.view(-1)

    ious = []
    for c in range(number_of_classes):
        pred_c = (preds_flat == c)
        label_c = (labels_flat == c)

        intersection = (pred_c & label_c).sum().item()
        union = (pred_c | label_c).sum().item()

        if union == 0:
            continue
        else:
            ious.append(intersection / union)

    if len(ious) == 0:
        return 0.0
    else:
        return np.mean(ious)

In [None]:
class DiceLoss(torch.nn.Module):
    def __init__(self, num_classes: int, softmax_dim: int = 1, smooth: float = 1e-6):
        super().__init__()
        self.num_classes = num_classes
        self.softmax_dim = softmax_dim
        self.smooth = smooth

    def forward(self, inputs: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
        probs = torch.nn.functional.softmax(inputs, dim=self.softmax_dim)
        targets_one_hot = torch.nn.functional.one_hot(targets, num_classes=self.num_classes)
        targets_one_hot = targets_one_hot.permute(0, 3, 1, 2).float()

        assert probs.shape == targets_one_hot.shape, \
            f"Input ({probs.shape}) and target ({targets_one_hot.shape}) shapes do not match."

        probs_flat = probs.view(probs.size(0), probs.size(1), -1)
        targets_flat = targets_one_hot.view(targets_one_hot.size(0), targets_one_hot.size(1), -1)

        intersection = (probs_flat * targets_flat).sum(dim=2)
        cardinality = (probs_flat + targets_flat).sum(dim=2)

        dice_score = ((2. * intersection + self.smooth) / (cardinality + self.smooth)).mean(dim=0)

        dice_score_mean = dice_score.mean()

        dice_loss = 1.0 - dice_score_mean

        return dice_loss

In [17]:
# training loop
train_losses = np.zeros(NUMBER_OF_EPOCHS)
validation_losses = np.zeros(NUMBER_OF_EPOCHS)

train_ious = np.zeros(NUMBER_OF_EPOCHS)
validation_ious = np.zeros(NUMBER_OF_EPOCHS)

loss_function = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=train_model.parameters(), lr=LEARNING_RATE)

loss_function_ce = torch.nn.CrossEntropyLoss()
loss_function_dice = DiceLoss(num_classes=NUMBER_OF_CLASSES)

ce_weight = 0.5
dice_weight = 0.5

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',    
    factor=0.1,    
    patience=3,     
    min_lr=1e-6
)

for epoch in range(NUMBER_OF_EPOCHS):
    train_loss_acum = np.array([], dtype = np.float32)
    validation_loss_acum = np.array([], dtype = np.float32)

    train_iou_acum = np.array([], dtype = np.float32)
    validation_iou_acum = np.array([], dtype = np.float32)

    train_model.train()
    for images, labels in train_dataloader:
        images = images.to(train_device)
        labels = labels.to(train_device)

        predictions = train_model(images)
        ce_loss_train = loss_function_ce(predictions, labels)
        dice_loss_train = loss_function_dice(predictions, labels)
        total_loss_train = ce_weight * ce_loss_train + dice_weight * dice_loss_train
        train_loss_acum = np.append(train_loss_acum, total_loss_train.cpu().detach().numpy())

        total_loss_train.backward()
        optimizer.step()
        optimizer.zero_grad()

        iou_batch = compute_mean_iou(predictions.detach(), labels, number_of_classes=NUMBER_OF_CLASSES)
        train_iou_acum = np.append(train_iou_acum, iou_batch)

    train_model.eval()
    with torch.no_grad():
        for images, labels in validation_dataloader:
            images = images.to(train_device)
            labels = labels.to(train_device)

            predictions = train_model(images)

            ce_loss_validation = loss_function_ce(predictions, labels)
            dice_loss_validation = loss_function_dice(predictions, labels)
            total_loss_validation = ce_weight * ce_loss_validation + dice_weight * dice_loss_validation
            validation_loss_acum = np.append(validation_loss_acum, total_loss_validation.cpu().detach().numpy())

            iou_batch = compute_mean_iou(preds=predictions.detach(), labels=labels, number_of_classes=NUMBER_OF_CLASSES)
            validation_iou_acum = np.append(validation_iou_acum, iou_batch)
    
    train_losses[epoch] = np.mean(train_loss_acum)
    train_ious[epoch]   = np.mean(train_iou_acum)

    validation_losses[epoch] = np.mean(validation_loss_acum)
    validation_ious[epoch] = np.mean(validation_iou_acum)

    scheduler.step(validation_losses[epoch])

    print(f'Epoch: {epoch}, Train loss: {train_losses[epoch]} Validation loss: {validation_losses[epoch]}')
    print(f'Epoch: {epoch}, Train IoU: {train_ious[epoch]} Validation IoU: {validation_ious[epoch]}')

    torch.save(train_model.state_dict(), f"segmentation_models/{epoch}_segmentation_CN.pth")



0
Epoch: 0, Train loss: 0.7244798541069031 Validation loss: 0.7187759876251221
Epoch: 0, Train IoU: 0.19725064004431567 Validation IoU: 0.33556737738343073
1
Epoch: 1, Train loss: 0.7180646657943726 Validation loss: 0.7217940092086792
Epoch: 1, Train IoU: 0.20053024187998292 Validation IoU: 0.3793187879283872
2
Epoch: 2, Train loss: 0.710625171661377 Validation loss: 0.7191142439842224
Epoch: 2, Train IoU: 0.2022814317197331 Validation IoU: 0.3488809162275592
3
Epoch: 3, Train loss: 0.719700276851654 Validation loss: 0.7092809081077576
Epoch: 3, Train IoU: 0.20343012119787204 Validation IoU: 0.37498430945463496
4
Epoch: 4, Train loss: 0.7151015996932983 Validation loss: 0.7053115367889404
Epoch: 4, Train IoU: 0.20144179896210349 Validation IoU: 0.3745417185974053
5
Epoch: 5, Train loss: 0.7107389569282532 Validation loss: 0.7061546444892883
Epoch: 5, Train IoU: 0.20238922631178943 Validation IoU: 0.32562885560656074
6
Epoch: 6, Train loss: 0.7112095355987549 Validation loss: 0.70482128

KeyboardInterrupt: 

In [None]:
# plot train and validation graphs
epochs = np.arange(1, NUMBER_OF_EPOCHS + 1)

# loss graph
plt.figure(figsize=(10, 4))
plt.plot(epochs, train_losses, label='Train Loss')
plt.plot(epochs, validation_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# iou graph
plt.figure(figsize=(10, 4))
plt.plot(epochs, train_ious, label='Train Intersection Over Union')
plt.plot(epochs, validation_ious, label='Validation Intersection Over Union')
plt.xlabel('Epoch')
plt.ylabel('Intersection Over Union')
plt.title('Intersection Over Union over Epochs')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [9]:
# server application
import io

import fastapi
import torch
import torchvision
import uvicorn
from PIL import Image

class ModelController(object):
    def __init__(self) -> None:
        self.prod_device = torch.device(
            "cuda" if torch.cuda.is_available() else "cpu"
        )
        print(self.prod_device)

        self.prod_model = SegmentationConvolutionalNetwork(in_channels=IN_CHANNELS, out_channels=OUT_CHANNELS, features=FEATURES)
        self.prod_model.load_state_dict(
            torch.load("segmentation_models/first_segmentation_CN.pth", weights_only=True)
        )
        self.prod_model.to(device=self.prod_device)
        self.prod_model.eval()

        self.prod_transformations = torchvision.transforms.Compose(
            [
                torchvision.transforms.ToTensor(),
                torchvision.transforms.Normalize(
                    NORMALIZATION_MEAN, NORMALIZATION_STD
                ),
            ]
        )
    
    def model_endpoint(self, image_file: fastapi.UploadFile = fastapi.File(...), ) -> fastapi.Response:
        image_bytes = image_file.file.read()
        image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
        image_tensor = (
            self.prod_transformations(image).unsqueeze(0).to(self.prod_device)
        )
        with torch.no_grad():
            predictions = self.prod_model(image_tensor)

        predicted_classes = torch.argmax(predictions, dim=1)
        print(predicted_classes)
        prediction_map = predicted_classes.squeeze(0).cpu().numpy()
        height, width = prediction_map.shape
        mask_image_np = np.zeros((height, width, 3), dtype=np.uint8)

        for class_index, color in COLOURS.items():
            mask_image_np[prediction_map == class_index] = color

        mask_image_pil = Image.fromarray(mask_image_np)

        buffer = io.BytesIO()
        mask_image_pil.save(buffer, format="PNG")
        buffer.seek(0)

        return fastapi.Response(content=buffer.getvalue(), media_type="image/png")


async def main() -> None:
    model_controller = ModelController()
    app = fastapi.FastAPI()
    app.add_api_route(
        path="/model",
        endpoint=model_controller.model_endpoint,
        methods=["POST"],
    )
    config = uvicorn.Config(app=app)
    server = uvicorn.Server(config)
    await server.serve()

if __name__ == "__main__":
    await main()

cuda


INFO:     Started server process [16156]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


tensor([[[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         ...,
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0]]], device='cuda:0')
INFO:     127.0.0.1:62621 - "POST /model HTTP/1.1" 200 OK
tensor([[[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         ...,
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0]]], device='cuda:0')
INFO:     127.0.0.1:62630 - "POST /model HTTP/1.1" 200 OK
tensor([[[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         ...,
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0]]], device='cuda:0')
INFO:     127.0.0.1:62634 - "POST /model HTTP/1.1" 200 OK
tensor([[[0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0, 0, 0],
         [0, 0, 0,  ..., 0,

INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [16156]
