## Import Libraries

In [12]:
## Importing libraries
import pandas as pd
import numpy as np
import cv2
import os
import glob
from PIL import Image
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report, confusion_matrix

import torch
import torch.nn as nn
from torchsummary import summary
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.data import random_split

import matplotlib.pyplot as plt
import seaborn as sns

## Loading Data

In [2]:
DIRECTORY = '../Data/'
CATEGORIES = ['Cloth mask','Mask worn incorrectly','N-95_Mask','No Face Mask','Surgical Mask']

data = []
labels = []
for category in CATEGORIES:
    path = os.path.join(DIRECTORY, category)
    i = 0
    for img in os.listdir(path):
        img_path = os.path.join(path, img)
        
        try:
            image = Image.open(img_path)
            data.append(image)
            labels.append(category)
            #image = train_transforms(image)
        except:
            pass        

In [3]:
total_images = len(data)
train_dataset_size = 1580
test_dataset_size = total_images - train_dataset_size
test_dataset_percentage = test_dataset_size/total_images
data_train, data_test, labels_train, labels_test = train_test_split(data, labels, test_size=test_dataset_percentage, random_state=42)

In [4]:
# convert_tensor=transforms.ToTensor()
# a = convert_tensor(data_train[3])
print(len(data_train))
print(len(data_test))

1580
519


In [5]:
# Imagenet standards
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Train uses data augmentation
train_transforms = transforms.Compose([
        transforms.Resize((224,224)),
#        transforms.RandomResizedCrop(size=256, scale=(0.8, 1.0)),
        transforms.RandomRotation(10),
#        transforms.ColorJitter(),
        transforms.RandomHorizontalFlip(),
#        transforms.CenterCrop(size=224),  # Image net standards
        transforms.ToTensor(),
        transforms.Normalize(torch.Tensor(mean), torch.Tensor(std))  
])
    
# Validation does not use augmentation
test_transforms = transforms.Compose([
        transforms.Resize((224,224)),
#        transforms.CenterCrop(size=224),
        transforms.ToTensor(),
        transforms.Normalize(torch.Tensor(mean), torch.Tensor(std))
    ])

In [62]:
from sklearn.preprocessing import LabelEncoder

lb_make = LabelEncoder()
labels_train = lb_make.fit_transform(labels_train)
labels_test = lb_make.transform(labels_test)

In [63]:
train_images = []
train_labels = []


for i in range(len(data_train)):
    try:
        train_images.append(train_transforms(data_train[i]))
        train_labels.append(labels_train[i])
    except:
        pass
    
test_images = []
test_labels = []
for i in range(len(data_test)):
    try:
        test_images.append(test_transforms(data_test[i]))
        test_labels.append(labels_test[i])
    except:
        pass

In [88]:
len(train_images), len(test_images), len(train_labels), len(test_labels)

(1506, 489, 1506, 489)

In [89]:
len(labels_train), len(labels_test)

(1580, 519)

In [90]:
labels_train.reshape(-1)

array([4, 1, 2, ..., 2, 3, 2], dtype=int64)

In [94]:
train_images[0].shape, len(train_images), len(train_labels)

(torch.Size([3, 224, 224]), 1506, 1506)

In [101]:
train_images_numpy = [t.numpy() for t in train_images]

In [106]:
test_images_numpy = [t.numpy() for t in test_images]

In [105]:
train_images_numpy[0].shape, len(train_images_numpy)

((3, 224, 224), 1506)

In [107]:
test_images_numpy[0].shape, len(test_images_numpy)

((3, 224, 224), 489)

In [131]:
class CreateDataset:
    def __init__(self, x, y):
        
        self.x = x
        self.y = y
    
    def __getitem__(self, index):
        
        sample = {
            'feature': torch.tensor([self.x[index]], dtype=torch.float32), 
            'label': torch.tensor([self.y[index]], dtype=torch.int16)}
        
        return sample
    
    def __len__(self):
        return len(self.x)

In [132]:
train_dataset = CreateDataset(x=train_images_numpy, y=train_labels)
test_dataset = CreateDataset(x=test_images_numpy, y=test_labels)

In [67]:
# train_dataset_size = 1500
# test_dataset_size = 601
# train_data, test_data = random_split(torch_dataset, [train_dataset_size, test_dataset_size])

In [133]:
batch_size_train = 100
batch_size_test = 100

train_loader = torch.utils.data.DataLoader(dataset=train_dataset, 
                                           batch_size=batch_size_train,
                                           shuffle=True,
                                           drop_last=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset, 
                                         batch_size=batch_size_test,
                                         shuffle=False,
                                         drop_last=True)

In [37]:
# train_dataset = torchvision.datasets.ImageFolder(root=train_dataset_path, transform=train_transforms)
# test_dataset = torchvision.datasets.ImageFolder(root=test_dataset_path, transform=test_transforms)

In [98]:
def show_transformed_images(dataset, batch_size):
    #loader = torch.utils.data.DataLoader(dataset, batch_size, shuffle=True)
    batch = next(iter(dataset))
    images, labels = batch
    
    grid = trochvision.utils.make_grid(images, nrow = 3)
    plt.figure(figsize=(11,11))
    plt.imshow(np.transpose(grid, (1,2,0)))
    print('labels: ', labels)

In [119]:
#show_transformed_images(dataset=train_loader, batch_size=32)

In [122]:
# from torchvision.utils import make_grid
# import matplotlib.pyplot as plt

# def show_batch(dl):
#     """Plot images grid of single batch"""
#     for images, labels in dl:
#         fig,ax = plt.subplots(figsize = (16,12))
#         ax.set_xticks([])
#         ax.set_yticks([])
#         ax.imshow(make_grid(images,nrow=16).permute(1,2,0))
#         break
        
# show_batch(train_loader)

## Building Network

In [39]:
# class CNN(nn.Module):
#     def __init__(self):
        
#         super(CNN, self).__init__()
#         self.conv_layer = nn.Sequential(
#         nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
#         nn.BatchNorm2d(32),
#         nn.LeakyReLU(inplace=True),
#         nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),
#         nn.BatchNorm2d(32),
#         nn.LeakyReLU(inplace=True),
#         nn.MaxPool2d(kernel_size=2, stride=2),
#         nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
#         nn.BatchNorm2d(64),
#         nn.LeakyReLU(inplace=True),
#         nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
#         nn.BatchNorm2d(64),
#         nn.LeakyReLU(inplace=True),
#         nn.MaxPool2d(kernel_size=2, stride=2),
#         )
        
#         self.fc_layer = nn.Sequential(
#         nn.Dropout(p=0.1),
#         nn.Linear(28 * 28 * 128, 1000),
#         nn.ReLU(inplace=True),
#         nn.Linear(1000, 512),
#         nn.ReLU(inplace=True),
#         nn.Dropout(p=0.1),
#         nn.Linear(512, 10)
#         )
        
#     def forward(self, x):
#         # conv layers
#         x = self.conv_layer(x)
#         # flatten
#         x = x.view(x.size(0), -1)
#         # fc layer
#         9
#         https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html#torch.nn.BatchNorm2d
#         6
#         x = self.fc_layer(x)
#         return x

In [134]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.LeakyReLU(inplace=True),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.LeakyReLU(inplace=True),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size = 3, padding = 1),
            nn.LeakyReLU(inplace=True),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2),
        )
        
        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.1),
            nn.Linear(28 * 28 * 128, 1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.1),
            nn.Linear(512, 5)
        )
        
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layer(x)
        
        return x

In [135]:
learning_rate = 0.005
model = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [79]:
        for epoch in range(num_epochs):

            train_loss = 0.0
            self.model.train()

            for i, data in enumerate(train_loader, start=1):
                # forward
                start_location = data['start_location']
                file_cycle = data['file_cycle'].reshape(-1,1)
                end_location = data['end_location']

                outputs = self.model(start_location, file_cycle)
                loss = self.criterion(outputs, end_location)

                # backward
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_loss += loss.item()

                if epoch == num_epochs-1:
                    pred_end_location_train.append(outputs)
                    real_end_location_train.append(end_location)

            train_loss_per_epoch.append(train_loss/len_train_loader)

            valid_loss = 0.0
            self.model.eval()

            for data in val_loader:
                start_location = data['start_location']
                file_cycle = data['file_cycle'].reshape(-1,1)
                end_location = data['end_location'] 

                outputs = self.model(start_location, file_cycle)
                loss = self.criterion(outputs, end_location)

                valid_loss += loss.item()

                # if epoch == num_epochs-1:
                #     pred_end_location_val.append(outputs)
                #     real_end_location_val.append(end_location)

            val_loss_per_epoch.append(valid_loss/len_val_loader)

            scheduler.step(valid_loss)

            print(f'epoch {epoch+1} / {num_epochs}, Training Loss = {(train_loss/len_train_loader):.6f}\
            , Validation Loss: {(valid_loss/len_val_loader):.6f}')    

        torch.save(self.model,'Saved/Models/'+self.flow_type+'/'+self.lag_type_train+'_'+str(self.num_seeds_train)+'_'+\
                   str(self.file_cycle_interval_train)+'_'+str(self.batch_size_train)+'_'+str(learning_rate)+'_Model.pth')
        

NameError: name 'self' is not defined

In [122]:
train_images[0].shape

torch.Size([3, 224, 224])

In [124]:
len(train_images)

1506

In [136]:
num_epochs = 2
total_step = len(train_loader)
loss_list = []
acc_list = []

for epoch in range(num_epochs):
    
    model.train()
    
    for i, data in enumerate(train_loader):
        
        images = data['feature']
        labels = data['label']
        print(images.shape, labels.shape)
        images = images.reshape(-1,3,224,224)
        #print(images.shape, labels.shape)
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss_list.append(loss.item())
        
        # Backprop and optimisation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Train accuracy
        total = labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct = (predicted == labels).sum().item()
        acc_list.append(correct / total)
        
        if (i + 1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'
            .format(epoch + 1, num_epochs, i + 1, total_step, loss.item(),
            (correct / total) * 100))


torch.Size([100, 1, 3, 224, 224]) torch.Size([100, 1])


RuntimeError: 0D or 1D target tensor expected, multi-target not supported

In [None]:
dataset = {
    "image_path": [],
    "mask_status": [],
    "where": []
}

for where in os.listdir(path):
    for status in os.listdir(path+"/"+where):
        for image in glob.glob(path+where+"/"+status+"/"+"*.png"):
            dataset["image_path"].append(image)
            dataset["mask_status"].append(status)
            dataset["where"].append(where)
            
dataset = pd.DataFrame(dataset)
dataset.head()

In [23]:
class CNN(nn.Module):
    def __init__(self):
        
        super(CNN, self).__init__()
        self.conv_layer = nn.Sequential(
        nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
        nn.BatchNorm2d(32),
        nn.LeakyReLU(inplace=True),
        nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, padding=1),
        nn.BatchNorm2d(32),
        nn.LeakyReLU(inplace=True),
        nn.MaxPool2d(kernel_size=2, stride=2),
        nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
        nn.BatchNorm2d(64),
        nn.LeakyReLU(inplace=True),
        nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1),
        nn.BatchNorm2d(64),
        nn.LeakyReLU(inplace=True),
        nn.MaxPool2d(kernel_size=2, stride=2),
        )
        
        self.fc_layer = nn.Sequential(
        nn.Dropout(p=0.1),
        nn.Linear(8 * 8 * 64, 1000),
        nn.ReLU(inplace=True),
        nn.Linear(1000, 512),
        nn.ReLU(inplace=True),
        nn.Dropout(p=0.1),
        nn.Linear(512, 10)
        )
        
    def forward(self, x):
        # conv layers
        x = self.conv_layer(x)
        # flatten
        x = x.view(x.size(0), -1)
        # fc layer
        9
        https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm2d.html#torch.nn.BatchNorm2d
        6
        x = self.fc_layer(x)
        return x

SyntaxError: invalid syntax (Temp/ipykernel_9344/3020880936.py, line 39)

In [25]:
total_step = len(train_loader)
loss_list = []
acc_list = []
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss_list.append(loss.item())
        # Backprop and optimisation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Train accuracy
        total = labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct = (predicted == labels).sum().item()
        acc_list.append(correct / total)

        if (i + 1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Accuracy: {:.2f}%'.format(epoch + 1, num_epochs, i + 1, total_step, loss.item(),\
            (correct / total) * 100))

NameError: name 'train_loader' is not defined

In [None]:
# load the image
path = '../Data/'
cloth_mask_path = 'Cloth mask/'
Mask_worn_incorrectly_path = 'Mask worn incorrectly/'
N_95_Mask_path = 'N-95_Mask/'
No_Face_Mask_path = 'No Face Mask/'
Surgical_Mask_path = 'Surgical Mask/'