In [6]:
import torch
#import torch.nn.functional as F
import os
import matplotlib.pyplot as plt
#import shutil
import splitfolders
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.nn.init import *
from torchvision import transforms, utils, datasets, models
import time
import copy
import numpy as np
from PIL import Image
from facenet_pytorch import  InceptionResnetV1
from facenet_pytorch import MTCNN, InceptionResnetV1
import cv2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

# Get face dataset from VIDTIMIT

In [9]:
vidtimit_dataset = 'VIDTIMIT'
face_dataset = "data"

create_dataset = True
mtcnn = MTCNN()

if create_dataset:
        if not os.path.exists(face_dataset):
                os.makedirs(face_dataset)

        labels = [name for name in os.listdir(vidtimit_dataset) if os.path.isdir(os.path.join(vidtimit_dataset, name))]

        for label in labels:
                face_path = f"{vidtimit_dataset}/{label}/video"
                i = 0
                
                if not os.path.exists(f"{face_dataset}/{label}"):
                        os.makedirs(f"{face_dataset}/{label}")
                
                for dir in os.listdir(face_path):
                        files = os.listdir(f"{face_path}/{dir}")
                        for file in files:
                                img_path = os.path.join(f"{face_path}/{dir}", file)
                                img = Image.open(img_path)
                                new_filename = f"{face_dataset}/{label}/{i}.jpeg" 
                                img_cropped = mtcnn(img, save_path = new_filename)
                                i = i +1
                                
                '''           
                for root, dirs, files in os.walk(face_path):
                        for file in files:
                                img_path = os.path.join(root, file)
                                img = Image.open(img_path)
                                new_filename = f"{i}.jpeg"
                                img.save(os.path.join(f"{face_dataset}/{label}", new_filename), "JPEG")
                                i += 1
                '''

## Split Folder in test train and val 

In [None]:
splitfolders.ratio(face_dataset, output="face_dataset", seed=1337, ratio=(0.7, 0.2, 0.1))

In [48]:
data_transforms = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((512,384)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((512,384)),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]),
}
data_dir = 'face_dataset/'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=8, shuffle=True) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train','val']}
class_names = image_datasets['train'].classes
class_names

In [49]:
def imshow(image_path, title=None):
    #inp = inp.numpy().transpose((1, 2, 0))
    #inp = np.clip(inp, 0, 1)
    img = cv2.imread( f"{face_dataset}/{image_path}" )
    plt.imshow(img)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  
inputs, classes = next(iter(dataloaders['train']))
out = utils.make_grid(inputs)
imshow(out, title=[class_names[x] for x in classes])

In [40]:
model_ft = InceptionResnetV1(pretrained='vggface2', classify=False, num_classes = len(class_names))

#list(model_ft.children())[-6:]

layer_list = list(model_ft.children())[-5:] # all final layers
model_ft = nn.Sequential(*list(model_ft.children())[:-5])

for param in model_ft.parameters():
    param.requires_grad = False
    
class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()
        
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return x

class normalize(nn.Module):
    def __init__(self):
        super(normalize, self).__init__()
        
    def forward(self, x):
        x = F.normalize(x, p=2, dim=1)
        return x
    
model_ft.avgpool_1a = nn.AdaptiveAvgPool2d(output_size=1)
model_ft.last_linear = nn.Sequential(
    Flatten(),
    nn.Linear(in_features=1792, out_features=512, bias=False),
    normalize()
)
model_ft.logits = nn.Linear(layer_list[2].out_features, len(class_names))
model_ft.softmax = nn.Softmax(dim=1)
model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.SGD(model_ft.parameters(), lr=1e-2, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)  


In [61]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()
    FT_losses = []
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
    # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode
            running_loss = 0.0
            running_corrects = 0
            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                # zero the parameter gradients
                optimizer.zero_grad()
                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        scheduler.step()
                
                FT_losses.append(loss.item())
                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, FT_losses

In [63]:
model_ft, FT_losses = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)

Epoch 0/19
----------




KeyboardInterrupt: 

  from .autonotebook import tqdm as notebook_tqdm
Downloading model.safetensors: 100%|██████████| 224M/224M [00:04<00:00, 53.7MB/s] 
