In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
# Importing the required modules

import numpy as np
import cv2
import matplotlib.pyplot as plt
import os
import warnings
warnings.filterwarnings('ignore')

In [None]:
SOURCE_IMAGE_DIRECTORY = "../MSFD/1/face_crop"

In [5]:
def readAndPreProcessImage(img_path):
    img = cv2.imread(img_path)
    imgRGB = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img, imgRGB, gray

In [6]:
def applyOtsuThreshold(gray):
    _, otsuMask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    return otsuMask

In [7]:
def detectEdges(gray, lowThreshold=50, highThreshold=150):
    edges = cv2.Canny(gray, lowThreshold, highThreshold)
    return edges

In [8]:
def applyMorphologicalClosing(mask, kernelSize=(5, 5)):
    kernel = np.ones(kernelSize, np.uint8)
    closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    return closed

In [9]:
def combineMasks(mask1, mask2):
    combined = cv2.bitwise_and(mask1, mask1, mask=mask2)
    return combined

In [10]:
def visualize(imgRGB, otsuMask, edges, otsuClosed):
    plt.figure(figsize=(40, 10))

    titles = [
        "The Original RGB Image",
        "Otsu's Thresholding Applied",
        "Canny Edge Detection",
        "Otsu + Morphological Closing"
    ]
    images = [imgRGB, otsuMask, edges, otsuClosed]
    cmaps = [None, 'gray', 'gray', 'gray']

    for i in range(4):
        plt.subplot(1, 4, i + 1)
        plt.imshow(images[i], cmap=cmaps[i])
        plt.title(titles[i], fontsize=24)
        plt.axis('off')

    plt.suptitle("Traditional Facemask Segmentation using Otsu's Thresholding and Canny", fontsize=28, y=1.05)
    plt.tight_layout()
    plt.show()

In [None]:
imagePath = "../MSFD/1/face_crop/005100_1.jpg"
img, imgRGB, gray = readAndPreProcessImage(imagePath)
otsuMask = applyOtsuThreshold(gray)
edges = detectEdges(gray)
otsuClosed = applyMorphologicalClosing(otsuMask)
combined = combineMasks(otsuClosed, edges)
# visualize(imgRGB, otsuMask, edges, otsuClosed)

In [12]:
def computeMetrics(pred, gt):
    intersection = np.logical_and(pred, gt).sum()
    union = np.logical_or(pred, gt).sum()
    iou = intersection / (union + 1e-6)
    dice = 2 * intersection / (pred.sum() + gt.sum() + 1e-6)
    return iou, dice

In [13]:
def applyOtsuThreshold(gray, kernel):
    _, otsuMask = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    otsuClosed = cv2.morphologyEx(otsuMask, cv2.MORPH_CLOSE, kernel)
    predOtsu = (otsuClosed > 0).astype(np.uint8)
    return predOtsu

In [14]:
ious, dices = [], []

def evaluateSegmentation(imageDir, maskDir):
    
    kernel = np.ones((5, 5), np.uint8)


    imageFiles = sorted(os.listdir(imageDir))
    # imageFiles = getImagesFromDirectory(imageDir)

    for filename in imageFiles:
        imgPath = os.path.join(imageDir, filename)
        maskPath = os.path.join(maskDir, filename)

        img = cv2.imread(imgPath)
        gt = cv2.imread(maskPath, 0)

        if img is None or gt is None:
            continue

        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        gtMask = (gt > 127).astype(np.uint8)

        predOtsu = applyOtsuThreshold(gray, kernel)

        predOtsu = cv2.resize(predOtsu, (gtMask.shape[1], gtMask.shape[0]), interpolation=cv2.INTER_NEAREST)

        iou, dice = computeMetrics(predOtsu, gtMask)
        ious.append(iou)
        dices.append(dice)
        

    print(f"\033[96mResult of Otsu's Thresholding on {len(ious)} images\033[0m")      # Yellow
    print(f"\033[92mMean Intersection over Union(IoU): {np.mean(ious):.4f}\033[0m")  # Green
    print(f"\033[92mMean Dice: {np.mean(dices):.4f}\033[0m\n")  # Green

In [15]:
# Example usage
IMAGE_DIRECTORY = '/kaggle/input/masked-face-segmentation-dataset/MSFD/1/face_crop'
MASK_DIRECTORY = '/kaggle/input/masked-face-segmentation-dataset/MSFD/1/face_crop_segmentation'

evaluateSegmentation(IMAGE_DIRECTORY, MASK_DIRECTORY)

[96mResult of Otsu's Thresholding on 9382 images[0m
[92mMean Intersection over Union(IoU): 0.2583[0m
[92mMean Dice: 0.3603[0m



## UNet
### In the code cells following, we will be applying the UNet model.

In [16]:
import os
import numpy as np
import torch
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

IMG_HEIGHT, IMG_WIDTH = 256, 256
BATCH_SIZE = 16

BASE_DIR = "/kaggle/input/MSFD/1"
IMAGE_DIR = "/kaggle/input/masked-face-segmentation-dataset/MSFD/1/face_crop"
MASK_DIR = "/kaggle/input/masked-face-segmentation-dataset/MSFD/1/face_crop_segmentation"

In [17]:
imagePaths = sorted([os.path.join(IMAGE_DIR, f) for f in os.listdir(IMAGE_DIR)])
maskPaths = sorted([os.path.join(MASK_DIR, f) for f in os.listdir(MASK_DIR)])

# Train-test split
trainImgs, valtestImgs, trainMasks, valtestMasks = train_test_split(
    imagePaths, maskPaths, test_size=0.3, random_state=42
)
valImgs, testImgs, valMasks, testMasks = train_test_split(
    valtestImgs, valtestMasks, test_size=0.5, random_state=42
)


In [18]:
from PIL import Image
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

# Image-mask loader
class ImgMaskDataset(Dataset):
    def __init__(self, imgPaths, maskPaths):
        self.imgPaths = imgPaths
        self.maskPaths = maskPaths
        self.transform_img = transforms.Compose([
            transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
            transforms.ToTensor(),  # Automatically scales to [0, 1]
        ])
        self.transform_mask = transforms.Compose([
            transforms.Resize((IMG_HEIGHT, IMG_WIDTH)),
            transforms.ToTensor(),  # Will keep mask values in [0, 1]
        ])

    def __len__(self):
        return len(self.imgPaths)

    def __getitem__(self, idx):
        img = Image.open(self.imgPaths[idx]).convert("RGB")
        img = self.transform_img(img)

        mask = Image.open(self.maskPaths[idx]).convert("L")
        mask = self.transform_mask(mask)
        mask = (mask > 0.5).float()  # Equivalent to thresholding > 127 in 8-bit

        return img, mask

# Dataset builder
def createDataset(imgs, masks):
    dataset = ImgMaskDataset(imgs, masks)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)
    return dataloader


In [19]:
trainDs = createDataset(trainImgs, trainMasks)
valDs = createDataset(valImgs, valMasks)
testDs = createDataset(testImgs, testMasks)

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Convolutional block with optional BatchNorm
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, batch_norm=True):
        super(ConvBlock, self).__init__()
        layers = [
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1)
        ]
        if batch_norm:
            layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.ReLU(inplace=True))

        layers.append(nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1))
        if batch_norm:
            layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.ReLU(inplace=True))

        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)

# Full U-Net for RGB input
class UNet(nn.Module):
    def __init__(self, in_channels=3, out_channels=1, batch_norm=True):  # Changed in_channels to 3
        super(UNet, self).__init__()

        self.enc1 = ConvBlock(in_channels, 64, batch_norm)
        self.enc2 = ConvBlock(64, 128, batch_norm)
        self.enc3 = ConvBlock(128, 256, batch_norm)
        self.enc4 = ConvBlock(256, 512, batch_norm)
        self.bottleneck = ConvBlock(512, 1024, batch_norm)

        self.up6 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        self.dec6 = ConvBlock(1024, 512, batch_norm)

        self.up7 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec7 = ConvBlock(512, 256, batch_norm)

        self.up8 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec8 = ConvBlock(256, 128, batch_norm)

        self.up9 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec9 = ConvBlock(128, 64, batch_norm)

        self.final = nn.Conv2d(64, out_channels, kernel_size=1)

    def forward(self, x):
        c1 = self.enc1(x)
        p1 = F.max_pool2d(c1, kernel_size=2)

        c2 = self.enc2(p1)
        p2 = F.max_pool2d(c2, kernel_size=2)

        c3 = self.enc3(p2)
        p3 = F.max_pool2d(c3, kernel_size=2)

        c4 = self.enc4(p3)
        p4 = F.max_pool2d(c4, kernel_size=2)

        c5 = self.bottleneck(p4)

        u6 = self.up6(c5)
        u6 = torch.cat([u6, c4], dim=1)
        c6 = self.dec6(u6)

        u7 = self.up7(c6)
        u7 = torch.cat([u7, c3], dim=1)
        c7 = self.dec7(u7)

        u8 = self.up8(c7)
        u8 = torch.cat([u8, c2], dim=1)
        c8 = self.dec8(u8)

        u9 = self.up9(c8)
        u9 = torch.cat([u9, c1], dim=1)
        c9 = self.dec9(u9)

        return torch.sigmoid(self.final(c9))

In [21]:
def diceCoef(yTrue, yPred, smooth=1e-7):
    yPred = (yPred > 0.5).float()
    intersection = torch.sum(yTrue * yPred)
    return (2. * intersection + smooth) / (torch.sum(yTrue) + torch.sum(yPred) + smooth)

def iouMetric(yTrue, yPred, smooth=1e-7):
    yPred = (yPred > 0.5).float()
    intersection = torch.sum(yTrue * yPred)
    union = torch.sum(yTrue) + torch.sum(yPred) - intersection
    return (intersection + smooth) / (union + smooth)

In [22]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchinfo import summary

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# # if torch.backends.mps.is_available():
# #     device = torch.device('mps')

# print(device)

# model = UNet(in_channels=3, out_channels=1, batch_norm=True).to(device)
# summary(model, input_size=(1, 3, 256, 256))  # (batch_size, channels, height, width)

model = UNet(in_channels=3, out_channels=1, batch_norm=True)

# Wrap with DataParallel if multiple GPUs are available
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)
    print(f"Using {torch.cuda.device_count()} GPUs!")

model = model.to(device)
print(device)
display(summary(model, input_size=(1, 3, 256, 256)))  # (batch_size, channels, height, width)

# Binary Cross Entropy loss (like Keras)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)


Using 2 GPUs!
cuda


Layer (type:depth-idx)                   Output Shape              Param #
DataParallel                             [1, 1, 256, 256]          --
├─UNet: 1-1                              [1, 1, 256, 256]          31,043,521
├─UNet: 1-2                              --                        --
│    └─ConvBlock: 2-1                    [1, 64, 256, 256]         38,976
│    └─ConvBlock: 2-2                    --                        --
│    │    └─Sequential: 3-1              [1, 64, 256, 256]         38,976
│    │    └─Sequential: 3-2              --                        38,976
│    └─ConvBlock: 2-3                    [1, 128, 128, 128]        221,952
│    └─ConvBlock: 2-4                    --                        --
│    │    └─Sequential: 3-3              [1, 128, 128, 128]        221,952
│    │    └─Sequential: 3-4              --                        221,952
│    └─ConvBlock: 2-5                    [1, 256, 64, 64]          886,272
│    └─ConvBlock: 2-6                    --  

In [23]:
def train(model, trainLoader, valLoader, criterion, optimizer, epochs=15):
    for epoch in range(epochs):
        model.train()
        trainLoss, trainDice, trainIou = 0.0, 0.0, 0.0

        for imgs, masks in trainLoader:
            imgs, masks = imgs.to(device), masks.to(device)

            outputs = model(imgs)
            loss = criterion(outputs, masks)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            trainLoss += loss.item()
            trainDice += diceCoef(masks, outputs).item()
            trainIou += iouMetric(masks, outputs).item()

        # Validation
        model.eval()
        valLoss, valDice, valIou = 0.0, 0.0, 0.0
        with torch.no_grad():
            for imgs, masks in valLoader:
                imgs, masks = imgs.to(device), masks.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, masks)

                valLoss += loss.item()
                valDice += diceCoef(masks, outputs).item()
                valIou += iouMetric(masks, outputs).item()

        # Average stats
        numTrain = len(trainLoader)
        numVal = len(valLoader)

        print(f"Epoch [{epoch+1}/{epochs}]")
        print(f"  Train Loss: {trainLoss/numTrain:.4f} | Dice: {trainDice/numTrain:.4f} | IoU: {trainIou/numTrain:.4f}")
        print(f"  Val   Loss: {valLoss/numVal:.4f} | Dice: {valDice/numVal:.4f} | IoU: {valIou/numVal:.4f}")

In [24]:
train(model, trainDs, valDs, criterion, optimizer, epochs=15)

Epoch [1/15]
  Train Loss: 0.3609 | Dice: 0.7427 | IoU: 0.5954
  Val   Loss: 0.3140 | Dice: 0.7908 | IoU: 0.6547
Epoch [2/15]
  Train Loss: 0.2922 | Dice: 0.7990 | IoU: 0.6659
  Val   Loss: 0.2857 | Dice: 0.8052 | IoU: 0.6746
Epoch [3/15]
  Train Loss: 0.2850 | Dice: 0.8043 | IoU: 0.6732
  Val   Loss: 0.2823 | Dice: 0.8068 | IoU: 0.6768
Epoch [4/15]
  Train Loss: 0.2818 | Dice: 0.8070 | IoU: 0.6771
  Val   Loss: 0.2793 | Dice: 0.8105 | IoU: 0.6820
Epoch [5/15]
  Train Loss: 0.2793 | Dice: 0.8089 | IoU: 0.6798
  Val   Loss: 0.2790 | Dice: 0.8131 | IoU: 0.6857
Epoch [6/15]
  Train Loss: 0.2773 | Dice: 0.8107 | IoU: 0.6823
  Val   Loss: 0.2792 | Dice: 0.8114 | IoU: 0.6833
Epoch [7/15]
  Train Loss: 0.2758 | Dice: 0.8120 | IoU: 0.6841
  Val   Loss: 0.2802 | Dice: 0.8127 | IoU: 0.6852
Epoch [8/15]
  Train Loss: 0.2740 | Dice: 0.8133 | IoU: 0.6859
  Val   Loss: 0.2813 | Dice: 0.8131 | IoU: 0.6857
Epoch [9/15]
  Train Loss: 0.2724 | Dice: 0.8145 | IoU: 0.6877
  Val   Loss: 0.2787 | Dice: 0.81

In [25]:
def evaluateModel(model, dataloader):
    model.eval()
    totalDice, totalIoU = 0.0, 0.0

    with torch.no_grad():
        for imgs, masks in dataloader:
            imgs, masks = imgs.to(device), masks.to(device)

            outputs = model(imgs)

            totalDice += diceCoef(masks, outputs).item()
            totalIoU  +=  iouMetric(masks, outputs).item()

    numBatches = len(dataloader)
    avgDice = totalDice / numBatches
    avgIoU  = totalIoU / numBatches

    return avgDice, avgIoU


In [26]:
print("\nTesting Metrics:")
diceScore, iouScore = evaluateModel(model, testDs)
print(f"Dice Score: {diceScore:.4f}")
print(f"IoU Score:  {iouScore:.4f}")


Testing Metrics:
Dice Score: 0.7760
IoU Score:  0.6346


In [27]:
import pandas as pd

# Construct the data
data = {
    "Method": ["Otsu Thresholding","U-Net"],
    "Average IoU": [
        np.mean(ious),
        iouScore  # from evaluate_model()
    ],
    "Average Dice": [
        np.mean(dices),
        diceScore  # from evaluate_model()
    ]
}

df = pd.DataFrame(data)
df


Unnamed: 0,Method,Average IoU,Average Dice
0,Otsu Thresholding,0.258274,0.360258
1,U-Net,0.634627,0.776041
