# Video Classification with VideoMAE Tiny Model
This notebook demonstrates how to fine-tune a much smaller VideoMAE model ("tiny" configuration) for the Violence-XD dataset. The model is initialized from scratch with a reduced number of layers and parameters, making it more suitable for resource-constrained environments.

In [24]:
# Install required libraries
!pip install transformers pytorchvideo datasets evaluate



In [25]:
import os
import torch
from huggingface_hub import HfFolder

# Read token from environment variable (more secure)
token = os.getenv("HUGGINGFACE_TOKEN")
if token:
    HfFolder.save_token(token)
    print("Hugging Face token successfully loaded from HUGGINGFACE_TOKEN environment variable.")
else:
    print("HUGGINGFACE_TOKEN environment variable not set. If you want to push models to the Hub, please set this variable before starting Jupyter Lab.")

Hugging Face token successfully loaded from HUGGINGFACE_TOKEN environment variable.


## Load Violence XD dataset

In [26]:
# Set the path to the local processed dataset folder
dataset_root_path = "processed_dataset"
all_video_file_paths = []

with open(os.path.join(dataset_root_path, "train.csv"), "r") as f:
    train_paths = [line.strip().split()[0] for line in f.readlines()]
    all_video_file_paths.extend([os.path.join(dataset_root_path, path) for path in train_paths])
with open(os.path.join(dataset_root_path, "val.csv"), "r") as f:
    val_paths = [line.strip().split()[0] for line in f.readlines()]
    all_video_file_paths.extend([os.path.join(dataset_root_path, path) for path in val_paths])
with open(os.path.join(dataset_root_path, "test.csv"), "r") as f:
    test_paths = [line.strip().split()[0] for line in f.readlines()]
    all_video_file_paths.extend([os.path.join(dataset_root_path, path) for path in test_paths])
print(f"Total video files: {len(all_video_file_paths)}")

Total video files: 4227


In [27]:
# Get labels from CSV files
labels = []
for split in ["train.csv", "val.csv", "test.csv"]:
    with open(os.path.join(dataset_root_path, split), "r") as f:
        for line in f.readlines():
            parts = line.strip().split()
            if len(parts) > 1:
                labels.append(parts[1])
class_labels = sorted(set(labels))
label2id = {label: i for i, label in enumerate(class_labels)}
id2label = {i: label for label, i in label2id.items()}
print(f"Unique classes: {len(label2id)}.")
print(f"Class labels: {class_labels}")

Unique classes: 7.
Class labels: ['A', 'B1', 'B2', 'B4', 'B5', 'B6', 'G']


In [28]:
import torch

# Main category counts for the training set (as provided by the user)
main_category_counts_train = {
    'A': 1632,
    'B1': 389,
    'B2': 353,
    'B6': 352,
    'G': 309,
    'B4': 297,
    'B5': 40
}

# Calculate total number of main category occurrences and number of unique main categories
total_main_category_occurrences = sum(main_category_counts_train.values())
num_main_categories = len(main_category_counts_train)

# Calculate weights for each main category
# Formula: total_samples / (num_classes * class_count)
# This gives higher weight to less frequent classes.
weights_main_cat = {}
for category, count in main_category_counts_train.items():
    weights_main_cat[category] = total_main_category_occurrences / (num_main_categories * count)
print(f"Calculated main category weights: {weights_main_cat}")

# id2label is available from the previous cell
# Create a tensor for class weights, one weight for each granular label
# The weight for a granular label will be the weight of its main category
num_granular_labels = len(id2label)
granular_class_weights_list = [0.0] * num_granular_labels

for i in range(num_granular_labels):
    granular_label_str = id2label[i]
    main_category_of_granular = granular_label_str.split('-')[0]
    if main_category_of_granular in weights_main_cat:
        granular_class_weights_list[i] = weights_main_cat[main_category_of_granular]
    else:
        print(f"Warning: Main category '{main_category_of_granular}' for label '{granular_label_str}' (ID: {i}) not found in provided main category counts. Assigning weight 1.0.")
        granular_class_weights_list[i] = 1.0 

# Convert to a PyTorch tensor
granular_class_weights_tensor = torch.tensor(granular_class_weights_list, dtype=torch.float)
print(f"Shape of granular_class_weights_tensor: {granular_class_weights_tensor.shape}")
# print(f"First few granular weights: {granular_class_weights_tensor[:10]}")

# Ensure the tensor has the correct number of elements
assert len(granular_class_weights_tensor) == num_granular_labels, "Mismatch in granular_class_weights_tensor length"


Calculated main category weights: {'A': 0.29516806722689076, 'B1': 1.2383400661035622, 'B2': 1.3646297045730473, 'B6': 1.3685064935064934, 'G': 1.5589459084604715, 'B4': 1.621933621933622, 'B5': 12.042857142857143}
Shape of granular_class_weights_tensor: torch.Size([7])


## Define and initialize a tiny VideoMAE model

In [29]:
from transformers import VideoMAEConfig, VideoMAEForVideoClassification, VideoMAEImageProcessor

compact_config = VideoMAEConfig(
    num_hidden_layers=4,  # Reduced from 6 to 4
    hidden_size=384,
    intermediate_size=1536,
    num_attention_heads=6,
    image_size=224,
    num_frames=16,
    num_labels=len(label2id),
    label2id=label2id,
    id2label=id2label,
    mask_ratio=0.0,
)

model = VideoMAEForVideoClassification(compact_config)
image_processor = VideoMAEImageProcessor.from_pretrained("MCG-NJU/videomae-base")

## Prepare the datasets for training

In [30]:
import pytorchvideo.data
from pytorchvideo.transforms import (
    ApplyTransformToKey, Normalize, RandomShortSideScale, RemoveKey, ShortSideScale, UniformTemporalSubsample,
)
from torchvision.transforms import (
    Compose, Lambda, RandomCrop, RandomHorizontalFlip, Resize,
)

mean = image_processor.image_mean
std = image_processor.image_std
if "shortest_edge" in image_processor.size:
    height = width = image_processor.size["shortest_edge"]
else:
    height = image_processor.size["height"]
    width = image_processor.size["width"]
resize_to = (height, width)

num_frames_to_sample = model.config.num_frames
sample_rate = 4
fps = 30
clip_duration = num_frames_to_sample * sample_rate / fps

batch_size = 8

train_transform = Compose([
    ApplyTransformToKey(
        key="video",
        transform=Compose([
            UniformTemporalSubsample(num_frames_to_sample),
            Lambda(lambda x: x / 255.0),
            Normalize(mean, std),
            RandomShortSideScale(min_size=256, max_size=320),
            RandomCrop(resize_to),
            RandomHorizontalFlip(p=0.5),
        ]),
    ),
])

def load_labeled_video_paths(csv_filename, root_dir_for_csv_paths, label_to_id_map):
    labeled_paths = []
    csv_path = os.path.join(root_dir_for_csv_paths, csv_filename)
    with open(csv_path, "r") as f:
        for line in f.readlines():
            parts = line.strip().split()
            if len(parts) >= 2:
                video_path_in_csv = parts[0]
                label_str = parts[1]
                full_video_path = os.path.join(root_dir_for_csv_paths, video_path_in_csv)
                if label_str in label_to_id_map:
                    label_id = label_to_id_map[label_str]
                    labeled_paths.append((full_video_path, {"label": label_id}))
                else:
                    print(f"Warning: Label '{label_str}' not in label2id map for video {full_video_path}. Skipping.")
            elif line.strip():
                print(f"Warning: Malformed line in {csv_path}: '{line.strip()}'")
    return labeled_paths

labeled_video_paths_train = load_labeled_video_paths("train.csv", dataset_root_path, label2id)
train_dataset = pytorchvideo.data.LabeledVideoDataset(
    labeled_video_paths=labeled_video_paths_train,
    clip_sampler=pytorchvideo.data.make_clip_sampler("random", clip_duration),
    decode_audio=False,
    transform=train_transform,
)

In [31]:
val_transform = Compose([
    ApplyTransformToKey(
        key="video",
        transform=Compose([
            UniformTemporalSubsample(num_frames_to_sample),
            Lambda(lambda x: x / 255.0),
            Normalize(mean, std),
            Resize(resize_to),
        ]),
    ),
])

labeled_video_paths_val = load_labeled_video_paths("val.csv", dataset_root_path, label2id)
val_dataset = pytorchvideo.data.LabeledVideoDataset(
    labeled_video_paths=labeled_video_paths_val,
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)

labeled_video_paths_test = load_labeled_video_paths("test.csv", dataset_root_path, label2id)
test_dataset = pytorchvideo.data.LabeledVideoDataset(
    labeled_video_paths=labeled_video_paths_test,
    clip_sampler=pytorchvideo.data.make_clip_sampler("uniform", clip_duration),
    decode_audio=False,
    transform=val_transform,
)

In [32]:
print(train_dataset.num_videos, val_dataset.num_videos, test_dataset.num_videos)

3372 430 425


## Training and evaluation setup

In [33]:
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate
import torch # Ensure torch is imported for nn.CrossEntropyLoss

model_name = "videomae-tiny"
new_model_name = f"{model_name}-finetuned-xd-violence"
num_epochs = 4

args = TrainingArguments(
    new_model_name,
    remove_unused_columns=False,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-4,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=True,
    max_steps=(train_dataset.num_videos // batch_size) * num_epochs,
)

metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

def collate_fn(examples):
    # Permute to (num_frames, num_channels, height, width)
    pixel_values = torch.stack([
        example["video"].permute(1, 0, 2, 3) for example in examples
    ])
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

# Define the custom Trainer with weighted loss
class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        # class_weights should be a tensor
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs): # MODIFIED SIGNATURE
        labels = inputs.pop("labels")
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.logits
        
        # Move class_weights to the device of the logits
        weights_on_device = self.class_weights.to(logits.device)
        
        # Define the loss function with weights
        loss_fct = torch.nn.CrossEntropyLoss(weight=weights_on_device)
        
        # Compute loss
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        
        return (loss, outputs) if return_outputs else loss

# Instantiate the custom trainer
# granular_class_weights_tensor is computed in the cell inserted before this block
trainer = WeightedLossTrainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=image_processor, # Though not strictly a tokenizer, it's used for processor/config by Trainer
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
    class_weights=granular_class_weights_tensor # Pass the calculated weights here
)

  super().__init__(*args, **kwargs)


In [34]:
from transformers import TrainingArguments, Trainer
import numpy as np
import evaluate
import torch # Ensure torch is imported for nn.CrossEntropyLoss

model_name = "videomae-tiny"
new_model_name = f"{model_name}-finetuned-xd-violence"
num_epochs = 4

args = TrainingArguments(
    new_model_name,
    remove_unused_columns=False,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-4,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    warmup_ratio=0.1,
    logging_steps=10,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    push_to_hub=True,
    max_steps=(train_dataset.num_videos // batch_size) * num_epochs,
)

metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    predictions = np.argmax(eval_pred.predictions, axis=1)
    return metric.compute(predictions=predictions, references=eval_pred.label_ids)

def collate_fn(examples):
    # Permute to (num_frames, num_channels, height, width)
    pixel_values = torch.stack([
        example["video"].permute(1, 0, 2, 3) for example in examples
    ])
    labels = torch.tensor([example["label"] for example in examples])
    return {"pixel_values": pixel_values, "labels": labels}

# Define the custom Trainer with weighted loss
class WeightedLossTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        # class_weights should be a tensor
        self.class_weights = class_weights

    def compute_loss(self, model, inputs, return_outputs=False, **kwargs): # MODIFIED SIGNATURE to include **kwargs
        labels = inputs.pop("labels")
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.logits
        
        # Move class_weights to the device of the logits
        weights_on_device = self.class_weights.to(logits.device)
        
        # Define the loss function with weights
        loss_fct = torch.nn.CrossEntropyLoss(weight=weights_on_device)
        
        # Compute loss
        loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        
        return (loss, outputs) if return_outputs else loss

# Instantiate the custom trainer
# granular_class_weights_tensor is computed in the cell inserted before this block
trainer = WeightedLossTrainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=image_processor, # Though not strictly a tokenizer, it's used for processor/config by Trainer
    compute_metrics=compute_metrics,
    data_collator=collate_fn,
    class_weights=granular_class_weights_tensor # Pass the calculated weights here
)

train_results = trainer.train()

  super().__init__(*args, **kwargs)


Epoch,Training Loss,Validation Loss,Accuracy
0,1.9347,1.521377,0.560329
1,1.7075,1.616424,0.324401
2,1.7025,1.70062,0.311433
3,1.8044,1.52143,0.414484




In [26]:
trainer.push_to_hub()

events.out.tfevents.1749831415.DESKTOP-JCNIME4:   0%|          | 0.00/41.8k [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/mitegvg/videomae-tiny-finetuned-xd-violence/commit/30f8443baa75990db8d6bfd7a34d573ae956141d', commit_message='End of training', commit_description='', oid='30f8443baa75990db8d6bfd7a34d573ae956141d', pr_url=None, repo_url=RepoUrl('https://huggingface.co/mitegvg/videomae-tiny-finetuned-xd-violence', endpoint='https://huggingface.co', repo_type='model', repo_id='mitegvg/videomae-tiny-finetuned-xd-violence'), pr_revision=None, pr_num=None)

## Inference

In [27]:
from transformers import pipeline
import os

local_model_directory = new_model_name
absolute_model_path = os.path.abspath(local_model_directory)

video_cls = pipeline(task="video-classification", model=absolute_model_path)

test_video = next(iter(test_dataset))["video"]
inputs = {"pixel_values": test_video.permute(1, 0, 2, 3).unsqueeze(0)}
with torch.no_grad():
    outputs = model(**inputs)
    logits = outputs.logits
predicted_class_idx = logits.argmax(-1).item()
print("Predicted class:", model.config.id2label[predicted_class_idx])

Device set to use cuda:0


RuntimeError: Input type (torch.FloatTensor) and weight type (torch.cuda.FloatTensor) should be the same or input should be a MKLDNN tensor and weight is a dense tensor

## Evaluate

In [1]:
import os
from transformers import pipeline
import torch # For checking device
import time # For timing inference

print("Starting evaluation on the full test set...")

# Define paths (relative to the notebook location)
dataset_root_path = "processed_dataset"
test_csv_filename = "test.csv"
test_csv_path = os.path.join(dataset_root_path, test_csv_filename)

local_model_directory = "videomae-tiny-finetuned-xd-violence"
absolute_model_path = os.path.abspath(local_model_directory)

# Function to load test data (video paths and true labels)
def load_test_data_from_csv(csv_file_path, data_root_path):
    test_samples = []
    if not os.path.exists(csv_file_path):
        print(f"ERROR: Test CSV file not found at {csv_file_path}")
        return test_samples
        
    with open(csv_file_path, "r") as f:
        for line in f.readlines():
            parts = line.strip().split()
            if len(parts) >= 2:
                relative_video_path = parts[0]  # e.g., "videos/video_000000.mp4"
                true_label_str = parts[1]       # e.g., "A", "B1"
                
                # Construct the full path to the video file
                # data_root_path is dataset_root_path (e.g., "processed_dataset")
                full_video_path = os.path.normpath(os.path.join(data_root_path, relative_video_path))
                test_samples.append((full_video_path, true_label_str))
            elif line.strip(): # Avoid warning for empty lines if any
                print(f"Warning: Malformed line in {csv_file_path}: '{line.strip()}'") # Corrected escaping here
    print(f"Loaded {len(test_samples)} samples from {csv_file_path}")
    return test_samples

# Initialize the video classification pipeline
video_cls = None
print(f"Attempting to load model from: {absolute_model_path}")
if not os.path.isdir(absolute_model_path):
    print(f"ERROR: Model directory not found at {absolute_model_path}")
else:
    print(f"Model directory found. Initializing pipeline...")
    try:
        video_cls = pipeline(
            task="video-classification",
            model=absolute_model_path,
            device=0 if torch.cuda.is_available() else -1 # Use GPU if available
        )
        print(f"Pipeline initialized. Using device: {'cuda:0' if torch.cuda.is_available() else 'cpu'}")
    except Exception as e:
        print(f"Error initializing pipeline: {e}")

if video_cls:
    # Load test data
    test_data = load_test_data_from_csv(test_csv_path, dataset_root_path)

    if test_data:
        top1_correct_predictions = 0
        top5_correct_predictions = 0
        total_videos_processed = 0
        inference_times = []  # Store inference times

        print(f"\nStarting inference on {len(test_data)} test videos...") # Corrected escaping for newline
        for i, (video_path, true_label) in enumerate(test_data):
            if not os.path.exists(video_path):
                print(f"Warning: Video file not found at {video_path}. Skipping.")
                continue

            try:
                # Measure inference time
                start_time = time.time()
                raw_results = video_cls(video_path)
                end_time = time.time()
                inference_times.append(end_time - start_time)
                total_videos_processed += 1

                if not raw_results:
                    print(f"Warning: No results returned for video {video_path}. Skipping.")
                    continue
                
                # Extract top 5 predicted labels (main part, e.g., "B4" from "B4-0-0")
                # Corrected line:
                predicted_labels_top5 = [res['label'].split('-')[0] for res in raw_results[:5]]

                if not predicted_labels_top5:
                    print(f"Warning: Could not extract top 5 labels for {video_path}. Skipping.")
                    continue
                    
                predicted_label_top1 = predicted_labels_top5[0]

                # Check Top-1 accuracy
                if predicted_label_top1 == true_label:
                    top1_correct_predictions += 1
                
                # Check Top-5 accuracy
                if true_label in predicted_labels_top5:
                    top5_correct_predictions += 1
                
                if (i + 1) % 10 == 0 or (i + 1) == len(test_data): # Print progress
                    print(f"  Processed {i + 1}/{len(test_data)} videos...")

            except Exception as e:
                print(f"An error occurred during processing of {video_path}: {e}")

        # Calculate accuracies
        if total_videos_processed > 0:
            top1_accuracy = (top1_correct_predictions / total_videos_processed) * 100
            top5_accuracy = (top5_correct_predictions / total_videos_processed) * 100
            avg_inference_time = sum(inference_times) / len(inference_times)
            fps = 1.0 / avg_inference_time if avg_inference_time > 0 else float('inf')
            
            print("\n--- Evaluation Complete ---") # Corrected escaping for newline
            print(f"Total videos processed: {total_videos_processed}")
            print(f"Top-1 Correct Predictions: {top1_correct_predictions}")
            print(f"Top-5 Correct Predictions: {top5_correct_predictions}")
            print(f"Top-1 Accuracy: {top1_accuracy:.2f}%")
            print(f"Top-5 Accuracy: {top5_accuracy:.2f}%")
            print(f"Average inference time per video: {avg_inference_time:.3f} seconds ({fps:.2f} videos/sec)")
        else:
            print("\n--- Evaluation Complete ---") # Corrected escaping for newline
            print("No videos were processed successfully.")
    else:
        print("No test data loaded. Cannot perform evaluation.")
else:
    print("Video classification pipeline not initialized. Cannot perform evaluation.")

Starting evaluation on the full test set...
Attempting to load model from: D:\BIRKBECK\REPOS\videomae-base-finetuned-xd-violence\videomae-tiny-finetuned-xd-violence
Model directory found. Initializing pipeline...


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Device set to use cuda:0


Pipeline initialized. Using device: cuda:0
Loaded 425 samples from processed_dataset\test.csv

Starting inference on 425 test videos...


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


  Processed 10/425 videos...
  Processed 20/425 videos...
  Processed 30/425 videos...
  Processed 40/425 videos...
  Processed 50/425 videos...
  Processed 60/425 videos...
  Processed 70/425 videos...
  Processed 80/425 videos...
  Processed 90/425 videos...
  Processed 100/425 videos...
  Processed 110/425 videos...
  Processed 120/425 videos...
  Processed 130/425 videos...
  Processed 140/425 videos...
  Processed 150/425 videos...
  Processed 160/425 videos...
  Processed 170/425 videos...
  Processed 180/425 videos...
  Processed 190/425 videos...
  Processed 200/425 videos...
  Processed 210/425 videos...
  Processed 220/425 videos...
  Processed 230/425 videos...
  Processed 240/425 videos...
  Processed 250/425 videos...
  Processed 260/425 videos...
  Processed 270/425 videos...
  Processed 280/425 videos...
  Processed 290/425 videos...
  Processed 300/425 videos...
  Processed 310/425 videos...
  Processed 320/425 videos...
  Processed 330/425 videos...
  Processed 340/425

moov atom not found


An error occurred during processing of processed_dataset\videos\video_000844.mp4: [Errno 1094995529] Invalid data found when processing input: 'processed_dataset\\videos\\video_000844.mp4'; last error log: [mov,mp4,m4a,3gp,3g2,mj2] moov atom not found

--- Evaluation Complete ---
Total videos processed: 357
Top-1 Correct Predictions: 141
Top-5 Correct Predictions: 321
Top-1 Accuracy: 39.50%
Top-5 Accuracy: 89.92%
Average inference time per video: 0.069 seconds (14.54 videos/sec)


## FPS Benchmark: VideoMAE Base (HuggingFace)
This cell benchmarks the inference speed (FPS) of the original VideoMAE Base model from HuggingFace on all valid test videos.

In [14]:
from transformers import pipeline
import torch
import time
import os

# Use the official VideoMAE Base model from HuggingFace
base_model_id = "MCG-NJU/videomae-base"

# Initialize pipeline for video classification
video_cls_base = pipeline(
    task="video-classification",
    model=base_model_id,
    device=0 if torch.cuda.is_available() else -1
)

# Load all test videos for benchmarking
dataset_root_path = "processed_dataset"
test_csv_path = os.path.join(dataset_root_path, "test.csv")

# Helper to load all video paths
video_paths = []
with open(test_csv_path, "r") as f:
    for line in f:
        parts = line.strip().split()
        if len(parts) >= 1:
            rel_path = parts[0]
            full_path = os.path.normpath(os.path.join(dataset_root_path, rel_path))
            if os.path.exists(full_path):
                video_paths.append(full_path)

# Benchmark FPS on all test videos
inference_times = []
valid_videos = 0
for i, video_path in enumerate(video_paths):
    try:
        # Try opening with PyAV first to check for corruption before pipeline
        import av
        try:
            container = av.open(video_path)
            container.close()
        except Exception as av_err:
            print(f"Skipping {video_path}: PyAV error: {av_err}")
            continue
        start = time.time()
        _ = video_cls_base(video_path)
        end = time.time()
        inference_times.append(end - start)
        valid_videos += 1
    except Exception as e:
        print(f"Skipping {video_path}: {e}")
    if (i + 1) % 10 == 0 or (i + 1) == len(video_paths):
        print(f"  Attempted {i + 1}/{len(video_paths)} videos...")

if inference_times:
    avg_time = sum(inference_times) / len(inference_times)
    fps = 1.0 / avg_time if avg_time > 0 else float('inf')
    print(f"VideoMAE Base - Average inference time per video: {avg_time:.3f} seconds (FPS: {fps:.2f}) on {valid_videos} valid videos.")
else:
    print("No valid test videos found for benchmarking.")

Some weights of VideoMAEForVideoClassification were not initialized from the model checkpoint at MCG-NJU/videomae-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Device set to use cuda:0


  Attempted 10/330 videos...
  Attempted 20/330 videos...
  Attempted 30/330 videos...
  Attempted 40/330 videos...
  Attempted 50/330 videos...
  Attempted 60/330 videos...
  Attempted 70/330 videos...
  Attempted 80/330 videos...
  Attempted 90/330 videos...
  Attempted 100/330 videos...
  Attempted 110/330 videos...
  Attempted 120/330 videos...
  Attempted 130/330 videos...
  Attempted 140/330 videos...
  Attempted 150/330 videos...
  Attempted 160/330 videos...
  Attempted 170/330 videos...
  Attempted 180/330 videos...
  Attempted 190/330 videos...
  Attempted 200/330 videos...
  Attempted 210/330 videos...
  Attempted 220/330 videos...
  Attempted 230/330 videos...
  Attempted 240/330 videos...
  Attempted 250/330 videos...
  Attempted 260/330 videos...
  Attempted 270/330 videos...
  Attempted 280/330 videos...
  Attempted 290/330 videos...
  Attempted 300/330 videos...
  Attempted 310/330 videos...
  Attempted 320/330 videos...
Skipping processed_dataset\videos\video_000844.mp

## Prune and Quantize the Fine-tuned Model
This cell demonstrates how to prune and quantize the trained VideoMAE model to further reduce its size and improve inference speed. Pruning removes less important weights, and quantization reduces the precision of weights from float32 to int8.

In [2]:
import torch
from torch.nn.utils import prune
import os

# Load the fine-tuned model
from transformers import VideoMAEForVideoClassification
model_dir = "videomae-tiny-finetuned-xd-violence"
model = VideoMAEForVideoClassification.from_pretrained(model_dir)

# PRUNING: Prune 30% of the weights in all Linear layers
parameters_to_prune = []
for name, module in model.named_modules():
    if isinstance(module, torch.nn.Linear):
        parameters_to_prune.append((module, 'weight'))
if parameters_to_prune:
    prune.global_unstructured(
        parameters_to_prune,
        pruning_method=prune.L1Unstructured,
        amount=0.3,  # Prune 30% of weights globally
    )
    # Remove pruning re-parametrization so the model can be saved normally
    for module, _ in parameters_to_prune:
        prune.remove(module, 'weight')
    print(f"Pruned {len(parameters_to_prune)} Linear layers.")
else:
    print("No Linear layers found for pruning.")

# QUANTIZATION: Convert model to dynamic quantized version (int8 for Linear layers)
quantized_model = torch.quantization.quantize_dynamic(
    model, {torch.nn.Linear}, dtype=torch.qint8
)

# Save the quantized and pruned model using torch.save
quantized_dir = model_dir + "-pruned-quantized"
os.makedirs(quantized_dir, exist_ok=True)
torch.save(quantized_model.state_dict(), os.path.join(quantized_dir, "pytorch_model.bin"))
model.config.save_pretrained(quantized_dir)
print(f"Quantized and pruned model weights saved to {quantized_dir}/pytorch_model.bin")
print(f"Model config saved to {quantized_dir}/config.json")

Pruned 25 Linear layers.
Quantized and pruned model weights saved to videomae-tiny-finetuned-xd-violence-pruned-quantized/pytorch_model.bin
Model config saved to videomae-tiny-finetuned-xd-violence-pruned-quantized/config.json


You can now use the pruned and quantized model for inference as usual, or compare its file size and speed to the original.

## Evaluate the Pruned and Quantized Model
This cell loads the pruned and quantized model and evaluates its top-1 and top-5 accuracy on the test set.

In [36]:
import torch
from transformers import VideoMAEForVideoClassification, VideoMAEConfig
import os

# Load config and original (float) model
quantized_dir = "videomae-tiny-finetuned-xd-violence-pruned-quantized"
config = VideoMAEConfig.from_pretrained(quantized_dir)
model = VideoMAEForVideoClassification.from_pretrained("videomae-tiny-finetuned-xd-violence", config=config)

# Quantize in memory
model = torch.quantization.quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8)

model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Evaluate on test set
top1_correct = 0
top5_correct = 0
total = 0

for sample in test_dataset:
    video = sample["video"].to(device)  # (C, T, H, W)
    label = sample["label"]
    # Permute and unsqueeze to (1, T, C, H, W)
    video = video.permute(1, 0, 2, 3).unsqueeze(0)
    with torch.no_grad():
        outputs = model(pixel_values=video)
        logits = outputs.logits
        top5 = torch.topk(logits, k=5, dim=-1).indices[0].cpu().numpy()
        pred = logits.argmax(-1).item()
        if pred == label:
            top1_correct += 1
        if label in top5:
            top5_correct += 1
        total += 1

print(f"Pruned+Quantized Model - Top-1 Accuracy: {100*top1_correct/total:.2f}%")
print(f"Pruned+Quantized Model - Top-5 Accuracy: {100*top5_correct/total:.2f}%")

TypeError: linear(): argument 'weight' must be Tensor, not method