# Part D

In [12]:
from dataset import SegmentationDataset
from torch.nn import BCEWithLogitsLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torch.nn import ConvTranspose2d
from torch.nn import Conv2d
from torch.nn import MaxPool2d
from torch.nn import Module
from torch.nn import ModuleList
from torch.nn import ReLU
from torchvision.transforms import CenterCrop
from torch.nn import functional as F
from sklearn.model_selection import train_test_split
from torchvision import transforms
from imutils import paths
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import torch
import time
import os
import cv2

In [13]:
DATASET_PATH = os.path.join("MSFD", "1")

IMAGE_DATASET_PATH = os.path.join(DATASET_PATH, "face_crop")
MASK_DATASET_PATH = os.path.join(DATASET_PATH, "face_crop_segmentation")

TEST_SPLIT = 0.2

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

PIN_MEMORY = True if DEVICE == "cuda" else False

NUM_CHANNELS = 1
NUM_CLASSES = 1
NUM_LEVELS = 3

INIT_LR = 0.001
NUM_EPOCHS = 40
BATCH_SIZE = 32

INPUT_IMAGE_WIDTH = 128
INPUT_IMAGE_HEIGHT = 128

THRESHOLD = 0.5

BASE_OUTPUT = "output"

MODEL_PATH = os.path.join(BASE_OUTPUT, "unet_masked_faces.pth")
PLOT_PATH = os.path.sep.join([BASE_OUTPUT, "plot.png"])
TEST_PATHS = os.path.sep.join([BASE_OUTPUT, "test_paths.txt"])
PRED_IMAGES = os.path.sep.join([BASE_OUTPUT, "predictions"])

In [14]:
os.makedirs(BASE_OUTPUT, exist_ok=True)
os.makedirs(PRED_IMAGES, exist_ok=True)

In [15]:
class Block(Module):
	def __init__(self, inChannels, outChannels):
		super().__init__()
		# store the convolution and RELU layers
		self.conv1 = Conv2d(inChannels, outChannels, 3)
		self.relu = ReLU()
		self.conv2 = Conv2d(outChannels, outChannels, 3)
	def forward(self, x):
		# apply CONV => RELU => CONV block to the inputs and return it
		return self.conv2(self.relu(self.conv1(x)))
	
class Encoder(Module):
	def __init__(self, channels=(3, 16, 32, 64)):
		super().__init__()
		# store the encoder blocks and maxpooling layer
		self.encBlocks = ModuleList(
			[Block(channels[i], channels[i + 1])
			 	for i in range(len(channels) - 1)])
		self.pool = MaxPool2d(2)
	def forward(self, x):
		# initialize an empty list to store the intermediate outputs
		blockOutputs = []
		# loop through the encoder blocks
		for block in self.encBlocks:
			# pass the inputs through the current encoder block, store
			# the outputs, and then apply maxpooling on the output
			x = block(x)
			blockOutputs.append(x)
			x = self.pool(x)
		# return the list containing the intermediate outputs
		return blockOutputs
	
class Decoder(Module):
	def __init__(self, channels=(64, 32, 16)):
		super().__init__()
		# initialize the number of channels, upsampler blocks, and
		# decoder blocks
		self.channels = channels
		self.upconvs = ModuleList(
			[ConvTranspose2d(channels[i], channels[i + 1], 2, 2)
			 	for i in range(len(channels) - 1)])
		self.dec_blocks = ModuleList(
			[Block(channels[i], channels[i + 1])
			 	for i in range(len(channels) - 1)])
	def forward(self, x, encFeatures):
		# loop through the number of channels
		for i in range(len(self.channels) - 1):
			# pass the inputs through the upsampler blocks
			x = self.upconvs[i](x)
			# crop the current features from the encoder blocks,
			# concatenate them with the current upsampled features,
			# and pass the concatenated output through the current
			# decoder block
			encFeat = self.crop(encFeatures[i], x)
			x = torch.cat([x, encFeat], dim=1)
			x = self.dec_blocks[i](x)
		# return the final decoder output
		return x
	def crop(self, encFeatures, x):
		# grab the dimensions of the inputs, and crop the encoder
		# features to match the dimensions
		(_, _, H, W) = x.shape
		encFeatures = CenterCrop([H, W])(encFeatures)
		# return the cropped features
		return encFeatures
	
class UNet(Module):
	def __init__(self, encChannels=(3, 16, 32, 64),
		 decChannels=(64, 32, 16),
		 nbClasses=1, retainDim=True,
		 outSize=(INPUT_IMAGE_HEIGHT,  INPUT_IMAGE_WIDTH)):
		super().__init__()
		# initialize the encoder and decoder
		self.encoder = Encoder(encChannels)
		self.decoder = Decoder(decChannels)
		# initialize the regression head and store the class variables
		self.head = Conv2d(decChannels[-1], nbClasses, 1)
		self.retainDim = retainDim
		self.outSize = outSize
		
	def forward(self, x):
		# grab the features from the encoder
		encFeatures = self.encoder(x)
		# pass the encoder features through decoder making sure that
		# their dimensions are suited for concatenation
		decFeatures = self.decoder(encFeatures[::-1][0],
								   encFeatures[::-1][1:])
		# pass the decoder features through the regression head to
		# obtain the segmentation mask
		map = self.head(decFeatures)
		# check to see if we are retaining the original output
		# dimensions and if so, then resize the output to match them
		if self.retainDim:
			map = F.interpolate(map, self.outSize)
		# return the segmentation map
		return map

In [None]:
if __name__ == "__main__":
    imagePaths = sorted(list(paths.list_images(IMAGE_DATASET_PATH)))
    maskPaths = sorted(list(paths.list_images(MASK_DATASET_PATH)))
    
    split = train_test_split(imagePaths, maskPaths,
        test_size=TEST_SPLIT, random_state=42)
    
    (trainImages, testImages) = split[:2]
    (trainMasks, testMasks) = split[2:]
    
    print("[INFO] saving testing image paths...")
    f = open(TEST_PATHS, "w")
    f.write("\n".join(testImages))
    f.close()

    
    transforms = transforms.Compose([transforms.ToPILImage(),
        transforms.Resize((INPUT_IMAGE_HEIGHT,
            INPUT_IMAGE_WIDTH)),
        transforms.ToTensor()])
    
    trainDS = SegmentationDataset(imagePaths=trainImages, maskPaths=trainMasks,
        transforms=transforms)
    testDS = SegmentationDataset(imagePaths=testImages, maskPaths=testMasks,
        transforms=transforms)
    print(f"[INFO] found {len(trainDS)} examples in the training set...")
    print(f"[INFO] found {len(testDS)} examples in the test set...")
    
    trainLoader = DataLoader(trainDS, shuffle=True,
        batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
        num_workers=os.cpu_count())
    testLoader = DataLoader(testDS, shuffle=False,
        batch_size=BATCH_SIZE, pin_memory=PIN_MEMORY,
        num_workers=os.cpu_count())

    
    unet = UNet().to(DEVICE)
    
    lossFunc = BCEWithLogitsLoss()
    opt = Adam(unet.parameters(), lr=INIT_LR)
    
    trainSteps = len(trainDS) // BATCH_SIZE
    testSteps = len(testDS) // BATCH_SIZE
    
    H = {"train_loss": [], "test_loss": []}

    print("[INFO] training the network...")
    startTime = time.time()
    for e in tqdm(range(NUM_EPOCHS)):
        unet.train()
        
        totalTrainLoss = 0
        totalTestLoss = 0
        
        for (i, (x, y)) in enumerate(trainLoader):
            (x, y) = (x.to(DEVICE), y.to(DEVICE))
            
            pred = unet(x)
            loss = lossFunc(pred, y)
            
            opt.zero_grad()
            loss.backward()
            opt.step()
            
            totalTrainLoss += loss
        
        with torch.no_grad():
            unet.eval()
            
            for (x, y) in testLoader:
                (x, y) = (x.to(DEVICE), y.to(DEVICE))
                
                pred = unet(x)
                totalTestLoss += lossFunc(pred, y)
        
        avgTrainLoss = totalTrainLoss / trainSteps
        avgTestLoss = totalTestLoss / testSteps
        
        H["train_loss"].append(avgTrainLoss.cpu().detach().numpy())
        H["test_loss"].append(avgTestLoss.cpu().detach().numpy())
        
        print("[INFO] EPOCH: {}/{}".format(e + 1, NUM_EPOCHS))
        print("Train loss: {:.6f}, Test loss: {:.4f}".format(
            avgTrainLoss, avgTestLoss))
    
    endTime = time.time()
    print("[INFO] total time taken to train the model: {:.2f}s".format(
        endTime - startTime))

    
    plt.style.use("ggplot")
    plt.figure()
    plt.plot(H["train_loss"], label="train_loss")
    plt.plot(H["test_loss"], label="test_loss")
    plt.title("Training Loss on Dataset")
    plt.xlabel("Epoch #")
    plt.ylabel("Loss")
    plt.legend(loc="lower left")
    plt.savefig(PLOT_PATH)
    
    torch.save(unet, MODEL_PATH)

[INFO] saving testing image paths...
[INFO] found 7506 examples in the training set...
[INFO] found 1877 examples in the test set...
[INFO] training the network...


  0%|                                                    | 0/40 [00:00<?, ?it/s]

In [None]:
def iou(img1, img2):
	intersection = np.logical_and(img1, img2).sum()
	union = np.logical_or(img1, img2).sum()
	return intersection/union

def prepare_plot(origImage, origMask, predMask, i):
	figure, ax = plt.subplots(nrows=1, ncols=3, figsize=(10, 10))
    
	ax[0].imshow(origImage)
	ax[1].imshow(origMask)
	ax[2].imshow(predMask)
	
	ax[0].set_title("Image")
	ax[1].set_title("Original Mask")
	ax[2].set_title("Predicted Mask")
	
	figure.tight_layout()
	# plt.close()
	plt.savefig(os.path.join(PRED_IMAGES, f"image_{i}.png"))
	
def make_predictions(model, imagePath, saveimg=False, i=None):
	
	model.eval()
	
	with torch.no_grad():
		image = cv2.imread(imagePath)
		image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
		image = image.astype("float32") / 255.0
		
		image = cv2.resize(image, (128, 128))
		orig = image.copy()
		
		filename = imagePath.split(os.path.sep)[-1]
		groundTruthPath = os.path.join(MASK_DATASET_PATH, filename)
		
		gtMask = cv2.imread(groundTruthPath, 0)
		gtMask = cv2.resize(gtMask, (INPUT_IMAGE_WIDTH, INPUT_IMAGE_HEIGHT))
		
		image = np.transpose(image, (2, 0, 1))
		image = np.expand_dims(image, 0)
		image = torch.from_numpy(image).to(DEVICE)
		
		predMask = model(image).squeeze()
		predMask = torch.sigmoid(predMask)
		predMask = predMask.cpu().numpy()
		
		
		predMask = (predMask > THRESHOLD) * 255
		predMask = predMask.astype(np.uint8)
		
		if saveimg:
			prepare_plot(orig, gtMask, predMask, i)
            
		return iou(gtMask, predMask)

In [None]:
testImagePaths = open(TEST_PATHS).read().strip().split("\n")

In [None]:
unet = torch.load(MODEL_PATH, weights_only=False).to(DEVICE)

In [None]:
chosenTestImagePaths = np.random.choice(testImagePaths, size=10)
for i, path in enumerate(chosenTestImagePaths):
	make_predictions(unet, path, saveimg=True, i=i)

In [None]:
total_iou = 0

for i, path in enumerate(testImagePaths):
    total_iou += make_predictions(unet, path)
    if (i+1) % 100 == 0:
        print(f"[INFO] {i+1} images done")

print(f"Average IOU: {total_iou/len(testImagePaths)}")

# Part C

In [None]:
import cv2 as cv
import numpy as np
import os
import matplotlib.pyplot as plt
import random

In [None]:
def calculate_iou(mask1, mask2):
    """Compute Intersection over Union (IoU) score between two masks."""
    intersection = np.logical_and(mask1, mask2).sum()
    union = np.logical_or(mask1, mask2).sum()
    return intersection / union if union != 0 else 0.0

def process_image(image_path, gt_mask_path):
    """Process an image to extract the best contour and compare with the ground truth mask."""
    img = cv.imread(image_path)
    if img is None:
        print(f"Error loading image {image_path}")
        return None
    
    # Convert to grayscale
    gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)

    # Apply Gaussian Blur
    blurred = cv.GaussianBlur(gray, (3, 3), 5)

    # Apply Otsu’s thresholding
    _, binary_mask = cv.threshold(blurred, 0, 255, cv.THRESH_BINARY_INV + cv.THRESH_OTSU)

    # Morphological closing to fill small gaps
    kernel = np.ones((5, 5), np.uint8)
    closed_mask = cv.morphologyEx(binary_mask, cv.MORPH_CLOSE, kernel)

    # Find contours and select the largest
    contours, _ = cv.findContours(closed_mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE)
    contours = sorted(contours, key=cv.contourArea, reverse=True)

    if not contours:
        print(f"No contours found in {image_path}")
        return None

    # Select the best contour (largest one)
    best_contour = contours[0]

    # Create binary mask with the best contour
    mask = np.zeros_like(gray)
    cv.drawContours(mask, [best_contour], -1, 255, thickness=cv.FILLED)

    # Load ground truth mask
    gt_mask = cv.imread(gt_mask_path, cv.IMREAD_GRAYSCALE)
    if gt_mask is None:
        print(f"Error loading ground truth mask {gt_mask_path}")
        return None
    
    # Convert ground truth mask to binary
    _, gt_mask = cv.threshold(gt_mask, 128, 255, cv.THRESH_BINARY)

    # Resize mask to match ground truth mask dimensions
    mask = cv.resize(mask, (gt_mask.shape[1], gt_mask.shape[0]), interpolation=cv.INTER_NEAREST)

    # Compute IoU
    iou_score = calculate_iou(mask, gt_mask)

    return img, gt_mask, mask, iou_score

def main():
    # Input folders
    input_folder = "MSFD/1/face_crop"
    gt_folder = "MSFD/1/face_crop_segmentation"

    # Output folder for saving segmented results
    output_folder = "saved_results"
    os.makedirs(output_folder, exist_ok=True)

    # Set random seed for reproducibility
    random.seed(42)

    # Store IoU results
    iou_data = []
    image_files = [img for img in os.listdir(input_folder) if os.path.exists(os.path.join(gt_folder, img))]

    if not image_files:
        print("No valid images found.")
        return

    # Compute IoUs for all images
    for img_name in image_files:
        img_path = os.path.join(input_folder, img_name)
        gt_path = os.path.join(gt_folder, img_name)

        print(f"Processing {img_name}...")
        result = process_image(img_path, gt_path)
        if result:
            img, gt_mask, mask, iou_score = result
            print(f"IoU: {iou_score:.4f}\n")
            iou_data.append((img_name, img, gt_mask, mask, iou_score))

    if not iou_data:
        print("No valid images with computed IoUs.")
        return

    # Randomly select 10 images
    selected_samples = random.sample(iou_data, min(10, len(iou_data)))

    # Iterate over the selected samples and plot/save individually
    for idx, (img_name, img, gt_mask, mask, iou_score) in enumerate(selected_samples):
        # Save the segmented mask result
        segmented_path = os.path.join(output_folder, f"segmented_{img_name}")
        cv.imwrite(segmented_path, mask)

        # Plot individual images in one figure
        plt.figure(figsize=(10, 4))

        plt.subplot(1, 3, 1)
        plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB))
        plt.axis("off")
        plt.title("Original")

        plt.subplot(1, 3, 2)
        plt.imshow(gt_mask, cmap="gray")
        plt.axis("off")
        plt.title("Ground Truth")

        plt.subplot(1, 3, 3)
        plt.imshow(mask, cmap="gray")
        plt.axis("off")
        plt.title(f"Segmented Mask\nIoU: {iou_score:.4f}")

        plt.suptitle(f"Segmentation Result - {img_name}")
        plt.tight_layout()
        plt.show()

        print(f"Saved segmented result: {segmented_path} (IoU: {iou_score:.4f})")

    # Compute and print final mean IoU
    mean_iou = np.mean([iou for _, _, _, _, iou in iou_data])
    print(f"\nFinal Mean IoU: {mean_iou:.4f}")

In [None]:
main()