In [5]:
import cv2
import torch
import numpy as np
import dlib
import torch.nn.functional as F
from collections import defaultdict, deque
from transformers import ViTForImageClassification
import torchvision.transforms as transforms
import subprocess

  _torch_pytree._register_pytree_node(


In [6]:
# === SETUP ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
datFile = r"E:\UET\HMI\Pj\shape_predictor_68_face_landmarks.dat"
face_rec_model_path = r"E:\UET\HMI\Pj\dlib_face_recognition_resnet_model_v1.dat"
video_path = r"E:\UET\HMI\Pj\Trich doan 3 (ly truong - me mo).mp4"

face_detector = dlib.get_frontal_face_detector()
landmark_predictor = dlib.shape_predictor(datFile)
face_encoder = dlib.face_recognition_model_v1(face_rec_model_path)

In [7]:
emotion_labels = ['Genuine Disgust', 'Posed Disgust', 'Genuine Happiness', 'Posed Happiness',
            'Genuine Fear', 'Posed Fear', 'Genuine Anger', 'Posed Anger',
            'Genuine Surprise', 'Posed Surprise', 'Genuine Sadness', 'Posed Sadness']
num_classes = len(emotion_labels)

In [8]:
model = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224")
model.classifier = torch.nn.Linear(in_features=768, out_features=num_classes, bias=True)
model.to(device)
model.load_state_dict(torch.load(r"E:\UET\HMI\Pj\vit_model.pth", map_location=device))
model.eval()



ViTForImageClassification(
  (vit): ViTModel(
    (embeddings): ViTEmbeddings(
      (patch_embeddings): ViTPatchEmbeddings(
        (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): ViTEncoder(
      (layer): ModuleList(
        (0-11): 12 x ViTLayer(
          (attention): ViTAttention(
            (attention): ViTSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): ViTSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): ViTIntermediate(
            (dense): Linear(in_features=7

In [9]:
num_classes = 12
IMG_SIZE = 224
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

In [10]:
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

In [None]:
# === FUNCTIONS ===
def extract_faces(image):
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    faces, scores, _ = face_detector.run(gray, 1, 0)
    results = []
    for face in faces:
        if face.width() < 20 or face.height() < 20:
            continue
        try:
            landmarks = landmark_predictor(gray, face)
            face_chip = dlib.get_face_chip(image, landmarks)
            embedding = np.array(face_encoder.compute_face_descriptor(face_chip))
            x_min, y_min, x_max, y_max = face.left(), face.top(), face.right(), face.bottom()
            face_crop = image[y_min:y_max, x_min:x_max]
            resized = cv2.resize(face_crop, (224, 224))
            results.append((resized, (x_min, y_min, x_max, y_max), embedding))
        except:
            continue
    return results

def calculate_optical_flow(prev_frame, next_frame):
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    next_gray = cv2.cvtColor(next_frame, cv2.COLOR_BGR2GRAY)
    flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray, None,
                                        0.5, 3, 15, 3, 5, 1.2, 0)
    return flow

def calculate_motion_intensity(flow):
    mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    return np.sum(mag)

def cosine_similarity(vec1, vec2):
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))

In [None]:
# === PROCESSING ===
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_id = 0
person_id_counter = 0

active_tracks = {}
all_segments = []

# Helper functions
def compute_iou(box1, box2):
    xA = max(box1[0], box2[0])
    yA = max(box1[1], box2[1])
    xB = min(box1[2], box2[2])
    yB = min(box1[3], box2[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = (box1[2] - box1[0]) * (box1[3] - box1[1])
    boxBArea = (box2[2] - box2[0]) * (box2[3] - box2[1])
    return interArea / float(boxAArea + boxBArea - interArea + 1e-5)

def center_distance(box1, box2):
    cx1 = (box1[0] + box1[2]) / 2
    cy1 = (box1[1] + box1[3]) / 2
    cx2 = (box2[0] + box2[2]) / 2
    cy2 = (box2[1] + box2[3]) / 2
    return np.sqrt((cx1 - cx2)**2 + (cy1 - cy2)**2)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_id += 1
    faces = extract_faces(frame)
    matched_ids = set()

    for face_img, box, embedding in faces:
        matched = False
        for pid in list(active_tracks):
            data = active_tracks[pid]
            avg_embedding = np.mean(data['embeddings'], axis=0) if data['embeddings'] else data['embedding']
            sim = cosine_similarity(embedding, avg_embedding)
            iou = compute_iou(box, data['last_box'])
            dist = center_distance(box, data['last_box'])

            if sim > 0.3 or iou > 0.2 or dist < 60:
                flow = calculate_optical_flow(data['last_face'], face_img) if data['last_face'] is not None else None
                intensity = calculate_motion_intensity(flow) if flow is not None else 0
                data['flow_buffer'].append((frame_id, intensity))

                buffer = data['flow_buffer']
                if not data['active']:
                    if len(buffer) >= 3 and all(buffer[i][1] < buffer[i+1][1] for i in range(-3, -1)):
                        data['active'] = True
                        data['segment'] = [(frame_id, face_img, box)]
                else:
                    data['segment'].append((frame_id, face_img, box))
                    if len(buffer) >= 4 and all(buffer[i][1] > buffer[i+1][1] for i in range(-4, -1)):
                        all_segments.append((pid, data['segment']))
                        data['segment'] = []
                        data['flow_buffer'].clear()
                        data['active'] = False

                data['last_face'] = face_img
                data['last_box'] = box
                data['embedding'] = embedding
                data['embeddings'].append(embedding)
                data['last_seen'] = frame_id
                matched_ids.add(pid)
                matched = True
                break

        if not matched:
            active_tracks[person_id_counter] = {
                'last_seen': frame_id,
                'last_box': box,
                'last_face': face_img,
                'embedding': embedding,
                'embeddings': deque([embedding], maxlen=5),
                'flow_buffer': deque(maxlen=10),
                'segment': [],
                'active': False
            }
            matched_ids.add(person_id_counter)
            person_id_counter += 1

    # Clean up stale tracks
    for pid in list(active_tracks):
        if frame_id - active_tracks[pid]['last_seen'] > 50:
            del active_tracks[pid]

cap.release()

In [18]:
# Dán nhãn và render
cap = cv2.VideoCapture(video_path)
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
out = cv2.VideoWriter("output_emotion_1.mp4",
                      cv2.VideoWriter_fourcc(*'mp4v'),
                      fps, (width, height))

frame_map = defaultdict(list)

for pid, segment in all_segments:
    if len(segment) < 2:
        continue
    onset_face = segment[0][1]
    highest = -1
    apex_idx = 0
    for idx, (_, face_img, _) in enumerate(segment):
        try:
            flow = calculate_optical_flow(onset_face, face_img)
            intensity = calculate_motion_intensity(flow)
            if intensity > highest:
                highest = intensity
                apex_idx = idx
        except:
            continue

    apex_face = segment[apex_idx][1]
    input_tensor = transform(apex_face).unsqueeze(0).to(device)
    with torch.no_grad():
        output = model(input_tensor)
        pred_idx = torch.argmax(output.logits, dim=1).item()
        emotion = emotion_labels[pred_idx]

    for frame_id, _, box in segment:
        frame_map[frame_id].append((box, emotion))

current_frame_id = 1
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    if current_frame_id in frame_map:
        for box, emotion in frame_map[current_frame_id]:
            x1, y1, x2, y2 = box
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, emotion, (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
            cv2.putText(frame, f"Person ID: {current_frame_id}",
                        (x1, y1 - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2)
    out.write(frame)
    current_frame_id += 1

cap.release()
out.release()

In [19]:
print("✅ Hoàn tất: final_output_with_audio.mp4")

✅ Hoàn tất: final_output_with_audio.mp4
