# TimeSformer Exercise Prediction

This notebook trains a TimeSformer model to predict exercises from video data.
Data is located in `data/`, with videos in `data/films/` and labels in CSV files.
We use `split.csv` to divide data into training and testing sets.

In [5]:
import os
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import TimesformerForVideoClassification, VideoMAEImageProcessor, TrainingArguments, Trainer
from transformers import AutoImageProcessor, AutoModelForVideoClassification
import decord
from decord import VideoReader, cpu
from PIL import Image
from sklearn.metrics import accuracy_score

# Ensure decord works correctly
decord.bridge.set_bridge('torch')
import random


## Configuration

In [6]:
DATA_DIR = 'data'
FILMS_DIR = os.path.join(DATA_DIR, 'films')
SPLIT_FILE = 'split.csv'
MODEL_CKPT = "facebook/timesformer-base-finetuned-k400"
BATCH_SIZE = 4
NUM_FRAMES = 8 # Number of frames to sample per clip
RESIZE_TO = 224

## Data Preprocessing

We need to parse the CSV files to identify labeled segments (clips). 
Each row in a CSV corresponds to a frame. We look for continuous segments of valid labels.

In [7]:
train_clips, test_clips, unique_labels = prepare_dataset(SPLIT_FILE)
print(f"Train clips: {len(train_clips)}, Test clips: {len(test_clips)}")
print(f"Unique labels: {unique_labels}")

# Class Counting (Original)
train_labels = [c['label'] for c in train_clips]
print("\nClass Distribution (Train - Original):")
print(Counter(train_labels))

# Balancing - Undersample Class 1 (or majority class)
# Target -> ~160
TARGET_COUNT = 160
random.seed(42)

# Identify class 1 clips
class_1_clips = [c for c in train_clips if c['label'] == 1]
other_clips = [c for c in train_clips if c['label'] != 1]

if len(class_1_clips) > TARGET_COUNT:
    print(f"\nUndersampling Class 1 from {len(class_1_clips)} to {TARGET_COUNT}...")
    class_1_clips = random.sample(class_1_clips, TARGET_COUNT)

train_clips = other_clips + class_1_clips
random.shuffle(train_clips)

# Verify New Distribution
train_labels_balanced = [c['label'] for c in train_clips]
print("\nClass Distribution (Train - Balanced):")
print(Counter(train_labels_balanced))

test_labels = [c['label'] for c in test_clips]
print("\nClass Distribution (Test):")
print(Counter(test_labels))


Processing 60 files...
Missing file for ID: 027_e84uoke1
Train clips: 3165, Test clips: 3179
Unique labels: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]

Class Distribution (Train - Original):
Counter({1: 892, 12: 166, 11: 158, 2: 156, 4: 155, 7: 154, 14: 153, 5: 152, 10: 150, 13: 149, 16: 149, 3: 147, 6: 147, 15: 147, 8: 146, 9: 144})

Undersampling Class 1 from 892 to 160...

Class Distribution (Train - Balanced):
Counter({12: 166, 1: 160, 11: 158, 2: 156, 4: 155, 7: 154, 14: 153, 5: 152, 10: 150, 13: 149, 16: 149, 6: 147, 15: 147, 3: 147, 8: 146, 9: 144})

Class Distribution (Test):
Counter({1: 906, 7: 163, 5: 157, 12: 157, 2: 155, 3: 152, 4: 152, 8: 152, 9: 152, 13: 152, 10: 150, 11: 150, 14: 147, 15: 146, 16: 145, 6: 143})


In [8]:
# Create label mappings
# Convert numpy types to native python types for JSON serialization
label2id = {int(label): int(i) for i, label in enumerate(unique_labels)}
id2label = {int(i): int(label) for label, i in label2id.items()}
print("Label Mappings:", label2id)

Label Mappings: {1: 0, 2: 1, 3: 2, 4: 3, 5: 4, 6: 5, 7: 6, 8: 7, 9: 8, 10: 9, 11: 10, 12: 11, 13: 12, 14: 13, 15: 14, 16: 15}


## Dataset Class

In [33]:
class ExerciseDataset(Dataset):
    def __init__(self, clips, processor, num_frames=8):
        self.clips = clips
        self.processor = processor
        self.num_frames = num_frames

    def __len__(self):
        return len(self.clips)

    def __getitem__(self, idx):
        clip = self.clips[idx]
        video_path = clip['video_path']
        start_f = clip['start_frame']
        end_f = clip['end_frame']
        label = clip['label']

        # Load video
        try:
            vr = VideoReader(video_path, ctx=cpu(0))
        except Exception as e:
            print(f"Error reading {video_path}: {e}")
            return self.__getitem__((idx + 1) % len(self))

        total_frames = len(vr)
        
        start_f = max(0, min(start_f, total_frames - 1))
        end_f = max(0, min(end_f, total_frames - 1))
        
        if start_f >= end_f:
             indices = [start_f] * self.num_frames
        else:
            # Sample indices evenly
            indices = np.linspace(start_f, end_f, self.num_frames).astype(int)

        video = vr.get_batch(indices)
        if hasattr(video, 'asnumpy'):
            video = video.asnumpy()
        else:
            video = video.numpy()
        
        # Normalize and process
        inputs = self.processor(list(video), return_tensors="pt")
        
        return {
            "pixel_values": inputs.pixel_values.squeeze(), # (num_frames, 3, 224, 224)
            "labels": torch.tensor(label2id[label])
        }

## Training Setup

In [None]:
def train_and_evaluate(num_frames):
    print(f"\n=== Training with NUM_FRAMES={num_frames} ===")
    
    # Processor and Model
    # Re-init model each time to start fresh
    processor = AutoImageProcessor.from_pretrained(MODEL_CKPT)
    model = TimesformerForVideoClassification.from_pretrained(
        MODEL_CKPT,
        num_labels=len(unique_labels),
        id2label=id2label,
        label2id=label2id,
        ignore_mismatched_sizes=True 
    )
    
    # Create datasets with specific num_frames
    train_ds = ExerciseDataset(train_clips, processor, num_frames=num_frames)
    test_ds = ExerciseDataset(test_clips, processor, num_frames=num_frames)
    
    current_output_dir = f"timesformer_results_frames_{num_frames}"
    args = TrainingArguments(
        output_dir=current_output_dir,
        remove_unused_columns=False,
        eval_strategy="epoch",
        save_strategy="epoch",
        learning_rate=5e-5,
        per_device_train_batch_size=BATCH_SIZE,
        per_device_eval_batch_size=BATCH_SIZE,
        gradient_accumulation_steps=4, 
        warmup_ratio=0.1,
        logging_steps=10,
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        num_train_epochs=3,
        fp16=True if torch.cuda.is_available() else False,
        report_to="none",
        disable_tqdm=False,
    )
    
    def compute_metrics(eval_pred):
        predictions, labels = eval_pred
        predictions = np.argmax(predictions, axis=1)
        # Use macro F1 for multi-class balance awareness during training? Or just accuracy.
        # Keeping accuracy for selection, but will report F1 later.
        return {"accuracy": accuracy_score(labels, predictions)}
    
    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=train_ds,
        eval_dataset=test_ds,
        compute_metrics=compute_metrics,
    )
    
    trainer.train()
    
    # Evaluate to get final accuracy
    metrics = trainer.evaluate()
    accuracy = metrics['eval_accuracy']
    return accuracy, trainer, model

# Binary Search / Ternary Search Strategy
def search_optimal_frames(low, high):
    print(f"Starting Search in range [{low}, {high}]")
    results = {}
    
    def get_score(n):
        if n in results:
            return results[n][0]
        acc, tr, md = train_and_evaluate(n)
        results[n] = (acc, tr, md)
        return acc
    
    l, h = low, high
    
    # We iterate until range is small
    best_n = l
    best_val = -1.0
    
    # Initial bounds
    score_l = get_score(l)
    score_h = get_score(h)
    
    if score_l > best_val:
        best_val = score_l
        best_n = l
    if score_h > best_val:
        best_val = score_h
        best_n = h
        
    while (h - l) > 1:
        mid = (l + h) // 2
        if mid == l or mid == h: break
        
        score_m = get_score(mid)
        
        if score_m > best_val:
            best_val = score_m
            best_n = mid
            
        # Decision logic: Move towards the higher side
        if score_l < score_h:
            # Right side seems promising
            l = mid
            score_l = score_m
        else:
            # Left side seems promising
            h = mid
            score_h = score_m
            
    return best_n, results[best_n][1]

# Run the search
# Using range [4, 16] as a reasonable window around the user's 4,8,12 suggestion
best_frames, trainer = search_optimal_frames(4, 12)
print(f"\nSearch Complete. Best Frames: {best_frames}")


Some weights of TimesformerForVideoClassification were not initialized from the model checkpoint at facebook/timesformer-base-finetuned-k400 and are newly initialized because the shapes did not match:
- classifier.weight: found shape torch.Size([400, 768]) in the checkpoint and torch.Size([16, 768]) in the model instantiated
- classifier.bias: found shape torch.Size([400]) in the checkpoint and torch.Size([16]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...


Epoch,Training Loss,Validation Loss,Accuracy
1,0.0351,0.230442,0.944322
2,0.0164,0.183605,0.958792
3,0.0011,0.177067,0.960994


TrainOutput(global_step=594, training_loss=0.20587469969785174, metrics={'train_runtime': 1995.768, 'train_samples_per_second': 4.758, 'train_steps_per_second': 0.298, 'total_flos': 8.319757294526792e+18, 'train_loss': 0.20587469969785174, 'epoch': 3.0})

In [37]:
trainer.save_model(f"./models/best_model_frames_{best_frames}")

# Final Evaluation Metrics
test_ds = ExerciseDataset(test_clips, processor, num_frames=best_frames)
preds_output = trainer.predict(test_ds)
y_preds = np.argmax(preds_output.predictions, axis=1)
y_true = preds_output.label_ids

print("\nClassification Report:")
print(classification_report(y_true, y_preds, target_names=[str(l) for l in unique_labels]))

print("Confusion Matrix:")
print(confusion_matrix(y_true, y_preds))


In [38]:
test_model = AutoModelForVideoClassification.from_pretrained("./models/model1")
print("Model wczytany pomyślnie!")

Model wczytany pomyślnie!
