In [19]:
!fusermount -u /content/drive 2>/dev/null || true
!rm -rf /content/drive
!mkdir -p /content/drive


In [20]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


Mounted at /content/drive


In [21]:
DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
VIDEO_EXTS = {".mp4", ".mov", ".mkv", ".avi"}

video_paths = [p for p in DATA_ROOT.rglob("*") if p.is_file() and p.suffix.lower() in VIDEO_EXTS]

rows = []
for vp in sorted(video_paths):
    rows.append({
        "video_path": str(vp),
        "label_raw": vp.name,
        "label": extract_label_from_filename(vp.name)
    })

df = pd.DataFrame(rows)

print("Total videos:", len(df))

if len(df) == 0:
    print("No videos found under:", DATA_ROOT)
    print("Check whether videos are in another folder or Drive path is different.")
else:
    print("\nTop labels:\n", df["label"].value_counts().head(20))
    out_csv = DATA_ROOT / "labels.csv"
    df.to_csv(out_csv, index=False)
    print("\nSaved:", out_csv)


Total videos: 77

Top labels:
 label
mma_kick              4
pointing              3
jumping               2
jogging               2
reacting              2
push                  2
punching              2
crying                2
angry                 2
sitting               2
talking               2
running               2
standing_thumbs_up    2
shaking_hands         2
stand_up              2
sad_idle              2
leaning_on_a_wall     2
leaning               1
laughing              1
pointing_(1)          1
Name: count, dtype: int64

Saved: /content/drive/MyDrive/synthetic_videos/labels.csv


In [22]:
!ls -lah /content/drive/MyDrive/synthetic_videos | head -n 50
!find /content/drive/MyDrive/synthetic_videos -type f \( -iname "*.mp4" -o -iname "*.mov" -o -iname "*.mkv" -o -iname "*.avi" \) | wc -l


total 32M
-rw------- 1 root root 178K Sep 27 14:21 air_squat0001-0112.mp4
-rw------- 1 root root 1.1M Sep 27 15:21 Angry0001-0899.mp4
-rw------- 1 root root 1.4M Sep 27 14:12 Angry0001-1151.mp4
-rw------- 1 root root 197K Sep 27 15:22 Boxing0001-0126.mp4
-rw------- 1 root root 8.5K Sep 27 14:13 Ch08_nonPBR0001-0002.mp4
-rw------- 1 root root 203K Sep 27 15:24 Cheering0001-0088.mp4
-rw------- 1 root root  84K Sep 27 15:24 Clapping0001-0070.mp4
-rw------- 1 root root 442K Sep 27 14:16 Crying0001-0189.mp4
-rw------- 1 root root 504K Sep 27 15:30 Crying0001-0376.mp4
-rw------- 1 root root 1.6M Sep 27 15:38 dancing_twerk0001-0912.mp4
-rw------- 1 root root 226K Sep 27 16:17 edge_slip0001-0148.mp4
-rw------- 1 root root 440K Sep 27 16:11 Excited0001-0198.mp4
-rw------- 1 root root 310K Sep 27 16:20 falling_from_losing_balance0001-0401.mp4
drwx------ 5 root root 4.0K Jan 14 17:49 frames
-rw------- 1 root root 411K Sep 27 16:22 hip_hop_dancing0001-0202.mp4
-rw------- 1 root root 277K Sep 27 14

In [23]:
import re
from pathlib import Path
import pandas as pd
from sklearn.model_selection import train_test_split

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
labels_path = DATA_ROOT / "labels.csv"

df = pd.read_csv(labels_path)

def clean_label(lbl: str) -> str:
    lbl = str(lbl).lower().strip()
    lbl = lbl.replace("-", "_").replace(" ", "_")
    # remove windows copy suffix like _(1), (1), _1
    lbl = re.sub(r"\(\d+\)$", "", lbl)
    lbl = re.sub(r"_\(\d+\)$", "", lbl)
    lbl = re.sub(r"_\d+$", "", lbl)
    lbl = re.sub(r"__+", "_", lbl)
    lbl = re.sub(r"_+$", "", lbl)
    return lbl

df["label"] = df["label"].apply(clean_label)

# Show final class counts
counts = df["label"].value_counts()
print("Total videos:", len(df))
print("Total classes:", df["label"].nunique())
print("\nClass counts:\n", counts)

# If any class has only 1 sample, stratified split may fail.
min_count = counts.min()
print("\nMin samples in a class:", min_count)

# Create split
if min_count >= 2:
    # First split train vs temp
    train_df, temp_df = train_test_split(
        df, test_size=0.30, random_state=42, stratify=df["label"]
    )
    # Split temp into val and test
    val_df, test_df = train_test_split(
        temp_df, test_size=0.50, random_state=42, stratify=temp_df["label"]
    )
else:
    # Fallback: no stratify if some class has only 1 sample
    train_df, temp_df = train_test_split(df, test_size=0.30, random_state=42)
    val_df, test_df = train_test_split(temp_df, test_size=0.50, random_state=42)

df["split"] = "train"
df.loc[val_df.index, "split"] = "val"
df.loc[test_df.index, "split"] = "test"

# Save
clean_path = DATA_ROOT / "labels_clean.csv"
split_path = DATA_ROOT / "split.csv"
df.to_csv(clean_path, index=False)
df.to_csv(split_path, index=False)

print("\nSaved:", clean_path)
print("Saved:", split_path)

print("\nSplit counts:\n", df["split"].value_counts())
print("\nExample rows:\n", df.head(5))


Total videos: 77
Total classes: 54

Class counts:
 label
mma_kick                       4
pointing                       4
sitting                        3
jogging                        2
angry                          2
crying                         2
talking                        2
reacting                       2
push                           2
jumping                        2
punching                       2
running                        2
leaning_on_a_wall              2
waving                         2
shaking_hands                  2
stand_up                       2
sad_idle                       2
standing_thumbs_up             2
clapping                       1
cheering                       1
ch08_nonpbr                    1
boxing                         1
t_pose                         1
surprised                      1
remy                           1
leaning                        1
laughing                       1
excited                        1
yelling            

In [24]:
import pandas as pd
from pathlib import Path

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
df = pd.read_csv(DATA_ROOT / "split.csv")

# Map raw labels -> grouped labels
GROUP_MAP = {
    # Emotions
    "angry": "emotion",
    "crying": "emotion",
    "laughing": "emotion",
    "excited": "emotion",
    "yelling": "emotion",
    "surprised": "emotion",
    "sad_idle": "emotion",

    # Communication / social
    "talking": "social",
    "sitting_talking": "social",
    "talking_on_phone": "social",
    "shaking_hands": "social",
    "pointing": "social",
    "clapping": "social",
    "cheering": "social",
    "waving": "social",
    "waving(1_hand)": "social",
    "waving_(2_hands)": "social",

    # Locomotion
    "walking": "locomotion",
    "start_walking": "locomotion",
    "walking_backwards": "locomotion",
    "running": "locomotion",
    "jogging": "locomotion",

    # Physical / sport actions
    "punching": "physical",
    "boxing": "physical",
    "mma_kick": "physical",
    "push": "physical",
    "pulling_a_rope": "physical",
    "pull_heavy_object": "physical",

    # Pose / idle / misc
    "sitting": "pose_idle",
    "standing_thumbs_up": "pose_idle",
    "standing_w_briefcase_idle": "pose_idle",
    "stand_up": "pose_idle",
    "leaning": "pose_idle",
    "leaning_on_a_wall": "pose_idle",
    "look_around": "pose_idle",
    "nervously_look_around": "pose_idle",
    "t_pose": "pose_idle",
    "air_squat": "pose_idle",
    "falling_from_losing_balance": "pose_idle",
    "edge_slip": "pose_idle",
    "ch08_nonpbr": "pose_idle",
    "remy": "pose_idle",
    "hip_hop_dancing": "pose_idle",
    "tut_hip_hop_dance": "pose_idle",
    "dancing_twerk": "pose_idle",
    "northern_soul_spin": "pose_idle",
    "quick_formal_bow": "pose_idle",
}

def assign_group(lbl: str) -> str:
    lbl = str(lbl)
    # remove male_batch2_ prefix to map those into same group
    if lbl.startswith("male_batch2_"):
        lbl2 = lbl.replace("male_batch2_", "")
    else:
        lbl2 = lbl
    return GROUP_MAP.get(lbl2, "pose_idle")  # default

df["group_label"] = df["label"].apply(assign_group)

print("Grouped class counts:\n", df["group_label"].value_counts())
print("\nGrouped class counts by split:\n", pd.crosstab(df["split"], df["group_label"]))

out_path = DATA_ROOT / "split_grouped.csv"
df.to_csv(out_path, index=False)
print("\nSaved:", out_path)


Grouped class counts:
 group_label
pose_idle     29
social        17
physical      14
emotion       10
locomotion     7
Name: count, dtype: int64

Grouped class counts by split:
 group_label  emotion  locomotion  physical  pose_idle  social
split                                                        
test               1           4         0          3       4
train              7           3        11         21      11
val                2           0         3          5       2

Saved: /content/drive/MyDrive/synthetic_videos/split_grouped.csv


In [25]:
import pandas as pd
from pathlib import Path

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
df = pd.read_csv(DATA_ROOT / "split_grouped.csv")

# Merge locomotion into physical
df["group_label"] = df["group_label"].replace({"locomotion": "physical"})

print("New group counts:\n", df["group_label"].value_counts())
print("\nNew grouped counts by split:\n", pd.crosstab(df["split"], df["group_label"]))

out_path = DATA_ROOT / "split_grouped_final.csv"
df.to_csv(out_path, index=False)
print("\nSaved:", out_path)


New group counts:
 group_label
pose_idle    29
physical     21
social       17
emotion      10
Name: count, dtype: int64

New grouped counts by split:
 group_label  emotion  physical  pose_idle  social
split                                            
test               1         4          3       4
train              7        14         21      11
val                2         3          5       2

Saved: /content/drive/MyDrive/synthetic_videos/split_grouped_final.csv


In [18]:
import cv2
import pandas as pd
from pathlib import Path
import numpy as np

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
CSV_PATH = DATA_ROOT / "split_grouped_final.csv"

NUM_FRAMES = 16
IMG_SIZE = 224

df = pd.read_csv(CSV_PATH)

def extract_frames(video_path, out_dir, num_frames=16):
    cap = cv2.VideoCapture(str(video_path))
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    if total <= 0:
        cap.release()
        return False

    idxs = np.linspace(0, total - 1, num_frames).astype(int)
    saved = 0

    for i, idx in enumerate(idxs):
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
        ret, frame = cap.read()
        if not ret:
            continue
        frame = cv2.resize(frame, (IMG_SIZE, IMG_SIZE))
        cv2.imwrite(str(out_dir / f"frame_{i:02d}.jpg"), frame)
        saved += 1

    cap.release()
    return saved == num_frames

# Create folders and extract
for _, row in df.iterrows():
    split = row["split"]
    label = row["group_label"]
    video_path = Path(row["video_path"])

    out_dir = FRAMES_ROOT / split / label / video_path.stem
    out_dir.mkdir(parents=True, exist_ok=True)

    ok = extract_frames(video_path, out_dir, NUM_FRAMES)
    if not ok:
        print("Warning: incomplete frames for", video_path)

print("Frame extraction complete.")


Frame extraction complete.


In [26]:
!find /content/drive/MyDrive/synthetic_videos/frames -type f | wc -l


1232


In [27]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from pathlib import Path
import pandas as pd

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
CSV_PATH = DATA_ROOT / "split_grouped_final.csv"

df = pd.read_csv(CSV_PATH)

# Fixed class order (important for consistency)
CLASS_NAMES = ["emotion", "social", "physical", "pose_idle"]
class_to_idx = {c: i for i, c in enumerate(CLASS_NAMES)}
idx_to_class = {i: c for c, i in class_to_idx.items()}

print("Class mapping:", class_to_idx)

# Basic transform (keep simple for baseline)
img_tf = transforms.Compose([
    transforms.ToTensor(),  # [0,1]
])

class VideoFramesDataset(Dataset):
    def __init__(self, df, split: str):
        self.df = df[df["split"] == split].reset_index(drop=True)
        self.split = split

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        video_path = Path(row["video_path"])
        label = row["group_label"]
        y = class_to_idx[label]

        # Folder where frames were saved
        frames_dir = FRAMES_ROOT / self.split / label / video_path.stem

        # Load 16 frames in sorted order
        frame_files = sorted(frames_dir.glob("frame_*.jpg"))
        if len(frame_files) != 16:
            raise RuntimeError(f"Expected 16 frames, got {len(frame_files)} for {frames_dir}")

        frames = []
        for ff in frame_files:
            img = Image.open(ff).convert("RGB")
            frames.append(img_tf(img))

        # (T, C, H, W)
        x = torch.stack(frames, dim=0)

        return x, y

train_ds = VideoFramesDataset(df, "train")
val_ds   = VideoFramesDataset(df, "val")
test_ds  = VideoFramesDataset(df, "test")

train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)

# Quick sanity check
xb, yb = next(iter(train_loader))
print("Batch x shape:", xb.shape)   # expected: (B, 16, 3, 224, 224)
print("Batch y shape:", yb.shape)
print("Sample labels:", [idx_to_class[int(i)] for i in yb])


Class mapping: {'emotion': 0, 'social': 1, 'physical': 2, 'pose_idle': 3}
Batch x shape: torch.Size([4, 16, 3, 224, 224])
Batch y shape: torch.Size([4])
Sample labels: ['social', 'pose_idle', 'physical', 'social']


In [28]:
!ls /content/drive/MyDrive/synthetic_videos


 air_squat0001-0112.mp4
 Angry0001-0899.mp4
 Angry0001-1151.mp4
 Boxing0001-0126.mp4
 Ch08_nonPBR0001-0002.mp4
 Cheering0001-0088.mp4
 Clapping0001-0070.mp4
 Crying0001-0189.mp4
 Crying0001-0376.mp4
 dancing_twerk0001-0912.mp4
 edge_slip0001-0148.mp4
 Excited0001-0198.mp4
 falling_from_losing_balance0001-0401.mp4
 frames
 hip_hop_dancing0001-0202.mp4
 Jogging0001-0078.mp4
 Jogging0001-0154.mp4
 Jumping0001-0114.mp4
 Jumping0001-0158.mp4
 labels_clean.csv
 labels.csv
 Laughing0001-0589.mp4
 Leaning0001-0201.mp4
 leaning_on_a_wall0001-0097.mp4
 leaning_on_a_wall0001-0116.mp4
 look_around0001-0801.mp4
'male_batch2__Mma Kick0001-0267.mp4'
 male_batch2__Pointing0001-0267.mp4
 male_batch2__Punching0001-0267.mp4
 male_batch2__Push0001-0267.mp4
 male_batch2__Reacting0001-0267.mp4
 mma_kick0001-0097.mp4
'Mma Kick0001-0097.mp4'
 mma_kick0001-0101.mp4
'Mma Kick0001-0267.mp4'
 nervously_look_around0001-0377.mp4
 northern_soul_spin0001-0243.mp4
 Pointing0001-0097.mp4
 Pointing0001-0216.mp4
 Pointin

In [29]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from pathlib import Path
import pandas as pd

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
CSV_PATH = DATA_ROOT / "split_grouped_final.csv"

df = pd.read_csv(CSV_PATH)

CLASS_NAMES = ["emotion", "social", "physical", "pose_idle"]
class_to_idx = {c: i for i, c in enumerate(CLASS_NAMES)}
idx_to_class = {i: c for c, i in class_to_idx.items()}

print("Class mapping:", class_to_idx)

img_tf = transforms.Compose([
    transforms.ToTensor(),
])

class VideoFramesDataset(Dataset):
    def __init__(self, df, split: str):
        self.df = df[df["split"] == split].reset_index(drop=True)
        self.split = split

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        video_path = Path(row["video_path"])
        label = row["group_label"]
        y = class_to_idx[label]

        frames_dir = FRAMES_ROOT / self.split / label / video_path.stem
        frame_files = sorted(frames_dir.glob("frame_*.jpg"))

        if len(frame_files) != 16:
            raise RuntimeError(f"Expected 16 frames, got {len(frame_files)} for {frames_dir}")

        frames = []
        for ff in frame_files:
            img = Image.open(ff).convert("RGB")
            frames.append(img_tf(img))

        x = torch.stack(frames, dim=0)  # (T, C, H, W)
        return x, y

train_ds = VideoFramesDataset(df, "train")
val_ds   = VideoFramesDataset(df, "val")
test_ds  = VideoFramesDataset(df, "test")

train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(test_ds, batch_size=4, shuffle=False, num_workers=2, pin_memory=True)

xb, yb = next(iter(train_loader))
print("Batch x shape:", xb.shape)
print("Batch y shape:", yb.shape)
print("Sample labels:", [idx_to_class[int(i)] for i in yb])


Class mapping: {'emotion': 0, 'social': 1, 'physical': 2, 'pose_idle': 3}
Batch x shape: torch.Size([4, 16, 3, 224, 224])
Batch y shape: torch.Size([4])
Sample labels: ['emotion', 'pose_idle', 'pose_idle', 'social']


In [30]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)


Device: cuda


In [31]:
class ResNetLSTM(nn.Module):
    def __init__(self, num_classes=4, hidden_size=256, num_layers=1, dropout=0.2):
        super().__init__()

        # Pretrained ResNet18 backbone
        base = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        self.feature_extractor = nn.Sequential(*list(base.children())[:-1])  # remove FC

        feat_dim = 512  # resnet18 final feature size

        self.lstm = nn.LSTM(
            input_size=feat_dim,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0
        )

        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden_size, num_classes)
        )

    def forward(self, x):
        # x: (B, T, C, H, W)
        B, T, C, H, W = x.shape
        x = x.view(B*T, C, H, W)

        with torch.no_grad():  # start stable: freeze resnet initially
            feats = self.feature_extractor(x).view(B, T, -1)  # (B, T, 512)

        out, _ = self.lstm(feats)  # (B, T, hidden)
        last = out[:, -1, :]       # (B, hidden)
        logits = self.classifier(last)
        return logits

model = ResNetLSTM(num_classes=4).to(device)
print(model)


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


100%|██████████| 44.7M/44.7M [00:00<00:00, 185MB/s]


ResNetLSTM(
  (feature_extractor): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_runnin

In [32]:
def accuracy(logits, y):
    preds = torch.argmax(logits, dim=1)
    return (preds == y).float().mean().item()

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

EPOCHS = 5

for epoch in range(1, EPOCHS + 1):
    # ---- Train ----
    model.train()
    train_loss = 0.0
    train_acc = 0.0
    n_train = 0

    for xb, yb in tqdm(train_loader, desc=f"Epoch {epoch} Train"):
        xb, yb = xb.to(device), yb.to(device)

        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

        bs = xb.size(0)
        train_loss += loss.item() * bs
        train_acc += accuracy(logits, yb) * bs
        n_train += bs

    train_loss /= n_train
    train_acc /= n_train

    # ---- Val ----
    model.eval()
    val_loss = 0.0
    val_acc = 0.0
    n_val = 0

    with torch.no_grad():
        for xb, yb in tqdm(val_loader, desc=f"Epoch {epoch} Val"):
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)

            bs = xb.size(0)
            val_loss += loss.item() * bs
            val_acc += accuracy(logits, yb) * bs
            n_val += bs

    val_loss /= n_val
    val_acc /= n_val

    print(f"\nEpoch {epoch}: Train loss={train_loss:.4f}, acc={train_acc:.3f} | Val loss={val_loss:.4f}, acc={val_acc:.3f}\n")


Epoch 1 Train: 100%|██████████| 14/14 [00:04<00:00,  2.85it/s]
Epoch 1 Val: 100%|██████████| 3/3 [00:01<00:00,  2.26it/s]



Epoch 1: Train loss=1.4279, acc=0.340 | Val loss=1.3165, acc=0.417



Epoch 2 Train: 100%|██████████| 14/14 [00:03<00:00,  3.55it/s]
Epoch 2 Val: 100%|██████████| 3/3 [00:00<00:00,  3.10it/s]



Epoch 2: Train loss=1.2983, acc=0.434 | Val loss=1.3454, acc=0.333



Epoch 3 Train: 100%|██████████| 14/14 [00:03<00:00,  4.03it/s]
Epoch 3 Val: 100%|██████████| 3/3 [00:00<00:00,  3.08it/s]



Epoch 3: Train loss=1.2469, acc=0.509 | Val loss=1.3351, acc=0.583



Epoch 4 Train: 100%|██████████| 14/14 [00:04<00:00,  3.23it/s]
Epoch 4 Val: 100%|██████████| 3/3 [00:01<00:00,  2.28it/s]



Epoch 4: Train loss=1.2242, acc=0.453 | Val loss=1.3236, acc=0.417



Epoch 5 Train: 100%|██████████| 14/14 [00:03<00:00,  3.79it/s]
Epoch 5 Val: 100%|██████████| 3/3 [00:00<00:00,  3.08it/s]


Epoch 5: Train loss=1.1538, acc=0.547 | Val loss=1.3087, acc=0.500






In [33]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

model.eval()
all_preds = []
all_targets = []

with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        preds = torch.argmax(logits, dim=1)

        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(yb.cpu().numpy())

acc = np.mean(np.array(all_preds) == np.array(all_targets))
print("Test accuracy:", acc)

cm = confusion_matrix(all_targets, all_preds)
print("\nConfusion matrix:\n", cm)

print("\nClassification report:\n")
print(classification_report(all_targets, all_preds, target_names=CLASS_NAMES))


Test accuracy: 0.3333333333333333

Confusion matrix:
 [[0 0 0 1]
 [0 0 0 4]
 [0 0 1 3]
 [0 0 0 3]]

Classification report:

              precision    recall  f1-score   support

     emotion       0.00      0.00      0.00         1
      social       0.00      0.00      0.00         4
    physical       1.00      0.25      0.40         4
   pose_idle       0.27      1.00      0.43         3

    accuracy                           0.33        12
   macro avg       0.32      0.31      0.21        12
weighted avg       0.40      0.33      0.24        12



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [34]:
for name, param in model.feature_extractor.named_parameters():
    if "7" in name:   # layer4 is index 7 in our Sequential
        param.requires_grad = True

optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)

print("Unfroze ResNet layer4 and reduced LR.")


Unfroze ResNet layer4 and reduced LR.


In [35]:
EPOCHS = 15

for epoch in range(1, EPOCHS + 1):
    model.train()
    train_loss = train_acc = n_train = 0

    for xb, yb in tqdm(train_loader, desc=f"FT Epoch {epoch} Train"):
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

        bs = xb.size(0)
        train_loss += loss.item() * bs
        train_acc += accuracy(logits, yb) * bs
        n_train += bs

    train_loss /= n_train
    train_acc /= n_train

    model.eval()
    val_loss = val_acc = n_val = 0

    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)

            bs = xb.size(0)
            val_loss += loss.item() * bs
            val_acc += accuracy(logits, yb) * bs
            n_val += bs

    val_loss /= n_val
    val_acc /= n_val

    print(f"\nFT Epoch {epoch}: Train acc={train_acc:.3f} | Val acc={val_acc:.3f}")


FT Epoch 1 Train: 100%|██████████| 14/14 [00:03<00:00,  3.95it/s]



FT Epoch 1: Train acc=0.566 | Val acc=0.583


FT Epoch 2 Train: 100%|██████████| 14/14 [00:04<00:00,  3.11it/s]



FT Epoch 2: Train acc=0.660 | Val acc=0.583


FT Epoch 3 Train: 100%|██████████| 14/14 [00:03<00:00,  4.01it/s]



FT Epoch 3: Train acc=0.623 | Val acc=0.583


FT Epoch 4 Train: 100%|██████████| 14/14 [00:03<00:00,  4.08it/s]



FT Epoch 4: Train acc=0.717 | Val acc=0.583


FT Epoch 5 Train: 100%|██████████| 14/14 [00:04<00:00,  3.19it/s]



FT Epoch 5: Train acc=0.604 | Val acc=0.500


FT Epoch 6 Train: 100%|██████████| 14/14 [00:03<00:00,  4.10it/s]



FT Epoch 6: Train acc=0.679 | Val acc=0.583


FT Epoch 7 Train: 100%|██████████| 14/14 [00:03<00:00,  3.81it/s]



FT Epoch 7: Train acc=0.642 | Val acc=0.583


FT Epoch 8 Train: 100%|██████████| 14/14 [00:03<00:00,  3.58it/s]



FT Epoch 8: Train acc=0.736 | Val acc=0.583


FT Epoch 9 Train: 100%|██████████| 14/14 [00:03<00:00,  4.02it/s]



FT Epoch 9: Train acc=0.717 | Val acc=0.583


FT Epoch 10 Train: 100%|██████████| 14/14 [00:03<00:00,  3.81it/s]



FT Epoch 10: Train acc=0.679 | Val acc=0.417


FT Epoch 11 Train: 100%|██████████| 14/14 [00:03<00:00,  3.61it/s]



FT Epoch 11: Train acc=0.811 | Val acc=0.417


FT Epoch 12 Train: 100%|██████████| 14/14 [00:03<00:00,  3.92it/s]



FT Epoch 12: Train acc=0.642 | Val acc=0.500


FT Epoch 13 Train: 100%|██████████| 14/14 [00:03<00:00,  3.69it/s]



FT Epoch 13: Train acc=0.755 | Val acc=0.500


FT Epoch 14 Train: 100%|██████████| 14/14 [00:03<00:00,  3.96it/s]



FT Epoch 14: Train acc=0.774 | Val acc=0.500


FT Epoch 15 Train: 100%|██████████| 14/14 [00:03<00:00,  4.14it/s]



FT Epoch 15: Train acc=0.811 | Val acc=0.500


In [36]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

model.eval()
all_preds, all_targets = [], []

with torch.no_grad():
    for xb, yb in test_loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(yb.cpu().numpy())

acc = np.mean(np.array(all_preds) == np.array(all_targets))
print("Test accuracy:", acc)

cm = confusion_matrix(all_targets, all_preds)
print("\nConfusion matrix:\n", cm)

print("\nClassification report:\n")
print(classification_report(all_targets, all_preds, target_names=CLASS_NAMES))


Test accuracy: 0.4166666666666667

Confusion matrix:
 [[1 0 0 0]
 [0 0 1 3]
 [0 0 2 2]
 [0 0 1 2]]

Classification report:

              precision    recall  f1-score   support

     emotion       1.00      1.00      1.00         1
      social       0.00      0.00      0.00         4
    physical       0.50      0.50      0.50         4
   pose_idle       0.29      0.67      0.40         3

    accuracy                           0.42        12
   macro avg       0.45      0.54      0.47        12
weighted avg       0.32      0.42      0.35        12



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [37]:
save_path = "/content/drive/MyDrive/synthetic_videos/resnet_lstm_grouped_ft.pt"
torch.save({
    "model_state": model.state_dict(),
    "class_names": CLASS_NAMES,
    "class_to_idx": class_to_idx
}, save_path)
print("Saved:", save_path)


Saved: /content/drive/MyDrive/synthetic_videos/resnet_lstm_grouped_ft.pt


In [38]:
import numpy as np
import torch
import torch.nn as nn

train_df = df[df["split"] == "train"]
counts = train_df["group_label"].value_counts()

weights = []
for c in CLASS_NAMES:
    weights.append(1.0 / counts.get(c, 1))

weights = np.array(weights, dtype=np.float32)
weights = weights / weights.sum() * len(CLASS_NAMES)   # normalize

weights_t = torch.tensor(weights, dtype=torch.float32).to(device)

print("Train class counts:\n", counts)
print("Class weights:", weights_t)

criterion = nn.CrossEntropyLoss(weight=weights_t)
print("Updated criterion to weighted CrossEntropyLoss.")


Train class counts:
 group_label
pose_idle    21
physical     14
social       11
emotion       7
Name: count, dtype: int64
Class weights: tensor([1.6196, 1.0307, 0.8098, 0.5399], device='cuda:0')
Updated criterion to weighted CrossEntropyLoss.


In [39]:
import torch.optim as optim
from tqdm import tqdm

optimizer = optim.Adam(model.parameters(), lr=5e-5)

def accuracy(logits, y):
    preds = torch.argmax(logits, dim=1)
    return (preds == y).float().mean().item()

EPOCHS = 5

for epoch in range(1, EPOCHS + 1):
    model.train()
    train_loss = train_acc = n_train = 0

    for xb, yb in tqdm(train_loader, desc=f"WFT Epoch {epoch} Train"):
        xb, yb = xb.to(device), yb.to(device)

        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        optimizer.step()

        bs = xb.size(0)
        train_loss += loss.item() * bs
        train_acc += accuracy(logits, yb) * bs
        n_train += bs

    train_loss /= n_train
    train_acc /= n_train

    model.eval()
    val_loss = val_acc = n_val = 0
    with torch.no_grad():
        for xb, yb in val_loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)

            bs = xb.size(0)
            val_loss += loss.item() * bs
            val_acc += accuracy(logits, yb) * bs
            n_val += bs

    val_loss /= n_val
    val_acc /= n_val

    print(f"WFT Epoch {epoch}: Train acc={train_acc:.3f} | Val acc={val_acc:.3f}")


WFT Epoch 1 Train: 100%|██████████| 14/14 [00:04<00:00,  3.14it/s]


WFT Epoch 1: Train acc=0.792 | Val acc=0.500


WFT Epoch 2 Train: 100%|██████████| 14/14 [00:03<00:00,  3.90it/s]


WFT Epoch 2: Train acc=0.811 | Val acc=0.333


WFT Epoch 3 Train: 100%|██████████| 14/14 [00:03<00:00,  3.97it/s]


WFT Epoch 3: Train acc=0.830 | Val acc=0.333


WFT Epoch 4 Train: 100%|██████████| 14/14 [00:04<00:00,  3.24it/s]


WFT Epoch 4: Train acc=0.811 | Val acc=0.333


WFT Epoch 5 Train: 100%|██████████| 14/14 [00:03<00:00,  4.09it/s]


WFT Epoch 5: Train acc=0.887 | Val acc=0.250


In [40]:
!pip -q install ultralytics
from ultralytics import YOLO
yolo = YOLO("yolov8n.pt")  # downloads weights first time
print("YOLO ready")


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m81.5 MB/s[0m eta [36m0:00:00[0m
[?25hCreating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 217.2MB/s 0.0s
YOLO ready


In [41]:
!find /content/drive/MyDrive/synthetic_videos/frames -type f | wc -l
!ls /content/drive/MyDrive/synthetic_videos/frames


1232
test  train  val


In [42]:
!pip -q install ultralytics

from ultralytics import YOLO
from pathlib import Path
from PIL import Image
import numpy as np

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
PERSON_ROOT = DATA_ROOT / "frames_person"

yolo = YOLO("yolov8n.pt")
print("YOLO loaded")

def crop_person_with_yolo(pil_img, conf=0.25):
    img = np.array(pil_img)
    results = yolo.predict(img, conf=conf, verbose=False)

    h, w = img.shape[:2]
    best = None
    best_area = -1

    for r in results:
        if r.boxes is None:
            continue
        for b in r.boxes:
            if int(b.cls.item()) == 0:  # person
                x1, y1, x2, y2 = b.xyxy[0].cpu().numpy().astype(int)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(w, x2), min(h, y2)
                area = (x2 - x1) * (y2 - y1)
                if area > best_area:
                    best_area = area
                    best = (x1, y1, x2, y2)

    if best is None:
        return pil_img  # fallback

    return pil_img.crop(best)

splits = ["train", "val", "test"]
total_done = 0

for split in splits:
    split_dir = FRAMES_ROOT / split
    if not split_dir.exists():
        raise FileNotFoundError(f"Missing: {split_dir}")

    for group_dir in split_dir.iterdir():
        if not group_dir.is_dir():
            continue

        for video_dir in group_dir.iterdir():
            if not video_dir.is_dir():
                continue

            out_dir = PERSON_ROOT / split / group_dir.name / video_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)

            frame_files = sorted(video_dir.glob("frame_*.jpg"))
            for ff in frame_files:
                out_path = out_dir / ff.name
                if out_path.exists():
                    continue

                img = Image.open(ff).convert("RGB")
                crop = crop_person_with_yolo(img, conf=0.25)
                crop = crop.resize((224, 224))
                crop.save(out_path, quality=95)
                total_done += 1

print("Done. Newly saved crops:", total_done)
print("Person crops folder:", PERSON_ROOT)


YOLO loaded
Done. Newly saved crops: 1232
Person crops folder: /content/drive/MyDrive/synthetic_videos/frames_person


In [43]:
!find /content/drive/MyDrive/synthetic_videos/frames_person -type f | wc -l


1232


In [44]:
from pathlib import Path
from PIL import Image
import torch
from torch.utils.data import Dataset
import torchvision.transforms as T

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
PERSON_ROOT = DATA_ROOT / "frames_person"

IMG_SIZE = 224
NUM_FRAMES = 16

transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]),
])

class TwoStreamFramesDataset(Dataset):
    def __init__(self, df, split, class_to_idx, num_frames=16):
        self.df = df[df["split"] == split].reset_index(drop=True)
        self.split = split
        self.class_to_idx = class_to_idx
        self.num_frames = num_frames

    def _load_clip(self, base_root, group_label, video_stem):
        clip_dir = base_root / self.split / group_label / video_stem
        frames = sorted(clip_dir.glob("frame_*.jpg"))
        if len(frames) < self.num_frames:
            raise FileNotFoundError(f"Not enough frames in {clip_dir} (found {len(frames)})")

        # take first num_frames (consistent)
        frames = frames[:self.num_frames]

        imgs = []
        for f in frames:
            img = Image.open(f).convert("RGB")
            img = transform(img)
            imgs.append(img)
        # [T, C, H, W]
        return torch.stack(imgs, dim=0)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        group_label = row["group_label"]
        video_path = Path(row["video_path"])
        video_stem = video_path.stem

        x_global = self._load_clip(FRAMES_ROOT, group_label, video_stem)
        x_person = self._load_clip(PERSON_ROOT, group_label, video_stem)

        y = self.class_to_idx[group_label]
        return x_global, x_person, torch.tensor(y, dtype=torch.long)


In [45]:
import torch.nn as nn
import torchvision.models as models

class TwoStreamResNetLSTM(nn.Module):
    def __init__(self, num_classes, hidden=256, dropout=0.2):
        super().__init__()
        backbone = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        backbone.fc = nn.Identity()  # output 512

        self.backbone = backbone
        self.lstm = nn.LSTM(input_size=1024, hidden_size=hidden, batch_first=True)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(hidden, num_classes)
        )

    def extract_feat_seq(self, x):
        # x: [B, T, C, H, W]
        B, T, C, H, W = x.shape
        x = x.view(B*T, C, H, W)
        feats = self.backbone(x)      # [B*T, 512]
        feats = feats.view(B, T, 512) # [B, T, 512]
        return feats

    def forward(self, x_global, x_person):
        f1 = self.extract_feat_seq(x_global)
        f2 = self.extract_feat_seq(x_person)
        f = torch.cat([f1, f2], dim=-1)   # [B, T, 1024]
        out, _ = self.lstm(f)
        last = out[:, -1, :]              # [B, hidden]
        return self.classifier(last)


In [46]:
from torch.utils.data import DataLoader

train_ds = TwoStreamFramesDataset(df, "train", class_to_idx, num_frames=NUM_FRAMES)
val_ds   = TwoStreamFramesDataset(df, "val",   class_to_idx, num_frames=NUM_FRAMES)
test_ds  = TwoStreamFramesDataset(df, "test",  class_to_idx, num_frames=NUM_FRAMES)

train_loader = DataLoader(train_ds, batch_size=4, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=4, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds,  batch_size=4, shuffle=False, num_workers=0)

xb1, xb2, yb = next(iter(train_loader))
print("Global:", xb1.shape, "Person:", xb2.shape, "y:", yb.shape)


Global: torch.Size([4, 16, 3, 224, 224]) Person: torch.Size([4, 16, 3, 224, 224]) y: torch.Size([4])


In [47]:
import torch
import torch.optim as optim
from tqdm import tqdm
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report

device = "cuda" if torch.cuda.is_available() else "cpu"

model2 = TwoStreamResNetLSTM(num_classes=len(CLASS_NAMES)).to(device)

# weighted loss (keep it)
train_df = df[df["split"] == "train"]
counts = train_df["group_label"].value_counts()
weights = np.array([1.0 / counts.get(c, 1) for c in CLASS_NAMES], dtype=np.float32)
weights = weights / weights.sum() * len(CLASS_NAMES)
weights_t = torch.tensor(weights, dtype=torch.float32).to(device)

criterion = nn.CrossEntropyLoss(weight=weights_t)
optimizer = optim.Adam(model2.parameters(), lr=5e-5)

def acc_from_logits(logits, y):
    preds = torch.argmax(logits, dim=1)
    return (preds == y).float().mean().item()

EPOCHS = 10
best_val = -1
best_path = "/content/drive/MyDrive/synthetic_videos/twostream_best.pt"

for epoch in range(1, EPOCHS + 1):
    model2.train()
    tr_loss = tr_acc = n = 0

    for xg, xp, y in tqdm(train_loader, desc=f"Epoch {epoch} Train"):
        xg, xp, y = xg.to(device), xp.to(device), y.to(device)

        optimizer.zero_grad()
        logits = model2(xg, xp)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        bs = y.size(0)
        tr_loss += loss.item() * bs
        tr_acc += acc_from_logits(logits, y) * bs
        n += bs

    tr_loss /= n
    tr_acc /= n

    model2.eval()
    va_loss = va_acc = n2 = 0
    with torch.no_grad():
        for xg, xp, y in val_loader:
            xg, xp, y = xg.to(device), xp.to(device), y.to(device)
            logits = model2(xg, xp)
            loss = criterion(logits, y)
            bs = y.size(0)
            va_loss += loss.item() * bs
            va_acc += acc_from_logits(logits, y) * bs
            n2 += bs

    va_loss /= n2
    va_acc /= n2

    print(f"Epoch {epoch}: Train acc={tr_acc:.3f} | Val acc={va_acc:.3f}")

    if va_acc > best_val:
        best_val = va_acc
        torch.save({
            "model_state": model2.state_dict(),
            "class_names": CLASS_NAMES,
            "class_to_idx": class_to_idx
        }, best_path)
        print("Saved best to:", best_path)

print("Best val:", best_val)


Epoch 1 Train: 100%|██████████| 14/14 [00:21<00:00,  1.54s/it]


Epoch 1: Train acc=0.396 | Val acc=0.333
Saved best to: /content/drive/MyDrive/synthetic_videos/twostream_best.pt


Epoch 2 Train: 100%|██████████| 14/14 [00:12<00:00,  1.08it/s]


Epoch 2: Train acc=0.717 | Val acc=0.333


Epoch 3 Train: 100%|██████████| 14/14 [00:12<00:00,  1.11it/s]


Epoch 3: Train acc=0.906 | Val acc=0.417
Saved best to: /content/drive/MyDrive/synthetic_videos/twostream_best.pt


Epoch 4 Train: 100%|██████████| 14/14 [00:12<00:00,  1.10it/s]


Epoch 4: Train acc=0.868 | Val acc=0.417


Epoch 5 Train: 100%|██████████| 14/14 [00:12<00:00,  1.10it/s]


Epoch 5: Train acc=1.000 | Val acc=0.333


Epoch 6 Train: 100%|██████████| 14/14 [00:12<00:00,  1.12it/s]


Epoch 6: Train acc=1.000 | Val acc=0.333


Epoch 7 Train: 100%|██████████| 14/14 [00:12<00:00,  1.13it/s]


Epoch 7: Train acc=0.962 | Val acc=0.250


Epoch 8 Train: 100%|██████████| 14/14 [00:13<00:00,  1.04it/s]


Epoch 8: Train acc=1.000 | Val acc=0.417


Epoch 9 Train: 100%|██████████| 14/14 [00:17<00:00,  1.23s/it]


Epoch 9: Train acc=1.000 | Val acc=0.250


Epoch 10 Train: 100%|██████████| 14/14 [00:12<00:00,  1.13it/s]


Epoch 10: Train acc=1.000 | Val acc=0.167
Best val: 0.4166666666666667


In [48]:
# load best
ckpt = torch.load(best_path, map_location=device)
model2.load_state_dict(ckpt["model_state"])
model2.eval()

all_preds, all_targets = [], []
with torch.no_grad():
    for xg, xp, y in test_loader:
        xg, xp, y = xg.to(device), xp.to(device), y.to(device)
        logits = model2(xg, xp)
        preds = torch.argmax(logits, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(y.cpu().numpy())

acc = np.mean(np.array(all_preds) == np.array(all_targets))
print("Two-stream Test accuracy:", acc)

cm = confusion_matrix(all_targets, all_preds)
print("\nConfusion matrix:\n", cm)

print("\nClassification report:\n")
print(classification_report(all_targets, all_preds, target_names=CLASS_NAMES))


Two-stream Test accuracy: 0.3333333333333333

Confusion matrix:
 [[1 0 0 0]
 [1 0 1 2]
 [0 0 2 2]
 [2 0 0 1]]

Classification report:

              precision    recall  f1-score   support

     emotion       0.25      1.00      0.40         1
      social       0.00      0.00      0.00         4
    physical       0.67      0.50      0.57         4
   pose_idle       0.20      0.33      0.25         3

    accuracy                           0.33        12
   macro avg       0.28      0.46      0.31        12
weighted avg       0.29      0.33      0.29        12



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [49]:
!pip -q install ultralytics

from ultralytics import YOLO
from pathlib import Path
import numpy as np
from PIL import Image

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
PERSON_ROOT = DATA_ROOT / "frames_person"
POSE_ROOT = DATA_ROOT / "pose_keypoints"

pose_model = YOLO("yolov8n-pose.pt")   # light + fast
print("Pose model loaded ")
print("PERSON_ROOT:", PERSON_ROOT)
print("POSE_ROOT:", POSE_ROOT)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.4.0/yolov8n-pose.pt to 'yolov8n-pose.pt': 100% ━━━━━━━━━━━━ 6.5MB 314.3MB/s 0.0s
Pose model loaded 
PERSON_ROOT: /content/drive/MyDrive/synthetic_videos/frames_person
POSE_ROOT: /content/drive/MyDrive/synthetic_videos/pose_keypoints


In [50]:
def extract_pose_from_image(img_path, max_people=2, conf=0.25):
    img = np.array(Image.open(img_path).convert("RGB"))
    res = pose_model.predict(img, conf=conf, verbose=False)[0]

    # If no detections
    if res.keypoints is None or len(res.keypoints) == 0:
        # persons, kpts, (x,y,conf)
        return np.zeros((0, 17, 3), dtype=np.float32)

    kpts = res.keypoints.data.cpu().numpy()  # [N,17,3]
    # Sort persons by bbox area (largest first) so consistent selection
    if res.boxes is not None and len(res.boxes) == len(kpts):
        xyxy = res.boxes.xyxy.cpu().numpy()
        areas = (xyxy[:,2]-xyxy[:,0])*(xyxy[:,3]-xyxy[:,1])
        order = np.argsort(-areas)
        kpts = kpts[order]

    return kpts[:max_people].astype(np.float32)

processed = 0
skipped = 0

splits = ["train", "val", "test"]
for split in splits:
    split_dir = PERSON_ROOT / split
    if not split_dir.exists():
        raise FileNotFoundError(f"Missing: {split_dir}")

    for group_dir in split_dir.iterdir():
        if not group_dir.is_dir():
            continue

        for video_dir in group_dir.iterdir():
            if not video_dir.is_dir():
                continue

            out_dir = POSE_ROOT / split / group_dir.name / video_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)

            for frame_path in sorted(video_dir.glob("frame_*.jpg")):
                out_path = out_dir / (frame_path.stem + ".npz")
                if out_path.exists():
                    skipped += 1
                    continue

                kpts = extract_pose_from_image(frame_path, max_people=2, conf=0.25)
                np.savez_compressed(out_path, kpts=kpts)
                processed += 1

print("Pose extraction done ")
print("Newly processed frames:", processed)
print("Skipped existing:", skipped)
print("Saved at:", POSE_ROOT)


Pose extraction done 
Newly processed frames: 1232
Skipped existing: 0
Saved at: /content/drive/MyDrive/synthetic_videos/pose_keypoints


In [51]:
# Count pose files
!find /content/drive/MyDrive/synthetic_videos/pose_keypoints -type f | wc -l

# Inspect one file shape
import numpy as np
from pathlib import Path

any_file = next(Path("/content/drive/MyDrive/synthetic_videos/pose_keypoints").rglob("*.npz"))
data = np.load(any_file)
print("Example:", any_file)
print("kpts shape:", data["kpts"].shape)   # (0..2, 17, 3)


1232
Example: /content/drive/MyDrive/synthetic_videos/pose_keypoints/train/emotion/Angry0001-1151/frame_00.npz
kpts shape: (0, 17, 3)


In [52]:
from ultralytics import YOLO
from pathlib import Path
import numpy as np
from PIL import Image

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
FRAMES_ROOT = DATA_ROOT / "frames"
POSE_FULL_ROOT = DATA_ROOT / "pose_keypoints_full"

pose_model = YOLO("yolov8n-pose.pt")
print("Pose model loaded ✅")

def extract_pose(img_path, max_people=2, conf=0.25):
    img = np.array(Image.open(img_path).convert("RGB"))
    res = pose_model.predict(img, conf=conf, verbose=False)[0]

    if res.keypoints is None or len(res.keypoints) == 0:
        return np.zeros((0, 17, 3), dtype=np.float32)

    kpts = res.keypoints.data.cpu().numpy()  # [N,17,3]

    # sort by bbox area, keep top2
    if res.boxes is not None and len(res.boxes) == len(kpts):
        xyxy = res.boxes.xyxy.cpu().numpy()
        areas = (xyxy[:,2]-xyxy[:,0])*(xyxy[:,3]-xyxy[:,1])
        order = np.argsort(-areas)
        kpts = kpts[order]

    return kpts[:max_people].astype(np.float32)

processed = 0
splits = ["train", "val", "test"]

for split in splits:
    for group_dir in (FRAMES_ROOT / split).iterdir():
        if not group_dir.is_dir():
            continue
        for video_dir in group_dir.iterdir():
            if not video_dir.is_dir():
                continue

            out_dir = POSE_FULL_ROOT / split / group_dir.name / video_dir.name
            out_dir.mkdir(parents=True, exist_ok=True)

            for frame_path in sorted(video_dir.glob("frame_*.jpg")):
                out_path = out_dir / (frame_path.stem + ".npz")
                if out_path.exists():
                    continue

                kpts = extract_pose(frame_path, max_people=2, conf=0.25)
                np.savez_compressed(out_path, kpts=kpts)
                processed += 1

print("Done  newly processed:", processed)
print("Saved at:", POSE_FULL_ROOT)


Pose model loaded ✅
Done  newly processed: 1232
Saved at: /content/drive/MyDrive/synthetic_videos/pose_keypoints_full


In [53]:
import numpy as np
from pathlib import Path
import random

ROOT = Path("/content/drive/MyDrive/synthetic_videos/pose_keypoints_full")
files = list(ROOT.rglob("*.npz"))
print("Total pose files:", len(files))

sample = random.sample(files, 20)
zeros = 0
for f in sample:
    k = np.load(f)["kpts"]
    if k.shape[0] == 0:
        zeros += 1

print("Zero-detection in sample:", zeros, "/ 20")
print("Example file:", sample[0], "shape:", np.load(sample[0])["kpts"].shape)


Total pose files: 1232
Zero-detection in sample: 6 / 20
Example file: /content/drive/MyDrive/synthetic_videos/pose_keypoints_full/test/pose_idle/leaning_on_a_wall0001-0116/frame_01.npz shape: (1, 17, 3)


In [54]:
import numpy as np
import pandas as pd
from pathlib import Path

DATA_ROOT = Path("/content/drive/MyDrive/synthetic_videos")
CSV_PATH  = DATA_ROOT / "split_grouped_final.csv"
POSE_ROOT = DATA_ROOT / "pose_keypoints_full"   # IMPORTANT: full poses
OUT_FEATS = DATA_ROOT / "pose_features_16f.npz"

df = pd.read_csv(CSV_PATH)

# Ensure consistent
df["video_stem"] = df["video_path"].apply(lambda p: Path(p).stem)

CLASS_NAMES = ["emotion", "social", "physical", "pose_idle"]
class_to_id = {c:i for i,c in enumerate(CLASS_NAMES)}

# COCO keypoint indices (YOLOv8-pose uses COCO order)
# 0 nose, 5 left_shoulder, 6 right_shoulder, 9 left_wrist, 10 right_wrist, 11 left_hip, 12 right_hip
L_SH, R_SH = 5, 6
L_WR, R_WR = 9, 10

def kpts_to_bbox_xyxy(kpts_xy):
    # kpts_xy: (17,2)
    xs = kpts_xy[:,0]; ys = kpts_xy[:,1]
    # if all zero -> empty bbox
    if np.all(xs == 0) and np.all(ys == 0):
        return np.array([0,0,0,0], dtype=np.float32)
    x1, y1 = xs.min(), ys.min()
    x2, y2 = xs.max(), ys.max()
    return np.array([x1,y1,x2,y2], dtype=np.float32)

def safe_unit(v, eps=1e-6):
    n = np.linalg.norm(v)
    if n < eps:
        return np.zeros_like(v)
    return v / n

def frame_features(kpts):
    """
    kpts: (N,17,3) where N in {0,1,2}
    output: (73,)
    """
    # Default empty persons
    A = np.zeros((17,2), dtype=np.float32)
    B = np.zeros((17,2), dtype=np.float32)

    if kpts.shape[0] >= 1:
        A = kpts[0,:,:2].astype(np.float32)
    if kpts.shape[0] >= 2:
        B = kpts[1,:,:2].astype(np.float32)

    # Relation features
    rel = np.zeros((5,), dtype=np.float32)

    # centres from bbox
    bboxA = kpts_to_bbox_xyxy(A)
    bboxB = kpts_to_bbox_xyxy(B)

    cA = np.array([(bboxA[0]+bboxA[2])/2, (bboxA[1]+bboxA[3])/2], dtype=np.float32)
    cB = np.array([(bboxB[0]+bboxB[2])/2, (bboxB[1]+bboxB[3])/2], dtype=np.float32)

    # centre distance (if B exists)
    if not np.all(B == 0):
        rel[0] = np.linalg.norm(cA - cB)

        # wrist distances
        rel[1] = np.linalg.norm(A[L_WR] - B[L_WR])
        rel[2] = np.linalg.norm(A[R_WR] - B[R_WR])

        # facing proxy: compare shoulder direction vectors
        vA = safe_unit(A[R_SH] - A[L_SH])
        vB = safe_unit(B[R_SH] - B[L_SH])
        rel[3] = float(np.dot(vA, vB))  # -1..1

        # overlap proxy: bbox IoU-ish (cheap)
        xA1,yA1,xA2,yA2 = bboxA
        xB1,yB1,xB2,yB2 = bboxB
        ix1, iy1 = max(xA1,xB1), max(yA1,yB1)
        ix2, iy2 = min(xA2,xB2), min(yA2,yB2)
        iw, ih = max(0, ix2-ix1), max(0, iy2-iy1)
        inter = iw*ih
        areaA = max(0, (xA2-xA1))*max(0, (yA2-yA1))
        areaB = max(0, (xB2-xB1))*max(0, (yB2-yB1))
        union = areaA + areaB - inter + 1e-6
        rel[4] = inter / union

    feat = np.concatenate([A.reshape(-1), B.reshape(-1), rel], axis=0)
    return feat.astype(np.float32)

X = []
y = []
splits = []

missing_pose_files = 0
zero_pose_frames = 0

for _, row in df.iterrows():
    split = row["split"]
    group = row["group_label"]
    stem  = row["video_stem"]

    # pose folder should match: split/group/video_stem
    pose_dir = POSE_ROOT / split / group / stem
    if not pose_dir.exists():
        missing_pose_files += 1
        continue

    frames = []
    for i in range(16):
        f = pose_dir / f"frame_{i:02d}.npz"
        if not f.exists():
            missing_pose_files += 1
            kpts = np.zeros((0,17,3), dtype=np.float32)
        else:
            kpts = np.load(f)["kpts"]
        if kpts.shape[0] == 0:
            zero_pose_frames += 1
        frames.append(frame_features(kpts))

    X.append(np.stack(frames, axis=0))  # (16,73)
    y.append(class_to_id[row["group_label"]])
    splits.append(split)

X = np.stack(X, axis=0)  # (N,16,73)
y = np.array(y, dtype=np.int64)
splits = np.array(splits)

print("X shape:", X.shape, " y shape:", y.shape)
print("Missing pose files count:", missing_pose_files)
print("Total zero-pose frames:", zero_pose_frames, "out of", X.shape[0]*16)

np.savez_compressed(OUT_FEATS, X=X, y=y, splits=splits, class_names=np.array(CLASS_NAMES))
print("Saved features:", OUT_FEATS)


X shape: (77, 16, 73)  y shape: (77,)
Missing pose files count: 0
Total zero-pose frames: 237 out of 1232
Saved features: /content/drive/MyDrive/synthetic_videos/pose_features_16f.npz


In [56]:
!ls -lah /content/drive/MyDrive/synthetic_videos | grep -i ".npz" || true


-rw------- 1 root root 114K Jan 14 18:00 pose_features_16f.npz


In [57]:
import numpy as np

DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"
data = np.load(DATA_PATH, allow_pickle=True)

print("Keys:", data.files)
for k in data.files:
    arr = data[k]
    print(k, type(arr), getattr(arr, "shape", None), getattr(arr, "dtype", None))


Keys: ['X', 'y', 'splits', 'class_names']
X <class 'numpy.ndarray'> (77, 16, 73) float32
y <class 'numpy.ndarray'> (77,) int64
splits <class 'numpy.ndarray'> (77,) <U5
class_names <class 'numpy.ndarray'> (4,) <U9


In [58]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# -----------------------------
# Load masked pose features
# -----------------------------
DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"
OUT_BEST  = "/content/drive/MyDrive/synthetic_videos/pose_transformer_best_weighted.pt"

DATA = np.load(DATA_PATH, allow_pickle=True)
X = DATA["X"]          # (N,16,74)
y = DATA["y"]          # (N,)
spl = DATA["splits"]   # (N,)
CLASS_NAMES = list(DATA["class_names"])

def idxs(split_name):
    return np.where(spl == split_name)[0]

train_idx = idxs("train")
val_idx   = idxs("val")
test_idx  = idxs("test")

# -----------------------------
# Dataset / Dataloader
# -----------------------------
class PoseSeqDS(Dataset):
    def __init__(self, X, y, idx):
        self.X = torch.tensor(X[idx], dtype=torch.float32)
        self.y = torch.tensor(y[idx], dtype=torch.long)
    def __len__(self): return len(self.y)
    def __getitem__(self, i): return self.X[i], self.y[i]

train_ds = PoseSeqDS(X, y, train_idx)
val_ds   = PoseSeqDS(X, y, val_idx)
test_ds  = PoseSeqDS(X, y, test_idx)

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=8, shuffle=False)
test_loader  = DataLoader(test_ds, batch_size=8, shuffle=False)

# -----------------------------
# Model
# -----------------------------
class TinyTransformer(nn.Module):
    def __init__(self, d_in=74, d_model=128, nhead=4, num_layers=2, num_classes=4):
        super().__init__()
        self.proj = nn.Linear(d_in, d_model)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, batch_first=True, dropout=0.2
        )
        self.enc = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.cls = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Dropout(0.3),
            nn.Linear(d_model, num_classes)
        )

    def forward(self, x):
        x = self.proj(x)     # (B,T,d_model)
        h = self.enc(x)      # (B,T,d_model)
        h = h.mean(dim=1)    # (B,d_model)
        return self.cls(h)   # (B,num_classes)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = TinyTransformer(d_in=X.shape[-1], num_classes=len(CLASS_NAMES)).to(device)

# -----------------------------
# Class-weighted loss (IMPORTANT)
# -----------------------------
train_labels = y[train_idx]
counts = np.bincount(train_labels, minlength=len(CLASS_NAMES))  # counts per class
weights = 1.0 / (counts + 1e-6)                                 # inverse frequency
weights = (weights / weights.sum()) * len(CLASS_NAMES)          # normalized around 1

class_weights = torch.tensor(weights, dtype=torch.float32).to(device)
print("Train counts:", counts)
print("Class weights:", weights)

criterion = nn.CrossEntropyLoss(weight=class_weights)

# -----------------------------
# Optimizer
# -----------------------------
opt = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-3)

# -----------------------------
# Train / Eval loop
# -----------------------------
def run_epoch(loader, train=True):
    model.train(train)
    total, correct, loss_sum = 0, 0, 0.0

    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)

        logits = model(xb)
        loss = criterion(logits, yb)

        if train:
            opt.zero_grad()
            loss.backward()
            opt.step()

        loss_sum += loss.item() * yb.size(0)
        pred = logits.argmax(1)
        correct += (pred == yb).sum().item()
        total += yb.size(0)

    return loss_sum / total, correct / total

best_val = 0.0

for epoch in range(1, 41):
    tr_loss, tr_acc = run_epoch(train_loader, train=True)
    va_loss, va_acc = run_epoch(val_loader, train=False)

    if va_acc > best_val:
        best_val = va_acc
        torch.save(model.state_dict(), OUT_BEST)

    if epoch % 5 == 0 or epoch == 1:
        print(f"Epoch {epoch:02d} | train acc={tr_acc:.3f} val acc={va_acc:.3f} (best {best_val:.3f})")

print("Saved best:", OUT_BEST)
print("Best val:", best_val)


Train counts: [ 7 11 14 21]
Class weights: [     1.6196      1.0307     0.80982     0.53988]
Epoch 01 | train acc=0.245 val acc=0.417 (best 0.417)
Epoch 05 | train acc=0.283 val acc=0.250 (best 0.417)
Epoch 10 | train acc=0.264 val acc=0.500 (best 0.500)
Epoch 15 | train acc=0.358 val acc=0.083 (best 0.500)
Epoch 20 | train acc=0.358 val acc=0.583 (best 0.583)
Epoch 25 | train acc=0.509 val acc=0.250 (best 0.583)
Epoch 30 | train acc=0.453 val acc=0.417 (best 0.583)
Epoch 35 | train acc=0.396 val acc=0.417 (best 0.583)
Epoch 40 | train acc=0.396 val acc=0.083 (best 0.583)
Saved best: /content/drive/MyDrive/synthetic_videos/pose_transformer_best_weighted.pt
Best val: 0.5833333333333334


In [59]:
!ls /content/drive/MyDrive/synthetic_videos | grep pose


pose_features_16f.npz
pose_keypoints
pose_keypoints_full
pose_transformer_best_weighted.pt


In [60]:
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix, classification_report

# -----------------------------
# Paths
# -----------------------------
DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"
OUT_BEST  = "/content/drive/MyDrive/synthetic_videos/pose_transformer_best_focal_jitter.pt"

# -----------------------------
# Load data
# -----------------------------
DATA = np.load(DATA_PATH, allow_pickle=True)
X = DATA["X"]                  # (N, 16, 73)
y = DATA["y"]                  # (N,)
splits = DATA["splits"]        # (N,) 'train'/'val'/'test'
CLASS_NAMES = [str(x) for x in list(DATA["class_names"])]

print("X:", X.shape, "y:", y.shape, "classes:", CLASS_NAMES)

def idxs(split_name):
    return np.where(splits == split_name)[0]

train_idx = idxs("train")
val_idx   = idxs("val")
test_idx  = idxs("test")

# -----------------------------
# Dataset with Temporal Jitter
# -----------------------------
class PoseSeqDS(Dataset):
    def __init__(self, X, y, idx, train_mode=False):
        self.X = torch.tensor(X[idx], dtype=torch.float32)
        self.y = torch.tensor(y[idx], dtype=torch.long)
        self.train_mode = train_mode

    def __len__(self):
        return len(self.y)

    def __getitem__(self, i):
        x = self.X[i].clone()
        y = self.y[i]

        # Temporal jitter only during training
        if self.train_mode:
            shift = torch.randint(low=-2, high=3, size=(1,)).item()  # -2..+2
            x = torch.roll(x, shifts=shift, dims=0)

        return x, y

train_ds = PoseSeqDS(X, y, train_idx, train_mode=True)
val_ds   = PoseSeqDS(X, y, val_idx, train_mode=False)
test_ds  = PoseSeqDS(X, y, test_idx, train_mode=False)

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=8, shuffle=False)
test_loader  = DataLoader(test_ds, batch_size=8, shuffle=False)

# -----------------------------
# Transformer model
# -----------------------------
class TinyTransformer(nn.Module):
    def __init__(self, d_in, d_model=128, nhead=4, num_layers=2, num_classes=4):
        super().__init__()
        self.proj = nn.Linear(d_in, d_model)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, batch_first=True, dropout=0.2
        )
        self.enc = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.cls = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Dropout(0.3),
            nn.Linear(d_model, num_classes)
        )

    def forward(self, x):
        x = self.proj(x)      # (B,T,d_model)
        h = self.enc(x)       # (B,T,d_model)
        h = h.mean(dim=1)     # (B,d_model)
        return self.cls(h)    # (B,C)

# -----------------------------
# Weighted Focal Loss
# -----------------------------
class WeightedFocalLoss(nn.Module):
    def __init__(self, alpha=None, gamma=2.0):
        super().__init__()
        self.alpha = alpha  # tensor [C] or None
        self.gamma = gamma

    def forward(self, logits, targets):
        ce = nn.functional.cross_entropy(logits, targets, weight=self.alpha, reduction="none")
        pt = torch.exp(-ce)
        loss = ((1 - pt) ** self.gamma) * ce
        return loss.mean()

# -----------------------------
# Setup
# -----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
model = TinyTransformer(d_in=X.shape[-1], num_classes=len(CLASS_NAMES)).to(device)

# class weights from train split
train_labels = y[train_idx]
counts = np.bincount(train_labels, minlength=len(CLASS_NAMES))
weights = 1.0 / (counts + 1e-6)
weights = (weights / weights.sum()) * len(CLASS_NAMES)
class_weights = torch.tensor(weights, dtype=torch.float32).to(device)

print("Train class counts:", counts)
print("Class weights:", weights)

criterion = WeightedFocalLoss(alpha=class_weights, gamma=2.0)
opt = torch.optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-3)

# -----------------------------
# Train/Eval loops
# -----------------------------
def run_epoch(loader, train=True):
    model.train(train)
    total, correct, loss_sum = 0, 0, 0.0

    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)

        logits = model(xb)
        loss = criterion(logits, yb)

        if train:
            opt.zero_grad()
            loss.backward()
            opt.step()

        loss_sum += loss.item() * yb.size(0)
        pred = logits.argmax(1)
        correct += (pred == yb).sum().item()
        total += yb.size(0)

    return loss_sum / total, correct / total

best_val = 0.0

# -----------------------------
# Training
# -----------------------------
for epoch in range(1, 41):
    tr_loss, tr_acc = run_epoch(train_loader, train=True)
    va_loss, va_acc = run_epoch(val_loader, train=False)

    if va_acc > best_val:
        best_val = va_acc
        torch.save(model.state_dict(), OUT_BEST)

    if epoch == 1 or epoch % 5 == 0:
        print(f"Epoch {epoch:02d} | train acc={tr_acc:.3f} val acc={va_acc:.3f} (best {best_val:.3f})")

print("Saved best:", OUT_BEST)
print("Best val:", best_val)

model.load_state_dict(torch.load(OUT_BEST, map_location=device))
model.eval()

all_preds, all_targets = [], []

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        logits = model(xb)
        preds = logits.argmax(1).cpu().numpy()
        all_preds.extend(list(preds))
        all_targets.extend(list(yb.numpy()))

acc = (np.array(all_preds) == np.array(all_targets)).mean()
print("\nTest accuracy:", acc)

cm = confusion_matrix(all_targets, all_preds, labels=list(range(len(CLASS_NAMES))))
print("\nConfusion matrix:\n", cm)

print("\nClassification report:\n")
print(classification_report(all_targets, all_preds, target_names=CLASS_NAMES, zero_division=0))


X: (77, 16, 73) y: (77,) classes: ['emotion', 'social', 'physical', 'pose_idle']
Train class counts: [ 7 11 14 21]
Class weights: [     1.6196      1.0307     0.80982     0.53988]
Epoch 01 | train acc=0.208 val acc=0.167 (best 0.167)
Epoch 05 | train acc=0.170 val acc=0.167 (best 0.250)
Epoch 10 | train acc=0.283 val acc=0.250 (best 0.250)
Epoch 15 | train acc=0.208 val acc=0.250 (best 0.250)
Epoch 20 | train acc=0.302 val acc=0.250 (best 0.250)
Epoch 25 | train acc=0.321 val acc=0.250 (best 0.250)
Epoch 30 | train acc=0.358 val acc=0.250 (best 0.250)
Epoch 35 | train acc=0.377 val acc=0.250 (best 0.250)
Epoch 40 | train acc=0.358 val acc=0.250 (best 0.250)
Saved best: /content/drive/MyDrive/synthetic_videos/pose_transformer_best_focal_jitter.pt
Best val: 0.25

Test accuracy: 0.3333333333333333

Confusion matrix:
 [[0 1 0 0]
 [3 0 0 1]
 [0 1 3 0]
 [1 1 0 1]]

Classification report:

              precision    recall  f1-score   support

     emotion       0.00      0.00      0.00      

In [61]:

import os, math, random
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# --------- Reproducibility ----------
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

seed_everything(42)
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# --------- Paths ----------
DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"

assert os.path.exists(DATA_PATH), f"File not found: {DATA_PATH}"
data = np.load(DATA_PATH, allow_pickle=True)

X = data["X"]          # (N, T=16, F=73)
y = data["y"]          # (N,)
classes = data.get("classes", None)

# y may be str labels or ints depending on how you saved; handle both.
if y.dtype.kind in ("U", "S", "O"):
    # map string labels to ids in sorted order (or use 'classes' if present)
    if classes is None:
        uniq = sorted(list(set([str(v) for v in y])))
    else:
        uniq = [str(c) for c in classes.tolist()] if hasattr(classes, "tolist") else [str(c) for c in classes]
    label2id = {lab:i for i, lab in enumerate(uniq)}
    y_ids = np.array([label2id[str(v)] for v in y], dtype=np.int64)
    CLASS_NAMES = uniq
else:
    y_ids = y.astype(np.int64)
    # if classes stored, use that; else build generic names
    if classes is not None:
        CLASS_NAMES = [str(c) for c in (classes.tolist() if hasattr(classes, "tolist") else classes)]
    else:
        K = int(y_ids.max()) + 1
        CLASS_NAMES = [f"class_{i}" for i in range(K)]

N, T, F = X.shape
K = int(y_ids.max()) + 1

print("X:", X.shape, "y:", y_ids.shape, "num_classes:", K)
print("Class names:", CLASS_NAMES)
print("Class counts:", np.bincount(y_ids, minlength=K))

# --------- Dataset ----------
class PoseSeqDS(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
    def __len__(self): return len(self.y)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# --------- Model (small Transformer) ----------
class PoseTransformer(nn.Module):
    def __init__(self, feat_dim=73, d_model=128, nhead=4, num_layers=2, dropout=0.2, num_classes=4):
        super().__init__()
        self.in_proj = nn.Linear(feat_dim, d_model)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=4*d_model,
            dropout=dropout, batch_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.cls = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Dropout(dropout),
            nn.Linear(d_model, num_classes)
        )
    def forward(self, x):
        # x: (B,T,F)
        x = self.in_proj(x)        # (B,T,D)
        x = self.encoder(x)        # (B,T,D)
        x = x.mean(dim=1)          # global average pooling over time
        return self.cls(x)

# --------- Training / Eval ----------
def make_class_weights(y_train, num_classes):
    counts = np.bincount(y_train, minlength=num_classes).astype(np.float32)
    # inverse freq weights (safe)
    weights = counts.sum() / (num_classes * np.maximum(counts, 1.0))
    return torch.tensor(weights, dtype=torch.float32, device=device)

def train_one_fold(X, y, tr_idx, va_idx, fold_id,
                   epochs=60, batch_size=8, lr=3e-4, wd=1e-4, patience=10):
    train_ds = PoseSeqDS(X[tr_idx], y[tr_idx])
    val_ds   = PoseSeqDS(X[va_idx], y[va_idx])

    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=0)

    model = PoseTransformer(feat_dim=F, num_classes=K).to(device)

    class_w = make_class_weights(y[tr_idx], K)
    criterion = nn.CrossEntropyLoss(weight=class_w)

    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)

    best_val = -1.0
    best_state = None
    bad = 0

    for ep in range(1, epochs+1):
        model.train()
        tr_preds, tr_tgts = [], []
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizer.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            tr_preds.append(logits.argmax(1).detach().cpu().numpy())
            tr_tgts.append(yb.detach().cpu().numpy())

        tr_preds = np.concatenate(tr_preds)
        tr_tgts  = np.concatenate(tr_tgts)
        tr_acc = accuracy_score(tr_tgts, tr_preds)

        model.eval()
        va_preds, va_tgts = [], []
        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.to(device)
                logits = model(xb)
                va_preds.append(logits.argmax(1).cpu().numpy())
                va_tgts.append(yb.numpy())
        va_preds = np.concatenate(va_preds)
        va_tgts  = np.concatenate(va_tgts)
        va_acc = accuracy_score(va_tgts, va_preds)

        if va_acc > best_val:
            best_val = va_acc
            best_state = {k: v.detach().cpu().clone() for k, v in model.state_dict().items()}
            bad = 0
        else:
            bad += 1

        if ep == 1 or ep % 10 == 0:
            print(f"[Fold {fold_id}] Epoch {ep:02d} | train acc={tr_acc:.3f} | val acc={va_acc:.3f} | best={best_val:.3f}")

        if bad >= patience:
            break

    # Load best and return fold metrics + preds
    model.load_state_dict(best_state)
    model.to(device).eval()

    va_preds, va_tgts = [], []
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device)
            logits = model(xb)
            va_preds.append(logits.argmax(1).cpu().numpy())
            va_tgts.append(yb.numpy())
    va_preds = np.concatenate(va_preds)
    va_tgts  = np.concatenate(va_tgts)

    return best_val, va_tgts, va_preds


skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

fold_accs = []
all_true = np.zeros(N, dtype=np.int64)
all_pred = np.zeros(N, dtype=np.int64)

for fold, (tr_idx, va_idx) in enumerate(skf.split(X, y_ids), start=1):
    print("\n" + "="*60)
    print(f"Fold {fold} | train={len(tr_idx)} val={len(va_idx)}")
    print("Train counts:", np.bincount(y_ids[tr_idx], minlength=K))
    print("Val   counts:", np.bincount(y_ids[va_idx], minlength=K))

    best_val, va_tgts, va_preds = train_one_fold(X, y_ids, tr_idx, va_idx, fold_id=fold)
    fold_accs.append(best_val)

    all_true[va_idx] = va_tgts
    all_pred[va_idx] = va_preds

print("\n" + "="*60)
print("Fold accuracies:", [round(a, 4) for a in fold_accs])
print(f"Mean CV acc: {np.mean(fold_accs):.4f}  | Std: {np.std(fold_accs):.4f}")

print("\nOverall (OOF) confusion matrix:")
cm = confusion_matrix(all_true, all_pred)
print(cm)

print("\nOverall (OOF) classification report:")
print(classification_report(all_true, all_pred, target_names=CLASS_NAMES, zero_division=0))


Device: cuda
X: (77, 16, 73) y: (77,) num_classes: 4
Class names: ['class_0', 'class_1', 'class_2', 'class_3']
Class counts: [10 17 21 29]

Fold 1 | train=61 val=16
Train counts: [ 8 13 16 24]
Val   counts: [2 4 5 5]
[Fold 1] Epoch 01 | train acc=0.311 | val acc=0.250 | best=0.250
[Fold 1] Epoch 10 | train acc=0.393 | val acc=0.438 | best=0.438
[Fold 1] Epoch 20 | train acc=0.541 | val acc=0.312 | best=0.500

Fold 2 | train=61 val=16
Train counts: [ 8 13 17 23]
Val   counts: [2 4 4 6]
[Fold 2] Epoch 01 | train acc=0.180 | val acc=0.312 | best=0.312
[Fold 2] Epoch 10 | train acc=0.393 | val acc=0.312 | best=0.500

Fold 3 | train=62 val=15
Train counts: [ 8 14 17 23]
Val   counts: [2 3 4 6]
[Fold 3] Epoch 01 | train acc=0.194 | val acc=0.200 | best=0.200
[Fold 3] Epoch 10 | train acc=0.355 | val acc=0.267 | best=0.267
[Fold 3] Epoch 20 | train acc=0.468 | val acc=0.200 | best=0.333

Fold 4 | train=62 val=15
Train counts: [ 8 14 17 23]
Val   counts: [2 3 4 6]
[Fold 4] Epoch 01 | train acc

In [71]:

import os, random, math
from pathlib import Path
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import confusion_matrix, classification_report

import torchvision
from torchvision import transforms
from PIL import Image

# -------------------------
# Repro
# -------------------------
def seed_all(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

seed_all(42)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)


POSE_NPZ = Path("/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz")
FRAMES_ROOT = Path("/content/drive/MyDrive/synthetic_videos/frames_person")
SPLIT_CSV = Path("/content/drive/MyDrive/synthetic_videos/split_grouped_final.csv")  # if you want mapping, optional
SAVE_DIR = Path("/content/drive/MyDrive/synthetic_videos")
SAVE_DIR.mkdir(parents=True, exist_ok=True)

OUT_BEST = SAVE_DIR / "fusion_best_cv.pt"


data = np.load(POSE_NPZ, allow_pickle=True)
X_pose = data["X"]      # (N, 16, 73)
y = data["y"]           # (N,)
classes = data.get("classes", None)

N, T, D = X_pose.shape
y = y.astype(int)

print("Pose X:", X_pose.shape, "y:", y.shape)
print("Num classes:", len(np.unique(y)), "Class counts:", np.bincount(y))

num_classes = len(np.unique(y))

def collect_video_dirs(frames_root: Path):

    dirs = []
    for d in frames_root.rglob("*"):
        if d.is_dir():
            jpgs = list(d.glob("*.jpg"))
            if len(jpgs) > 0:
                dirs.append(d)
    return dirs

video_dirs = collect_video_dirs(FRAMES_ROOT)

stem_to_dir = {}
for d in video_dirs:
    stem = d.name
    if stem not in stem_to_dir:
        stem_to_dir[stem] = d

stems = None
try:
    import pandas as pd
    df = pd.read_csv(SPLIT_CSV)
    if "video_path" in df.columns:
        stems = [Path(p).stem for p in df["video_path"].tolist()]
        print("Loaded stems from split CSV:", len(stems))
except Exception as e:
    stems = None

if stems is None:
    # fallback (may mismatch if order differs)
    all_stems = sorted(stem_to_dir.keys())
    if len(all_stems) < N:
        raise RuntimeError(f"Not enough stems in frames_person. Found {len(all_stems)} but need {N}.")
    stems = all_stems[:N]
    print("WARNING: Using fallback stems list (sorted). This may mismatch with pose rows.")

# sanity: ensure each stem exists in frames_person
missing = [s for s in stems if s not in stem_to_dir]
print("Missing stems in frames_person:", len(missing))
if len(missing) > 0:
    print("Example missing:", missing[:5])

# -------------------------
# Appearance feature extractor (ResNet18 -> 512D)
# -------------------------
resnet = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.DEFAULT)
resnet.fc = nn.Identity()
resnet = resnet.to(device).eval()

img_tfm = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225]),
])

@torch.no_grad()
def extract_app_feature_for_stem(stem: str, num_frames=16):
    """
    returns (512,) float32
    """
    if stem not in stem_to_dir:
        return np.zeros((512,), dtype=np.float32)

    d = stem_to_dir[stem]
    imgs = sorted(d.glob("*.jpg"))
    if len(imgs) == 0:
        return np.zeros((512,), dtype=np.float32)

    # pick evenly spaced frames up to num_frames
    if len(imgs) >= num_frames:
        idxs = np.linspace(0, len(imgs)-1, num_frames).astype(int).tolist()
        imgs = [imgs[i] for i in idxs]
    else:
        # pad by repeating last
        while len(imgs) < num_frames:
            imgs.append(imgs[-1])

    feats = []
    for p in imgs:
        try:
            im = Image.open(p).convert("RGB")
            x = img_tfm(im).unsqueeze(0).to(device)
            f = resnet(x)          # (1, 512)
            feats.append(f.squeeze(0).detach().cpu().numpy())
        except:
            # if image fails
            feats.append(np.zeros((512,), dtype=np.float32))

    feat = np.mean(np.stack(feats, axis=0), axis=0)
    return feat.astype(np.float32)

print("Extracting appearance features for all videos (may take some time)...")
X_app = np.stack([extract_app_feature_for_stem(s, num_frames=16) for s in stems], axis=0)  # (N, 512)
print("Appearance X_app:", X_app.shape)

# -------------------------
# Dataset
# -------------------------
class FusionDS(Dataset):
    def __init__(self, X_pose, X_app, y, idxs):
        self.X_pose = torch.tensor(X_pose[idxs], dtype=torch.float32)
        self.X_app  = torch.tensor(X_app[idxs], dtype=torch.float32)
        self.y      = torch.tensor(y[idxs], dtype=torch.long)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, i):
        return self.X_pose[i], self.X_app[i], self.y[i]

class PoseTransformer(nn.Module):
    def __init__(self, in_dim=73, d_model=128, nhead=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.proj = nn.Linear(in_dim, d_model)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=256,
            dropout=dropout, batch_first=True, activation="gelu"
        )
        self.enc = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.pool = nn.AdaptiveAvgPool1d(1)

    def forward(self, x):  # x: (B,T,D)
        x = self.proj(x)    # (B,T,dm)
        x = self.enc(x)     # (B,T,dm)
        # pool over T
        x = x.transpose(1,2)        # (B,dm,T)
        x = self.pool(x).squeeze(-1) # (B,dm)
        return x

class AppMLP(nn.Module):
    def __init__(self, in_dim=512, hid=256, dropout=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, hid),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hid, hid),
            nn.ReLU(),
        )
    def forward(self, x):
        return self.net(x)

class FusionModel(nn.Module):
    def __init__(self, num_classes=4):
        super().__init__()
        self.pose = PoseTransformer(in_dim=73, d_model=128, nhead=4, num_layers=2, dropout=0.1)
        self.app  = AppMLP(in_dim=512, hid=256, dropout=0.2)
        self.cls  = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(128+256, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )

    def forward(self, x_pose, x_app):
        p = self.pose(x_pose)
        a = self.app(x_app)
        z = torch.cat([p,a], dim=1)
        return self.cls(z)


def accuracy_from_logits(logits, y):
    return (logits.argmax(1) == y).float().mean().item()

def train_one_fold(model, train_loader, val_loader, class_weights, epochs=25, lr=3e-4, wd=1e-4):
    model = model.to(device)
    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
    sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs)

    ce = nn.CrossEntropyLoss(weight=class_weights.to(device))

    best_val = -1
    best_state = None

    for ep in range(1, epochs+1):
        model.train()
        tr_correct, tr_total = 0, 0
        for xb_pose, xb_app, yb in train_loader:
            xb_pose, xb_app, yb = xb_pose.to(device), xb_app.to(device), yb.to(device)
            opt.zero_grad()
            logits = model(xb_pose, xb_app)
            loss = ce(logits, yb)
            loss.backward()
            opt.step()

            tr_correct += (logits.argmax(1) == yb).sum().item()
            tr_total += yb.size(0)

        sched.step()

        # val
        model.eval()
        va_correct, va_total = 0, 0
        with torch.no_grad():
            for xb_pose, xb_app, yb in val_loader:
                xb_pose, xb_app, yb = xb_pose.to(device), xb_app.to(device), yb.to(device)
                logits = model(xb_pose, xb_app)
                va_correct += (logits.argmax(1) == yb).sum().item()
                va_total += yb.size(0)

        tr_acc = tr_correct / max(1, tr_total)
        va_acc = va_correct / max(1, va_total)

        if ep == 1 or ep % 5 == 0:
            print(f"Epoch {ep:02d} | train acc={tr_acc:.3f} | val acc={va_acc:.3f} | best={best_val if best_val>=0 else va_acc:.3f}")

        if va_acc > best_val:
            best_val = va_acc
            best_state = {k: v.detach().cpu().clone() for k,v in model.state_dict().items()}

    model.load_state_dict(best_state)
    return model, best_val

@torch.no_grad()
def predict_loader(model, loader):
    model.eval()
    all_p, all_y = [], []
    for xb_pose, xb_app, yb in loader:
        xb_pose, xb_app = xb_pose.to(device), xb_app.to(device)
        logits = model(xb_pose, xb_app)
        all_p.append(logits.argmax(1).cpu().numpy())
        all_y.append(yb.numpy())
    return np.concatenate(all_p), np.concatenate(all_y)


skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

fold_accs = []
oof_preds = np.zeros((N,), dtype=int)

for fold, (tr_idx, va_idx) in enumerate(skf.split(np.zeros(N), y), 1):
    print("\n" + "="*60)
    print(f"Fold {fold} | train={len(tr_idx)} val={len(va_idx)}")

    # class weights from TRAIN only
    tr_counts = np.bincount(y[tr_idx], minlength=num_classes).astype(np.float32)
    tr_counts[tr_counts == 0] = 1.0
    weights = (tr_counts.sum() / tr_counts)  # inverse freq
    weights = weights / weights.mean()
    class_weights = torch.tensor(weights, dtype=torch.float32)

    print("Train counts:", tr_counts.astype(int).tolist())
    print("Class weights:", np.round(weights, 3).tolist())

    train_ds = FusionDS(X_pose, X_app, y, tr_idx)
    val_ds   = FusionDS(X_pose, X_app, y, va_idx)

    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=0)
    val_loader   = DataLoader(val_ds, batch_size=32, shuffle=False, num_workers=0)

    model = FusionModel(num_classes=num_classes)
    model, best_val = train_one_fold(model, train_loader, val_loader, class_weights, epochs=25, lr=3e-4, wd=1e-4)

    # val predictions
    preds, targets = predict_loader(model, val_loader)
    acc = (preds == targets).mean()
    fold_accs.append(acc)
    oof_preds[va_idx] = preds

    print(f"[Fold {fold}] Best val acc (during training): {best_val:.4f}")
    print(f"[Fold {fold}] Final val acc (after best reload): {acc:.4f}")

print("\n" + "="*60)
print("Fold accuracies:", [round(a,4) for a in fold_accs])
print(f"Mean CV acc: {np.mean(fold_accs):.4f} | Std: {np.std(fold_accs):.4f}")

cm = confusion_matrix(y, oof_preds)
print("\nOverall (OOF) confusion matrix:\n", cm)

print("\nOverall (OOF) classification report:\n")
print(classification_report(y, oof_preds))

full_counts = np.bincount(y, minlength=num_classes).astype(np.float32)
full_counts[full_counts == 0] = 1.0
full_w = (full_counts.sum() / full_counts)
full_w = full_w / full_w.mean()
full_w = torch.tensor(full_w, dtype=torch.float32)

full_idx = np.arange(N)
full_ds = FusionDS(X_pose, X_app, y, full_idx)
full_loader = DataLoader(full_ds, batch_size=16, shuffle=True, num_workers=0)

val_size = max(8, N//5)
val_idx = np.random.choice(full_idx, size=val_size, replace=False)
tr_idx = np.array([i for i in full_idx if i not in set(val_idx)])
BS = 16
train_ds = FusionDS(X_pose, X_app, y, tr_idx)
val_ds   = FusionDS(X_pose, X_app, y, val_idx)
train_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=0)
val_loader = DataLoader(val_ds, batch_size=BS, shuffle=False, drop_last=False)

final_model = FusionModel(num_classes=num_classes)
final_model, best_val = train_one_fold(final_model, train_loader, val_loader, full_w, epochs=35, lr=3e-4, wd=1e-4)

torch.save(final_model.state_dict(), OUT_BEST)
print("\nSaved final fusion model to:", OUT_BEST)
print("Best val (monitor split):", best_val)


Device: cuda
Pose X: (77, 16, 73) y: (77,)
Num classes: 4 Class counts: [10 17 21 29]
Loaded stems from split CSV: 77
Missing stems in frames_person: 0
Extracting appearance features for all videos (may take some time)...
Appearance X_app: (77, 512)

Fold 1 | train=61 val=16
Train counts: [8, 13, 16, 24]
Class weights: [1.6339999437332153, 1.0049999952316284, 0.8169999718666077, 0.5450000166893005]
Epoch 01 | train acc=0.262 | val acc=0.375 | best=0.375
Epoch 05 | train acc=0.361 | val acc=0.250 | best=0.375
Epoch 10 | train acc=0.344 | val acc=0.312 | best=0.375
Epoch 15 | train acc=0.459 | val acc=0.250 | best=0.375
Epoch 20 | train acc=0.443 | val acc=0.375 | best=0.375
Epoch 25 | train acc=0.475 | val acc=0.375 | best=0.375
[Fold 1] Best val acc (during training): 0.3750
[Fold 1] Final val acc (after best reload): 0.3750

Fold 2 | train=61 val=16
Train counts: [8, 13, 17, 23]
Class weights: [1.6440000534057617, 1.0110000371932983, 0.7730000019073486, 0.5720000267028809]
Epoch 01 | 

In [66]:
import numpy as np

DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"

pose = np.load(DATA_PATH, allow_pickle=True)
print("Keys:", pose.files)

X_pose = pose["X"] if "X" in pose.files else pose["pose"]
y = pose["y"] if "y" in pose.files else pose["labels"]

print("X_pose:", X_pose.shape, X_pose.dtype)
print("y:", y.shape, y.dtype)
print("Classes:", np.unique(y), "count:", len(np.unique(y)))
print("class_names:", pose["class_names"] if "class_names" in pose.files else None)


Keys: ['X', 'y', 'splits', 'class_names']
X_pose: (77, 16, 73) float32
y: (77,) int64
Classes: [0 1 2 3] count: 4
class_names: ['emotion' 'social' 'physical' 'pose_idle']


In [74]:
import numpy as np
import torch
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

DATA_PATH = "/content/drive/MyDrive/synthetic_videos/pose_features_16f.npz"
data = np.load(DATA_PATH, allow_pickle=True)

X = data["X"]          # (77,16,73)
y = data["y"]          # (77,)
class_names = list(data["class_names"])

# split
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, test_idx = next(sss.split(X, y))

Xtr, ytr = X[train_idx], y[train_idx]
Xte, yte = X[test_idx], y[test_idx]

print("Train:", Xtr.shape, "Test:", Xte.shape)
print("Class names:", class_names)
print("Test class counts:", np.unique(yte, return_counts=True))


Train: (61, 16, 73) Test: (16, 16, 73)
Class names: [np.str_('emotion'), np.str_('social'), np.str_('physical'), np.str_('pose_idle')]
Test class counts: (array([0, 1, 2, 3]), array([2, 4, 4, 6]))


In [73]:
print("len(yte)  =", len(yte))
print("len(preds)=", len(preds))


len(yte)  = 16
len(preds)= 15


In [79]:
import numpy as np
import torch
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

model.eval()
all_preds, all_true = [], []

with torch.no_grad():
    for x_pose, x_app, yb in val_loader:
        x_pose = x_pose.to(device)
        x_app  = x_app.to(device)

        logits = model(x_pose, x_app)          # FusionModel
        pred = logits.argmax(dim=1).cpu().numpy()

        all_preds.append(pred)
        all_true.append(yb.cpu().numpy())

preds = np.concatenate(all_preds)
y_eval = np.concatenate(all_true)

print("len(y_eval) =", len(y_eval), "len(preds) =", len(preds))

print("Accuracy:", accuracy_score(y_eval, preds))
print("Macro-F1:", f1_score(y_eval, preds, average="macro"))
print("Confusion matrix:\n", confusion_matrix(y_eval, preds))
print("\nReport:\n", classification_report(y_eval, preds, target_names=class_names))


len(y_eval) = 15 len(preds) = 15
Accuracy: 0.4
Macro-F1: 0.31060606060606055
Confusion matrix:
 [[0 0 0 1]
 [1 1 1 1]
 [0 0 3 1]
 [0 1 3 2]]

Report:
               precision    recall  f1-score   support

     emotion       0.00      0.00      0.00         1
      social       0.50      0.25      0.33         4
    physical       0.43      0.75      0.55         4
   pose_idle       0.40      0.33      0.36         6

    accuracy                           0.40        15
   macro avg       0.33      0.33      0.31        15
weighted avg       0.41      0.40      0.38        15



In [80]:
import os, torch
os.makedirs("/content/drive/MyDrive/synthetic_videos", exist_ok=True)
torch.save(model.state_dict(), "/content/drive/MyDrive/synthetic_videos/fusion_best.pt")
print("Saved:", "/content/drive/MyDrive/synthetic_videos/fusion_best.pt")


Saved: /content/drive/MyDrive/synthetic_videos/fusion_best.pt


In [82]:
summary = """\
Synthetic Video Project — Final Summary (Fusion Model)

Dataset:
- Source folder: /content/drive/MyDrive/synthetic_videos
- pose_features_16f.npz: X (77, 16, 73), y (77), classes: ['emotion','social','physical','pose_idle']
- Appearance features: 512-d per sample (used in fusion)

Model:
- FusionModel = Pose Transformer stream + Appearance MLP stream → fused classifier
- Num classes: 4

Final Validation:
- N_val: 15
- Accuracy: 0.40
- Macro-F1: 0.3106

Confusion Matrix:
[[0 0 0 1]
 [1 1 1 1]
 [0 0 3 1]
 [0 1 3 2]]

Saved model:
- /content/drive/MyDrive/synthetic_videos/fusion_best.pt
"""

out_path = "/content/drive/MyDrive/synthetic_videos/RESULTS_SUMMARY.txt"

with open(out_path, "w") as f:
    f.write(summary)

print("Saved:", out_path)


Saved: /content/drive/MyDrive/synthetic_videos/RESULTS_SUMMARY.txt


In [83]:
!ls -lah /content/drive/MyDrive/synthetic_videos/RESULTS_SUMMARY.txt


-rw------- 1 root root 595 Jan 14 18:44 /content/drive/MyDrive/synthetic_videos/RESULTS_SUMMARY.txt
