In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms, datasets
from torch.utils.data import DataLoader
import math

In [2]:
class ArcFace(nn.Module):
    def __init__(self, in_features, out_features, s=30.0, m=0.50, easy_margin=False):
        super(ArcFace, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)

        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, input, label):
        cosine = F.linear(F.normalize(input), F.normalize(self.weight))
        sine = torch.sqrt(1.0 - torch.clamp(cosine**2, 0, 1))
        phi = cosine * self.cos_m - sine * self.sin_m

        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)

        one_hot = torch.zeros_like(cosine)
        one_hot.scatter_(1, label.view(-1,1), 1.0)

        output = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        output *= self.s
        return output


In [3]:
class ResNetArcModel(nn.Module):
    def __init__(self, num_classes, backbone="resnet50", embedding_size=512):
        super(ResNetArcModel, self).__init__()
        resnet = getattr(models, backbone)(pretrained=True)
        in_features = resnet.fc.in_features
        resnet.fc = nn.Identity()

        self.backbone = resnet
        self.embedding = nn.Linear(in_features, embedding_size)
        self.arcface = ArcFace(embedding_size, num_classes)

    def forward(self, x, labels=None):
        x = self.backbone(x)
        x = self.embedding(x)
        if labels is not None:
            logits = self.arcface(x, labels)
            return logits
        return x


In [6]:
transform = transforms.Compose([
    transforms.Resize((112,112)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

train_dataset = datasets.ImageFolder(
    root="../data/preprocessed/train",
    transform=transform
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)

num_classes = len(train_dataset.classes)
print("Clases:", train_dataset.classes)


Clases: ['Abir Ahmed', 'Adriana Sanchez', 'Adriana Solanilla', 'Alejandro Tulipano', 'Amy Olivares', 'Blas de Leon', 'Carlos Beitia', 'Carlos Hernandez', 'Cesar Rodriguez', 'Javier Bustamante', 'Jeremy Sanchez', 'Jonathan Peralta', 'Kevin Rodriguez', 'Mahir Arcia', 'Michael Jordan']


In [None]:
model = ResNetArcModel(num_classes=num_classes, embedding_size=512).cuda()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
from tqdm import tqdm

num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    pbar = tqdm(train_loader)
    
    for images, labels in pbar:
        images = images.cuda()
        labels = labels.cuda()

        logits = model(images, labels)
        loss = criterion(logits, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        avg_loss = running_loss / len(train_loader)
        pbar.set_description(f"Epoch {epoch+1}/{num_epochs} Loss: {avg_loss:.4f}")

# Save the whole model
torch.save(model.state_dict(), "../models/arcface_model_10.pth")
print("✅ Modelo completo guardado.")

# Save only backbone+embedding (without ArcFace head)
torch.save(
    {
        "backbone": model.backbone.state_dict(),
        "embedding": model.embedding.state_dict()
    },
    "../models/arcface_backbone.pth"
)
print("✅ Backbone+embedding guardado.")

Epoch 1/10 Loss: 0.0607: 100%|██████████| 75/75 [00:05<00:00, 13.45it/s]
Epoch 2/10 Loss: 0.0610: 100%|██████████| 75/75 [00:05<00:00, 13.35it/s]
Epoch 3/10 Loss: 0.0437: 100%|██████████| 75/75 [00:05<00:00, 13.36it/s]
Epoch 4/10 Loss: 0.0960: 100%|██████████| 75/75 [00:05<00:00, 13.31it/s]
Epoch 5/10 Loss: 0.0772: 100%|██████████| 75/75 [00:05<00:00, 13.29it/s]
Epoch 6/10 Loss: 0.0493: 100%|██████████| 75/75 [00:05<00:00, 13.19it/s]
Epoch 7/10 Loss: 0.0691: 100%|██████████| 75/75 [00:05<00:00, 13.25it/s]
Epoch 8/10 Loss: 0.0644: 100%|██████████| 75/75 [00:05<00:00, 13.22it/s]
Epoch 9/10 Loss: 0.0773: 100%|██████████| 75/75 [00:05<00:00, 13.16it/s]
Epoch 10/10 Loss: 0.0365: 100%|██████████| 75/75 [00:05<00:00, 13.20it/s]


✅ Modelo completo guardado.
✅ Backbone+embedding guardado.


In [None]:
import os
import torch
import torch.nn.functional as F
from torchvision import models, transforms
from PIL import Image
import numpy as np
from tqdm import tqdm

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
DATASET_DIR = "../data/preprocessed/test"

# Cargar backbone
backbone = models.resnet50(pretrained=False)
in_features = backbone.fc.in_features
backbone.fc = nn.Identity()
checkpoint = torch.load("../models/arcface_backbone.pth")
backbone.load_state_dict(checkpoint["backbone"])

embedding_layer = torch.nn.Linear(in_features, 512)
embedding_layer.load_state_dict(checkpoint["embedding"])

backbone = backbone.to(DEVICE).eval()
embedding_layer = embedding_layer.to(DEVICE).eval()

transform = transforms.Compose([
    transforms.Resize((112,112)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

class_embeddings = {}

for class_name in os.listdir(DATASET_DIR):
    class_dir = os.path.join(DATASET_DIR, class_name)
    if not os.path.isdir(class_dir):
        continue

    embeddings = []
    image_files = [f for f in os.listdir(class_dir) if f.lower().endswith((".jpg", ".png"))]

    print(f"Procesando {class_name}...")

    for img_file in tqdm(image_files):
        img_path = os.path.join(class_dir, img_file)
        img = Image.open(img_path).convert("RGB")
        x = transform(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            features = backbone(x)
            emb = embedding_layer(features)
            emb = F.normalize(emb, dim=1)
        embeddings.append(emb.squeeze(0).cpu())

    mean_emb = torch.stack(embeddings).mean(0)
    mean_emb = F.normalize(mean_emb, dim=0)
    class_embeddings[class_name] = mean_emb.numpy()

np.save("../models/gallery_embeddings.npy", class_embeddings)
print("✅ Galería guardada en ../models/gallery_embeddings.npy")



Procesando Abir Ahmed...


100%|██████████| 40/40 [00:00<00:00, 241.75it/s]


Procesando Adriana Sanchez...


100%|██████████| 40/40 [00:00<00:00, 258.39it/s]


Procesando Adriana Solanilla...


100%|██████████| 40/40 [00:00<00:00, 272.39it/s]


Procesando Alejandro Tulipano...


100%|██████████| 40/40 [00:00<00:00, 267.51it/s]


Procesando Amy Olivares...


100%|██████████| 40/40 [00:00<00:00, 268.06it/s]


Procesando Blas de Leon...


100%|██████████| 40/40 [00:00<00:00, 233.31it/s]


Procesando Carlos Beitia...


100%|██████████| 40/40 [00:00<00:00, 258.95it/s]


Procesando Carlos Hernandez...


100%|██████████| 39/39 [00:00<00:00, 258.82it/s]


Procesando Cesar Rodriguez...


100%|██████████| 40/40 [00:00<00:00, 152.42it/s]


Procesando Javier Bustamante...


100%|██████████| 40/40 [00:00<00:00, 136.33it/s]


Procesando Jeremy Sanchez...


100%|██████████| 40/40 [00:00<00:00, 144.35it/s]


Procesando Jonathan Peralta...


100%|██████████| 40/40 [00:00<00:00, 141.56it/s]


Procesando Kevin Rodriguez...


100%|██████████| 40/40 [00:00<00:00, 143.21it/s]


Procesando Mahir Arcia...


100%|██████████| 40/40 [00:00<00:00, 147.52it/s]


Procesando Michael Jordan...


100%|██████████| 40/40 [00:00<00:00, 144.00it/s]

✅ Galería guardada en ../models/gallery_embeddings.npy





## INFERENCIA

In [None]:
import cv2
import torch
import numpy as np
import mediapipe as mp
from torchvision import transforms
from torchvision import models
from torch.nn.functional import normalize

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Cargar backbone
backbone = models.resnet50(pretrained=False)
in_features = backbone.fc.in_features
backbone.fc = nn.Identity()
checkpoint = torch.load("../models/arcface_backbone.pth")
backbone.load_state_dict(checkpoint["backbone"])

embedding_layer = torch.nn.Linear(in_features, 512)
embedding_layer.load_state_dict(checkpoint["embedding"])

backbone = backbone.to(DEVICE).eval()
embedding_layer = embedding_layer.to(DEVICE).eval()

# Transform
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((112,112)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3)
])

# Galería de embeddings
reference_db = np.load("../models/gallery_embeddings.npy", allow_pickle=True).item()

# Mediapipe detection
mp_face_detection = mp.solutions.face_detection
face_detector = mp_face_detection.FaceDetection(model_selection=1, min_detection_confidence=0.9)

def get_embedding(face_img):
    face_tensor = transform(face_img).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        features = backbone(face_tensor)
        emb = embedding_layer(features)
        emb = normalize(emb, dim=1)
    return emb.squeeze(0).cpu().numpy()

def recognize_face(embedding, threshold=0.91):
    best_match = None
    best_score = -1
    for name, ref_emb in reference_db.items():
        score = np.dot(embedding, ref_emb)
        if score > best_score:
            best_score = score
            best_match = name
    if best_score >= threshold:
        return best_match
    return "Desconocido"

# Cambiar aquí
# cap = cv2.VideoCapture(0)  # Webcam
cap = cv2.VideoCapture("../data/crudo/Abir1.mp4")  # Video

while True:
    ret, frame = cap.read()
    if not ret:
        break

    rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = face_detector.process(rgb)

    if results.detections:
        for det in results.detections:
            bbox = det.location_data.relative_bounding_box
            ih, iw, _ = frame.shape
            x1 = max(int(bbox.xmin * iw), 0)
            y1 = max(int(bbox.ymin * ih), 0)
            w = int(bbox.width * iw)
            h = int(bbox.height * ih)
            x2 = min(x1 + w, iw)
            y2 = min(y1 + h, ih)

            face_img = frame[y1:y2, x1:x2]
            if face_img.size == 0:
                continue

            # Reconocimiento
            emb = get_embedding(face_img)
            name = recognize_face(emb)

            # Bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)
            cv2.putText(frame, name, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)

            # Dibujar keypoints
            for kp in det.location_data.relative_keypoints:
                kp_x = int(kp.x * iw)
                kp_y = int(kp.y * ih)
                cv2.circle(frame, (kp_x, kp_y), 2, (0, 250, 0), -1)

    cv2.imshow("Recognition", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

W0000 00:00:1751394116.636541   17807 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
