# Real-Time Medical Mask Detection

In [None]:
import os
import random
import cv2 # OpenCV
import matplotlib.pyplot as plt
import xmltodict # PyPI library that treats XMLs as JSON files (containing key values pairs)
import torch # PyTorch
import torchvision # PyTorch library containing useful attributes and functionalities
import torchvision.models as models
import torchvision.transforms as transforms
import torchvision.datasets as datasets

In [None]:
def ImageNames():
    '''
    function to get the names of images in the dataset
    '''
    imgs = []
    for dname, _, fname in os.walk('/IIT-K Project/Dataset/images/'):
        for f in fname:
            fpath = os.path.join(dname,f)
            ext = fpath[len(fpath)-4:]
            if ext != '.xml':
                imgs.append(f)
    return imgs
def Path(img):
    '''
    function to get the path of the images and their labels in the dataset
    '''
    o = '/IIT-K Project/Dataset/'
    i = o + 'images/' + img
    if img[-4:] == 'jpeg':
        lbl = img[:-5] + '.xml'
    else: # 'jpg' or 'png'
        lbl = img[:-4] + '.xml'
    l = o + 'annotations/' + lbl
    return i, l

In [None]:
def ParseXML(l):
    '''
    function to convert the labels of XML files to dictionary format and return the classes of dataset along with bounding boxes
    '''
    x = xmltodict.parse(open(l,'rb'))
    items = x['annotation']['object']
    # when image has only one bounding box
    if not isinstance(items,list):
        items = [items]
    res = []
    for i in items:
        n = i['name']
        bb = [(int(i['bndbox']['xmin']), int(i['bndbox']['ymin'])), (int(i['bndbox']['xmax']), int(i['bndbox']['ymax']))]
        res.append((n,bb))
    s = [int(x['annotation']['size']['width']), int(x['annotation']['size']['height'])]
    return res, s  # res contains classes & its bounding box's x,y coords; s contains width & height of bounding box
def Visualize(imgs, bb=True):
    '''
    function to visualize the images in the dataset along with appropriate bounding boxes according to their classes
    '''
    i, l = Path(imgs)
    img = cv2.imread(i)  # reading the image using OpenCV's imread function
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB) # converting image to RGB format as OpenCV reads image in BGR format
    if bb:  # bb (bounding box) is set as true in function parameter
        lbls, s = ParseXML(l)
        t = int(sum(s)/500)
        for lbl in lbls:
            n, bb = lbl # lbl has the classes and bound box's x,y coords of result as defined by the above fucntion
            if n == 'without_mask' or n == 'bad' : # if mask is not worn, bounding box of red color is drawn
                cv2.rectangle(img, bb[0], bb[1], (255, 0, 0), t)
            elif n == 'with_mask' or n == 'good':  # if mask is worn correctly, bounding box of green color is drawn
                cv2.rectangle(img, bb[0], bb[1], (0, 255, 0), t)
            elif n == 'mask_weared_incorrect' or n == 'none': # if mask is worn incorrectly, bounding box of blue color is drawn
                cv2.rectangle(img, bb[0], bb[1], (0, 0, 255), t)
    plt.figure(figsize=(20,20))
    plt.subplot(2,2,2)
    plt.axis('off')
    plt.imshow(img)
    plt.show()

In [None]:
imgs = ImageNames() # getting the image names

In [None]:
# Testing the bounding boxes on a few randomly selected images in the dataaset
r = random.sample(range(0,len(imgs)),3)
for i in r:
    Visualize(imgs[i])

In [None]:
def CropImage(imgs):
    '''
    function to crop the part of image inside bounding box, and that cropped part will be
    fed as input in a pre-trained neural network ResNet-50, to detect the presence of the mask
    '''
    i, l = Path(imgs)
    # image pre-processing
    img = cv2.imread(i)
    img = cv2.cvtColor(img,cv2.COLOR_BGR2RGB)
    # getting the labels and bounding boxes' coords and size
    lbls, s = ParseXML(l)
    cropped_img_lbls = []
    for lbl in lbls:
        # from labels we extract the label names and annotation parameters
        n, bb = lbl
        # cropping out the part of image inside bounding box
        cropped_img = img[bb[0][1]:bb[1][1], bb[0][0]:bb[1][0]]
        lbl_num = 0
        # as label contains categorical variables, they are assigned numeric categories
        if n == "with_mask":
            lbl_num = 1
        elif n == "without_mask":
            lbl_num = 2
        elif n == "mask_weared_incorrect" or n == "none":
            lbl_num = 3
        # getting the cropped image and label
        cropped_img_lbl = [cropped_img, lbl_num]
        # creating the list of all cropped images exracted from the input images and its label numbers
        cropped_img_lbls.append(cropped_img_lbl)
    return cropped_img_lbls

In [None]:
# Creating train directory with separate folders belonging to separate classes and also the model directory to save the models

trn = 'train/'
l1 = trn + "1/"
l2 = trn + "2/"
l3 = trn + "3/"
mdl = "model/"

os.mkdir(trn)
os.mkdir(l1)
os.mkdir(l2)
os.mkdir(l3)
os.mkdir(mdl)

In [None]:
lbl1 = 0
lbl2 = 0
lbl3 = 0
for i in imgs:
    cropped_img_lbls = CropImage(i)
    # iterating through the cropped images and their target labels list
    for l in cropped_img_lbls:
        # extracting the images and labels from the list
        img = l[0]
        lbl = l[1]
        # renaming the images with their index numbers along with appending their directory names
        if lbl == 1:
            cropped_img = str(lbl1) + ".png"
            cv2.imwrite(l1 + cropped_img, img)
            lbl1 += 1
        elif lbl == 2:
            cropped_img = str(lbl2) + ".png"
            cv2.imwrite(l2 + cropped_img, img)
            lbl2 += 1
        elif lbl == 3:
            cropped_img = str(lbl3) + ".png"
            cv2.imwrite(l3 + cropped_img, img)
            lbl3 += 1

In [None]:
# Making list of images according to their respective classes
lbl_1 = [f for f in os.listdir(l1) if os.path.isfile(os.path.join(l1, f))]  
lbl_2 = [f for f in os.listdir(l2) if os.path.isfile(os.path.join(l2, f))]
lbl_3 = [f for f in os.listdir(l3) if os.path.isfile(os.path.join(l3, f))]
print("Total no. of images = " + str(len(lbl_1)+len(lbl_2)+len(lbl_3)) + "\n")
print("No. of images labeled 1 = " + str(len(lbl_1)))
print("No. of images labeled 2 = " + str(len(lbl_2)))
print("No. of images labeled 3 = " + str(len(lbl_3)))

In [None]:
# Using GPU for training if available else using CPU to train the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
from torch import nn
import torch.nn.functional as F
class state(nn.Module):
    def __init__(self):
        super(state , self).__init__()
            
        self.cnn1 = nn.Conv2d(in_channels=3 , out_channels=8 , kernel_size = 3 , stride = 1 , padding  = 1)
        self.batchnorm1 = nn.BatchNorm2d(8)
        self.relu = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=2)
        self.cnn2 = nn.Conv2d(in_channels=8 , out_channels=16 , kernel_size=3 , stride = 1 , padding = 1)
        self.batchnorm2 = nn.BatchNorm2d(16)
        self.maxpool2 = nn.MaxPool2d(kernel_size=2)
        self.fc1 = nn.Linear(in_features=16*56*56 , out_features=4000)
        self.dropout = nn.Dropout(0.55)
        self.fc2 = nn.Linear(in_features=4000 , out_features=2000)
        self.dropout = nn.Dropout(0.55)
        self.fc3 = nn.Linear(in_features=2000 , out_features=512)
        self.dropout = nn.Dropout(0.45)
        self.fc4 = nn.Linear(in_features=512 , out_features=2)
        self.final_act = nn.LogSoftmax(dim=1)
    def forward(self , x):
        out = self.cnn1(x)
        out = self.batchnorm1(out)
        out = self.relu(out)
        out = self.maxpool1(out)
        out = self.cnn2(out)
        out = self.batchnorm2(out)
        out = self.relu(out)
        out = self.maxpool2(out)
      #  print(out.shape)
        out = out.view(-1,16*56*56)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc3(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc4(out)
        out = self.final_act(out)
        return out
    
model = state()
        
print(model)

In [None]:
#applying series of transformation using tranforms.Compose(): it creates a pipeline for series of tranformations needed to apply the the data
#resize the image to 224*224 size array
#convert into pytorch compatible tensor from nd_array type
#normalize and scale the image between [0,1]
train_transforms = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
    ])

In [None]:
#GETTING THE DATA FROM THE FOLDER AND APPLYING THE TRANSFORMATIONS DEFINED ABOVE
dataset = datasets.ImageFolder(trn, transform = train_transforms)


#dividing into train test and validate data
dataset_size = len(dataset)
train_size = int(dataset_size * 0.6)
val_size = int(dataset_size * 0.2)
test_size = dataset_size - train_size - val_size

#randomly splitting the data for no class imbalances and model performs robust when trained on random shuffle
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, val_size, test_size])

print('Dataset size: ', len(dataset))
print('Train set size: ', len(train_dataset))
print('Validation set size: ', len(val_dataset))
print('Test set size: ', len(test_dataset))

In [None]:
BATCH_SIZE = 64
#LOADING THE DATA IN BATCHES FOR COMPUTATION INCLUDING LESS MEMORY CONSUMPTION

#loading the train data

train_loader = torch.utils.data.DataLoader(train_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=True)

#loading the validation data

val_loader = torch.utils.data.DataLoader(val_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=True)

#loading the test data

test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=BATCH_SIZE,
                                          shuffle=True)

In [None]:
LEARNING_RATE = 0.001
#using crossentropy loss and adam optimizer(an upgrade to stochastic gradient descent with momentum capabilities)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = LEARNING_RATE)

In [None]:
#model.cuda()
model.to(device)

# Training and Validation

In [None]:
total_epoch = 50

best_epoch = 0
training_losses = []
val_losses = []

#TRAINING PROCESS
for epoch in range(total_epoch):
    #keep track of training loss
    epoch_train_loss = 0
    
    for X, y in train_loader:
        
        X, y = X.cuda(), y.cuda()
        #setting zero_grad, as in pytorch , optimizer keeps accumulating gradients during each backpropgation
        optimizer.zero_grad()
        #forward pass
        result = model(X)
        #calculating the loss
        loss = criterion(result, y)
        #adding the training loss for better tracking of loss minimization process
        epoch_train_loss += loss.item()
        #executing backward propagation for calcuation of gradients
        loss.backward()
        #updating the weights using the calculated gradients 
        optimizer.step()
      
    training_losses.append(epoch_train_loss)
    
    
    epoch_val_loss = 0
    correct = 0
    total = 0
    
    ##
    # A SMALL DESCRIPTION:
    # torch.no_grad() :::::>>  it impacts the autograd(backpropagation) engine and deactivates it . It will reduce the memory usage
    #                          and speed up the computations and hence we wont be able to perform backprop .

    with torch.no_grad():
        for X, y in val_loader:
            
            X, y = X.cuda(), y.cuda()
            #forward pass
            result = model(X)
            #calculating the loss
            loss = criterion(result, y)
            #appending the loss
            epoch_val_loss += loss.item()
            #extracting the  indeces of maximum values from each row of predictions
            #like: if second prediction is hightest, it will return 1 , the index of second prediction
            #then we will compare with y which have the correct label for that image with predicted label
            _, maximum = torch.max(result.data, 1)
            total += y.size(0)
            #summing all the correct predictions by matching with the true lables for accuracy calcuation
            correct += (maximum == y).sum().item()
            
    val_losses.append(epoch_val_loss)
    #calculating the valildation accuracy
    accuracy = correct/total
    print("EPOCH:", epoch, ", Training Loss:", epoch_train_loss, ", Validation Loss:", epoch_val_loss, ", Accuracy: ", accuracy)
    
    
 

In [None]:
#VISUALIZATION OF TRAINING AND VALIDATION LOSS CHANGES THROUGHOUT THE PROCESS
plt.plot(range(total_epoch), training_losses, label='Training')
plt.plot(range(total_epoch), val_losses, label='Validation')
plt.legend()

In [None]:
#LOADING THE SAVED MODEL
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath)
    model = checkpoint['model']
    model.load_state_dict(checkpoint['state_dict'])
    # AS WE HAVE TO PREFORM TESTING, WE DONT NEED BACKPROPAGATOIN , so setting 'requires_grad' equals FALSE
    for parameter in model.parameters():
        parameter.requires_grad = False
   #RETURNING MODEL IN EVALUATIOIN MODE ::>  .eval() do not change anny behaviour of gradient calculations , but are used to set specific layers like 
   #                                         dropout and batchnorm to evaluation mode i.e. dropout layer won't drop activations and 
   #                                         batchnorm will use running estimates instead batch statistics.
    return model.eval()


filepath = models_dir + str(best_epoch) + ".pth"
#loading th model for testing
loaded_model = load_checkpoint(filepath)

train_transforms = transforms.Compose([
                                       transforms.Resize((224,224)),
                                       transforms.ToTensor(),
                                       transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
                                       ])

# Test

In [None]:
correct = 0
total = 0
    
with torch.no_grad():
    for X, y in test_loader:

        X, y = X.cuda(), y.cuda()
        
        #FORWARD PASS
        result = loaded_model(X)
        
        #ACCURACY CALCULATION
        _, maximum = torch.max(result.data, 1)
        total += y.size(0)
        correct += (maximum == y).sum().item()

accuracy = correct/total

print("\n")
print("---")
print("Accuracy: " + str(accuracy*100))
print("---")
print("\n")