In [6]:
!pip uninstall -y moviepy fer facenet-pytorch -q
!pip install moviepy==1.0.1 fer==22.4.0 decord==0.6.0 facenet-pytorch==2.5.3 transformers==4.42.3 torch torchvision torchaudio tqdm pandas --quiet

In [7]:
import torch, cv2, numpy as np, pandas as pd, os
from tqdm import tqdm
from fer import FER
from facenet_pytorch import MTCNN, InceptionResnetV1
from transformers import XCLIPProcessor, XCLIPModel
from decord import VideoReader, cpu
from google.colab import files

device = "cuda" if torch.cuda.is_available() else "cpu"

print("üîπ Loading models ...")
face_detector = MTCNN(keep_all=True, device=device)
facenet = InceptionResnetV1(pretrained='vggface2').eval().to(device)
emotion_detector = FER(mtcnn=True)
action_processor = XCLIPProcessor.from_pretrained("microsoft/xclip-base-patch32")
action_model = XCLIPModel.from_pretrained("microsoft/xclip-base-patch32").to(device)
print("‚úÖ All models loaded successfully!")


ValueError: numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject

In [None]:
print("üì§ Please upload one or more .mp4 videos...")
uploaded = files.upload()

video_dir = "videos"
os.makedirs(video_dir, exist_ok=True)
for name, data in uploaded.items():
    with open(os.path.join(video_dir, name), "wb") as f:
        f.write(data)

print(f"‚úÖ Uploaded {len(uploaded)} videos to '{video_dir}'")


In [None]:
def get_face_embeddings(frame):
    boxes, _ = face_detector.detect(frame)
    faces = []
    embeddings = []
    if boxes is not None:
        for box in boxes:
            x1, y1, x2, y2 = [int(b) for b in box]
            face = frame[y1:y2, x1:x2]
            if face.size == 0:
                continue
            face_resized = cv2.resize(face, (160, 160))
            face_rgb = cv2.cvtColor(face_resized, cv2.COLOR_BGR2RGB)
            face_tensor = torch.tensor(face_rgb / 255.0).permute(2, 0, 1).unsqueeze(0).float().to(device)
            with torch.no_grad():
                emb = facenet(face_tensor).cpu().numpy().flatten()
            faces.append(face)
            embeddings.append(emb)
    return faces, embeddings


def get_top_actions(video_path, top_k=3):
    try:
        actions = ["a person dancing", "a person speaking", "a person sitting", "a person walking", "a person smiling"]
        inputs = action_processor(text=actions, videos=video_path, return_tensors="pt").to(device)
        with torch.no_grad():
            logits = action_model(**inputs).logits_per_video.softmax(dim=1)[0].cpu().numpy()
        top_idx = logits.argsort()[-top_k:][::-1]
        return [f"{actions[i]}:{logits[i]:.2f}" for i in top_idx]
    except Exception as e:
        return [f"‚ö†Ô∏è Action error: {str(e)}"]


def get_emotion(frame):
    try:
        result = emotion_detector.detect_emotions(frame)
        if not result: return "No face"
        emotions = result[0]['emotions']
        sorted_emo = sorted(emotions.items(), key=lambda x: x[1], reverse=True)
        top3 = [f"{e}:{v:.2f}" for e, v in sorted_emo[:3]]
        return ", ".join(top3)
    except:
        return "Error"


In [None]:
from sklearn.cluster import AgglomerativeClustering

all_faces = []
all_embeddings = []
all_records = []

for video_name in os.listdir(video_dir):
    if not video_name.endswith(".mp4"): continue
    path = os.path.join(video_dir, video_name)
    vr = VideoReader(path, ctx=cpu(0))
    frame_count = len(vr)
    fps = vr.get_avg_fps()
    chunk_size = int(fps * 5)
    print(f"‚ñ∂Ô∏è Processing {video_name}: {frame_count} frames")

    for i in tqdm(range(0, frame_count, chunk_size)):
        frame = vr[i].asnumpy()
        faces, embeds = get_face_embeddings(frame)
        emo = get_emotion(frame)
        temp_video = f"/tmp/clip_{video_name}_{i}.mp4"

        # Create short clip for action recognition
        out = cv2.VideoWriter(temp_video, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame.shape[1], frame.shape[0]))
        for _ in range(int(fps)):
            out.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        out.release()
        actions = get_top_actions(temp_video)

        for face, emb in zip(faces, embeds):
            face_id = f"{video_name}_f{i}"
            face_path = f"/tmp/{face_id}.jpg"
            cv2.imwrite(face_path, cv2.cvtColor(face, cv2.COLOR_RGB2BGR))
            all_faces.append(face_path)
            all_embeddings.append(emb)
            all_records.append({
                "Video": video_name,
                "Frame": i,
                "Actions": ", ".join(actions),
                "Emotions": emo,
                "FacePath": face_path
            })

print("‚úÖ All faces, actions & emotions extracted!")

# Cluster faces
if len(all_embeddings) > 1:
    cluster = AgglomerativeClustering(n_clusters=None, metric="euclidean", linkage="ward", distance_threshold=1.0)
    labels = cluster.fit_predict(np.array(all_embeddings))
else:
    labels = np.zeros(len(all_embeddings))

for i, rec in enumerate(all_records):
    rec["PersonID"] = int(labels[i])

df = pd.DataFrame(all_records)
print(f"\n‚úÖ Processed {len(df)} faces, {len(set(labels))} unique persons detected.")
df.head()


In [None]:
from IPython.display import Image, display

print(f"Total faces: {len(df)}, Unique persons: {df['PersonID'].nunique()}\n")
for pid in sorted(df['PersonID'].unique()):
    print(f"üßç Person {pid}")
    sample_faces = df[df["PersonID"] == pid]["FacePath"].head(3)
    for path in sample_faces:
        display(Image(path))
    print()


In [None]:
person_id = int(input("Enter Person ID to view details: "))
person_df = df[df["PersonID"] == person_id]

if person_df.empty:
    print("‚ùå No such person found.")
else:
    print(f"\nüé¨ Person {person_id} appeared in {person_df['Video'].nunique()} videos.")
    display(person_df[["Video", "Frame", "Actions", "Emotions"]])

    print("\nüñºÔ∏è Sample Faces:")
    for img in person_df["FacePath"].head(5):
        display(Image(img))
