# Practical Machine Learning and Deep Learning

# Lesson 7

# Semantic Segmentation

Semantic segmentation in machine learning is a process where each pixel in an image is classified into a predefined category. Unlike image classification, which assigns a single label to an entire image, semantic segmentation provides a pixel-level understanding, segmenting different objects and regions within the image.

### EMaterial Segmentation Dataset

Referring to the material segmentation dataset comprising 3817 images gathered from the Virginia Department of Transportation (VDOT) Bridge Inspection Reports, semantic segmentation would involve:

1. **Pixel-Level Annotation**:
   - Each pixel in the bridge inspection images would be labeled with a specific material category, such as concrete, steel, asphalt, or other materials found in bridge structures.

2. **Training a Model**:
   - A neural network, often a Convolutional Neural Network (CNN) architecture designed for segmentation tasks (e.g., U-Net, SegNet, or DeepLab), would be trained on this annotated dataset. The model learns to associate pixel patterns with specific material categories.

3. **Inference**:
   - After training, the model can take a new image as input and output a segmentation map where each pixel is assigned a material category label. This allows for detailed analysis of the materials present in the bridge structure.

### Key Steps in Semantic Segmentation

1. **Dataset Preparation**:
   - The dataset (in this case, the 3817 VDOT images) needs to be annotated at the pixel level, where each pixel is labeled according to the material it represents.

2. **Model Architecture**:
   - Choose a suitable segmentation model architecture. Popular choices include U-Net, which is effective for biomedical image segmentation but also applicable to other fields, and DeepLab, known for its ability to capture multi-scale contextual information.

3. **Training**:
   - Train the chosen model on the annotated dataset. This involves feeding the images and their corresponding pixel-level labels into the model, allowing it to learn the patterns associated with each material category.

4. **Evaluation**:
   - Evaluate the model’s performance using metrics such as Intersection over Union (IoU), pixel accuracy, and mean average precision to ensure it accurately segments materials in new images.

5. **Application**:
   - Once trained, the model can be used for automated inspection and analysis of bridge materials in new images, assisting in tasks like detecting material defects, monitoring wear and tear, and planning maintenance activities.

### Importance in Real-World Applications

Semantic segmentation in the context of the VDOT Bridge Inspection Reports can significantly enhance the efficiency and accuracy of bridge maintenance and inspection processes. By automating the material classification task, engineers can quickly identify and analyze the materials used in bridge construction and their condition, leading to better-informed decisions regarding repairs and maintenance, ultimately improving infrastructure safety and longevity.


# Downloading the Dataset

In [None]:
!wget https://data.lib.vt.edu/ndownloader/articles/16624648/versions/1

In [2]:
import zipfile
zip_ref = zipfile.ZipFile("1", 'r')
zip_ref.extractall()
zip_ref.close()

In [3]:
zip_ref = zipfile.ZipFile("Material Detection.zip", 'r')
zip_ref.extractall()
zip_ref.close()

## Import Libraries

In [4]:
# necessary imports
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
import numpy as np
from pathlib import Path
from torch.utils.data import random_split
import torch.nn as nn
import cv2
import torch.nn.functional as F
from tqdm import tqdm

## Create Mappings and Constants

In [5]:
# necessary constants
CLASS_MAPPING = {
    0: "background",
    1: "steel",
    2: "concrete",  # segment concrete
    3: "metal deck",
}
COLOR_MAPPING = {
    0: (0, 0, 0),
    1: (0, 0, 128),
    2: (0, 128, 0),
    3: (0, 128, 128),
}


color2label = {v: k for k, v in COLOR_MAPPING.items()}
IMG_SIZE = 256
MAX_PIXEL_VALUE = 255
NORMALIZATION_MEAN = [0.485, 0.456, 0.406]
NORMALIZATION_STD = [0.229, 0.224, 0.225]

## Dataset


In [6]:
train_dir = "Material Detection/original/Train"

### Preprocessing

For the following lab we will use [Albumentations](https://albumentations.ai/) for the data transforms. Albumentations allows image and mask transformation at the same time.


In [7]:
transforms = A.Compose(
    [
        A.HorizontalFlip(p=0.5),
        A.ToFloat(max_value=MAX_PIXEL_VALUE),
        A.Normalize(
            mean=NORMALIZATION_MEAN, std=NORMALIZATION_STD, max_pixel_value=1.0
        ),
        ToTensorV2(),
    ]
)

## Segmentation Dataset Class


The SegmentationDataset class handles loading and preprocessing of image and mask data from a specified directory structure for a material segmentation task. It ensures that both images and masks are correctly paired, loaded into memory, and transformed appropriately for training a machine learning model.

In [8]:
class SegmentationDataset(torch.utils.data.Dataset):
    def __init__(self, root_path, transform):
        """
        Material segmentation dataset

        :param root_path: path to train split, which contains images and masks
        :param transform: transforms for dataset

        """
        self.transform = transform
        if not root_path.exists():
            raise FileNotFoundError(f"No root path {root_path} was found")
        self.img_path = root_path / "images"
        self.mask_path = root_path / "masks"

        if not self.img_path.exists():
            raise FileNotFoundError("No images was found")

        if not self.mask_path.exists():
            raise FileNotFoundError("No masks was found")

        # create list of images and masks
        self.img_list = sorted(self._get_filenames(self.img_path))
        self.mask_list = sorted(self._get_filenames(self.mask_path))
        missing_files = set([f.stem for f in self.img_list]).symmetric_difference(
            set([f.stem for f in self.mask_list])
        )
        if len(missing_files) != 0:
            raise FileNotFoundError(f"Missing files: {missing_files}")

        # load images and masks into memory
        self._read_imgs()
        self._read_masks()

    def __getitem__(self, idx):
        img = self.images[idx]
        mask = self.masks[idx]
        transformed = self.transform(image=img, mask=mask)
        return transformed["image"].float(), transformed["mask"].long()

    def __len__(self):
        return len(self.img_list)

    def _get_filenames(self, path):
        return [f for f in path.iterdir() if f.is_file()]

    def _read_imgs(self):
        """
        Load images into memory
        """
        self.images = []
        for f in tqdm(self.img_list):
            img = cv2.imread(f.as_posix())
            img = cv2.resize(img, (IMG_SIZE, IMG_SIZE)).astype(np.uint8)
            self.images.append(img)

    def _read_masks(self):
        """
        Load masks into memory and convert multiclass mask
        into binary masks for 'concrete' class
        """
        self.masks = []
        for f in tqdm(self.mask_list):
            mask = cv2.imread(f.as_posix())
            mask = (
                cv2.resize(mask, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_NEAREST)
                / 128
            )
            binary_mask = mask[..., 1] * 2 + mask[..., 0]
            binary_mask[binary_mask != 2] = 0
            binary_mask /= 2
            self.masks.append(binary_mask)


In [None]:
dataset = SegmentationDataset(Path(train_dir), transform=transforms)

# splitting dataset into train and validation
split_proportion = 0.9
size = int(len(dataset) * split_proportion)
train_dataset, val_dataset = random_split(dataset, [size, len(dataset) - size])


## Create Dataloaders

We have discussed Dataloaders briefly in previous lesson

In [10]:
# create dataloaders
batch_size = 8
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, pin_memory=True
)


In [None]:
# check the sizes of train and validation splits
len(train_dataset), len(val_dataset)


## Model

U-Net is an architecture for semantic segmentation. It consists of a contracting path and an expansive path. The contracting path follows the typical architecture of a convolutional network. Every step in the expansive path consists of an upsampling of the feature map, a concatenation with the correspondingly cropped feature map from the contracting path, and convolutions.

### Architecture

![Alt text](https://media.geeksforgeeks.org/wp-content/uploads/20220614121231/Group14.jpg)


In [12]:
class DoubleConv(nn.Module):
    """
    Block with two convolutional blocks
    """

    def __init__(self, in_channels, out_channels, mid_channels=None):
        """
        Double convolution

        :param in_channels: number of in channels for first conv layer
        :param out_channels: number of out channels for last conv layer
        :param mid_channels: number of out channels for first conv layer
        """
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels

        # write model that contains 2 conv layer with batch normalization and relu activation function
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """
    Block for down path
    """

    def __init__(self, in_channels, out_channels):
        """
        Down block

        :param in_channels: number of in channels for double conv block
        :param out_channels: number of out channels for double conv block
        """
        super().__init__()

        # write model which contains pooling and double conv block
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2), DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """
    Block for up path
    """

    def __init__(self, in_channels, out_channels):
        """
        Down block

        :param in_channels: number of in channels for transpose convolution
        :param out_channels: number of out channels for double conv block
        """
        super().__init__()

        self.up = nn.ConvTranspose2d(
            in_channels, in_channels // 2, kernel_size=2, stride=2
        )
        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2, diffY // 2, diffY - diffY // 2])

        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        """
        Final convolution block

        :param in_channels: number of in channels for conv layer
        :param out_channels: number of out channels for conv layer
        """
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)


In [13]:
class UNet(nn.Module):
    """
    UNet model
    """

    def __init__(self, n_channels, n_classes):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 1024)
        self.up1 = Up(1024, 512)
        self.up2 = Up(512, 256)
        self.up3 = Up(256, 128)
        self.up4 = Up(128, 64)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits

## Loss

As the loss we will use combination of Cross Entropy Loss and Dice Loss

### Dice loss

Dice loss is based on [Sørensen-Dice coefficient](https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient). It measures the overlap between the predicted and target segmentation masks. Dice loss provides a differentiable and smooth measure of segmentation accuracy.

$$
DiceLoss\left( y, \overline{p} \right) = 1 - \dfrac{\left(  2y\overline{p} + 1 \right)} {\left( y+\overline{p } + 1 \right)}
$$


In [14]:
class DiceLoss(nn.Module):
    """
    Dice loss
    """

    def __init__(self):
        super(DiceLoss, self).__init__()

    def forward(self, inputs, targets, eps=1e-6):
        """
        Calculation of dice loss

        :param inputs: model predictions
        :param targets: target values
        :param eps: stability factor, defaults to 1e-6
        :return: loss value
        """
        # implement dice loss
        inputs = F.softmax(inputs, dim=1)

        target_one_hot = F.one_hot(targets, num_classes=inputs.shape[1]).permute(
            0, 3, 1, 2
        )

        dims = (1, 2, 3)
        intersection = torch.sum(inputs * target_one_hot, dims)
        cardinality = torch.sum(inputs + target_one_hot, dims)

        dice_score = 2.0 * intersection / (cardinality + eps)

        return torch.mean(1.0 - dice_score)


In [15]:
model = UNet(n_channels=3, n_classes=2)


In [16]:
optimizer = torch.optim.Adam(
    model.parameters(),
)
criterion1 = nn.CrossEntropyLoss(reduction="mean")
criterion2 = DiceLoss()


## Training
The function `train_model` will train our segmentation model and validate its performance over a specified number of epochs (default is 10). During each epoch, the model will be set to training mode, and it will process batches of images and corresponding masks from the training dataset.



In [17]:
def train_model(model, train_loader, val_loader, device, optimizer, epochs=10):
    """
    Train a segmentation model and validate its performance.

    :param model: The model to be trained.
    :param train_loader: DataLoader for the training dataset.
    :param val_loader: DataLoader for the validation dataset.
    :param device: Device to run the training on (CPU or GPU).
    :param optimizer: Optimizer to use for training.
    :param epochs: Number of epochs to train the model.
    """
    model.to(device)
    for epoch in range(1, epochs + 1):
        model.train()
        epoch_loss = 0
        with tqdm(
            total=len(train_dataset), desc=f"Epoch {epoch}/{epochs}", unit="img"
        ) as pbar:
            for batch in train_loader:
                images, true_masks = batch
                images, true_masks = images.to(device), true_masks.to(device)

                masks_pred = model(images)
                loss = criterion1(masks_pred, true_masks) + criterion2(
                    masks_pred, true_masks
                )
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                pbar.update(images.shape[0])
                epoch_loss += loss.item()
                pbar.set_postfix(**{"loss (batch)": loss.item()})
        model.eval()
        with tqdm(total=len(val_dataset), desc=f"Validation", unit="img") as pbar:
            with torch.no_grad():
                for batch in val_loader:
                    images, true_masks = batch

                    images, true_masks = images.to(device), true_masks.to(device)

                    masks_pred = model(images)
                    loss = criterion1(masks_pred, true_masks) + criterion2(
                        masks_pred, true_masks
                    )
                    pbar.update(images.shape[0])
                    epoch_loss += loss.item()
                    pbar.set_postfix(**{"loss (batch)": loss.item()})


In [None]:
train_model(model, train_loader, val_loader, "cuda", optimizer, epochs=10)


## Let's save the model

In [20]:
torch.save(model.state_dict(), "best.pt")


## Predict

For prediction, we will first follow some transformation steps that we performed earlier

In [21]:
import matplotlib.pyplot as plt


def plot_mask(mask, color_mapping=COLOR_MAPPING):
    color_mask = np.zeros((*mask.shape[::-1], 3), dtype=np.uint8)
    for i in range(mask.shape[1]):
        for j in range(mask.shape[0]):
            color_mask[i, j] = color_mapping[mask[j, i]]
    color_mask = cv2.cvtColor(color_mask, cv2.COLOR_BGR2RGB)
    plt.imshow(color_mask)


In [22]:
test_transforms = A.Compose(
    [
        A.Resize(IMG_SIZE, IMG_SIZE, interpolation=0),
        A.ToFloat(max_value=MAX_PIXEL_VALUE),
        A.Normalize(
            mean=NORMALIZATION_MEAN, std=NORMALIZATION_STD, max_pixel_value=1.0
        ),
        ToTensorV2(),
    ]
)


In [24]:
RESULTS_SHAPE = (64, 64)


def predict(model, img, device="cpu"):
    """
    Model inference on image

    :param model: model
    :param img: image
    :param device: device for computation, defaults to "cpu"
    :return: mask
    """
    model.to(device)
    model.eval()
    tensor_img = test_transforms(image=img)["image"]
    tensor_img = tensor_img.to(device=device, dtype=torch.float32).unsqueeze(0)

    with torch.no_grad():
        output = model(tensor_img)
        mask = output.argmax(dim=1)
    mask = mask.detach().cpu().numpy()[0].astype(np.uint8)
    mask = cv2.resize(mask, RESULTS_SHAPE, interpolation=cv2.INTER_NEAREST)
    return mask


In [None]:
img = cv2.imread(
    "Material Detection/original/Test/images/0.jpeg"
)
mask = predict(model, img)
plot_mask(mask)


## Results

Run-Length encoding (RLE)

The Run-Length encoding function performs run-length encoding on a binary mask array by first identifying the indices of the foreground pixels. It then iterates through these indices, grouping consecutive pixels into runs. For each run, it records the start position and the length of the run, resulting in a list of start positions and lengths. This method efficiently compresses the mask data by only storing information about the runs of foreground pixels, rather than every individual pixel.

In [26]:
def rle_encode(x, fg_val=1):
    dots = np.where(x.T.flatten() == fg_val)[0]
    run_lengths = []
    prev = -2
    for b in dots:
        if b > prev + 1:
            run_lengths.extend((b + 1, 0))
        run_lengths[-1] += 1
        prev = b
    return run_lengths


def list_to_string(x):
    if x:  # non-empty list
        s = str(x).replace("[", "").replace("]", "").replace(",", "")
    else:
        s = "-"
    return s

In [None]:
model = UNet(n_channels=3, n_classes=2)
model.load_state_dict(torch.load("best.pt"))


In [None]:
import os
import pandas as pd

df = pd.DataFrame(columns=["id", "pixels"])
test_dir = "/content/Material Detection/original/Test/images/"
for i, f in tqdm(enumerate(os.listdir(test_dir)), total=len(os.listdir(test_dir))):
    img = cv2.imread(test_dir + f)
    mask = predict(model, img, device="cuda")
    pred = list_to_string(rle_encode(mask))
    df.loc[i] = [f[:-5], pred]
df.to_csv("results.csv", index=None)
