In [1]:
import copy
from os import listdir
from os.path import isfile, join

import cv2
from PIL import Image
import matplotlib.pyplot as plt
import dlib
from imutils import face_utils
from random import shuffle

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

In [2]:
class Lambda(nn.Module):
    def __init__(self, func):
        super().__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)

In [3]:
class ResidualBlock(nn.Module):
    """
    The residual block used by ResNet.
    
    Args:
        in_channels: The number of channels (feature maps) of the incoming embedding
        out_channels: The number of channels after the first convolution
        stride: Stride size of the first convolution, used for downsampling
    """
    
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()        
        if stride > 1 or in_channels != out_channels:
            # Add strides in the skip connection and zeros for the new channels.
            self.skip = Lambda(lambda x: F.pad(x[:, :, ::stride, ::stride],
                                               (0, 0, 0, 0, 0, out_channels - in_channels),
                                               mode="constant", value=0))
        else:
            self.skip = nn.Sequential()
            
        # TODO: Initialize the required layers
        self.layer = nn.Sequential(
                nn.Conv2d(in_channels,out_channels,kernel_size=3,stride=stride,padding=1,bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=1,padding=1,bias=False),
                nn.BatchNorm2d(out_channels)
            )
    def forward(self, inputs):
        # TODO: Execute the required layers and functions
        out = self.layer(inputs)
        out += self.skip(inputs)
        out = F.relu(out)
        return out

In [4]:
class ResidualStack(nn.Module):
    """
    A stack of residual blocks.
    
    Args:
        in_channels: The number of channels (feature maps) of the incoming embedding
        out_channels: The number of channels after the first layer
        stride: Stride size of the first layer, used for downsampling
        num_blocks: Number of residual blocks
    """
    
    def __init__(self, in_channels, out_channels, stride, num_blocks):
        super().__init__()
        
        # TODO: Initialize the required layers (blocks)
        layer = []
        layer.append(ResidualBlock(in_channels, out_channels, stride))
        for i in range(1,num_blocks):
            layer.append(ResidualBlock(out_channels, out_channels))
        self.layer = nn.Sequential(*layer)

    def forward(self, out):
        # TODO: Execute the layers (blocks)
        out = self.layer(out)
        return out

In [5]:
class ResNet(nn.Module):
    def __init__(self, n = 5, num_classes = 2):
        super().__init__()
        self.n = 5
        self.num_classes = 2

        # TODO: Implement ResNet via nn.Sequential
        self.layer = nn.Sequential(
        nn.Conv2d(3,16,3,1,1,bias=False),
        nn.BatchNorm2d(16),
        nn.ReLU(inplace=True),
        ResidualStack(16,16,1,n),
        ResidualStack(16,32,2,n),
        ResidualStack(32,64,2,n),
        ResidualStack(64,64,2,n),
        ResidualStack(64,64,2,n),
        nn.AvgPool2d(8), 
        Lambda(lambda x: torch.squeeze(x)),
        nn.Linear(64,num_classes), 
        )
    def forward(self, out):
        # TODO: Execute the layers (blocks)
        out = self.layer(out)
        return out

In [6]:
resnet = ResNet()

In [7]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(128, 4),
    transforms.ToTensor(),
    normalize,
])
transform_eval = transforms.Compose([
    transforms.ToTensor(),
    normalize
])

In [8]:
path1 = 'trainingData1/'
path0 = 'trainingData0/'
files1 = [image for image in listdir(path1) if isfile(join(path1, image))]
files0 = [image for image in listdir(path0) if isfile(join(path0, image))]
shuffle(files1)
shuffle(files0)

inputs = []

for i, file in enumerate(files1):
    image_path = path1 + files1[i]
    img = Image.open(image_path)
    inputs.append(img)

for i, file in enumerate(files0):
    image_path = path0 + files0[i]
    img = Image.open(image_path)
    inputs.append(img)

targets1 = np.ones(len(files1), dtype=np.long)
targets0 = np.zeros(len(files0), dtype=np.long)

In [9]:
ntrain = 182
nval = 26
ndata1 = len(files1)
train_set = []
val_set = []
test_set = []
for i in range(ntrain):
    train_set.append((transform_train(inputs[i]),targets1[i]))
    train_set.append((transform_train(inputs[i+ndata1]),targets0[i]))
for i in range(ntrain, ndata1):
    val_set.append((transform_eval(inputs[i]),targets1[i]))
    val_set.append((transform_eval(inputs[i+ndata1]),targets0[i]))

In [10]:
dataloaders = {}
dataloaders['train'] = torch.utils.data.DataLoader(train_set, batch_size=8,
                                                   shuffle=True, num_workers=0,
                                                   pin_memory=True)
dataloaders['val'] = torch.utils.data.DataLoader(val_set, batch_size=8,
                                                 shuffle=False, num_workers=0,
                                                 pin_memory=True)

In [11]:
def run_epoch(model, optimizer, dataloader, train):
    """
    Run one epoch of training or evaluation.
    
    Args:
        model: The model used for prediction
        optimizer: Optimization algorithm for the model
        dataloader: Dataloader providing the data to run our model on
        train: Whether this epoch is used for training or evaluation
        
    Returns:
        Loss and accuracy in this epoch.
    """
    # TODO: Change the necessary parts to work correctly during evaluation (train=False)
    device = next(model.parameters()).device

    epoch_loss = 0.0
    epoch_acc = 0.0

    # Set model to training mode (for e.g. batch normalization, dropout)
    if train:
        model.train()
    else:
        model.eval()

    for xb, yb in dataloader:
        xb, yb = xb.to(device), yb.to(device)
      # zero the parameter gradients
        if train:
            optimizer.zero_grad()

      # forward
        with torch.set_grad_enabled(True):
            pred = model(xb.float())
            loss = F.cross_entropy(pred, yb.long())
            top1 = torch.argmax(pred, dim=1)
            ncorrect = torch.sum(top1 == yb)

        if train:  
            loss.backward()
            optimizer.step()

      # statistics
        epoch_loss += loss.item()
        epoch_acc += ncorrect.item()

    epoch_loss /= len(dataloader.dataset)
    epoch_acc /= len(dataloader.dataset)
    return epoch_loss, epoch_acc

In [12]:
def fit(model, optimizer, lr_scheduler, dataloaders, max_epochs, patience):
    """
    Fit the given model on the dataset.
    
    Args:
        model: The model used for prediction
        optimizer: Optimization algorithm for the model
        lr_scheduler: Learning rate scheduler that improves training
                      in late epochs with learning rate decay
        dataloaders: Dataloaders for training and validation
        max_epochs: Maximum number of epochs for training
        patience: Number of epochs to wait with early stopping the
                  training if validation loss has decreased
                  
    Returns:
        Loss and accuracy in this epoch.
    """
    
    best_acc = 0
    curr_patience = 0
    PATH = './data'
    for epoch in range(max_epochs):
        train_loss, train_acc = run_epoch(model, optimizer, dataloaders['train'], train=True)
        lr_scheduler.step()
        print(f"Epoch {epoch + 1: >3}/{max_epochs}, train loss: {train_loss:.2e}, accuracy: {train_acc * 100:.2f}%")

        val_loss, val_acc = run_epoch(model, None, dataloaders['val'], train=False)
        print(f"Epoch {epoch + 1: >3}/{max_epochs}, val loss: {val_loss:.2e}, accuracy: {val_acc * 100:.2f}%")

        # TODO: Add early stopping and save the best weights (in best_model_weights)
        if val_acc > best_acc:
            curr_patience = 0
            best_acc = val_acc
            best_model_weights = copy.deepcopy(model.state_dict())
        else:
            curr_patience += 1
        if curr_patience >= patience:
            break
    
    model.load_state_dict(best_model_weights)

In [13]:
optimizer = torch.optim.SGD(resnet.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100, 150], gamma=0.1)

# Fit model
fit(resnet, optimizer, lr_scheduler, dataloaders, max_epochs=200, patience=10)

Epoch   1/200, train loss: 4.92e-01, accuracy: 61.81%
Epoch   1/200, val loss: 8.37e-01, accuracy: 71.25%
Epoch   2/200, train loss: 7.43e-02, accuracy: 75.00%
Epoch   2/200, val loss: 1.44e-01, accuracy: 50.00%
Epoch   3/200, train loss: 5.47e-02, accuracy: 85.44%
Epoch   3/200, val loss: 3.71e-02, accuracy: 83.75%
Epoch   4/200, train loss: 2.32e-02, accuracy: 93.68%
Epoch   4/200, val loss: 8.27e-03, accuracy: 97.50%
Epoch   5/200, train loss: 3.16e-02, accuracy: 93.13%
Epoch   5/200, val loss: 1.90e-01, accuracy: 50.00%
Epoch   6/200, train loss: 2.46e-02, accuracy: 93.41%
Epoch   6/200, val loss: 1.00e-02, accuracy: 96.25%
Epoch   7/200, train loss: 2.16e-02, accuracy: 94.78%
Epoch   7/200, val loss: 7.00e-03, accuracy: 98.75%
Epoch   8/200, train loss: 1.39e-02, accuracy: 96.43%
Epoch   8/200, val loss: 8.60e-03, accuracy: 98.75%
Epoch   9/200, train loss: 1.91e-02, accuracy: 95.33%
Epoch   9/200, val loss: 1.56e-02, accuracy: 97.50%
Epoch  10/200, train loss: 1.92e-02, accuracy:

In [14]:
torch.save(resnet.state_dict(), 'param_cpu.pkl')

In [15]:
# resnet.load_state_dict(torch.load('param_640.pkl'))

In [16]:
def hisEqulColor(img):  
    ycrcb = cv2.cvtColor(img, cv2.COLOR_BGR2YCR_CB)  
    channels = cv2.split(ycrcb)  
    cv2.equalizeHist(channels[0], channels[0]) #equalizeHist(in,out)  
    cv2.merge(channels, ycrcb)  
    img_eq=cv2.cvtColor(ycrcb, cv2.COLOR_YCR_CB2BGR)  
    return img_eq

In [None]:
face_detect = dlib.get_frontal_face_detector()
path = 'test_image/'
files = [image for image in listdir(path) if isfile(join(path, image))]

for i, file in enumerate(files):
    image_path = path + files[i]
    print(image_path)
    frame =  cv2.imread(image_path)
    img = hisEqulColor(frame)
    new_dimension = (128,128)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    rects = face_detect(gray, 1)
    for (i, rect) in enumerate(rects):
        (x, y, w, h) = face_utils.rect_to_bb(rect)
        cropped_face =  frame[y:y+h, x:x+w]
        # resizing found face to new dimensions
        resized_face = cv2.resize(cropped_face, new_dimension)
        resized_face = cv2.cvtColor(resized_face,cv2.COLOR_BGR2RGB)
        resized_face = Image.fromarray(resized_face, 'RGB')
        inp = transform_eval(resized_face)
        plt.imshow(resized_face, cmap='gray')
        plt.axis('off')
        plt.show()
        pred = resnet(torch.unsqueeze(inp,0))
        top1 = torch.argmax(pred)
        print(pred)

In [18]:
def classifier(frame):
    face_detect = dlib.get_frontal_face_detector()
    img = hisEqulColor(frame)
    new_dimension = (128,128)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) #img
    rects = face_detect(gray, 1)
    cropped_face = None
    for (i, rect) in enumerate(rects):
        (x, y, w, h) = face_utils.rect_to_bb(rect)
        cropped_face = frame[y:y+h, x:x+w]

    try:
        if cropped_face is not None:
        # resizing found face to new dimensions
            resized_face = cv2.resize(cropped_face, new_dimension)
            resized_face = cv2.cvtColor(resized_face,cv2.COLOR_BGR2RGB)
            resized_face = Image.fromarray(resized_face, 'RGB')
            inp = transform_eval(resized_face)
            pred = resnet(torch.unsqueeze(inp,0))
            top1 = torch.argmax(pred)
            return top1 
        else:
            return None
    except:
        return None

In [None]:
capture = cv2.VideoCapture('test_video/Blightt2.avi')
response, frame = capture.read()
fps = capture.get(cv2.CAP_PROP_FPS)
size = (int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)),
        int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
writer = cv2.VideoWriter('Face_recognition.avi', cv2.VideoWriter_fourcc(*'XVID'), fps, size)
face_detect = dlib.get_frontal_face_detector()
res = []
while response:
    result = classifier(frame)
    if result == 1:
        res.append(1)        
        message = 'You you are finally here'
        cv2.putText(frame, message, (50,80), cv2.FONT_ITALIC, 1, (0,255,0), 2)	

    elif result == 0:
        res.append(0)
        message = 'Sorry! You are not who you say'
        cv2.putText(frame, message, (50,80), cv2.FONT_ITALIC, 1, (0,0,255), 2)
        
    else:
        message = 'No face found!'
        cv2.putText(frame, message, (50,80), cv2.FONT_ITALIC, 1, (0,200,200), 2)


    writer.write(frame)
    response, frame = capture.read()

capture.release()
writer.release()
cv2.destroyAllWindows()

In [None]:
capture = cv2.VideoCapture('test_video/original3.mp4')
response, frame = capture.read()
fps = capture.get(cv2.CAP_PROP_FPS)
size = (int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)),
        int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)))
face_detect = dlib.get_frontal_face_detector()
res = []
count = 0
count_n = 0
while response:
    count += 1
    result = classifier(frame)
    if result == 1:
        res.append(1)        

    elif result == 0:
        res.append(0)
        
    else:
        count_n += 1


    response, frame = capture.read()
print(np.mean(res))
print(1-count_n/count)
capture.release()
cv2.destroyAllWindows()