In [5]:
import numpy as np
from sklearn.datasets import fetch_lfw_people
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, datasets
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import math
import os

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [2]:
class LFWDataset(Dataset):
    def __init__(self, transform=None):
        lfw_people = fetch_lfw_people(min_faces_per_person=20, color=True)
        
        self.images = lfw_people.images
        self.labels = lfw_people.target
        self.target_names = lfw_people.target_names
        self.transform = transform
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        image = torch.from_numpy(image)
        image = image.permute(2, 0, 1)

        if self.transform:
            image = self.transform(image)
            
        return image, label

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Model(nn.Module):
    def __init__(self, embedding_size=128):
        super().__init__()
        
        self.conv_blocks = nn.Sequential(
            
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2, padding=1),

            
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((1, 1))  
        )

        
        self.embedding = nn.Linear(512, embedding_size, bias=False)
        self.bn_embedding = nn.BatchNorm1d(embedding_size)

    def forward(self, x):
        x = self.conv_blocks(x)           
        x = x.view(x.size(0), -1)         
        x = self.embedding(x)             
        x = self.bn_embedding(x)          
        x = F.normalize(x, p=2, dim=1)    
        return x



class ArcFaceLoss(nn.Module):
    def __init__(self, embedding_size, num_classes, scale=64.0, margin=0.5):
        super().__init__()
        self.weight = nn.Parameter(torch.Tensor(num_classes, embedding_size))
        nn.init.xavier_uniform_(self.weight)
        self.scale = scale
        self.margin = margin
        self.cos_m = math.cos(margin)
        self.sin_m = math.sin(margin)
        self.th = math.cos(math.pi - margin)
        self.mm = math.sin(math.pi - margin) * margin

    def forward(self, embeddings, labels):
        cosine = F.linear(F.normalize(embeddings), F.normalize(self.weight))
        sine = torch.sqrt(1.0 - torch.clamp(cosine ** 2, 0, 1))
        phi = cosine * self.cos_m - sine * self.sin_m
        phi = torch.where(cosine > self.th, phi, cosine - self.mm)
        one_hot = F.one_hot(labels, num_classes=self.weight.size(0)).float().to(embeddings.device)
        logits = one_hot * phi + (1.0 - one_hot) * cosine
        logits *= self.scale
        return logits

In [None]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((128, 128)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5]*3, std=[0.5]*3)
])

batch_size = 64

dataset = LFWDataset(transform=transform)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=4)

num_classes = len(np.unique(dataset.labels))

model = Model(embedding_size=128).to(device)
arcface = ArcFaceLoss(embedding_size=128, num_classes=num_classes).to(device)

optimizer = torch.optim.Adam(list(model.parameters()) + list(arcface.parameters()), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

In [8]:
from tqdm import tqdm

epochs = 50
loss_history = []

pbar = tqdm(range(epochs))

for epoch in pbar:
    model.train()
    total_loss = 0
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)
        embeddings = model(images)
        logits = arcface(embeddings, labels)
        loss = F.cross_entropy(logits, labels)
        loss_history.append(loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    scheduler.step()
    pbar.set_postfix({'loss': loss.item()})

100%|██████████| 50/50 [02:31<00:00,  3.04s/it, loss=0.00743] 


In [9]:
torch.save(model.state_dict(), "model.pth")

In [None]:
def recognize(query_embedding, database, threshold=0.7, device='cpu'):
    best_match = None
    best_score = -1

    query_embedding = query_embedding.squeeze().to(device)

    for name, embs in database.items():
        total = 0
        for emb in embs:
            emb = emb.float().squeeze().to(device)

            total += F.cosine_similarity(query_embedding, emb, dim=0).item()
            
        avg = total / len(embs)
        if avg > threshold and avg > best_score:
            best_score = avg
            best_match = name

    return best_match or "Unknown", best_score

In [None]:
face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml')

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

def preprocess_face(face_img):
    face_img = cv2.cvtColor(face_img, cv2.COLOR_BGR2RGB) 
    face_img = np.transpose(face_img, (2, 0, 1))
    face_tensor = transform(torch.tensor(face_img,dtype=torch.float32)).unsqueeze(0)
    return face_tensor


def recognize_face(face, database, model, threshold=0.7, device='cpu'):
    with torch.no_grad():
        embedding = model(face)

    best_match = None
    best_score = 0

    for name, embs in database.items():
        embs_tensor = torch.stack(embs).to(device)
        sims = F.cosine_similarity(embedding.unsqueeze(0), embs_tensor, dim=2)
        avg = sims.mean().item()
        std = sims.std().item()
        # print(avg)

        if avg - std > threshold and avg - std > best_score:
            best_score = avg
            best_match = name

    return best_match, best_score

def process_stream(url, model, database_name = 'data.pkl', threshold=0.7, mode='test', user='user', device='cpu'):
    cap = cv2.VideoCapture(url)
    database = {user: []}

    try:
        if mode == 'test':
            with open(database_name, 'rb') as f:
                database = pickle.load(f)
    except Exception as e:
        print('error opening database')
        return

    if not cap.isOpened():
        print("cannot open video stream")
        return

    recognized = set()

    training = False
    while True:
        ret, frame = cap.read()

        if not ret:
            print("failed to grab frame")
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=5, minSize=(128, 128))

        for (x, y, w, h) in faces:
            face_img = frame[y:y+h, x:x+w]
            
            try:
                face_tensor = preprocess_face(face_img).to(device)

                if mode == 'train':
                    color = (0, 0, 255)
                    if training:
                        color = (0, 165, 220)
                        with torch.no_grad():
                            embedding = model(face_tensor)
                            database[user].append(embedding)
                    cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
                    break

                name, confidence = recognize_face(face_tensor, database, model, threshold, device=device)

                color = (0, 0, 255) if not name else (0, 255, 0)
                cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
                
                if name:
                    recognized.add(name)
                    cv2.rectangle(frame, (x-12, y+h - int((h*confidence))), (x-6, y+h), color, -1)
                    cv2.rectangle(frame, (x-13, y+h), (x-5, y), (255, 255, 255))
                    cv2.putText(frame, name, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
                    
            except Exception as e:
                print(f"error processing face: {e}")
                continue

        for i, name in enumerate(recognized):
            cv2.putText(frame, name, (30, i * 20 + 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (220, 220, 0), 2)
        
        cv2.imshow('Registering Face' if mode == 'train' else 'Recognition Mode', frame)

        pressed_key = cv2.waitKey(1) & 0xFF
        if pressed_key == ord('s'):
            training = not training and mode == 'train'
        if pressed_key == ord('q'):
            break


    if os.path.exists(database_name):
        with open(database_name, 'rb') as f:
            old = pickle.load(f)
            
        merged = old.copy()
        for k in database:
            if k in merged:
                merged[k] += database[k]
            else:
                merged[k] = database[k]

    else:
        merged = database
    
    if mode == 'train':
        with open(database_name, 'wb') as f:
            pickle.dump(merged, f)

    cap.release()
    cv2.destroyAllWindows()

In [None]:
url = 'http://192.168.1.102:4747/video'
process_stream(url, model, database_name='data.pkl', threshold=0.7, mode='test', user='john', device='cpu')