<a href="https://colab.research.google.com/github/123prashanth123/Fault-Detection-System/blob/Colabs/FineTuned%20Siamese%20ResNet50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Notebook Setup

In [None]:
from IPython.display import clear_output
from google.colab import drive
drive.mount("/content/gdrive")

!pip install --upgrade imgaug
!pip install imagecorruptions

clear_output()

# Imports

In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch import nn, optim
from torchvision import models, transforms, ops
from torch.utils.data import DataLoader as DL
from torch.utils.data import Dataset

import imgaug
from imgaug import augmenters
from sklearn.model_selection import KFold

import os
import re
import cv2
import random as r
from time import time
from termcolor import colored

import warnings
warnings.filterwarnings("ignore")

seed = 0
STANDARD_SIZE = 224
DS_PATH = "/content/gdrive/My Drive/Videos/"


def breaker(num=50, char="*"):
    print(colored("\n" + num*char + "\n", color="yellow"))


def normalize(x=None):
    for i in range(x.shape[0]):
        x[i] = (x[i] - torch.min(x[i]))/(torch.max(x[i]) - torch.min(x[i]))
    return x


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Build Models

In [None]:
def build_roi_extractor():
    breaker()
    print("Building ROI Extractor ...")

    class RoIExtractor(nn.Module):
        def __init__(self):
            super(RoIExtractor, self).__init__()
            self.model = models.detection.fasterrcnn_mobilenet_v3_large_320_fpn(pretrained=True, progress=True)
        
        def forward(self, x):
            return self.model(x)
    
    roi_extractor = RoIExtractor().to(device)
    roi_extractor.eval()
    roi_transform = transforms.Compose([transforms.ToTensor(),])
    return roi_extractor, roi_transform


def build_model():
    breaker()
    print("Building Model ...")

    class SiameseResNet(nn.Module):
        def __init__(self):
            super(SiameseResNet, self).__init__()
            model = models.resnet50(pretrained=True, progress=True)
            in_f = model.fc.in_features

            for params in model.parameters():
                params.requires_grad = False

            for names, params in model.named_parameters():
                if re.match(r"layer4.2", names, re.IGNORECASE):
                    params.requires_grad = True
                
            self.extractor = nn.Sequential(*[*model.children()][:-1])
            self.classifier = nn.Sequential(nn.Linear(in_features=in_f, out_features=1))
        
        def getOptimizer(self, lr=1e-3, wd=0):
            p = [p for p in self.parameters() if p.requires_grad]
            return optim.Adam(self.parameters(), lr=lr, weight_decay=wd)
    
        def forward(self, x1, x2=None):
            if x2 is not None:
                x1 = self.extractor(x1)[:, :, 0, 0]
                x2 = self.extractor(x2)[:, :, 0, 0]
                x = torch.abs(x1 - x2)
                x = self.classifier(x)
            else:
                x = self.classifier(self.extractor(x1)[:, :, 0, 0])
            return x

    siamese_net = SiameseResNet().to(device)
    siamese_transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),])
    return siamese_net, siamese_transform

# Make Data

In [None]:
def build_dataloaders(idx, batch_size, num_samples):
    breaker()
    print("Building Dataloaders ...")
    roi_extractor, roi_transform = build_roi_extractor()
    _, siamese_transform = build_model()

    def get_image(idx, size=224):
        names = [name for name in os.listdir(DS_PATH) if (name[-4:] == ".jpg")]
        return cv2.cvtColor(cv2.imread(os.path.join(DS_PATH, names[idx]), cv2.IMREAD_COLOR), code=cv2.COLOR_BGR2RGB)

    def make_data(size=224, num_samples=1000, cls="Positive",
                  roi_extractor=None, roi_transform=None, 
                  device="cpu", seed=0):
        image = get_image(idx, size=size)
        image = cv2.resize(src=image, dsize=(size, size), interpolation=cv2.INTER_AREA)
        anchor = image.copy()

        num_layers = 3
        imgaug.seed(seed)
        roi_augment = augmenters.Sequential([augmenters.imgcorruptlike.GlassBlur(severity=(3, 5), seed=seed)] * num_layers)
        
        imgaug.seed(seed)
        dataset_augment = augmenters.Sequential([
            augmenters.HorizontalFlip(p=0.25),
            augmenters.VerticalFlip(p=0.25),
            augmenters.SomeOf(5, [
                    augmenters.blur.GaussianBlur(sigma=(0, 5), seed=seed),
                    augmenters.blur.MedianBlur(k=(1, 7), seed=seed),
                    augmenters.size.Crop(percent=(0, 0.1), seed=seed),
                    augmenters.geometric.Affine(rotate=(-45, 45), scale=(0.5, 1.2),translate_percent=(-0.2, 0.2), seed=seed),
                    augmenters.geometric.Rot90(k=(1, 3), seed=seed),
                    augmenters.arithmetic.Dropout(p=(0, 0.05), seed=seed),
                    augmenters.arithmetic.SaltAndPepper(p=(0, 0.05), seed=seed),
                    augmenters.color.MultiplyBrightness(mul=(0.5, 1.5)),
                    augmenters.color.MultiplySaturation(mul=(0, 5), seed=seed),
                    augmenters.iaa_convolutional.Sharpen(alpha=(0.75, 1), lightness=(0.75, 1.25), seed=seed),
                    augmenters.iaa_convolutional.Emboss(alpha=(0.75, 1), strength=(0.75, 1.25), seed=seed),
                    augmenters.contrast.CLAHE(seed=seed),
                    augmenters.contrast.GammaContrast(gamma=(0.2, 5), seed=seed), 
                    ])
                ])

        if re.match(r"Negative", cls, re.IGNORECASE):
            with torch.no_grad():
                output = roi_extractor(roi_transform(image).to(device).unsqueeze(dim=0))
            cnts, scrs = output[0]["boxes"], output[0]["scores"]
            if len(cnts) != 0:
                cnts = ops.clip_boxes_to_image(cnts, (image.shape[0], image.shape[1]))
                best_index = ops.nms(cnts, scrs, 0.1)[0]

                x1, y1, x2, y2 = int(cnts[best_index][0]), int(cnts[best_index][1]), int(cnts[best_index][2]), int(cnts[best_index][3])
                crp_img = image[y1:y2, x1:x2]
                crp_img = roi_augment(images=np.expand_dims(crp_img, axis=0))
                image[y1:y2, x1:x2] = crp_img.squeeze()

        images = np.array(dataset_augment(images=[image for _ in range(num_samples)]))
        return anchor, images
    
    _, p_images = make_data(size=STANDARD_SIZE, num_samples=num_samples, cls="Positive", roi_extractor=roi_extractor, roi_transform=roi_transform, device=device, seed=seed)
    anchor, n_images = make_data(size=STANDARD_SIZE, num_samples=num_samples, cls="Negative", roi_extractor=roi_extractor, roi_transform=roi_transform, device=device, seed=seed)

    class DS(Dataset):
        def __init__(self, anchor, positive, negative, transform):
            self.anchor = anchor
            self.positive = positive
            self.negative = negative
            self.transform = transform

            self.anchor = np.array([anchor for _ in range(self.positive.shape[0])])
            self.pX = np.concatenate((np.expand_dims(self.anchor, axis=1), np.expand_dims(self.positive, axis=1)), axis=1)
            self.nX = np.concatenate((np.expand_dims(self.anchor, axis=1), np.expand_dims(self.negative, axis=1)), axis=1)
            self.py = np.ones((self.pX.shape[0], 1))
            self.ny = np.zeros((self.nX.shape[0], 1))
            self.X = np.concatenate((self.pX, self.nX), axis=0)
            self.y = np.concatenate((self.py, self.ny), axis=0)
        
        def __len__(self):
            return self.X.shape[0]

        def __getitem__(self, idx): 
            return self.transform(self.X[idx, 0, :, :, :]), self.transform(self.X[idx, 1, :, :, :]), torch.FloatTensor(self.y[idx])
    
    for tr_idx, va_idx in KFold(n_splits=5, shuffle=True, random_state=seed).split(p_images):
        train_indices, valid_indices = tr_idx, va_idx
        break

    p_train, p_valid = p_images[train_indices], p_images[valid_indices]
    n_train, n_valid = n_images[train_indices], n_images[valid_indices]

    tr_data_setup = DS(anchor=anchor, positive=p_train, negative=n_train, transform=siamese_transform)
    va_data_setup = DS(anchor=anchor, positive=p_valid, negative=n_valid, transform=siamese_transform)

    dataloaders = {"train" : DL(tr_data_setup, batch_size=batch_size, shuffle=True, generator=torch.manual_seed(seed)),
                   "valid" : DL(va_data_setup, batch_size=batch_size, shuffle=False)
                   }

    return dataloaders

# Helpers

In [None]:
def train(model, tr_data, va_data, epochs, lr, wd):

    def fit_(model=None, optimizer=None, scheduler=None, epochs=None,
             trainloader=None, validloader=None, criterion=None, device=None,
             path=None, verbose=None):
        breaker()
        print("Training ...")
        breaker()

        model.to(device)
        Losses, Accuracies = [], []
        bestLoss, bestAccs = {"train" : np.inf, "valid" : np.inf}, {"train" : 0.0, "valid" : 0.0}
        DLS = {"train" : trainloader, "valid" : validloader}

        start_time = time()
        for e in range(epochs):
            e_st = time()
            epochLoss = {"train" : 0.0, "valid" : 0.0}
            epochAccs = {"train" : 0.0, "valid" : 0.0}

            for phase in ["train", "valid"]:
                if phase == "train":
                    model.train()
                else:
                    model.eval()
                
                lossPerPass, accsPerPass = [], []

                for X1, X2, y in DLS[phase]:
                    X1, X2, y = X1.to(device), X2.to(device), y.to(device)

                    optimizer.zero_grad()
                    with torch.set_grad_enabled(phase == "train"):
                        output = model(X1, X2)
                        loss = criterion(output, y)
                        if phase == "train":
                            loss.backward()
                            optimizer.step()
                    lossPerPass.append(loss.item())
                    accsPerPass.append(getAccuracy(output, y))
                epochLoss[phase] = np.mean(np.array(lossPerPass))
                epochAccs[phase] = np.mean(np.array(accsPerPass))
            Losses.append(epochLoss)
            Accuracies.append(epochAccs)
            
            if epochLoss["valid"] < bestLoss["valid"]:
                bestLoss = epochLoss
                BLE = e+1
                torch.save({"model_state_dict" : model.state_dict(),
                            "optim_state_dict" : optimizer.state_dict()},
                            os.path.join(path, "state.pt"))
            
            if epochAccs["valid"] > bestAccs["valid"]:
                bestAccs = epochAccs
                BAE = e+1
            
            if scheduler:
                scheduler.step(epochLoss["valid"])
            
            if verbose:
                print("Epoch: {} | Train Loss: {:.5f} | Valid Loss: {:.5f} | Train Accs : {:.5f} | \
Valid Accs : {:.5f} | Time: {:.2f} seconds".format(e + 1,
                                                    epochLoss["train"], epochLoss["valid"],
                                                    epochAccs["train"], epochAccs["valid"],
                                                    time() - e_st))



        breaker()
        print("Best Validation Loss at Epoch ---> {}".format(BLE))
        breaker()
        print("Best Validation Accs at Epoch ---> {}".format(BAE))
        breaker()
        print("Time Taken [{} Epochs] : {:.2f} minutes".format(epochs, (time() - start_time) / 60))
        breaker()
        print("Training Completed")
        breaker()

        return Losses, Accuracies, BLE, BAE


    def getAccuracy(y_pred=None, y_true=None):
        y_pred, y_true = torch.sigmoid(y_pred).detach(), y_true.detach()

        y_pred[y_pred > 0.5] = 1
        y_pred[y_pred <= 0.5] = 0

        return torch.count_nonzero(y_pred == y_true).item() / len(y_pred)
    
    optimizer = model.getOptimizer(lr=lr, wd=wd)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, patience=25, eps=1e-12, verbose=True)
    checkpoint_path = os.path.join("/content/checkpoints")
    if not os.path.exists(checkpoint_path):
        os.makedirs(checkpoint_path)

    L, A, BLE, BAE = fit_(model=model, optimizer=optimizer, scheduler=scheduler, epochs=epochs,
                          trainloader=tr_data, validloader=va_data, device=device,
                          criterion=nn.BCEWithLogitsLoss(),
                          path=checkpoint_path, verbose=True)

    TL, VL, TA, VA = [], [], [], []

    for i in range(len(L)):
        TL.append(L[i]["train"])
        VL.append(L[i]["valid"])
        TA.append(A[i]["train"])
        VA.append(A[i]["valid"])

    x_Axis = np.arange(1, len(L)+1)
    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    plt.plot(x_Axis, TL, "r", label="Training Loss")
    plt.plot(x_Axis, VL, "b--", label="Validation Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid()
    plt.subplot(1, 2, 2)
    plt.plot(x_Axis, TA, "r", label="Training Accuracy")
    plt.plot(x_Axis, VA, "b--", label="Validation Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid()
    plt.show()

    return BLE, checkpoint_path

# Main

In [None]:
def main():
    ###
    num_samples = 1000
    batch_size = 64
    epochs = 100
    lr, wd = 1e-6, 1e-6
    ###

    dataloaders = build_dataloaders(1, batch_size=batch_size, num_samples=num_samples)
    model, _ = build_model()
    # summary(model, (3, 224, 224), 64)
    BLE, path = train(model, dataloaders["train"], dataloaders["valid"], epochs, lr, wd)

    return model, BLE, path

model, BLE, path = main()