In [2]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
from torch.utils.data import DataLoader
from torch import optim as op
from torch.optim.lr_scheduler import MultiStepLR
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
from torchvision.transforms import RandomHorizontalFlip, RandomRotation, ToPILImage, Resize, ColorJitter, Normalize, ToTensor
import os
from tqdm import tqdm

In [6]:
path = r'D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\train\AI1904'
batch_size = 16
epoch = 5
workers = 0 if os.name == 'nt' else 8

In [7]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [8]:
mtcnn = MTCNN(device=device, margin=10, keep_all=True, thresholds=[0.5, 0.6, 0.6])

In [9]:
dataset = datasets.ImageFolder(path, transform=transforms.Resize((512,512)))  # you can use `PIL.Image.resize` if you want

# After it reads the folder people, it will asign each folder from 0-inf this line of code save the new cropped image to the same folder. It is not nessasary to run the code
dataset.samples = [(p, p.replace(path, path + 'cropped')) for p, _ in dataset.samples]

loader = DataLoader(dataset, batch_size=batch_size, num_workers=workers, collate_fn=training.collate_pil)
for i, (x,y) in enumerate(loader):
    try:
        mtcnn(x, save_path=y)
        print('\rBatch {} of {}'.format(i + 1, len(loader)), end='')
    except ValueError:
        pass

# Remove mtcnn to reduce GPU memory usage
del mtcnn

Batch 3 of 3

In [10]:
# Remember so set classify=False if you want to add new people in the future, or do one-shot face matching
temp_dataset = datasets.ImageFolder(r'D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\train\AI1904')
resnet = InceptionResnetV1(pretrained='vggface2', classify=True, num_classes=len(temp_dataset.class_to_idx)).to(device)

In [11]:
optimizer = op.AdamW(resnet.parameters(), lr=0.001)
scl = MultiStepLR(optimizer, [5,10])
loss_fn = nn.CrossEntropyLoss()
metric = {'fps': training.BatchTimer(), 'acc': training.accuracy}

In [12]:
train_path = r'D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\train\AI1904'
val_path = r'D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\test\AI1904'

train_trans = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.RandomHorizontalFlip(0.3),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomRotation(degrees=20),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.4792, 0.4191, 0.3912],
                         std=[0.2922, 0.2726, 0.2682]),
    fixed_image_standardization
])

val_trans = transforms.Compose([
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.4792, 0.4191, 0.3912],
                         std=[0.2922, 0.2726, 0.2682]),
    fixed_image_standardization
])

# Datasets
train_dataset = datasets.ImageFolder(train_path, transform=train_trans)
val_dataset   = datasets.ImageFolder(val_path, transform=val_trans)

# Loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    num_workers=workers,
    shuffle=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=batch_size,
    num_workers=workers,
    shuffle=False
)

In [15]:
from PIL import Image

def crop_and_save_faces(input_dir, output_dir, mtcnn):
    dataset = datasets.ImageFolder(input_dir)
    os.makedirs(output_dir, exist_ok=True)
    for label, class_name in enumerate(dataset.classes):
        os.makedirs(os.path.join(output_dir, class_name), exist_ok=True)

    for img_path, label in tqdm(dataset.imgs, desc=f"Processing {input_dir}"):
        img_name = os.path.basename(img_path)
        save_path = os.path.join(output_dir, dataset.classes[label], img_name)

        try:
            img = Image.open(img_path).convert('RGB')
            cropped_img = mtcnn(img)
            if cropped_img is not None:
                ToPILImage()(cropped_img).save(save_path)
        except Exception as e:
            print(f"Error processing {img_path}: {e}")

train_path_cropped = './data/train_cropped'
val_path_cropped = './data/val_cropped'

mtcnn = MTCNN(device=device, margin=10, keep_all=False, thresholds=[0.5, 0.6, 0.6])

crop_and_save_faces(train_path, train_path_cropped, mtcnn)
crop_and_save_faces(val_path, val_path_cropped, mtcnn)

Processing D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\train\AI1904: 100%|██████████| 18/18 [00:01<00:00, 10.45it/s]
Processing D:\Python\Projects\CV\Computer_Vision\FaceNet\face_test\test\AI1904: 100%|██████████| 46/46 [00:04<00:00, 10.00it/s]


In [16]:
writer = SummaryWriter()
writer.iteration, writer.interval = 0, 10

print('\n\nInitial')
print('-' * 10)
resnet.eval()
training.pass_epoch(
    resnet, loss_fn, val_loader,
    batch_metrics=metric, show_running=True, device=device,
    writer=writer
)

for i in range(epoch):
    print('\nEpoch {}/{}'.format(i + 1, epoch))
    print('-' * 10)

    resnet.train()
    training.pass_epoch(
        resnet, loss_fn, train_loader, optimizer, scl,
        batch_metrics=metric, show_running=True, device=device,
        writer=writer
    )

    resnet.eval()
    training.pass_epoch(
        resnet, loss_fn, val_loader,
        batch_metrics=metric, show_running=True, device=device,
        writer=writer
    )

writer.close()



Initial
----------
Valid |     3/3    | loss:    1.4810 | fps:   33.2061 | acc:    0.5536   

Epoch 1/5
----------
Train |     2/2    | loss:    1.1247 | fps:   13.2917 | acc:    0.6562   
Valid |     3/3    | loss:    1.9384 | fps:   53.2952 | acc:    0.0625   

Epoch 2/5
----------
Train |     2/2    | loss:    0.8857 | fps:   15.1764 | acc:    0.5938   
Valid |     3/3    | loss:    8.1314 | fps:   54.3871 | acc:    0.5536   

Epoch 3/5
----------
Train |     2/2    | loss:    1.8821 | fps:   15.2462 | acc:    0.4688   
Valid |     3/3    | loss:    8.4809 | fps:   54.6365 | acc:    0.5536   

Epoch 4/5
----------
Train |     2/2    | loss:    0.6300 | fps:   15.1277 | acc:    0.7500   
Valid |     3/3    | loss:    3.2010 | fps:   54.8970 | acc:    0.5536   

Epoch 5/5
----------
Train |     2/2    | loss:    0.2832 | fps:   15.0274 | acc:    0.9375   
Valid |     3/3    | loss:    4.8578 | fps:   55.4753 | acc:    0.0833   


In [12]:
# # This code does the same thing as above, not sure why I wrote it
# for epoch_num in range(epoch):
#     train_loss = 0
#     resnet.train()
#
#     for batch, (x_train, y_train) in enumerate(train_loader):
#         x_train, y_train = x_train.to(device), y_train.to(device)
#         y_pred = resnet(x_train)
#         loss = loss_fn(y_pred, y_train)
#         train_loss += loss.item()
#
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
#     train_loss /= len(train_loader)
#
#     test_loss = 0
#     resnet.eval()
#     with torch.inference_mode():
#         for X_test, y_test in val_loader:
#             X_test, y_test = X_test.to(device), y_test.to(device)
#             test_pred = resnet(X_test)
#             test_loss += loss_fn(test_pred, y_test).item()
#
#     test_loss /= len(val_loader)
#     print(f'Epoch {epoch_num + 1}: Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

### Save model

In [13]:
torch.save({
    'model_state_dict': resnet.state_dict(),
    'class_to_idx': dataset.class_to_idx  # Save label-to-class mapping
}, '../model.pth')

### Load model

In [14]:
# Load the saved model
checkpoint = torch.load('../model.pth', map_location=device)  # Use map_location='cpu' if no GPU available
resnet.load_state_dict(checkpoint['model_state_dict'])
resnet.eval()  # Put the model in evaluation mode
print('Model loaded and ready for inference')

Model loaded and ready for inference


# Finding the mean and std of the dataset

In [15]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np

# Define the dataset WITHOUT applying normalization
transform = transforms.Compose([
    transforms.Resize((160, 160)),  # Ensure dimensions match model input
    transforms.ToTensor()  # Convert to Tensor (values in range [0,1])
])
dataset = datasets.ImageFolder('./train', transform=transform)
loader = DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4)

# Calculate mean and std
mean = 0.0
std = 0.0
n_samples = 0
for images, _ in loader:
    n_samples += images.size(0)  # Add the batch size
    images = images.view(images.size(0), images.size(1), -1)  # Flatten HxW to single dimension
    mean += images.mean(2).sum(0)  # Sum mean per channel
    std += images.std(2).sum(0)  # Sum std per channel

mean /= n_samples
std /= n_samples

print(f"Mean: {mean}")
print(f"Std: {std}")


FileNotFoundError: [WinError 3] The system cannot find the path specified: './train'

In [43]:
from PIL import Image
import os


def verify_images(folder_path):
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        try:
            # Try opening the image
            with Image.open(file_path) as img:
                img.verify()  # Verify that the image fits the PIL format
        except (IOError, SyntaxError) as e:
            print(f"Corrupted or unsupported file: {file_path}")
            # Optionally, remove the corrupted file
            # os.remove(file_path)


# Validate images in 'Me_val' and 'madonna' folders
verify_images('data/train/Me')
verify_images('data/train/madonna')


In [45]:
dataset_path = 'data/train'
for folder in os.listdir(dataset_path):
    folder_path = os.path.join(dataset_path, folder)
    if os.path.isdir(folder_path):  # Skip non-directory files
        verify_images(folder_path)
