In [8]:
# Cell 1: Imports & Configuration

import os
import sys
import cv2
import json
import torch
import numpy as np
import pandas as pd
from collections import Counter
from deepface import DeepFace
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import InputLayer
from tensorflow.keras.mixed_precision import Policy as DTypePolicy
import albumentations as A
import joblib

# Append YOLO-FaceV2 code to path
sys.path.append("/home/ayombalima/YOLO-FaceV2-master")
from models.experimental import attempt_load
from utils.general       import non_max_suppression, scale_coords
from utils.torch_utils   import select_device

# ─── Paths ───────────────────────────────────────────────────────────
VIDEO_PATH         = "/home/ayombalima/video_uploads/test_video.mp4"
YOLO_WEIGHTS       = "/home/ayombalima/YOLO-FaceV2-master/yolov5s_v2.pt"
ML_MODEL_PATH      = "/home/ayombalima/ml_models/student_recognition_model.h5"
SCALER_PATH        = "/home/ayombalima/ml_models/scaler_v3.pkl"
LABEL_ENCODER_PATH = "/home/ayombalima/ml_models/label_encoder_v2.pkl"
CLUSTER_JSON       = "/home/ayombalima/ml_models/final_clustered_results.json"

# ─── Working directories ─────────────────────────────────────────────
EXTRACTED_DIR = "extracted_frames"
RESIZED_DIR   = "resized_frames"
DETECTED_DIR  = "detected_faces"
MATCHED_DIR   = "matched_faces"
for d in (EXTRACTED_DIR, RESIZED_DIR, DETECTED_DIR, MATCHED_DIR):
    os.makedirs(d, exist_ok=True)

# ─── Load models ──────────────────────────────────────────────────────
device      = select_device("cpu")
yolo_model  = attempt_load(YOLO_WEIGHTS, map_location=device).eval()

class CustomInputLayer(InputLayer):
    def __init__(self, *args, **kwargs):
        bs = kwargs.pop("batch_shape", None)
        if bs is not None:
            kwargs["batch_input_shape"] = bs
        super().__init__(*args, **kwargs)
        

ml_model      = load_model(
    ML_MODEL_PATH,
    compile=False,
    safe_mode=False,
    custom_objects={"InputLayer": CustomInputLayer, "DTypePolicy": DTypePolicy}
)
scaler        = joblib.load(SCALER_PATH)
label_encoder = joblib.load(LABEL_ENCODER_PATH)

with open(CLUSTER_JSON) as f:
    clusters_map = json.load(f).get("clusters", {})

# ─── Augmentation pipeline ───────────────────────────────────────────
aug_pipeline = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.Rotate(limit=15, p=0.6),
    A.RandomBrightnessContrast(p=0.6),
    A.GaussNoise(p=0.2),
    A.HueSaturationValue(p=0.3),
    A.RandomShadow(p=0.2),
])


Fusing layers... 


In [9]:
# Cell 2: Frame extraction

def extract_and_resize(video_path: str, interval: int = 30):
    """
    Extract every `interval`th frame from `video_path`, rotate it,
    save raw in EXTRACTED_DIR and resized (640×640) in RESIZED_DIR.
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open video: {video_path}")
    fid, count = 0, 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if fid % interval == 0:
            name = f"frame{fid}.jpg"
            raw  = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
            cv2.imwrite(os.path.join(EXTRACTED_DIR, name), raw)
            small = cv2.resize(raw, (640, 640))
            cv2.imwrite(os.path.join(RESIZED_DIR,   name), small)
            count += 1
        fid += 1
    cap.release()
    print(f"[extract] Extracted {count} frames.")


In [10]:
# Cell 3: ML pipeline: clustering embeddings & classification

def detect_faces(img, conf_thresh=0.25, iou_thresh=0.45):
    """
    Run YOLO on `img`, return list of face crops.
    """
    tensor = (torch.from_numpy(img)
              .permute(2,0,1).float().div(255.0)
              .unsqueeze(0).to(device))
    with torch.no_grad():
        det = non_max_suppression(yolo_model(tensor)[0],
                                  conf_thresh, iou_thresh)[0]
    if det is None:
        return []
    det[:, :4] = scale_coords(tensor.shape[2:], det[:, :4], img.shape).round()
    crops = []
    for *box, _, _ in det:
        x1,y1,x2,y2 = map(int, box)
        crop = img[y1:y2, x1:x2]
        if crop.size:
            crops.append(crop)
    return crops

In [11]:
def run_ml_notebook_pipeline(video_path: str):
    """
    1) extract frames
    2) detect & augment faces
    3) get embeddings
    4) cluster embeddings by cosine>0.85
    5) classify one rep embedding per cluster
    6) return results list
    """
    extract_and_resize(video_path)
    instances = []  # each is (embedding, crop)

    # a) detect & embed
    for fn in sorted(os.listdir(RESIZED_DIR)):
        img = cv2.imread(os.path.join(RESIZED_DIR, fn))
        for crop in detect_faces(img):
            aug = aug_pipeline(image=crop)["image"]
            face160 = cv2.resize(aug, (160, 160))
            emb = DeepFace.represent(
                cv2.cvtColor(face160, cv2.COLOR_BGR2RGB),
                model_name="Facenet",
                enforce_detection=False
            )[0]["embedding"]
            instances.append((emb, crop))

    print(f"[ml] Total crops → embeddings: {len(instances)}")

    if not instances:
        return []

    # b) cluster by cosine similarity
    clusters = []
    SIM_THRESH = 0.85

    for emb, crop in instances:
        matched = False
        for cl in clusters:
            ref = cl["embs"][0]
            cos = np.dot(emb, ref)/(np.linalg.norm(emb)*np.linalg.norm(ref))
            if cos >= SIM_THRESH:
                cl["embs"].append(emb)
                cl["crops"].append(crop)
                matched = True
                break
        if not matched:
            clusters.append({"embs": [emb], "crops": [crop]})

    print(f"[ml] Formed {len(clusters)} unique face clusters")

    # c) classify rep embedding per cluster
    results = []
    for idx, cl in enumerate(clusters):
        rep_emb = np.mean(cl["embs"], axis=0)
        scaled  = scaler.transform([rep_emb])
        probs   = ml_model.predict(scaled, verbose=0)[0]
        cid     = int(np.argmax(probs))
        sid     = label_encoder.inverse_transform([cid])[0]
        conf    = float(np.max(probs))

        results.append({
            "cluster":    idx,
            "student_id": sid,
            "confidence": conf,
            "crop":       cl["crops"][0]
        })

    return results


In [12]:
# Cell 4: Run pipeline, tabulate & show crops

results = run_ml_notebook_pipeline(VIDEO_PATH)

# 1) DataFrame summary
df = pd.DataFrame([
    {"Cluster": r["cluster"],
     "Student": r["student_id"],
     "Confidence": r["confidence"]}
    for r in results
])
df

# 2) Plot each crop with its label
import matplotlib.pyplot as plt

for r in results:
    plt.figure(figsize=(2,2))
    plt.imshow(cv2.cvtColor(r["crop"], cv2.COLOR_BGR2RGB))
    plt.title(f"{r['student_id']} ({r['confidence']:.2f})")
    plt.axis("off")
    plt.show()


[extract] Extracted 12 frames.
[ml] Total crops → embeddings: 10
[ml] Formed 2 unique face clusters


In [13]:
# Cell 5: Display all cropped faces for manual verification

import glob
import matplotlib.pyplot as plt

# Find every crop saved during detection & augmentation
crop_paths = sorted(glob.glob(os.path.join(DETECTED_DIR, "*.jpg")))

print(f"Displaying {len(crop_paths)} cropped faces...\n")
for path in crop_paths:
    img = cv2.imread(path)
    # convert BGR→RGB for matplotlib
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.figure(figsize=(2,2))
    plt.imshow(img)
    plt.title(os.path.basename(path), fontsize=8)
    plt.axis("off")
    plt.show()


Displaying 0 cropped faces...

