In [2]:
import cv2
import numpy as np
import torch
import os
from ultralytics import YOLO
from torchvision import models, transforms
from PIL import Image
import insightface

print("Libraries imported successfully.")

Libraries imported successfully.


In [None]:
# --- CONFIGURATION ---
VIDEO_INPUT_PATH = "assets/input_test.mp4" # Replace with your video path
VIDEO_OUTPUT_PATH = "assets/output_test_demo.mp4"

# Model paths (ensure files exist)
YOLO_MODEL_PATH = "models/yolov8_best.pt"
EMOTION_MODEL_PATH = "models/best_resnet18_sgd.ckpt"

# Device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [None]:
print("Loading models...")

# 1. Load YOLOv8 (Behavior)
try:
    behavior_model = YOLO(YOLO_MODEL_PATH)
    print("✅ YOLOv8 Behavior model loaded.")
except Exception as e:
    print(f"❌ Error loading YOLO: {e}")
    behavior_model = None

# 2. Load InsightFace (Face Detection)
try:
    # providers=['CUDAExecutionProvider'] if using GPU and onnxruntime-gpu is installed
    # if error, switch to ['CPUExecutionProvider']
    identity_model = insightface.app.FaceAnalysis(providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])
    identity_model.prepare(ctx_id=0 if str(device) == 'cuda' else -1, det_size=(640, 640))
    print("✅ InsightFace loaded.")
except Exception as e:
    print(f"❌ Error loading InsightFace: {e}")
    identity_model = None

# 3. Load ResNet18 (Emotion)
emotion_classes = ["angry", "disgust", "fear", "happy", "neutral", "sad", "surprise"]
try:
    emotion_model = models.resnet18(weights=None)
    emotion_model.fc = torch.nn.Linear(emotion_model.fc.in_features, 7)
    
    ckpt = torch.load(EMOTION_MODEL_PATH, map_location=device)
    # Handle if checkpoint is a dict or direct state_dict
    state_dict = ckpt.get("model", ckpt) 
    emotion_model.load_state_dict(state_dict)
    
    emotion_model.to(device)
    emotion_model.eval()
    
    # Transform for ResNet
    emotion_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    print("✅ ResNet18 Emotion model loaded.")
except Exception as e:
    print(f"❌ Error loading Emotion model: {e}")
    emotion_model = None

Loading models...
✅ YOLOv8 Behavior model loaded.




Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\namlh/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\namlh/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\namlh/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\namlh/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\namlh/.insightface\models\buffalo_l\w600k_r50.onnx recognition ['None', 3, 112, 112] 127.

In [None]:
def predict_emotion(face_img_bgr):
    if emotion_model is None:
        return "N/A"
    try:
        # Convert BGR (OpenCV) -> RGB (PIL)
        img_rgb = cv2.cvtColor(face_img_bgr, cv2.COLOR_BGR2RGB)
        pil_img = Image.fromarray(img_rgb)
        
        # Transform & Inference
        input_tensor = emotion_transform(pil_img).unsqueeze(0).to(device)
        with torch.no_grad():
            outputs = emotion_model(input_tensor)
            _, predicted = torch.max(outputs, 1)
            idx = predicted.item()
            return emotion_classes[idx]
    except Exception as e:
        print(f"Emotion error: {e}")
        return "error"

In [None]:
# Open video
cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
if not cap.isOpened():
    print(f"Cannot open video: {VIDEO_INPUT_PATH}")
else:
    # Get video properties for writer
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    
    # Video Writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(VIDEO_OUTPUT_PATH, fourcc, fps, (width, height))
    
    frame_count = 0
    print(f"Processing video... ({width}x{height} @ {fps}fps)")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
            
        # Copy frame for drawing
        vis_frame = frame.copy()
        
        # --- STEP 1: DETECT BEHAVIOR (YOLO) ---
        if behavior_model:
            results = behavior_model(frame, verbose=False, conf=0.4) # conf threshold 0.4
            
            for result in results:
                for box in result.boxes:
                    # Behavior Box coordinates (Global)
                    bx1, by1, bx2, by2 = box.xyxy[0].cpu().numpy().astype(int)
                    label = behavior_model.names[int(box.cls[0])]
                    
                    # Draw Behavior box (Green)
                    cv2.rectangle(vis_frame, (bx1, by1), (bx2, by2), (0, 255, 0), 2)
                    cv2.putText(vis_frame, f"Behavior: {label}", (bx1, by1 - 10), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                    
                    # --- STEP 2: CROP & DETECT FACE ---
                    # Safe crop (avoid border errors)
                    h_img, w_img = frame.shape[:2]
                    bx1, by1 = max(0, bx1), max(0, by1)
                    bx2, by2 = min(w_img, bx2), min(h_img, by2)
                    
                    if bx2 <= bx1 or by2 <= by1: continue
                    
                    behavior_crop = frame[by1:by2, bx1:bx2]
                    
                    if identity_model:
                        faces = identity_model.get(behavior_crop)
                        
                        for face in faces:
                            # Face coordinates (Local in behavior_crop)
                            fx1, fy1, fx2, fy2 = face.bbox.astype(int)
                            
                            # --- STEP 3: EMOTION ---
                            # Crop face for Emotion Model
                            # Clamp local coordinates
                            fh, fw = behavior_crop.shape[:2]
                            fx1_c, fy1_c = max(0, fx1), max(0, fy1)
                            fx2_c, fy2_c = min(fw, fx2), min(fh, fy2)
                            
                            emotion_label = "unknown"
                            if fx2_c > fx1_c and fy2_c > fy1_c:
                                face_img = behavior_crop[fy1_c:fy2_c, fx1_c:fx2_c]
                                emotion_label = predict_emotion(face_img)
                            
                            # --- STEP 4: DRAW RESULTS (GLOBAL COORDINATES) ---
                            # Convert: Global = Behavior_Box + Local_Face
                            g_fx1, g_fy1 = bx1 + fx1, by1 + fy1
                            g_fx2, g_fy2 = bx1 + fx2, by1 + fy2
                            
                            # Draw Face box (Red)
                            cv2.rectangle(vis_frame, (g_fx1, g_fy1), (g_fx2, g_fy2), (0, 0, 255), 2)
                            
                            # Display info: Emotion - Unknown Student
                            info_text = f"{emotion_label} | Student: Unknown"
                            cv2.putText(vis_frame, info_text, (g_fx1, g_fy2 + 20), 
                                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)

        # Write frame to output video
        out.write(vis_frame)
        
        frame_count += 1
        if frame_count % 30 == 0:
            print(f"Processed {frame_count} frames...")

    # Release resources
    cap.release()
    out.release()
    # Safe destroy windows (headless environments may lack GUI support)
    try:
        cv2.destroyAllWindows()
    except Exception as e:
        print(f"Skipping cv2.destroyAllWindows(): {e}")
    print(f"✅ DONE! Video saved to: {VIDEO_OUTPUT_PATH}")

Processing video... (1280x720 @ 24.0fps)
Processed 30 frames...
Processed 60 frames...
Processed 90 frames...
Processed 120 frames...
Processed 150 frames...
Processed 180 frames...


error: OpenCV(4.11.0) D:\a\opencv-python\opencv-python\opencv\modules\highgui\src\window.cpp:1295: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvDestroyAllWindows'
