In [None]:
from google.colab import drive

drive.mount('/content/drive')

In [None]:
#!unzip "/content/drive/MyDrive/ELE401/model2_DeepLab/Dataset_V2.zip" -d "/content/drive/MyDrive/ELE401/model2_DeepLab/Dataset_V2"


## HELPER CLASSES

### METRICS CLASSES

In [None]:
import re
import torch.nn as nn


class BaseObject(nn.Module):
    def __init__(self, name=None):
        super().__init__()
        self._name = name

    @property
    def __name__(self):
        if self._name is None:
            name = self.__class__.__name__
            s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
            return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower()
        else:
            return self._name


class Metric(BaseObject):
    pass


class Loss(BaseObject):
    def __add__(self, other):
        if isinstance(other, Loss):
            return SumOfLosses(self, other)
        else:
            raise ValueError("Loss should be inherited from `Loss` class")

    def __radd__(self, other):
        return self.__add__(other)

    def __mul__(self, value):
        if isinstance(value, (int, float)):
            return MultipliedLoss(self, value)
        else:
            raise ValueError("Loss should be inherited from `BaseLoss` class")

    def __rmul__(self, other):
        return self.__mul__(other)


class SumOfLosses(Loss):
    def __init__(self, l1, l2):
        name = "{} + {}".format(l1.__name__, l2.__name__)
        super().__init__(name=name)
        self.l1 = l1
        self.l2 = l2

    def __call__(self, *inputs):
        return self.l1.forward(*inputs) + self.l2.forward(*inputs)


class MultipliedLoss(Loss):
    def __init__(self, loss, multiplier):

        # resolve name
        if len(loss.__name__.split("+")) > 1:
            name = "{} * ({})".format(multiplier, loss.__name__)
        else:
            name = "{} * {}".format(multiplier, loss.__name__)
        super().__init__(name=name)
        self.loss = loss
        self.multiplier = multiplier

    def __call__(self, *inputs):
        return self.multiplier * self.loss.forward(*inputs)
class Activation(nn.Module):

    def __init__(self, name, **params):

        super().__init__()

        if name is None or name == 'identity':
            self.activation = nn.Identity(**params)
        elif name == 'sigmoid':
            self.activation = nn.Sigmoid()
        elif name == 'softmax2d':
            self.activation = nn.Softmax(dim=1, **params)
        elif name == 'softmax':
            self.activation = nn.Softmax(**params)
        elif name == 'logsoftmax':
            self.activation = nn.LogSoftmax(**params)
        elif name == 'argmax':
            self.activation = ArgMax(**params)
        elif name == 'argmax2d':
            self.activation = ArgMax(dim=1, **params)
        elif callable(name):
            self.activation = name(**params)
        else:
            raise ValueError('Activation should be callable/sigmoid/softmax/logsoftmax/None; got {}'.format(name))

    def forward(self, x):
        return self.activation(x)

import torch


def _take_channels(*xs, ignore_channels=None):
    if ignore_channels is None:
        return xs
    else:
        channels = [channel for channel in range(xs[0].shape[1]) if channel not in ignore_channels]
        xs = [torch.index_select(x, dim=1, index=torch.tensor(channels).to(x.device)) for x in xs]
        return xs


def _threshold(x, threshold=None):
    if threshold is not None:
        return (x > threshold).type(x.dtype)
    else:
        return x


def iou(pr, gt, eps=1e-7, threshold=None, ignore_channels=None):
    """Calculate Intersection over Union between ground truth and prediction
    Args:
        pr (torch.Tensor): predicted tensor
        gt (torch.Tensor):  ground truth tensor
        eps (float): epsilon to avoid zero division
        threshold: threshold for outputs binarization
    Returns:
        float: IoU (Jaccard) score
    """

    pr = _threshold(pr, threshold=threshold)
    pr, gt = _take_channels(pr, gt, ignore_channels=ignore_channels)

    intersection = torch.sum(gt * pr)
    union = torch.sum(gt) + torch.sum(pr) - intersection + eps
    return (intersection + eps) / union


jaccard = iou


def f_score(pr, gt, beta=1, eps=1e-7, threshold=None, ignore_channels=None):
    """Calculate F-score between ground truth and prediction
    Args:
        pr (torch.Tensor): predicted tensor
        gt (torch.Tensor):  ground truth tensor
        beta (float): positive constant
        eps (float): epsilon to avoid zero division
        threshold: threshold for outputs binarization
    Returns:
        float: F score
    """

    pr = _threshold(pr, threshold=threshold)
    pr, gt = _take_channels(pr, gt, ignore_channels=ignore_channels)

    tp = torch.sum(gt * pr)
    fp = torch.sum(pr) - tp
    fn = torch.sum(gt) - tp

    score = ((1 + beta ** 2) * tp + eps) / ((1 + beta ** 2) * tp + beta ** 2 * fn + fp + eps)

    return score


def accuracy(pr, gt, threshold=0.5, ignore_channels=None):
    """Calculate accuracy score between ground truth and prediction
    Args:
        pr (torch.Tensor): predicted tensor
        gt (torch.Tensor):  ground truth tensor
        eps (float): epsilon to avoid zero division
        threshold: threshold for outputs binarization
    Returns:
        float: precision score
    """
    pr = _threshold(pr, threshold=threshold)
    pr, gt = _take_channels(pr, gt, ignore_channels=ignore_channels)

    tp = torch.sum(gt == pr, dtype=pr.dtype)
    score = tp / gt.view(-1).shape[0]
    return score


def precision(pr, gt, eps=1e-7, threshold=None, ignore_channels=None):
    """Calculate precision score between ground truth and prediction
    Args:
        pr (torch.Tensor): predicted tensor
        gt (torch.Tensor):  ground truth tensor
        eps (float): epsilon to avoid zero division
        threshold: threshold for outputs binarization
    Returns:
        float: precision score
    """

    pr = _threshold(pr, threshold=threshold)
    pr, gt = _take_channels(pr, gt, ignore_channels=ignore_channels)

    tp = torch.sum(gt * pr)
    fp = torch.sum(pr) - tp

    score = (tp + eps) / (tp + fp + eps)

    return score


def recall(pr, gt, eps=1e-7, threshold=None, ignore_channels=None):
    """Calculate Recall between ground truth and prediction
    Args:
        pr (torch.Tensor): A list of predicted elements
        gt (torch.Tensor):  A list of elements that are to be predicted
        eps (float): epsilon to avoid zero division
        threshold: threshold for outputs binarization
    Returns:
        float: recall score
    """

    pr = _threshold(pr, threshold=threshold)
    pr, gt = _take_channels(pr, gt, ignore_channels=ignore_channels)

    tp = torch.sum(gt * pr)
    fn = torch.sum(gt) - tp

    score = (tp + eps) / (tp + fn + eps)

    return score

import torch.nn as nn

class JaccardLoss(Loss):
    def __init__(self, eps=1.0, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return 1 - jaccard(
            y_pr,
            y_gt,
            eps=self.eps,
            threshold=None,
            ignore_channels=self.ignore_channels,
        )


class DiceLoss(Loss):
    def __init__(self, eps=1.0, beta=1.0, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.beta = beta
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return 1 - f_score(
            y_pr,
            y_gt,
            beta=self.beta,
            eps=self.eps,
            threshold=None,
            ignore_channels=self.ignore_channels,
        )


class L1Loss(nn.L1Loss, Loss):
    pass


class MSELoss(nn.MSELoss, Loss):
    pass


class CrossEntropyLoss(nn.CrossEntropyLoss, Loss):
    pass


class NLLLoss(nn.NLLLoss, Loss):
    pass


class BCELoss(nn.BCELoss, Loss):
    pass


class BCEWithLogitsLoss(nn.BCEWithLogitsLoss, Loss):
    pass

class IoU(Metric):
    __name__ = "iou_score"

    def __init__(self, eps=1e-7, threshold=0.5, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.threshold = threshold
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return iou(
            y_pr,
            y_gt,
            eps=self.eps,
            threshold=self.threshold,
            ignore_channels=self.ignore_channels,
        )


class Fscore(Metric):
    def __init__(self, beta=1, eps=1e-7, threshold=0.5, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.beta = beta
        self.threshold = threshold
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return f_score(
            y_pr,
            y_gt,
            eps=self.eps,
            beta=self.beta,
            threshold=self.threshold,
            ignore_channels=self.ignore_channels,
        )


class Accuracy(Metric):
    def __init__(self, threshold=0.5, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.threshold = threshold
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return accuracy(
            y_pr,
            y_gt,
            threshold=self.threshold,
            ignore_channels=self.ignore_channels,
        )


class Recall(Metric):
    def __init__(self, eps=1e-7, threshold=0.5, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.threshold = threshold
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return recall(
            y_pr,
            y_gt,
            eps=self.eps,
            threshold=self.threshold,
            ignore_channels=self.ignore_channels,
        )


class Precision(Metric):
    def __init__(self, eps=1e-7, threshold=0.5, activation=None, ignore_channels=None, **kwargs):
        super().__init__(**kwargs)
        self.eps = eps
        self.threshold = threshold
        self.activation = Activation(activation)
        self.ignore_channels = ignore_channels

    def forward(self, y_pr, y_gt):
        y_pr = self.activation(y_pr)
        return precision(
            y_pr,
            y_gt,
            eps=self.eps,
            threshold=self.threshold,
            ignore_channels=self.ignore_channels,
        )

### EPOCH CLASS

In [None]:
import numpy as np


class Meter(object):
    """Meters provide a way to keep track of important statistics in an online manner.
    This class is abstract, but provides a standard interface for all meters to follow.
    """

    def reset(self):
        """Reset the meter to default settings."""
        pass

    def add(self, value):
        """Log a new value to the meter
        Args:
            value: Next result to include.
        """
        pass

    def value(self):
        """Get the value of the meter in the current state."""
        pass


class AverageValueMeter(Meter):
    def __init__(self):
        super(AverageValueMeter, self).__init__()
        self.reset()
        self.val = 0

    def add(self, value, n=1):
        self.val = value
        self.sum += value
        self.var += value * value
        self.n += n

        if self.n == 0:
            self.mean, self.std = np.nan, np.nan
        elif self.n == 1:
            self.mean = 0.0 + self.sum  # This is to force a copy in torch/numpy
            self.std = np.inf
            self.mean_old = self.mean
            self.m_s = 0.0
        else:
            self.mean = self.mean_old + (value - n * self.mean_old) / float(self.n)
            self.m_s += (value - self.mean_old) * (value - self.mean)
            self.mean_old = self.mean
            self.std = np.sqrt(self.m_s / (self.n - 1.0))

    def value(self):
        return self.mean, self.std

    def reset(self):
        self.n = 0
        self.sum = 0.0
        self.var = 0.0
        self.val = 0.0
        self.mean = np.nan
        self.mean_old = 0.0
        self.m_s = 0.0
        self.std = np.nan

import sys
import torch
from tqdm import tqdm as tqdm

class Epoch:
    def __init__(self, model, loss, metrics, stage_name, device="cpu", verbose=True):
        self.model = model
        self.loss = loss
        self.metrics = metrics
        self.stage_name = stage_name
        self.verbose = verbose
        self.device = device

        self._to_device()

    def _to_device(self):
        self.model.to(self.device)
        self.loss.to(self.device)
        for metric in self.metrics:
            metric.to(self.device)

    def _format_logs(self, logs):
        str_logs = ["{} - {:.4}".format(k, v) for k, v in logs.items()]
        s = ", ".join(str_logs)
        return s

    def batch_update(self, x, y):
        raise NotImplementedError

    def on_epoch_start(self):
        pass

    def run(self, dataloader):

        self.on_epoch_start()

        logs = {}
        loss_meter = AverageValueMeter()
        metrics_meters = {metric.__name__: AverageValueMeter() for metric in self.metrics}

        with tqdm(
            dataloader,
            desc=self.stage_name,
            file=sys.stdout,
            disable=not (self.verbose),
        ) as iterator:
            for x, y in iterator:
                x, y = x.to(self.device), y.to(self.device)
                loss, y_pred = self.batch_update(x, y)

                # update loss logs
                loss_value = loss.cpu().detach().numpy()
                loss_meter.add(loss_value)
                loss_logs = {self.loss.__name__: loss_meter.mean}
                logs.update(loss_logs)

                # update metrics logs
                for metric_fn in self.metrics:
                    metric_value = metric_fn(y_pred, y).cpu().detach().numpy()
                    metrics_meters[metric_fn.__name__].add(metric_value)
                metrics_logs = {k: v.mean for k, v in metrics_meters.items()}
                logs.update(metrics_logs)

                if self.verbose:
                    s = self._format_logs(logs)
                    iterator.set_postfix_str(s)

        return logs


class TrainEpoch(Epoch):
    def __init__(self, model, loss, metrics, optimizer, device="cpu", verbose=True):
        super().__init__(
            model=model,
            loss=loss,
            metrics=metrics,
            stage_name="train",
            device=device,
            verbose=verbose,
        )
        self.optimizer = optimizer

    def on_epoch_start(self):
        self.model.train()

    def batch_update(self, x, y):

        self.optimizer.zero_grad()
        prediction = self.model.forward(x)
        if isinstance(prediction, dict):
          prediction = prediction['out']
        loss = self.loss(prediction, y)
        loss.backward()
        self.optimizer.step()
        return loss, prediction


class ValidEpoch(Epoch):
    def __init__(self, model, loss, metrics, device="cpu", verbose=True):
        super().__init__(
            model=model,
            loss=loss,
            metrics=metrics,
            stage_name="valid",
            device=device,
            verbose=verbose,
        )

    def on_epoch_start(self):
        self.model.eval()

    def batch_update(self, x, y):

        with torch.no_grad():
            prediction = self.model.forward(x)
            if isinstance(prediction, dict):
              prediction = prediction['out']
            loss = self.loss(prediction, y)
        return loss, prediction

##DATASET

In [None]:
import os
#os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import numpy as np
import cv2
import matplotlib.pyplot as plt

DATA_DIR = "/content/drive/MyDrive/ELE401/model2_DeepLab/Dataset_V2"

x_train_dir = os.path.join(DATA_DIR, 'X_train')
y_train_dir = os.path.join(DATA_DIR, 'y_train')


x_test_dir = os.path.join(DATA_DIR, 'X_test')
y_test_dir = os.path.join(DATA_DIR, 'y_test')


path, dirs, files = next(os.walk(x_train_dir))
file_count = len(files)
print(file_count)

In [None]:
CLASSES = ['background' ,'jp_drain', 'drain_bag', 'liquid']
height = width = 1024

In [None]:
from torch.utils.data import Dataset as BaseDataset

class Dataset(BaseDataset):
      def __init__(
        self,
        images_dir,
        masks_dir,
        classes = None,
        augmentation = None,
        preprocessing = None,
    ):
        #get images and masks
        self.ids_x = sorted(os.listdir(images_dir))
        self.ids_y = sorted(os.listdir(masks_dir))


        #fullt paths
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids_x]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids_y]

        #convert str names to class values on masks
        self.class_values = [CLASSES.index(cls.lower()) for cls in classes]
        self.augmentation = augmentation
        self.preprocessing = preprocessing



      def __getitem__(self, i):

        #read data
        image = cv2.imread(self.images_fps[i])
        if image is None:
          raise ValueError(f"Image not found or could not be read: {self.images_fps[i]}")

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        #image = cv2.resize(image, (width, height), interpolation=cv2.INTER_LINEAR)



        mask = cv2.imread(self.masks_fps[i], cv2.IMREAD_GRAYSCALE)
        if mask is None:
          raise ValueError(f"Mask not found or could not be read: {self.masks_fps[i]}")


        image = cv2.resize(image, (width, height), interpolation=cv2.INTER_LINEAR)
        mask = cv2.resize(mask, (width, height), interpolation=cv2.INTER_NEAREST)

        # extract certain classes
        masks = [(mask == v ) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float32')


        #apply augmentations
        if self.augmentation:
          sample = self.augmentation(image = image, mask = mask)
          image,mask = sample['image'], sample['mask']


        #apply preprocessing
        if self.preprocessing:
          sample = self.preprocessing(image= image, mask= mask)
          image, mask = sample['image'], sample['mask']


        return self.images_fps[i],image,mask # If filename information needed
        #return image,mask


      def __len__(self):
        return len(self.ids_x)



In [None]:
def visualize(image, mask, label=None, truth= None):
  if truth is None:
    plt.figure(figsize=(6,6))
    plt.subplot(1,2,1)
    plt.imshow(image)
    plt.subplot(1,2,2)
    plt.imshow(mask)
    if label is not None:
      plt.title(f"{label.capitalize()}")


In [None]:
  dataset = Dataset(images_dir= x_train_dir, masks_dir= y_train_dir, classes= CLASSES)
  #path, image,mask = dataset[301]
  image,mask = dataset[350]
  print(mask.shape)
  print(mask[550,500])
  #print(path)
  visualize(image= image, mask = mask.squeeze())

In [None]:
for label in CLASSES:
  dataset = Dataset(images_dir= x_train_dir, masks_dir= y_train_dir, classes=[label])
  #path,image,mask = dataset[501]
  image,mask = dataset[460]
  print(mask.shape)
  visualize(image= image, mask = mask.squeeze(), label= label)

In [None]:
import albumentations as albu

def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')



def get_preprocessing():
    _transform = [
        albu.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        albu.Lambda(image=to_tensor, mask=to_tensor)
    ]
    return albu.Compose(_transform)

## Track Log Loss Function

In [None]:
import pandas as pd
import numpy as np

def save_training_logs(epoch, train_logs, valid_logs=None, excel_path="loss_history_new.xlsx", old_iteration = 0):

    adjusted_epoch = old_iteration + epoch

    data = {
        'Epoch': [adjusted_epoch]
    }


    if 'Cross_Entropy_Loss' in train_logs:
        data['Train_Loss'] = [train_logs['Cross_Entropy_Loss']]

    # training IoU metrics
    for metric in ['IoU_Background', 'IoU_JP', 'IoU_Bag', 'IoU_Liquid']:
        if metric in train_logs:
            data[f'Train_{metric}'] = [train_logs[metric]]

    # validation metrics
    if valid_logs:
        # validation loss
        if 'Cross_Entropy_Loss' in valid_logs:
            data['Val_Loss'] = [valid_logs['Cross_Entropy_Loss']]

        #validation IoU metrics
        for metric in ['Val_IoU_Background', 'Val_IoU_JP', 'Val_IoU_Bag', 'Val_IoU_Liquid']:
            if metric in valid_logs:
                data[metric] = [valid_logs[metric]]

    # DataFrame for current epoch
    current_df = pd.DataFrame(data)

    # Check if the Excel file exists
    if os.path.exists(excel_path):
        try:
            # Read existing data
            existing_df = pd.read_excel(excel_path, engine="openpyxl")

            # Check if we need to add columns that might be missing in the existing file
            for col in current_df.columns:
                if col not in existing_df.columns:
                    existing_df[col] = None

            # Append new data and save
            combined_df = pd.concat([existing_df, current_df], ignore_index=True)
            combined_df.to_excel(excel_path, engine="openpyxl", index=False)
            print(f"Successfully saved training logs to: {excel_path}")
        except Exception as e:
            # If there's an error, create a backup file
            backup_path = f"{os.path.splitext(excel_path)[0]}_backup.xlsx"
            current_df.to_excel(backup_path, engine="openpyxl", index=False)
            print(f"Error with existing file: {e}")
            print(f"Saved current epoch data to: {backup_path}")
    else:
        # Create a new file if it doesn't exist
        current_df.to_excel(excel_path, engine="openpyxl", index=False)

##HYPERPARAMETERS

In [None]:
#Batch size
BATCH_SIZE = 8
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
NUM_CLASSES = 4

#Excel
EXCEL_FILE_PATH = '/content/drive/MyDrive/ELE401/model2_DeepLab/excel_logs/loss_history_new.xlsx'

#Model
STATE_DICT_PATH = '/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00140.pth'
MODEL_SAVE_FOLDER = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts"

##DATASETS, DATALOADERS

In [None]:
from torch.utils.data import DataLoader

train_dataset = Dataset(images_dir= x_train_dir, masks_dir= y_train_dir, classes= CLASSES, preprocessing= get_preprocessing())
test_dataset = Dataset(images_dir= x_test_dir, masks_dir= y_test_dir, classes= CLASSES, preprocessing= get_preprocessing())



train_DataLoader = DataLoader(dataset= train_dataset, batch_size= BATCH_SIZE, shuffle= True)
test_DataLoader = DataLoader(dataset= test_dataset, batch_size= 1)

#path,image, mask = next(iter(train_DataLoader))
image, mask = next(iter(train_DataLoader))
#image, mask = next(iter(train_DataLoader))
print(f"Image Path: {path}")
print(f"Image shape: {image.shape}")
print(f"Image type: {type(image)}")
print(f"Image dtype: {image.dtype}")



#print(f"Mask Shape: {mask.shape}")
print(f"Mask type: {type(mask)}")
print(f"Mask shape: {mask.shape}")
print(f"Mask dtype: {mask.dtype}")

##MODEL, LOSS_FN, OPTIMIZER

In [None]:
#DEVICE AGNOSTIC CODE
import torch

#HYPER PARAMETERS

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
NUM_CLASSES = 4

if DEVICE != "cuda":
  raise RuntimeError("Convert to GPU use model.load_state_dict(torch.load('x.torch', map_location=device))")



from torch import nn #loss_fn
from torch import optim #optimizer
from torchvision.models import segmentation #segmentation models


#MODEL
model = segmentation.deeplabv3_resnet50(pretrained=True)
model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
model = model.to(DEVICE)  # Move to device before loading

# Load state dict
loaded_state_dict = torch.load(STATE_DICT_PATH, map_location=DEVICE)
model.load_state_dict(loaded_state_dict)


#find old iteration
import re
import os

# Get old_iteration from model filename
try:
    match = re.search(r"modelWeights_(\d{5})\.pth$", STATE_DICT_PATH)
    if not match:
        raise ValueError(f"Could not extract iteration number from model path: {STATE_DICT_PATH}")

    old_iteration = int(match.group(1))
    print(f"Continuing training from iteration: {old_iteration}")
except Exception as e:
    print(f"ERROR: {e}")
    print("Provide a model path in the format 'model_XXXXX.pth' where XXXXX is a 5-digit number.")
    raise
print(f"Loaded model state_dict from {STATE_DICT_PATH} (Iteration: {old_iteration})")

#LOSS FUNCTION
class_weights = torch.tensor([
    1.0,   # Background
    20.0,  # JP Drain
    20.0,  # Bag
    100.0   # Liquid
], device=DEVICE)

loss_fn = torch.nn.CrossEntropyLoss(weight= class_weights)
loss_fn.__name__ = "Cross_Entropy_Loss"


#OPTIMIZER
LEARNING_RATE = 1e-5
optimizer = optim.Adam(params= model.parameters(), lr = LEARNING_RATE)

scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',  #IoU maximized
    factor=0.5,  # Multiply LR by this factor when reducing
    patience=3,  # Number of epochs with no improvement after which LR will be reduced
    verbose=True
)

old_iteration

##METRICS

In [None]:
#Train metrics
metrics_train = [
    IoU(threshold=0.5, ignore_channels=[1,2,3]), #0 Background
    IoU(threshold=0.5, ignore_channels=[0,2,3]), #1 JP
    IoU(threshold=0.5, ignore_channels=[0,1,3]), #2 Bag
    IoU(threshold=0.5, ignore_channels=[0,1,2]), #3 Liquid
   #Fscore(threshold=0.5),
]



metrics_train[0].__name__= "IoU_Background"
metrics_train[1].__name__= "IoU_JP"
metrics_train[2].__name__= "IoU_Bag"
metrics_train[3].__name__= "IoU_Liquid"



#Val metrics
metrics_test = [
    IoU(threshold=0.5, ignore_channels=[1,2,3]), #0 Background
    IoU(threshold=0.5, ignore_channels=[0,2,3]), #1 JP
    IoU(threshold=0.5, ignore_channels=[0,1,3]), #2 Bag
    IoU(threshold=0.5, ignore_channels=[0,1,2]), #3 Liquid
    #Fscore(threshold=0.5),
]



metrics_test[0].__name__= "Val_IoU_Background"
metrics_test[1].__name__= "Val_IoU_JP"
metrics_test[2].__name__= "Val_IoU_Bag"
metrics_test[3].__name__= "Val_IoU_Liquid"

##Train

In [None]:
import torch

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device:", DEVICE)

train_epoch = TrainEpoch(
    model,
    loss=loss_fn,
    metrics=metrics_train,
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,
)

valid_epoch = ValidEpoch(
    model,
    loss=loss_fn,
    metrics=metrics_test,
    device=DEVICE,
    verbose=True,
)


max_score = 0

for epoch in range(1, 10000):

    print('\nEpoch: {}'.format(epoch))
    train_logs = train_epoch.run(train_DataLoader)
    valid_logs = valid_epoch.run(test_DataLoader)

    current_iou = train_logs['IoU_Liquid']

    scheduler.step(current_iou)

    current_lr = optimizer.param_groups[0]['lr']
    print(f'Current learning rate: {current_lr:.2e}')

    # Save the best model
    if max_score < train_logs['IoU_Liquid']:
      max_score = train_logs['IoU_Liquid']
      best_model_path = os.path.join(MODEL_SAVE_FOLDER, f"model_best_Weights.pth")
      torch.save(model.state_dict(), best_model_path)
      print(f'Best model saved with IOU: {max_score:.4f}')

    # Save checkpoint
    if epoch % 2 == 0:
      checkpoint_number = old_iteration + epoch
      checkpoint_path = os.path.join(MODEL_SAVE_FOLDER, f"modelWeights_{checkpoint_number:05d}.pth")
      torch.save(model.state_dict(), checkpoint_path)
      print(f'Checkpoint saved as: {checkpoint_path}')


    save_training_logs(epoch=epoch, train_logs=train_logs, valid_logs= valid_logs, excel_path=EXCEL_FILE_PATH, old_iteration= old_iteration)

##Testing

###Last Model

In [None]:
test_epoch = ValidEpoch(
    model,
    loss=loss_fn,
    metrics=metrics_test,
    device=DEVICE,
    verbose=True,
)

#MODEL
model = segmentation.deeplabv3_resnet50(pretrained=True)
model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
model = model.to(DEVICE)  # Move to device before loading

# Load state dict
loaded_state_dict = torch.load(STATE_DICT_PATH, map_location=DEVICE)
model.load_state_dict(loaded_state_dict)


test_epoch.run(test_DataLoader)

###Best Model (Highest Liquid IoU Score)

In [None]:
#Best model STATE_DICT
BEST_STATE_DICT = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00148.pth"

#MODEL
model = segmentation.deeplabv3_resnet50(pretrained=True)
model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
model = model.to(DEVICE)  # Move to device before loading

# Load state dict
loaded_state_dict = torch.load(BEST_STATE_DICT, map_location=DEVICE)
model.load_state_dict(loaded_state_dict)


test_epoch = ValidEpoch(
    model,
    loss=loss_fn,
    metrics=metrics_test,
    device=DEVICE,
    verbose=True,
)


test_epoch.run(test_DataLoader)

##RGB Output Demo

In [None]:
def visualizeData(**images):
    """PLot images in one row."""
    n = len(images)
    plt.figure(figsize=(8, 5))
    for i, (name, image) in enumerate(images.items()):
        plt.subplot(1, n, i + 1)
        plt.xticks([])
        plt.yticks([])
        plt.title(' '.join(name.split('_')).title())
        plt.imshow(image)
    plt.show()

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

#Best model STATE_DICT
BEFORE_BEST_STATE_DICT = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00148.pth"

#MODEL
trained_model = segmentation.deeplabv3_resnet50(pretrained=True)
trained_model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
trained_model = trained_model.to(DEVICE)  # Move to device before loading

# Load state dict
loaded_state_dict = torch.load(BEFORE_BEST_STATE_DICT, map_location=DEVICE)
trained_model.load_state_dict(loaded_state_dict)



import torch


test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)

train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)


# train_dataloader = DataLoader(train_dataset)
# test_dataloader = DataLoader(test_dataset)



path, image, gt_mask = train_dataset[950]
#path, image, gt_mask = test_dataset[108]
#image, gt_mask = train_dataset[350]

trained_model.eval()
with torch.inference_mode():
  x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
  predicted_mask = trained_model(x_tensor)['out']

pr_mask = predicted_mask.squeeze().cpu().numpy()

#For all labels
pr_mask = np.argmax(pr_mask, axis=0)
gt_mask = np.argmax(gt_mask, axis=0)

#pr_mask = pr_mask[2,:,:]
#gt_mask = gt_mask[2,:,:]

image_t = image.transpose(1, 2, 0)
visualizeData(
        image=image_t,
        ground_truth_mask=gt_mask,
        predicted_mask=pr_mask
    )

#Convert the predicted mask to numpy and get the predicted class indices
predicted_output = torch.argmax(predicted_mask.squeeze(), dim=0).detach().cpu().numpy()
Indices = np.unique(predicted_output)
print(f"GT shape: {gt_mask.shape}")
print(f"PR shape: {pr_mask.shape}")

for i in Indices:
  print(CLASSES[i])

print(path)

In [None]:
def overlay_masks(gt_mask, pr_mask):

    plt.figure(figsize=(15, 5))

    # Display ground truth
    plt.subplot(1, 3, 1)
    gt_plot = plt.imshow(gt_mask, cmap='viridis')
    plt.title("Ground Truth")
    plt.axis("off")

    # Display prediction
    plt.subplot(1, 3, 2)
    pr_plot = plt.imshow(pr_mask, cmap='viridis')
    plt.title("Prediction")
    plt.axis("off")

    # Display overlay
    plt.subplot(1, 3, 3)

    # Create a combined visualization
    plt.imshow(gt_mask, cmap='Reds', alpha=0.5)
    plt.imshow(pr_mask, cmap='inferno', alpha=0.5)
    plt.title("Overlay (Blue=GT, Red=Prediction)")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

overlay_masks(gt_mask, pr_mask)

In [None]:
def visualize_class_mask(class_mask, num_classes=4):

    # Define colors for each class BGR for OpenCV or RGB for matplotlip
    colors = [
        [0, 0, 0],  # background
        [0,  0, 255],  # JP
        [255, 255, 0],  #Bag
        [0, 255, 0]     # Liq
    ]

    # Create RGB image
    colored_mask = np.zeros((class_mask.shape[0], class_mask.shape[1], 3), dtype=np.uint8)

    # Fill in colors based on class indices
    for class_idx in range(num_classes):
        colored_mask[class_mask == class_idx] = colors[class_idx]

    return colored_mask

In [None]:
rgb_map = visualize_class_mask(predicted_output,4)
rgb_map = cv2.cvtColor(rgb_map, cv2.COLOR_RGB2BGR)
cv2.imwrite('/content/drive/MyDrive/ELE401/model2_DeepLab/rgb_predicted_map/temp.png', rgb_map)

plt.imshow(cv2.imread('/content/drive/MyDrive/ELE401/model2_DeepLab/rgb_predicted_map/temp.png'))

In [None]:
!pip install ultralytics

In [None]:
import numpy as np
import cv2
from matplotlib import pyplot as plt
from PIL import Image
import math

img = Image.open("/content/drive/MyDrive/ELE401/model2_DeepLab/rgb_predicted_map/temp.png")


drain = 0
blood = 0

from ultralytics import YOLO
def prepare_for_yolo(image):
    #transpose to (H, W, C)
    if image.shape[0] == 3 and len(image.shape) == 3:
        image = image.transpose(1, 2, 0)

    # Reverse normalization
    mean = np.array([0.485, 0.456, 0.406]).reshape(-1, 1, 1)
    std = np.array([0.229, 0.224, 0.225]).reshape(-1, 1, 1)

    if image.shape[-1] == 3:  # If channels last
        mean = mean.reshape(1, 1, 3)
        std = std.reshape(1, 1, 3)

    image = image * std + mean

    # Convert to uint8 range [0, 255]
    image = (image * 255).clip(0, 255).astype(np.uint8)

    return image



#path, image, gt_mask = test_dataset[24]
print(image.shape)

# Prepare image for YOLO
image = prepare_for_yolo(image)
plt.imshow(image)
print(image.shape)

model = YOLO("/content/drive/MyDrive/ELE401/model2_DeepLab/YOLO_weights/best.pt")

# Run YOLO detection
results = model(image, conf= 0.3)
boxes = results[0].boxes
confidences = boxes.conf.cpu().numpy()
best_idx = confidences.argmax()

class_id = int(boxes.cls[best_idx].item())
confidence = confidences[best_idx]

print(f"Confidence: {confidence}")
CLASS = ""

# Map class ID to name
class_names = results[0].names
class_name = class_names[class_id]

print(class_names)

# Return class type based on detection
if "jp_high" in class_name.lower():
  CLASS = "JP_high"

if "jp_low" in class_name.lower():
   CLASS = "JP_low"

elif "drain_bag" in class_name.lower():
   CLASS = "BAG"

if CLASS== "JP_high":
  mul_value = 350

if CLASS== "JP_low":
  mul_value = 267

if CLASS== "BAG":
  mul_value = 1950  #Not the actual value used, only for demonstration

for pixel in img.getdata():
    if pixel == (0, 0, 255): # (0, 0, 255) JP Drain, (255,255,0) Drain_Bag
        drain += 1
    elif pixel == (255, 255, 0): # Bag
        drain += 1
    elif pixel == (0, 255, 0): # Liquid
        blood += 1

print(f"Mul value: {mul_value}")

volume = math.floor(blood / (blood + drain) * mul_value)
print(f"Estimated volume of blood is {volume}ml")
print(blood)

## Save Overlay Masks

In [None]:
####  HYPERPARAMS ####
BEFORE_BEST_STATE_DICT = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00148.pth"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# classses
CLASS_IDX = 3  #1 JP, #2 BAG, #Liq


# save directories for overlayed masks
JP_overlay_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/JP_overlayed_masks_V2"
BAG_overlay_dir = "/content/drive/MyDrive/ELE401/model2_DeepLab/BAG_overlayed_masks_V2"
LIQ_overlay_dir = "/content/drive/MyDrive/ELE401/model2_DeepLab/Liquid_overlayed_masks_V2"


print(BEFORE_BEST_STATE_DICT)

In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader


#Best model STATE_DICT


#MODEL
trained_model = segmentation.deeplabv3_resnet50(pretrained=True)
trained_model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
trained_model = trained_model.to(DEVICE)  # Move to device before loading

# Load state dict
loaded_state_dict = torch.load(BEFORE_BEST_STATE_DICT, map_location=DEVICE)
trained_model.load_state_dict(loaded_state_dict)


# Initialize  test dataset
valid_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)


# Initialize  train dataset
train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)


# Function to overlay masks and save with better contrasting colors
def save_overlay(gt_mask, pr_mask, save_path):
    plt.figure(figsize=(5, 5))
    plt.imshow(gt_mask, cmap="Greens", alpha=0.5, label="Ground Truth")
    plt.imshow(pr_mask, cmap="inferno", alpha=0.5, label="Predicted Mask")

    plt.savefig(save_path, bbox_inches='tight', pad_inches=0, dpi=300)
    plt.close()


#CHANGE
for idx in range(len(train_dataset)):
    #CHANGE
    file_path,image, gt_mask = train_dataset[idx]
    filename = os.path.splitext(os.path.basename(file_path))[0]

    # To process Liquid comment out the lines below.

    # if filename.startswith('JP'):
    #     overlay_save_dir = JP_overlay_path
    #     CLASS_IDX = 1  # JP
    # elif filename.startswith('BAG'):
    #     overlay_save_dir = BAG_overlay_dir
    #     CLASS_IDX = 2  #  BAG
    if CLASS_IDX == 3:
        overlay_save_dir = LIQ_overlay_dir
        CLASS_IDX = 3  # Liquid


    x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)

    trained_model.eval()
    with torch.inference_mode():
        prediction = trained_model(x_tensor)
        if isinstance(prediction, dict):
          prediction = prediction['out']



    pr_mask = prediction.squeeze().cpu().numpy()

    #For all labels
    pr_mask = np.argmax(pr_mask, axis=0)
    gt_mask = np.argmax(gt_mask, axis=0)





    # Process ground truth if needed
    if gt_mask.ndim > 2:
        gt_mask = np.argmax(gt_mask, axis=0)



    # Create binary masks
    gt_class_mask = (gt_mask == CLASS_IDX).astype(np.float32)
    pr_class_mask = (pr_mask == CLASS_IDX).astype(np.float32)

    # Overlay save path
    overlay_filename = os.path.join(overlay_save_dir, f"{filename}_overlay.png")

    # Save overlayed image
    save_overlay(gt_class_mask, pr_class_mask, overlay_filename)

    print(f"PR mask shape: {pr_mask.shape}, dtype: {pr_mask.dtype}, min: {pr_mask.min()}, max: {pr_mask.max()}")
    print(f"PR class mask shape: {pr_class_mask.shape}, sum: {pr_class_mask.sum()}")

    print(f"Saved overlay: {overlay_filename}")


In [None]:
#plt.imshow(gt_class_mask, cmap="Greens", alpha=0.5, label="Ground Truth")  # Blue for ground truth
plt.imshow(pr_class_mask, cmap="inferno", alpha=0.5, label="Predicted Mask")  # Red-yellow for prediction
np.unique(pr_mask)


##Predicted Masks

In [None]:
# Define save directory for overlayed masks
JP_PRE_dir = "/content/drive/MyDrive/ELE401/model2_DeepLab/JP_Pre_masks_V2"
BAG_PRE_dir = "/content/drive/MyDrive/ELE401/model2_DeepLab/BAG_Pre_masks_V2"

# Load model and weights
BEST_STATE_DICT = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00148.pth"
print(BEST_STATE_DICT)

In [None]:
import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader


# Initialize datasets
train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)

test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)


import os
import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# # Paths for saving predictions
# BASE_SAVE_DIR = "/content/drive/MyDrive/ELE401/predictions"
# JP_SAVE_DIR = os.path.join(BASE_SAVE_DIR, "JP_predictions")
# BAG_SAVE_DIR = os.path.join(BASE_SAVE_DIR, "BAG_predictions")

# # Create directories
# os.makedirs(JP_SAVE_DIR, exist_ok=True)
# os.makedirs(BAG_SAVE_DIR, exist_ok=True)



# Initialize model
trained_model = segmentation.deeplabv3_resnet50(pretrained=True)
trained_model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
trained_model = trained_model.to(DEVICE)

# Load state dict
loaded_state_dict = torch.load(BEST_STATE_DICT, map_location=DEVICE)
trained_model.load_state_dict(loaded_state_dict)
trained_model.eval()

# Initialize datasets
train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)

train_dataset_org_img = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing= None,
    classes=CLASSES,
)

test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)


test_dataset_org_img = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=None,
    classes=CLASSES,
)


def visualizeData(image, ground_truth_mask, predicted_mask, save_path=None):
    # Create a figure with 3 subplots
    plt.figure(figsize=(15, 5))

    # Display original image
    plt.subplot(1, 3, 1)
    plt.imshow(image)
    plt.title('Original Image')
    plt.axis('off')

    # Display ground truth mask
    plt.subplot(1, 3, 2)
    plt.imshow(ground_truth_mask)
    plt.title('Ground Truth')
    plt.axis('off')

    # Display predicted mask
    plt.subplot(1, 3, 3)
    plt.imshow(predicted_mask)
    plt.title('Prediction')
    plt.axis('off')

    # Save or display
    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=300)
        plt.close()
    else:
        plt.tight_layout()
        plt.show()

# Function to process a dataset and save predictions
def process_dataset(dataset, dataset_name, img_dataset):
    print(f"Processing {dataset_name} dataset...")

    for idx in range(len(dataset)):
        try:
            # Get data
            path, image, gt_mask = dataset[idx]
            filename = os.path.splitext(os.path.basename(path))[0]

            # Determine save directory
            if filename.startswith('JP'):
                save_dir = JP_PRE_dir
            elif filename.startswith('BAG'):
                save_dir = BAG_PRE_dir
            else:
                continue

            # Make prediction
            with torch.inference_mode():
                x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)
                predicted_mask = trained_model(x_tensor)['out']

            # Process the prediction
            pr_mask = predicted_mask.squeeze().cpu().numpy()
            pr_mask = np.argmax(pr_mask, axis=0)

            # Process ground truth
            if gt_mask.ndim > 2:
                gt_mask = np.argmax(gt_mask, axis=0)

            #Get org image
            path_org, org_image, org_gt_mask = img_dataset[idx]

            # Define save path
            save_path = os.path.join(save_dir, f"{filename}.png")

            # Visualize and save
            visualizeData(
                image=org_image,
                ground_truth_mask=gt_mask,
                predicted_mask=pr_mask,
                save_path=save_path
            )

            # Log progress
            print(f"Saved: {save_path}")

            # Get unique class indices in prediction
            classes_present = np.unique(pr_mask)
            class_names = [CLASSES[i] for i in classes_present]
            print(f"Classes in prediction: {', '.join(class_names)}")

        except Exception as e:
            print(f"Error processing sample {idx}: {e}")


process_dataset(train_dataset, "training", train_dataset_org_img)
process_dataset(test_dataset, "test", test_dataset_org_img)

print("All predictions saved successfully!")

##EXCEL file

In [None]:
!pip install ultralytics

In [None]:
YOLO_PATH = "/content/drive/MyDrive/ELE401/model2_DeepLab/YOLO_weights/best.pt"
BEST_STATE_DICT = "/content/drive/MyDrive/ELE401/model2_DeepLab/state_dicts/modelWeights_00148.pth"


print(BEST_STATE_DICT)


In [None]:
import os
import cv2
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import re
import math
from torch.utils.data import DataLoader


def prepare_for_yolo(image):
    #Reverse preprocessing to prepare image for YOLO
    if image.shape[0] == 3 and len(image.shape) == 3:
        image = image.transpose(1, 2, 0)

    # Reverse normalization
    mean = np.array([0.485, 0.456, 0.406]).reshape(-1, 1, 1)
    std = np.array([0.229, 0.224, 0.225]).reshape(-1, 1, 1)

    if image.shape[-1] == 3:
        mean = mean.reshape(1, 1, 3)
        std = std.reshape(1, 1, 3)

    image = image * std + mean

    image = (image * 255).clip(0, 255).astype(np.uint8)

    return image


def extract_volume_from_filename(filename):

    match = re.search(r'(\d+)ml', filename)
    if match:
        return int(match.group(1))
    return None


def class_wise_iou(pr_mask, gt_mask, num_classes):

    iou_scores = {}

    # Convert masks to appropriate format if needed
    if isinstance(pr_mask, torch.Tensor):
        pr_mask = pr_mask.cpu().numpy()
    if isinstance(gt_mask, torch.Tensor):
        gt_mask = gt_mask.cpu().numpy()

    # Handle both one-hot and class index formats
    for class_idx in range(num_classes):
        if pr_mask.ndim > 2 and pr_mask.shape[0] == num_classes:
            # One-hot encoded
            pr_bin = pr_mask[class_idx] > 0.5
        else:
            # Class index map
            pr_bin = pr_mask == class_idx

        if gt_mask.ndim > 2 and gt_mask.shape[0] == num_classes:
            # One-hot encoded
            gt_bin = gt_mask[class_idx] > 0.5
        else:
            # Class index map
            gt_bin = gt_mask == class_idx

        # Calculate intersection and union
        intersection = np.sum(pr_bin & gt_bin)
        union = np.sum(pr_bin) + np.sum(gt_bin) - intersection

        eps = 1e-7
        iou = (intersection + eps) / (union + eps)

        iou_scores[f'Class_{class_idx}'] = round(float(iou), 4)

    return iou_scores

# Device configuration
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


def load_yolo_model(weights_path=YOLO_PATH):
    """Load YOLOv8 model with specified weights"""
    try:
        # Try to import Ultralytics
        from ultralytics import YOLO
        # Load the model
        model = YOLO(weights_path)
        return model
    except ImportError:
        print("Ultralytics not found. Installing...")
        os.system("pip install -q ultralytics")
        from ultralytics import YOLO
        model = YOLO(weights_path)
        return model

# Load YOLO model
yolo_model = load_yolo_model()

# Load DeepLabV3 model
trained_model = segmentation.deeplabv3_resnet50(pretrained=True)
trained_model.classifier[4] = nn.Conv2d(256, NUM_CLASSES, kernel_size=(1, 1), stride=(1, 1))
trained_model = trained_model.to(DEVICE)

# Load segmentation model weights
loaded_state_dict = torch.load(BEST_STATE_DICT, map_location=DEVICE)
trained_model.load_state_dict(loaded_state_dict)
trained_model.eval()


# Initialize datasets
train_dataset = Dataset(
    x_train_dir,
    y_train_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)

test_dataset = Dataset(
    x_test_dir,
    y_test_dir,
    augmentation=None,
    preprocessing=get_preprocessing(),
    classes=CLASSES,
)

def detect_class_with_yolo(image):

    # Prepare image for YOLO
    image = prepare_for_yolo(image)

    # Run YOLO detection
    results = yolo_model(image, conf=0.3)

    # Process results
    if len(results[0].boxes) == 0:
        return None, 0.0  # No detections

    # Get the class with highest confidence
    boxes = results[0].boxes
    confidences = boxes.conf.cpu().numpy()
    best_idx = confidences.argmax()

    class_id = int(boxes.cls[best_idx].item())
    confidence = confidences[best_idx]

    # Extract name
    class_names = results[0].names
    class_name = class_names[class_id]

    # Return class type based on detection
    if "jp_high" in class_name.lower():
        return "JP_high", confidence
    elif "jp_low" in class_name.lower():
        return "JP_low", confidence
    elif "drain_bag" in class_name.lower():
        return "Drain_Bag", confidence
    else:
        return class_name, confidence


# Function to visualize masks
def visualize_class_mask(class_mask, num_classes=4):

    colors = [
        [0, 0, 0],      # Background
        [0, 0, 255],    # JP
        [255, 255, 0],  # Bag
        [0, 255, 0]     # Liquid
    ]

    colored_mask = np.zeros((class_mask.shape[0], class_mask.shape[1], 3), dtype=np.uint8)

    for class_idx in range(min(num_classes, len(colors))):
        colored_mask[class_mask == class_idx] = colors[class_idx]

    return colored_mask



def apply_bag_correction(ratio):

    a, b, c, d = -0.639004, 1.223345, 0.570415, -0.015097  # E

    if 0.4 <= ratio <= 0.62:

        corrected_ratio = ratio
    else:

        corrected_ratio = a*(ratio**3) + b*(ratio**2) + c*ratio + d

    # Ensure ratio stays between 0 and 1
    corrected_ratio = max(0, min(corrected_ratio, 1))


    return corrected_ratio




# Function to calculate volume
def calculate_volume(mask, class_type, filename):
    """Calculate volume based on mask and class type"""

    # if class_type is None:
    #     # Try to infer from filename
    #     if "JP" in filename:
    #         class_type = "JP_high"
    #     elif "BAG" in filename:
    #         class_type = "Drain_Bag"
    #     else:
    #         return 0  # Can't determine class type

    # Count pixels for each class
    jp_pixels = np.sum(mask == 1)      # JP drain
    bag_pixels = np.sum(mask == 2)     # Bag
    liquid_pixels = np.sum(mask == 3)  # Liquid )

    # Determine which container is relevant based on class_type
    if "jp_low" in class_type.lower() or "jp_high" in class_type.lower() or "jp" in class_type.lower():
        container_pixels = jp_pixels
    elif "bag" in class_type.lower() or "drain_bag" in class_type.lower():
        container_pixels = bag_pixels
    else:
        return 0  # Unknown container type

    # Skip if no liquid or container detected
    if liquid_pixels == 0 or container_pixels == 0:
        return 0

    # Calculate the ratio of liquid to total volume
    ratio = liquid_pixels / (liquid_pixels + container_pixels)

    # Determine capacity based on class_type
    if class_type == "JP_high" or class_type == "JP":
        capacity = 350
        volume_ml = math.floor(ratio * capacity)
    elif class_type == "JP_low":
        capacity = 267
        volume_ml = math.floor(ratio * capacity)
    elif class_type == "Drain_Bag" or class_type == "BAG":
        capacity = 1950
        corrected_ratio = apply_bag_correction(ratio)
        volume_ml = math.floor(corrected_ratio * capacity)



    return volume_ml

# Lists to store results
results = []
undetected_images = []

# Process datasets
datasets = {
    "Train": train_dataset,
    "Test": test_dataset
}

for dataset_name, dataset in datasets.items():
    print(f"Processing {dataset_name} dataset...")

    for idx in range(len(dataset)):
        try:
            file_path, image, gt_mask = dataset[idx]
            filename = os.path.splitext(os.path.basename(file_path))[0]

            print(f"Processing {filename} ({idx+1}/{len(dataset)})")

            # Get ground truth volume if available
            gt_volume = extract_volume_from_filename(filename)

            # Detect class type using YOLO model
            yolo_class_type, confidence = detect_class_with_yolo(image)

            # Check if YOLO detection failed
            yolo_detection_failed = False
            if yolo_class_type is None:
                print(f"  YOLO detection failed for {filename}, trying filename...")
                yolo_detection_failed = True

                # Record the undetected image
                undetected_images.append({
                    "Dataset": dataset_name,
                    "Filename": filename,
                    "Path": file_path
                })

                # Try to determine class from filename
                if "JP" in filename:
                    yolo_class_type = "JP_high"
                    confidence = 0.0
                elif "BAG" in filename:
                    yolo_class_type = "Drain_Bag"
                    confidence = 0.0
                else:
                    # Use default
                    yolo_class_type = "Unknown"
                    confidence = 0.0

            # Predict segmentation mask


            x_tensor = torch.from_numpy(image).to(DEVICE).unsqueeze(0)

            with torch.no_grad():
                prediction = trained_model(x_tensor)
                if isinstance(prediction, dict):
                    prediction = prediction['out']

            # Convert to class index map
            pr_mask = torch.argmax(prediction, dim=1).squeeze().cpu().numpy()

            # Process ground truth
            if gt_mask.ndim > 2:
                gt_mask = np.argmax(gt_mask, axis=0)

            # Calculate IoU scores
            iou_scores = class_wise_iou(pr_mask, gt_mask, len(CLASSES))

            # Calculate volume using the class type
            pred_volume = calculate_volume(pr_mask, yolo_class_type, filename)


            # Store results
            result = {
                "Dataset": dataset_name,
                "Filename": filename,
                "YOLO_Detected": not yolo_detection_failed,
                "YOLO_Class": yolo_class_type,
                "YOLO_Confidence": round(float(confidence), 4) if confidence else 0.0,
                "IoU_Background": iou_scores.get("Class_0", 0),
                "IoU_JP": iou_scores.get("Class_1", 0),
                "IoU_Bag": iou_scores.get("Class_2", 0),
                "IoU_Liquid": iou_scores.get("Class_3", 0),
                "GT_Volume": gt_volume,
                "Predicted_Volume": pred_volume,
            }

            # Add volume error if ground truth is available
            if gt_volume and gt_volume > 0:
                result["Volume_Error_ml"] = pred_volume - gt_volume
                result["Volume_Error_Percent"] = round((pred_volume - gt_volume) / gt_volume * 100, 2)

            results.append(result)
            print(f"  Processed: Class={yolo_class_type}, Volume={pred_volume}ml")

        except Exception as e:
            print(f"Error processing {filename}: {e}")
            import traceback
            traceback.print_exc()

# Create directories if they don't exist
os.makedirs("/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3", exist_ok=True)

# Create DataFrame and save to Excel
df_results = pd.DataFrame(results)
excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/yolo_integration_results_V3.xlsx"
df_results.to_excel(excel_path, index=False)

print(f"Results saved to: {excel_path}")

# Save undetected images to a separate Excel file
if undetected_images:
    df_undetected = pd.DataFrame(undetected_images)
    undetected_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/undetected_images_V3.xlsx"
    df_undetected.to_excel(undetected_path, index=False)
    print(f"Undetected images saved to: {undetected_path}")
    print(f"Total undetected images: {len(undetected_images)}")
else:
    print("All images were successfully detected by YOLO.")

# Print summary statistics
print("\nSummary of Results:")
print(f"Total processed images: {len(results)}")
print(f"YOLO detection rate: {sum(df_results['YOLO_Detected']) / len(df_results) * 100:.2f}%")

# Print IoU statistics
print(f"Average IoU_JP: {df_results['IoU_JP'].mean():.4f}")
print(f"Average IoU_Bag: {df_results['IoU_Bag'].mean():.4f}")
print(f"Average IoU_Liquid: {df_results['IoU_Liquid'].mean():.4f}")

# Print volume error statistics
if 'Volume_Error_Percent' in df_results.columns:
    valid_errors = df_results['Volume_Error_Percent'].dropna()
    if not valid_errors.empty:
        print(f"Average Volume Error: {valid_errors.mean():.2f}%")
        print(f"Median Volume Error: {valid_errors.median():.2f}%")
        print(f"Volume Error Range: {valid_errors.min():.2f}% to {valid_errors.max():.2f}%")

In [None]:
import pandas as pd

# Path to your existing Excel file
excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/yolo_integration_results_V3.xlsx"

# Read the Excel file
df = pd.read_excel(excel_path)

print(df['YOLO_Detected'].value_counts())

YOLO_Detected
True    1086
Name: count, dtype: int64


In [None]:
import pandas as pd

# Path to your existing Excel file
excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/yolo_integration_results_V3.xlsx"

# Read the Excel file
df = pd.read_excel(excel_path)

# Remove unwanted columns if they exist
columns_to_remove = ["YOLO_Detected"]
existing_columns = [col for col in columns_to_remove if col in df.columns]

if existing_columns:
    df = df.drop(columns=existing_columns)
    print(f"Removed columns: {', '.join(existing_columns)}")
else:
    print("No matching columns found to remove")

# Sort by the 'Filename' column
if "Filename" in df.columns:
    df = df.sort_values(by="Filename")
    print("DataFrame sorted by 'Filename'")
else:
    print("'Filename' column not found in the DataFrame. Skipping sorting.")

# Save the modified DataFrame back to the Excel file
df.to_excel(excel_path, index=False)
print(f"Modified Excel file saved to: {excel_path}")


In [None]:
import pandas as pd
import os

# Path to your existing Excel file
excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/yolo_integration_results_V3.xlsx"

# Output paths for the separated Excel files
bag_excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/BAG_results_sorted_V3.xlsx"
jp_excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/JP_results_sorted_V3.xlsx"
other_excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/other_results_V3.xlsx"

# Read the Excel file
df = pd.read_excel(excel_path)
print(f"Read {len(df)} rows from original Excel file")

# Create masks for BAG and JP entries
bag_mask = df['Filename'].str.startswith('BAG')
jp_mask = df['Filename'].str.startswith('JP')

# Split the dataframe
bag_df = df[bag_mask].copy()
jp_df = df[jp_mask].copy()
other_df = df[~(bag_mask | jp_mask)].copy()

# Save to separate Excel files
bag_df.to_excel(bag_excel_path, index=False)
print(f"Saved {len(bag_df)} BAG entries to: {bag_excel_path}")

jp_df.to_excel(jp_excel_path, index=False)
print(f"Saved {len(jp_df)} JP entries to: {jp_excel_path}")

# If there are any other entries not starting with BAG or JP
if len(other_df) > 0:
    other_df.to_excel(other_excel_path, index=False)
    print(f"Saved {len(other_df)} other entries to: {other_excel_path}")
else:
    print("No other entries found")

print(f"Total entries processed: {len(bag_df) + len(jp_df) + len(other_df)}")
print(f"Original file had: {len(df)} entries")

In [None]:
import pandas as pd
import numpy as np

# Path to the JP Excel file
jp_excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/JP_results_sorted_V3.xlsx"

# Load the JP data
jp_df = pd.read_excel(jp_excel_path)
print(f"Loaded {len(jp_df)} JP samples from Excel file")

# Check if Volume_Error_Percent column exists
if 'Volume_Error_Percent' in jp_df.columns:
    # Calculate mean percentage error (ignoring NaN values)
    mean_error = jp_df['Volume_Error_Percent'].mean()
    median_error = jp_df['Volume_Error_Percent'].median()
    std_error = jp_df['Volume_Error_Percent'].std()

    # Calculate absolute error statistics
    abs_errors = jp_df['Volume_Error_Percent'].abs()
    mean_abs_error = abs_errors.mean()
    median_abs_error = abs_errors.median()

    print(f"Mean percentage error: {mean_error:.2f}%")
    print(f"Median percentage error: {median_error:.2f}%")
    print(f"Standard deviation: {std_error:.2f}%")
    print(f"Mean absolute percentage error: {mean_abs_error:.2f}%")
    print(f"Median absolute percentage error: {median_abs_error:.2f}%")

    # Count samples under various threshold values
    thresholds = [5, 10, 15, 20, 25, 30]

    print("\nSamples under error thresholds:")
    for th in thresholds:
        count = (abs_errors <= th).sum()
        percentage = (count / len(jp_df)) * 100
        print(f"Error <= {th}%: {count} samples ({percentage:.2f}% of total)")

    # Distribution of errors
    print("\nError distribution:")
    bins = [-100, -50, -25, -10, -5, 0, 5, 10, 25, 50, 100, float('inf')]
    labels = [
        "< -50%", "-50% to -25%", "-25% to -10%", "-10% to -5%", "-5% to 0%",
        "0% to 5%", "5% to 10%", "10% to 25%", "25% to 50%", "50% to 100%", "> 100%"
    ]

    # Count samples in each bin
    error_distribution = pd.cut(jp_df['Volume_Error_Percent'], bins=bins, labels=labels)
    error_counts = error_distribution.value_counts().sort_index()

    for category, count in error_counts.items():
        percentage = (count / len(jp_df)) * 100
        print(f"{category}: {count} samples ({percentage:.2f}% of total)")

else:
    print("Volume_Error_Percent column not found in the JP Excel file")


#print(f"IoU Liquid:{np.mean(jp_df[jp_df['IoU_Liquid'] > 0]['IoU_Liquid'])} ")
#print(f"IoU Drain: {np.mean(jp_df[jp_df['IoU_JP'] > 0]['IoU_JP'])}")

print(f"IoU Liquid:{np.mean(jp_df['IoU_Liquid'])} ")
print(f"IoU Drain: {np.mean(jp_df['IoU_JP'])}")

jp_df_liq_iou_0 = jp_df[jp_df['IoU_Liquid'] == 0]
jp_df_jp_iou_0 = jp_df[jp_df['IoU_JP'] == 0]

print(f"LiqIoU=0 : {len(jp_df_liq_iou_0)} ")
print(f"JPIoU=0 : {len(jp_df_jp_iou_0)} ")

In [None]:
import pandas as pd
import numpy as np

# Path to the BAG Excel file
bag_excel_path = "/content/drive/MyDrive/ELE401/model2_DeepLab/excel_results_V3/BAG_results_sorted_V3.xlsx"

# Load the BAG datas
bag_df = pd.read_excel(bag_excel_path)
print(f"Loaded {len(bag_df)} BAG samples from Excel file")

# Check if Volume_Error_Percent column exists
if 'Volume_Error_Percent' in bag_df.columns:
    # Calculate mean percentage error (ignoring NaN values)
    mean_error = bag_df['Volume_Error_Percent'].mean()
    median_error = bag_df['Volume_Error_Percent'].median()
    std_error = bag_df['Volume_Error_Percent'].std()

    # Calculate absolute error statistics
    abs_errors = bag_df['Volume_Error_Percent'].abs()
    mean_abs_error = abs_errors.mean()
    median_abs_error = abs_errors.median()

    print(f"Mean percentage error: {mean_error:.2f}%")
    print(f"Median percentage error: {median_error:.2f}%")
    print(f"Standard deviation: {std_error:.2f}%")
    print(f"Mean absolute percentage error: {mean_abs_error:.2f}%")
    print(f"Median absolute percentage error: {median_abs_error:.2f}%")

    # Count samples under various threshold values
    thresholds = [5, 10, 15, 20, 25, 30]

    print("\nSamples under error thresholds:")
    for th in thresholds:
        count = (abs_errors <= th).sum()
        percentage = (count / len(bag_df)) * 100
        print(f"Error <= {th}%: {count} samples ({percentage:.2f}% of total)")

    # Distribution of errors
    print("\nError distribution:")
    bins = [-100, -50, -25, -10, -5, 0, 5, 10, 25, 50, 100, float('inf')]
    labels = [
        "< -50%", "-50% to -25%", "-25% to -10%", "-10% to -5%", "-5% to 0%",
        "0% to 5%", "5% to 10%", "10% to 25%", "25% to 50%", "50% to 100%", "> 100%"
    ]

    # Count samples in each bin
    error_distribution = pd.cut(bag_df['Volume_Error_Percent'], bins=bins, labels=labels)
    error_counts = error_distribution.value_counts().sort_index()

    for category, count in error_counts.items():
        percentage = (count / len(bag_df)) * 100
        print(f"{category}: {count} samples ({percentage:.2f}% of total)")

else:
    print("Volume_Error_Percent column not found in the BAG Excel file")

print(f"IoU Liquid:  {bag_df['IoU_Liquid'].mean()}")
print(f"IoU Bag:  {np.mean(bag_df['IoU_Bag']):.4f}")



bag_df_liq_iou_0 = bag_df[bag_df['IoU_Liquid'] == 0]
bag_df_bag_iou_0 = bag_df[bag_df['IoU_JP'] == 0]

print(f"LiqIoU=0 : {len(bag_df_liq_iou_0)} ")
print(f"BAGIoU=0 : {len(bag_df_bag_iou_0)} ")