## Clips generation (sliding window)

In [None]:
import os
import json
from moviepy import VideoFileClip
import random
import warnings
warnings.filterwarnings('ignore')

def extract_event_clips_soccernet():
    """
    Extract clips for multiple events from SoccerNet dataset with different extraction strategies per event type.
    """
    # Base directory containing league folders
    base_dir = "F:/AIM Lab/Project/SoccerNet"
    clips_base_dir = "F:/AIM Lab/Experiment/sliding-window/Clips"
    
    # Create base clips directory if it doesn't exist
    os.makedirs(clips_base_dir, exist_ok=True)
    
    # Event types to process with their extraction strategies
    event_types = ["Goal", "Red card", "Yellow card", "Direct free-kick", "Penalty", "Indirect free-kick", "Corner", "Substitution", "Shots on target"]
    
    # Events that need only first occurrence
    first_occurrence_events = ["Yellow card", "Direct free-kick", "Indirect free-kick", "Corner", "Substitution", "Shots on target"]
    
    # Events that need all occurrences with sliding window
    sliding_window_events = ["Penalty", "Red card"]
    
    # Events that need all occurrences (normal extraction)
    all_occurrences_events = ["Goal"]
    
    event_dirs = {event: os.path.join(clips_base_dir, event.replace(" ", "_")) for event in event_types}
    random_clips_dir = os.path.join(clips_base_dir, "no_event")
    
    # Create directories for each event and random clips
    for event_dir in event_dirs.values():
        os.makedirs(event_dir, exist_ok=True)
    os.makedirs(random_clips_dir, exist_ok=True)
    
    # Counters for clips
    event_counters = {event: 1 for event in event_types}
    random_counter = 1
    
    # Iterate through league folders
    for league_folder in os.listdir(base_dir):
        league_path = os.path.join(base_dir, league_folder)
        if os.path.isdir(league_path):
            print(f"Processing league: {league_folder}")
            
            # Iterate through season folders
            for season_folder in os.listdir(league_path):
                season_path = os.path.join(league_path, season_folder)
                if os.path.isdir(season_path):
                    print(f"  Processing season: {season_folder}")
                    
                    # Iterate through match folders
                    for match_folder in os.listdir(season_path):
                        match_path = os.path.join(season_path, match_folder)
                        if os.path.isdir(match_path):
                            print(f"    Processing match: {match_folder}")
                            
                            # Look for Labels-v2.json and 2_720p.mkv files
                            labels_file = os.path.join(match_path, "Labels-v2.json")
                            video_file = os.path.join(match_path, "2_720p.mkv")
                            
                            if os.path.exists(labels_file) and os.path.exists(video_file):
                                # Extract events from second half
                                event_timestamps = extract_event_timestamps_soccernet(labels_file, event_types)
                                
                                # Load video once for all clips from this match
                                try:
                                    video = VideoFileClip(video_file)
                                    video_duration = video.duration
                                    
                                    # Create clips for each event based on extraction strategy
                                    for event, timestamps in event_timestamps.items():
                                        if not timestamps:
                                            continue
                                            
                                        if event in first_occurrence_events:
                                            # Extract only first occurrence
                                            timestamp = timestamps[0]
                                            match_identifier = f"{league_folder}_{season_folder}_{match_folder.replace(' ', '_').replace('-', '_')}"
                                            create_event_clip_moviepy(
                                                video, 
                                                timestamp, 
                                                event_dirs[event], 
                                                f"{event_counters[event]}_{match_identifier}",
                                                event
                                            )
                                            event_counters[event] += 1
                                            
                                        elif event in sliding_window_events:
                                            # Extract all occurrences with sliding window
                                            for i, timestamp in enumerate(timestamps):
                                                match_identifier = f"{league_folder}_{season_folder}_{match_folder.replace(' ', '_').replace('-', '_')}"
                                                
                                                # Normal clip
                                                create_event_clip_moviepy(
                                                    video, 
                                                    timestamp, 
                                                    event_dirs[event], 
                                                    f"{event_counters[event]}_{match_identifier}_normal",
                                                    event
                                                )
                                                
                                                # 2 seconds before
                                                create_event_clip_moviepy(
                                                    video, 
                                                    timestamp - 2000,  # 2 seconds before
                                                    event_dirs[event], 
                                                    f"{event_counters[event]}_{match_identifier}_before",
                                                    event
                                                )
                                                
                                                # 2 seconds after
                                                create_event_clip_moviepy(
                                                    video, 
                                                    timestamp + 2000,  # 2 seconds after
                                                    event_dirs[event], 
                                                    f"{event_counters[event]}_{match_identifier}_after",
                                                    event
                                                )
                                                
                                                event_counters[event] += 1
                                                
                                        elif event in all_occurrences_events:
                                            # Extract all occurrences (normal)
                                            for i, timestamp in enumerate(timestamps):
                                                match_identifier = f"{league_folder}_{season_folder}_{match_folder.replace(' ', '_').replace('-', '_')}"
                                                create_event_clip_moviepy(
                                                    video, 
                                                    timestamp, 
                                                    event_dirs[event], 
                                                    f"{event_counters[event]}_{match_identifier}",
                                                    event
                                                )
                                                event_counters[event] += 1
                                    
                                    # Generate 5 random non-event clips
                                    avoided_intervals = generate_avoided_intervals(event_timestamps)
                                    num_random_clips = 5
                                    random_start_times = generate_random_segments(video_duration, avoided_intervals, num_random_clips, 7.0)
                                    
                                    for start_time in random_start_times:
                                        match_identifier = f"{league_folder}_{season_folder}_{match_folder.replace(' ', '_').replace('-', '_')}"
                                        create_random_clip_moviepy(
                                            video, 
                                            start_time, 
                                            random_clips_dir, 
                                            f"{random_counter}_{match_identifier}"
                                        )
                                        random_counter += 1
                                    
                                    # Close video to free memory
                                    video.close()
                                    
                                except Exception as e:
                                    print(f"      Error loading video: {e}")
                            else:
                                if not os.path.exists(labels_file):
                                    print(f"      Labels-v2.json not found")
                                if not os.path.exists(video_file):
                                    print(f"      2_720p.mkv not found")

def extract_event_timestamps_soccernet(labels_file, event_types):
    """
    Extract timestamps for specified events from the second half of the match.
    
    Args:
        labels_file (str): Path to the Labels-v2.json file
        event_types (list): List of event types to extract
        
    Returns:
        dict: Dictionary with event types as keys and lists of timestamps as values
    """
    try:
        with open(labels_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        event_timestamps = {event: [] for event in event_types}
        
        for annotation in data.get('annotations', []):
            label = annotation.get('label')
            game_time = annotation.get('gameTime', '')
            
            # Extract events from second half only
            if label in event_types and game_time.startswith('2 -'):
                position = int(annotation.get('position', 0))
                event_timestamps[label].append(position)
        
        # Sort timestamps for each event
        for event in event_timestamps:
            event_timestamps[event].sort()
        
        return event_timestamps
        
    except Exception as e:
        print(f"      Error reading labels file: {e}")
        return {event: [] for event in event_types}

def create_event_clip_moviepy(video_clip, position_ms, output_dir, clip_name, event):
    """
    Create a 7-second video clip around the event time using MoviePy.
    
    Args:
        video_clip (VideoFileClip): Loaded video clip
        position_ms (int): Event position in milliseconds
        output_dir (str): Directory to save the clip
        clip_name (str): Name for the output clip
        event (str): Event type for logging
    """
    try:
        # Calculate start and end times (3 seconds before and 4 seconds after)
        start_seconds = max(0, (position_ms - 3000) / 1000.0)  # 3 seconds before, but not negative
        end_seconds = (position_ms + 4000) / 1000.0  # 4 seconds after (total 7 seconds)
        
        # Make sure we don't exceed video duration
        end_seconds = min(end_seconds, video_clip.duration)
        
        # Ensure we have at least some duration
        if end_seconds <= start_seconds:
            end_seconds = min(start_seconds + 7.0, video_clip.duration)
        
        # Output file path
        output_file = os.path.join(output_dir, f"{clip_name}.mp4")
        
        print(f"      Creating clip {clip_name} for {event} at {start_seconds:.2f}s")
        
        # Extract the clip
        event_clip = video_clip.subclipped(start_seconds, end_seconds)
        
        # Write the clip to file
        event_clip.write_videofile(
            output_file, 
            audio_codec='aac',
            codec='libx264',
            logger=None  # Suppress moviepy logs
        )
        
        # Close the clip to free memory
        event_clip.close()
        
        print(f"      Successfully created {output_file}")
            
    except Exception as e:
        print(f"      Error creating clip {clip_name}: {e}")

def create_random_clip_moviepy(video_clip, start_time, output_dir, clip_name):
    """
    Create a random 7-second video clip.
    
    Args:
        video_clip (VideoFileClip): Loaded video clip
        start_time (float): Start time in seconds
        output_dir (str): Directory to save the clip
        clip_name (str): Name for the output clip
    """
    try:
        # Calculate end time
        end_time = start_time + 7.0  # 7 seconds duration
        
        # Make sure we don't exceed video duration
        end_time = min(end_time, video_clip.duration)
        
        # Output file path
        output_file = os.path.join(output_dir, f"{clip_name}.mp4")
        
        print(f"      Creating random clip {clip_name}")
        
        # Extract the clip
        random_clip = video_clip.subclipped(start_time, end_time)
        
        # Write the clip to file
        random_clip.write_videofile(
            output_file, 
            audio_codec='aac',
            codec='libx264',
            logger=None  # Suppress moviepy logs
        )
        
        # Close the clip to free memory
        random_clip.close()
        
        print(f"      Successfully created {output_file}")
            
    except Exception as e:
        print(f"      Error creating random clip {clip_name}: {e}")

def generate_avoided_intervals(event_timestamps):
    """
    Generate avoided intervals for random clip generation based on event timestamps.
    
    Args:
        event_timestamps (dict): Dictionary of event timestamps
        
    Returns:
        list: List of avoided intervals (start, end) in seconds
    """
    avoided_intervals = []
    for timestamps in event_timestamps.values():
        for position_ms in timestamps:
            start_seconds = max(0, (position_ms - 3000) / 1000.0)  # 3 seconds before
            end_seconds = (position_ms + 4000) / 1000.0  # 4 seconds after
            avoided_intervals.append((start_seconds, end_seconds))
    return avoided_intervals

def generate_random_segments(video_duration, avoided_intervals, num_segments, segment_duration):
    """
    Generate random segments avoiding specified intervals.
    
    Args:
        video_duration (float): Total duration of the video
        avoided_intervals (list): List of (start, end) intervals to avoid
        num_segments (int): Number of segments to generate
        segment_duration (float): Duration of each segment
        
    Returns:
        list: List of start times for random segments
    """
    random_segments = []
    max_attempts = 1000
    
    for _ in range(num_segments):
        attempts = 0
        while attempts < max_attempts:
            max_start = video_duration - segment_duration
            if max_start <= 0:
                break
            
            start_time = random.uniform(0, max_start)
            end_time = start_time + segment_duration
            
            # Check if this segment overlaps with any avoided interval
            overlap = False
            for avoid_start, avoid_end in avoided_intervals:
                if not (end_time <= avoid_start or start_time >= avoid_end):
                    overlap = True
                    break
            
            if not overlap:
                random_segments.append(start_time)
                break
            
            attempts += 1
        
        if attempts >= max_attempts:
            print(f"        Warning: Could not find non-overlapping segment after {max_attempts} attempts")
    
    return random_segments

# Run the extraction
print("SoccerNet Multiclass Clip Extractor (MoviePy)")
print("=============================================")

try:
    extract_event_clips_soccernet()
    print("\nProcessing complete!")
except ImportError:
    print("Error: MoviePy not installed. Install with: pip install moviepy")
except Exception as e:
    print(f"Error: {e}")

SoccerNet Multiclass Clip Extractor (MoviePy)
Processing league: england_epl
  Processing season: 2014-2015
    Processing match: 2015-02-21 - 18-00 Chelsea 1 - 1 Burnley
      Creating clip 1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley for Goal at 2118.49s
      Successfully created F:/AIM Lab/Experiment/sliding-window/Clips\Goal\1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley.mp4
      Creating clip 1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley_normal for Red card at 1462.20s
Proc not detected
      Successfully created F:/AIM Lab/Experiment/sliding-window/Clips\Red_card\1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley_normal.mp4
      Creating clip 1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley_before for Red card at 1460.20s
Proc not detected
      Successfully created F:/AIM Lab/Experiment/sliding-window/Clips\Red_card\1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley_before.mp4

KeyboardInterrupt: 

### Bring that from 3rd cell to 2nd cell

In [3]:
import os
import shutil
import random
from collections import defaultdict

def split_videos(base_path, event_types, train_ratio=0.8, test_ratio=0.2):
    """
    Split videos into train and test subfolders for each event type based on unequal sample sizes.
    For "Red card" and "Penalty", ensure all three samples of an event (normal, before, after) remain together.
    
    Args:
        base_path (str): Path to the base folder containing event folders.
        event_types (list): List of event types to process.
        train_ratio (float): Proportion of videos to place in the train folder.
        test_ratio (float): Proportion of videos to place in the test folder.
    """
    for event in event_types:
        event_folder = os.path.join(base_path, event.replace(" ", "_"))
        train_folder = os.path.join(event_folder, "train")
        test_folder = os.path.join(event_folder, "test")
        
        # Create train and test subfolders if they don't exist
        os.makedirs(train_folder, exist_ok=True)
        os.makedirs(test_folder, exist_ok=True)
        
        # Get all video files in the event folder
        video_files = [f for f in os.listdir(event_folder) if f.endswith(".mp4")]
        
        if event in ["Red card", "Penalty"]:
            # Group files by event (normal, before, after)
            grouped_files = defaultdict(list)
            for filename in video_files:
                # Extract the base event identifier (e.g., "1_match_normal", "1_match_before", "1_match_after")
                base_event = "_".join(filename.split("_")[:2])  # First two parts of the filename
                grouped_files[base_event].append(filename)
            
            # Shuffle the grouped events to ensure randomness
            grouped_events = list(grouped_files.items())
            random.shuffle(grouped_events)
            
            # Calculate the number of train and test events
            total_events = len(grouped_events)
            num_train_events = int(total_events * train_ratio)
            num_test_events = total_events - num_train_events
            
            # Split events into train and test sets
            train_events = grouped_events[:num_train_events]
            test_events = grouped_events[num_train_events:]
            
            # Move grouped files to train folder
            for base_event, files in train_events:
                for file in files:
                    src_path = os.path.join(event_folder, file)
                    dest_path = os.path.join(train_folder, file)
                    shutil.move(src_path, dest_path)
            
            # Move grouped files to test folder
            for base_event, files in test_events:
                for file in files:
                    src_path = os.path.join(event_folder, file)
                    dest_path = os.path.join(test_folder, file)
                    shutil.move(src_path, dest_path)
            
            print(f"Event: {event}")
            print(f"  Total events: {total_events}")
            print(f"  Train events: {len(train_events)}")
            print(f"  Test events: {len(test_events)}")
        else:
            # Shuffle the video files to ensure randomness
            random.shuffle(video_files)
            
            # Calculate the number of train and test videos
            total_videos = len(video_files)
            num_train_videos = int(total_videos * train_ratio)
            num_test_videos = total_videos - num_train_videos
            
            # Split videos into train and test sets
            train_videos = video_files[:num_train_videos]
            test_videos = video_files[num_train_videos:]
            
            # Move videos to train folder
            for video in train_videos:
                src_path = os.path.join(event_folder, video)
                dest_path = os.path.join(train_folder, video)
                shutil.move(src_path, dest_path)
            
            # Move videos to test folder
            for video in test_videos:
                src_path = os.path.join(event_folder, video)
                dest_path = os.path.join(test_folder, video)
                shutil.move(src_path, dest_path)
            
            print(f"Event: {event}")
            print(f"  Total videos: {total_videos}")
            print(f"  Train videos: {len(train_videos)}")
            print(f"  Test videos: {len(test_videos)}")

if __name__ == "__main__":
    # Define base paths for Clips and Features-processed
    base_paths = [
        "F:/AIM Lab/Experiment/sliding-window/Clips"
    ]
    
    # List of event types to process
    event_types = ["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "Indirect_free-kick", "Corner", "Substitution", "Shots_on_target", "no_event"]
    
    # Split videos in both base paths
    for base_path in base_paths:
        split_videos(base_path, event_types)

Event: Goal
  Total videos: 71
  Train videos: 56
  Test videos: 15
Event: Red_card
  Total videos: 9
  Train videos: 7
  Test videos: 2
Event: Yellow_card
  Total videos: 40
  Train videos: 32
  Test videos: 8
Event: Direct_free-kick
  Total videos: 37
  Train videos: 29
  Test videos: 8
Event: Penalty
  Total events: 10
  Train events: 8
  Test events: 2
Event: Indirect_free-kick
  Total videos: 42
  Train videos: 33
  Test videos: 9
Event: Corner
  Total videos: 43
  Train videos: 34
  Test videos: 9
Event: Substitution
  Total videos: 43
  Train videos: 34
  Test videos: 9
Event: Shots_on_target
  Total videos: 43
  Train videos: 34
  Test videos: 9
Event: no_event
  Total videos: 214
  Train videos: 171
  Test videos: 43


## Feature reduction for video clips

In [4]:
import os
import cv2
import numpy as np

def process_video(input_path, output_path):
    """
    Process the video to:
    1. Convert to grayscale.
    2. Remove grass and audience areas while preserving the ball.
    """
    # Load video
    cap = cv2.VideoCapture(input_path)
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Initialize video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Changed to mp4v for better compatibility
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height), isColor=False)
    
    print(f"Processing {total_frames} frames from {input_path}...")
    
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        if frame_count % 100 == 0:
            print(f"Processed {frame_count}/{total_frames} frames")

        # Convert to grayscale
        gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # Convert original frame to HSV for better color detection
        hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # Define green grass color range in HSV (more specific range)
        lower_green = np.array([35, 40, 40])
        upper_green = np.array([85, 255, 255])
        grass_mask = cv2.inRange(hsv, lower_green, upper_green)

        # Define ball detection (white/light colored ball)
        lower_ball_hsv = np.array([0, 0, 200])
        upper_ball_hsv = np.array([180, 30, 255])
        ball_mask_hsv = cv2.inRange(hsv, lower_ball_hsv, upper_ball_hsv)
        
        _, ball_mask_gray = cv2.threshold(gray_frame, 200, 255, cv2.THRESH_BINARY)
        ball_mask = cv2.bitwise_or(ball_mask_hsv, ball_mask_gray)
        
        kernel = np.ones((3, 3), np.uint8)
        ball_mask = cv2.morphologyEx(ball_mask, cv2.MORPH_CLOSE, kernel)
        ball_mask = cv2.morphologyEx(ball_mask, cv2.MORPH_OPEN, kernel)

        lower_audience = np.array([0, 100, 150])
        upper_audience = np.array([180, 255, 255])
        audience_mask = cv2.inRange(hsv, lower_audience, upper_audience)
        audience_mask = cv2.bitwise_and(audience_mask, cv2.bitwise_not(ball_mask))

        combined_mask = cv2.bitwise_or(grass_mask, audience_mask)
        keep_mask = cv2.bitwise_not(combined_mask)
        keep_mask = cv2.bitwise_or(keep_mask, ball_mask)

        processed_frame = cv2.bitwise_and(gray_frame, gray_frame, mask=keep_mask)
        out.write(processed_frame)
    
    cap.release()
    out.release()
    cv2.destroyAllWindows()
    print(f"Processed video saved as: {output_path}")

def process_all_videos_recursive(input_folder, output_folder):
    """
    Recursively process all videos in input_folder (including subfolders)
    and save processed videos to output_folder, preserving the directory structure.
    """
    for root, dirs, files in os.walk(input_folder):
        # Compute the relative path from input_folder
        rel_path = os.path.relpath(root, input_folder)
        # Compute the corresponding output directory
        out_dir = os.path.join(output_folder, rel_path)
        os.makedirs(out_dir, exist_ok=True)
        for file in files:
            if file.lower().endswith('.mp4'):
                input_path = os.path.join(root, file)
                output_path = os.path.join(out_dir, file)
                print(f"Processing: {input_path} -> {output_path}")
                process_video(input_path, output_path)

def process_all_events(base_input_path, base_output_path, event_types):
    """
    For each event, process all videos in both train and test subfolders,
    preserving the directory structure and filenames.
    """
    for event in event_types:
        event_folder = event.replace(" ", "_")
        input_event_dir = os.path.join(base_input_path, event_folder)
        output_event_dir = os.path.join(base_output_path, event_folder)
        if os.path.exists(input_event_dir):
            print(f"Processing event: {event}")
            process_all_videos_recursive(input_event_dir, output_event_dir)
        else:
            print(f"Input folder not found for event: {event}")


if __name__ == "__main__":
    # Define input and output base paths
    base_input_path = "F:/AIM Lab/Experiment/sliding-window/Clips"
    base_output_path = "F:/AIM Lab/Experiment/sliding-window/Features-processed"
    
    # List of event types to process
    event_types = ["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "Indirect_free-kick", "Corner", "Substitution", "Shots_on_target", "no_event"]
    
    # Process all events
    process_all_events(base_input_path, base_output_path, event_types)



Processing event: Goal
Processing: F:/AIM Lab/Experiment/sliding-window/Clips\Goal\test\15_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.mp4 -> F:/AIM Lab/Experiment/sliding-window/Features-processed\Goal\test\15_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.mp4
Processing 175 frames from F:/AIM Lab/Experiment/sliding-window/Clips\Goal\test\15_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.mp4...
Processed 100/175 frames
Processed video saved as: F:/AIM Lab/Experiment/sliding-window/Features-processed\Goal\test\15_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.mp4
Processing: F:/AIM Lab/Experiment/sliding-window/Clips\Goal\test\19_england_epl_2015-2016_2015_10_31___15_45_Chelsea_1___3_Liverpool.mp4 -> F:/AIM Lab/Experiment/sliding-window/Features-processed\Goal\test\19_england_epl_2015-2016_2015_10_31___15_45_Chelsea_1___3_Liverpool.mp4
Processing 175 frames from F:/AIM Lab/Experiment/sliding-wind

### Generating ResNet50 embeddings

In [None]:
import os
import cv2
import numpy as np
import torch
import torchvision
from typing import List

# Load ResNet-50 model
model = torchvision.models.resnet50(pretrained=True)
model.fc = torch.nn.Identity()  # Remove the final classifier to get 2048-d features
model.eval()  # Set the model to evaluation mode

# Preprocessing pipeline for frames
preprocess = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),                    # H×W×C → C×H×W, [0,1]
    torchvision.transforms.Normalize(mean=[.485, .456, .406], std=[.229, .224, .225]),
    torchvision.transforms.Resize((224, 224)),
])

def embed_clip(frames: List[np.ndarray]):
    """
    Generate ResNet-50 embeddings for a list of frames.
    
    Args:
        frames (List[np.ndarray]): List of frames (H×W×C, BGR format).
    
    Returns:
        np.ndarray: N×2048 array of frame embeddings.
    """
    feats = []
    for frame in frames:
        # Convert BGR (OpenCV) to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        x = preprocess(frame_rgb).unsqueeze(0)  # 1×3×224×224
        with torch.no_grad():
            f = model(x)  # 1×2048
        feats.append(f.squeeze(0).cpu().numpy())
    return np.stack(feats, axis=0)  # N×2048

def sample_frames_from_video(video_path, num_samples=16):
    """
    Uniformly sample num_samples frames from the video at video_path.
    Returns a list of BGR frames (as numpy arrays).
    """
    print(f"Opening video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open video {video_path}")
        return []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")
    if total_frames == 0:
        print(f"Error: No frames found in video {video_path}")
        return []

    if total_frames < num_samples:
        # If fewer frames than samples, just read them all
        indices = list(range(total_frames))
    else:
        # Uniformly spaced frame indices
        indices = np.linspace(0, total_frames - 1, num=num_samples, dtype=int)

    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            print(f"Warning: Failed to read frame {idx} from {video_path}")
            continue
        frames.append(frame)
    cap.release()

    if not frames:
        print(f"Error: No frames sampled from video {video_path}")
    else:
        print(f"Successfully sampled {len(frames)} frames from {video_path}")
    return frames

def process_folder(input_folder, output_folder, num_samples=16):
    """
    Process videos in a folder, generate mean-pooled ResNet-50 embeddings for sampled frames,
    and save the embeddings to the output folder.
    
    Args:
        input_folder (str): Path to the folder containing input videos.
        output_folder (str): Path to save embeddings.
        num_samples (int): Number of frames to sample per video.
    """
    print(f"Processing folder: {input_folder}")
    if not os.path.exists(input_folder):
        print(f"Error: Folder does not exist: {input_folder}")
        return

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for fname in os.listdir(input_folder):
        if not fname.lower().endswith('.mp4'):
            print(f"Skipping non-video file: {fname}")
            continue
        video_path = os.path.join(input_folder, fname)
        print(f"Processing video: {video_path}")
        frames = sample_frames_from_video(video_path, num_samples=num_samples)
        if frames:
            # Generate embeddings and mean-pool them
            embeddings = embed_clip(frames)
            clip_vec = np.mean(embeddings, axis=0)  # Mean-pool → (2048,)
            
            # Save the embedding as a .npy file
            output_path = os.path.join(output_folder, f"{os.path.splitext(fname)[0]}.npy")
            np.save(output_path, clip_vec)
            print(f"Saved embedding to {output_path}")
        else:
            print(f"Error: No frames sampled from {fname}")

# Example usage
if __name__ == "__main__":
    input_folder = "path_to_input_videos"
    output_folder = "path_to_save_embeddings"
    process_folder(input_folder, output_folder, num_samples=16)

In [None]:
import os
import cv2
import numpy as np
import torch
import torchvision
from typing import List, Dict
import random
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

# Load ResNet-50 model
model = torchvision.models.resnet50(pretrained=True)
model.fc = torch.nn.Identity()  # Remove the final classifier to get 2048-d features
model.eval()  # Set the model to evaluation mode

# Preprocessing pipeline for frames
preprocess = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),                    # H×W×C → C×H×W, [0,1]
    torchvision.transforms.Normalize(mean=[.485, .456, .406], std=[.229, .224, .225]),
    torchvision.transforms.Resize((224, 224)),
])

def embed_clip(frames: List[np.ndarray]):
    """
    Generate ResNet-50 embeddings for a list of frames.
    
    Args:
        frames (List[np.ndarray]): List of frames (H×W×C, BGR format).
    
    Returns:
        np.ndarray: N×2048 array of frame embeddings.
    """
    feats = []
    for frame in frames:
        # Convert BGR (OpenCV) to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        x = preprocess(frame_rgb).unsqueeze(0)  # 1×3×224×224
        with torch.no_grad():
            f = model(x)  # 1×2048
        feats.append(f.squeeze(0).cpu().numpy())
    return np.stack(feats, axis=0)  # N×2048

def sample_frames_from_video(video_path, num_samples=16):
    """
    Uniformly sample num_samples frames from the video at video_path.
    Returns a list of BGR frames (as numpy arrays).
    """
    print(f"Opening video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open video {video_path}")
        return []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")
    if total_frames == 0:
        print(f"Error: No frames found in video {video_path}")
        return []

    if total_frames < num_samples:
        # If fewer frames than samples, just read them all
        indices = list(range(total_frames))
    else:
        # Uniformly spaced frame indices
        indices = np.linspace(0, total_frames - 1, num=num_samples, dtype=int)

    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            print(f"Warning: Failed to read frame {idx} from {video_path}")
            continue
        frames.append(frame)
    cap.release()

    if not frames:
        print(f"Error: No frames sampled from video {video_path}")
    else:
        print(f"Successfully sampled {len(frames)} frames from {video_path}")
    return frames

def process_folder(input_folder, output_folder, num_samples=16):
    """
    Process videos in a folder, generate mean-pooled ResNet-50 embeddings for sampled frames,
    and save the embeddings to the output folder.
    
    Args:
        input_folder (str): Path to the folder containing input videos.
        output_folder (str): Path to save embeddings.
        num_samples (int): Number of frames to sample per video.
    """
    print(f"Processing folder: {input_folder}")
    if not os.path.exists(input_folder):
        print(f"Error: Folder does not exist: {input_folder}")
        return

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for fname in os.listdir(input_folder):
        if not fname.lower().endswith('.mp4'):
            print(f"Skipping non-video file: {fname}")
            continue
        video_path = os.path.join(input_folder, fname)
        print(f"Processing video: {video_path}")
        frames = sample_frames_from_video(video_path, num_samples=num_samples)
        if frames:
            # Generate embeddings and mean-pool them
            embeddings = embed_clip(frames)
            clip_vec = np.mean(embeddings, axis=0)  # Mean-pool → (2048,)
            
            # Save the embedding as a .npy file
            output_path = os.path.join(output_folder, f"{os.path.splitext(fname)[0]}.npy")
            np.save(output_path, clip_vec)
            print(f"Saved embedding to {output_path}")
        else:
            print(f"Error: No frames sampled from {fname}")

def l2_normalize(vec):
    """
    Perform L2 normalization on a vector.
    
    Args:
        vec (np.ndarray): Input vector.
    
    Returns:
        np.ndarray: L2-normalized vector.
    """
    norm = np.linalg.norm(vec)
    if norm == 0:
        return vec
    return vec / norm

def build_prototypes(embeddings_base_path, event_classes):
    """
    Build prototypes for each event class from training embeddings.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        event_classes (list): List of event class names.
    
    Returns:
        dict: Dictionary of prototypes for each event class.
    """
    prototypes = {}
    
    for event in event_classes:
        event_train_folder = os.path.join(embeddings_base_path, event, "Train")
        
        if not os.path.exists(event_train_folder):
            print(f"Warning: Training folder not found for {event}")
            continue
            
        event_vecs = []
        print(f"Loading training embeddings for {event}...")
        
        # Load embeddings for the event
        for fname in os.listdir(event_train_folder):
            if fname.endswith(".npy"):
                vec = np.load(os.path.join(event_train_folder, fname))
                event_vecs.append(vec)
                print(f"  Loaded {fname} (shape: {vec.shape})")

        if event_vecs:
            # Compute mean vector for the event and normalize
            proto_event = np.mean(event_vecs, axis=0)
            proto_event = l2_normalize(proto_event)
            prototypes[event] = proto_event
            print(f"Built prototype for {event} from {len(event_vecs)} samples")
        else:
            print(f"Warning: No embeddings found for {event}")

    return prototypes

def classify_clip(clip_embedding, prototypes):
    """
    Classify a clip based on cosine similarity to prototypes.
    
    Args:
        clip_embedding (np.ndarray): Embedding vector of the clip.
        prototypes (dict): Dictionary of prototypes for each event class.
    
    Returns:
        tuple: (predicted_label, similarity_scores)
    """
    # Normalize the clip embedding
    clip_norm = l2_normalize(clip_embedding)
    
    # Compute cosine similarities
    similarities = {}
    for event, proto in prototypes.items():
        similarity = np.dot(clip_norm, proto)
        similarities[event] = similarity
    
    # Predict the class with highest similarity
    predicted_label = max(similarities, key=similarities.get)
    
    return predicted_label, similarities

def evaluate_multiclass_classification(embeddings_base_path, prototypes, event_classes):
    """
    Evaluate multiclass classification performance on test data.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        prototypes (dict): Dictionary of prototypes for each event class.
        event_classes (list): List of event class names.
    
    Returns:
        dict: Dictionary containing evaluation results.
    """
    y_true = []
    y_pred = []
    results = {
        'predictions': [],
        'class_counts': {event: {'correct': 0, 'total': 0} for event in event_classes}
    }
    
    print("\n" + "="*60)
    print("EVALUATING MULTICLASS CLASSIFICATION")
    print("="*60)
    
    for event in event_classes:
        event_test_folder = os.path.join(embeddings_base_path, event, "Test")
        
        if not os.path.exists(event_test_folder):
            print(f"Warning: Test folder not found for {event}")
            continue
            
        print(f"\nTesting {event} clips...")
        
        for fname in os.listdir(event_test_folder):
            if fname.endswith(".npy"):
                test_embedding = np.load(os.path.join(event_test_folder, fname))
                predicted_label, similarities = classify_clip(test_embedding, prototypes)
                
                is_correct = predicted_label == event
                results['class_counts'][event]['total'] += 1
                
                if is_correct:
                    results['class_counts'][event]['correct'] += 1
                
                # Store for sklearn metrics
                y_true.append(event)
                y_pred.append(predicted_label)
                
                # Store detailed results
                result_entry = {
                    'file': fname,
                    'true_label': event,
                    'predicted_label': predicted_label,
                    'correct': is_correct,
                    'similarities': similarities
                }
                results['predictions'].append(result_entry)
                
                # Print results
                print(f"  File: {fname}")
                print(f"    True: {event}, Predicted: {predicted_label}, Correct: {is_correct}")
                print(f"    Similarities: {similarities}")
    
    # Calculate overall metrics
    overall_accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    classification_rep = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    
    # Calculate sklearn metrics for each class (treating as one-vs-rest)
    precision = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    
    # Calculate per-class accuracy
    class_accuracies = {}
    for event in event_classes:
        if results['class_counts'][event]['total'] > 0:
            class_accuracies[event] = results['class_counts'][event]['correct'] / results['class_counts'][event]['total']
        else:
            class_accuracies[event] = 0.0
    
    # Print results
    print(f"\n{'='*60}")
    print("EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    print(f"Weighted Precision: {precision:.4f}")
    print(f"Weighted Recall: {recall:.4f}")
    print(f"Weighted F₁ Score: {f1:.4f}")
    
    print(f"\nPer-class Accuracy:")
    for event, acc in class_accuracies.items():
        correct = results['class_counts'][event]['correct']
        total = results['class_counts'][event]['total']
        print(f"  {event}: {acc:.4f} ({correct}/{total})")
    
    print(f"\nConfusion Matrix:")
    print("Rows: True labels, Columns: Predicted labels")
    print(f"Classes: {event_classes}")
    print(cm)
    
    print(f"\nDetailed Classification Report:")
    print(classification_rep)
    
    results.update({
        'overall_accuracy': overall_accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'class_accuracies': class_accuracies,
        'confusion_matrix': cm,
        'classification_report': classification_rep,
        'y_true': y_true,
        'y_pred': y_pred
    })
    
    return results

def main():
    """
    Main function to run multiclass few-shot learning classification.
    """
    print("Multiclass Few-Shot Learning with ResNet-50")
    print("="*50)
    
    # Configuration
    base_input_path = "F:/AIM Lab/Experiment/sliding-window/Features-processed"
    base_output_path = "F:/AIM Lab/Experiment/sliding-window/Resnet-50 embeddings"
    
    # Event classes
    event_classes = ["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]
    
    # Step 1: Generate embeddings for all videos
    print("\nStep 1: Generating ResNet-50 embeddings...")
    for event in event_classes:
        for split in ["Train", "Test"]:
            input_folder = os.path.join(base_input_path, event, split)
            output_folder = os.path.join(base_output_path, event, split)
            
            print(f"\nProcessing {event} - {split}...")
            process_folder(input_folder, output_folder, num_samples=16)
    
    # Step 2: Build prototypes from training embeddings
    print("\nStep 2: Building prototypes from training data...")
    prototypes = build_prototypes(base_output_path, event_classes)
    
    if not prototypes:
        print("Error: No prototypes could be built!")
        return
    
    print(f"Successfully built prototypes for {len(prototypes)} classes:")
    for event, proto in prototypes.items():
        print(f"  {event}: shape {proto.shape}")
    
    # Step 3: Evaluate on test data
    print("\nStep 3: Evaluating classification performance...")
    results = evaluate_multiclass_classification(base_output_path, prototypes, event_classes)
    
    # Step 4: Additional analysis
    print(f"\nStep 4: Additional Analysis...")
    print(f"Total test samples: {len(results['y_true'])}")
    print(f"Number of classes: {len(event_classes)}")
    
    # Find most confused classes
    cm = results['confusion_matrix']
    print(f"\nMost confused class pairs:")
    for i, true_class in enumerate(event_classes):
        for j, pred_class in enumerate(event_classes):
            if i != j and cm[i, j] > 0:
                print(f"  {true_class} → {pred_class}: {cm[i, j]} times")
    
        print(f"\n{'='*50}")
        print("FINAL RESULTS")
        print(f"{'='*50}")
        print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
        print(f"Weighted Precision: {results['precision']:.4f}")
        print(f"Weighted Recall: {results['recall']:.4f}")
        print(f"Weighted F₁ Score: {results['f1_score']:.4f}")
        
        print(f"\n{'='*50}")
        print("MULTICLASS FEW-SHOT LEARNING COMPLETE!")
        print(f"{'='*50}")
    print(f"\n{'='*50}")
    print("MULTICLASS FEW-SHOT LEARNING COMPLETE!")
    print(f"{'='*50}")

if __name__ == "__main__":
    main()



Multiclass Few-Shot Learning with ResNet-50

Step 2: Building prototypes from training data...
Loading training embeddings for Goal...
  Loaded 10_england_epl_2015-2016_2015_08_29___17_00_Manchester_City_2___0_Watford.npy (shape: (2048,))
  Loaded 11_england_epl_2015-2016_2015_09_12___14_45_Everton_3___1_Chelsea.npy (shape: (2048,))
  Loaded 12_england_epl_2015-2016_2015_09_12___17_00_Crystal_Palace_0___1_Manchester_City.npy (shape: (2048,))
  Loaded 13_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (2048,))
  Loaded 14_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (2048,))
  Loaded 16_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (2048,))
  Loaded 17_england_epl_2015-2016_2015_10_17___17_00_Chelsea_2___0_Aston_Villa.npy (shape: (2048,))
  Loaded 18_england_epl_2015-2016_2015_10_31___15_45_Chelsea_1___3_Liverpool.npy (shape: (2048,))
  Loaded 1_england_epl_2014-2015_2015_02_21___18_0

In [3]:
import os
import cv2
import numpy as np
import torch
import torch.nn.functional as F
from torchvision import transforms
from torchvision.models.video import r2plus1d_18
from typing import List, Dict
import random
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

class R2Plus1DFeatureExtractor:
    def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
        """
        Initialize R(2+1)D model for feature extraction.
        
        Args:
            device: Device to run the model on
        """
        self.device = device
        
        # Initialize R(2+1)D model
        self.model = r2plus1d_18(pretrained=True)
        
        # Remove the final classification layer to get features
        self.model.fc = torch.nn.Identity()
        self.model.to(device)
        self.model.eval()
        
        # Preprocessing transforms
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((112, 112)),  # R(2+1)D typically uses 112x112
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.43216, 0.394666, 0.37645], 
                               std=[0.22803, 0.22145, 0.216989])  # Kinetics normalization
        ])
    
    def preprocess_frames(self, frames: List[np.ndarray], num_frames: int = 16) -> torch.Tensor:
        """
        Preprocess frames for R(2+1)D input.
        
        Args:
            frames: List of frames (H×W×C, BGR format)
            num_frames: Number of frames to use for R(2+1)D
            
        Returns:
            Preprocessed tensor of shape (1, 3, num_frames, H, W)
        """
        # Sample frames uniformly if we have more than needed
        if len(frames) > num_frames:
            indices = np.linspace(0, len(frames) - 1, num_frames, dtype=int)
            frames = [frames[i] for i in indices]
        elif len(frames) < num_frames:
            # Repeat last frame if we don't have enough
            while len(frames) < num_frames:
                frames.append(frames[-1])
        
        # Convert frames and apply transforms
        processed_frames = []
        for frame in frames:
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            # Apply transforms
            frame_tensor = self.transform(frame_rgb)
            processed_frames.append(frame_tensor)
        
        # Stack frames: (num_frames, 3, H, W) -> (3, num_frames, H, W)
        video_tensor = torch.stack(processed_frames, dim=1)  # (3, num_frames, H, W)
        video_tensor = video_tensor.unsqueeze(0)  # Add batch dimension
        
        return video_tensor
    
    def extract_features(self, frames: List[np.ndarray]) -> np.ndarray:
        """
        Extract R(2+1)D features from video frames.
        
        Args:
            frames: List of video frames
            
        Returns:
            Feature vector of shape (512,) for R(2+1)D-18
        """
        # Preprocess frames
        video_tensor = self.preprocess_frames(frames)
        video_tensor = video_tensor.to(self.device)
        
        # Extract features
        with torch.no_grad():
            features = self.model(video_tensor)
            # Global average pooling if needed
            if len(features.shape) > 2:
                features = F.adaptive_avg_pool3d(features, 1).squeeze()
            else:
                features = features.squeeze()
        
        return features.cpu().numpy()

def sample_frames_from_video(video_path: str, num_samples: int = 32) -> List[np.ndarray]:
    """
    Uniformly sample frames from a video.
    
    Args:
        video_path: Path to video file
        num_samples: Number of frames to sample
        
    Returns:
        List of sampled frames
    """
    print(f"Opening video: {video_path}")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Cannot open video {video_path}")
        return []

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print(f"Total frames in video: {total_frames}")
    
    if total_frames == 0:
        print(f"Error: No frames found in video {video_path}")
        return []

    if total_frames < num_samples:
        indices = list(range(total_frames))
    else:
        indices = np.linspace(0, total_frames - 1, num=num_samples, dtype=int)

    frames = []
    for idx in indices:
        cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
        ret, frame = cap.read()
        if not ret:
            print(f"Warning: Failed to read frame {idx} from {video_path}")
            continue
        frames.append(frame)
    
    cap.release()
    
    if not frames:
        print(f"Error: No frames sampled from video {video_path}")
    else:
        print(f"Successfully sampled {len(frames)} frames from {video_path}")
    
    return frames

def process_folder(input_folder: str, output_folder: str, 
                  feature_extractor: R2Plus1DFeatureExtractor,
                  num_frames: int = 32):
    """
    Process all videos in a folder and generate R(2+1)D embeddings.
    
    Args:
        input_folder: Folder containing input videos
        output_folder: Folder to save embeddings
        feature_extractor: R(2+1)D feature extractor instance
        num_frames: Number of frames to use for each video
    """
    print(f"Processing folder: {input_folder}")
    if not os.path.exists(input_folder):
        print(f"Error: Folder does not exist: {input_folder}")
        return

    os.makedirs(output_folder, exist_ok=True)

    for fname in os.listdir(input_folder):
        if not fname.lower().endswith('.mp4'):
            print(f"Skipping non-video file: {fname}")
            continue
        
        video_path = os.path.join(input_folder, fname)
        print(f"Processing video: {video_path}")
        
        # Sample frames from video
        frames = sample_frames_from_video(video_path, num_samples=num_frames)
        
        if frames:
            # Extract R(2+1)D features
            try:
                features = feature_extractor.extract_features(frames)
                
                # Save the embedding as a .npy file
                output_path = os.path.join(output_folder, f"{os.path.splitext(fname)[0]}.npy")
                np.save(output_path, features)
                print(f"Saved R(2+1)D embedding to {output_path} (shape: {features.shape})")
                
            except Exception as e:
                print(f"Error processing {fname}: {e}")
        else:
            print(f"Error: No frames sampled from {fname}")

def l2_normalize(vec: np.ndarray) -> np.ndarray:
    """
    Perform L2 normalization on a vector.
    
    Args:
        vec: Input vector
        
    Returns:
        L2-normalized vector
    """
    norm = np.linalg.norm(vec)
    if norm == 0:
        return vec
    return vec / norm

def cosine_similarity(vec1: np.ndarray, vec2: np.ndarray) -> float:
    """
    Compute cosine similarity between two vectors.
    
    Args:
        vec1: First vector
        vec2: Second vector
        
    Returns:
        Cosine similarity value
    """
    # Normalize vectors
    vec1_norm = l2_normalize(vec1)
    vec2_norm = l2_normalize(vec2)
    
    # Compute cosine similarity
    return np.dot(vec1_norm, vec2_norm)

def build_prototypes(embeddings_base_path: str, event_classes: List[str]) -> Dict[str, np.ndarray]:
    """
    Build prototypes for each event class from training embeddings.
    
    Args:
        embeddings_base_path: Base path to embeddings folders
        event_classes: List of event class names
        
    Returns:
        Dictionary of prototypes for each event class
    """
    prototypes = {}
    
    for event in event_classes:
        event_train_folder = os.path.join(embeddings_base_path, event, "Train")
        
        if not os.path.exists(event_train_folder):
            print(f"Warning: Training folder not found for {event}")
            continue
            
        event_vecs = []
        print(f"Loading training embeddings for {event}...")
        
        # Load embeddings for the event
        for fname in os.listdir(event_train_folder):
            if fname.endswith(".npy"):
                vec = np.load(os.path.join(event_train_folder, fname))
                event_vecs.append(vec)
                print(f"  Loaded {fname} (shape: {vec.shape})")

        if event_vecs:
            # Compute mean vector for the event and normalize
            proto_event = np.mean(event_vecs, axis=0)
            proto_event = l2_normalize(proto_event)
            prototypes[event] = proto_event
            print(f"Built prototype for {event} from {len(event_vecs)} samples")
        else:
            print(f"Warning: No embeddings found for {event}")

    return prototypes

def classify_clip(clip_embedding: np.ndarray, prototypes: Dict[str, np.ndarray]) -> tuple:
    """
    Classify a clip based on cosine similarity to prototypes.
    
    Args:
        clip_embedding: Embedding vector of the clip
        prototypes: Dictionary of prototypes for each event class
    
    Returns:
        tuple: (predicted_label, similarity_scores)
    """
    # Normalize the clip embedding
    clip_norm = l2_normalize(clip_embedding)
    
    # Compute cosine similarities
    similarities = {}
    for event, proto in prototypes.items():
        similarity = cosine_similarity(clip_norm, proto)
        similarities[event] = similarity
    
    # Predict the class with highest similarity
    predicted_label = max(similarities, key=similarities.get)
    
    return predicted_label, similarities

def evaluate_multiclass_classification(embeddings_base_path: str, 
                                     prototypes: Dict[str, np.ndarray], 
                                     event_classes: List[str],
                                     threshold: float = 0.0) -> dict:
    """
    Evaluate multiclass classification performance on test data.
    
    Args:
        embeddings_base_path: Base path to embeddings folders
        prototypes: Dictionary of prototypes for each event class
        event_classes: List of event class names
        threshold: Decision threshold (not used in multiclass, kept for compatibility)
    
    Returns:
        Dictionary containing evaluation results
    """
    y_true = []
    y_pred = []
    results = {
        'predictions': [],
        'class_counts': {event: {'correct': 0, 'total': 0} for event in event_classes}
    }
    
    print("\n" + "="*60)
    print("EVALUATING MULTICLASS CLASSIFICATION")
    print("="*60)
    
    for event in event_classes:
        event_test_folder = os.path.join(embeddings_base_path, event, "Test")
        
        if not os.path.exists(event_test_folder):
            print(f"Warning: Test folder not found for {event}")
            continue
            
        print(f"\nTesting {event} clips...")
        
        for fname in os.listdir(event_test_folder):
            if fname.endswith(".npy"):
                test_embedding = np.load(os.path.join(event_test_folder, fname))
                predicted_label, similarities = classify_clip(test_embedding, prototypes)
                
                is_correct = predicted_label == event
                results['class_counts'][event]['total'] += 1
                
                if is_correct:
                    results['class_counts'][event]['correct'] += 1
                
                # Store for sklearn metrics
                y_true.append(event)
                y_pred.append(predicted_label)
                
                # Store detailed results
                result_entry = {
                    'file': fname,
                    'true_label': event,
                    'predicted_label': predicted_label,
                    'correct': is_correct,
                    'similarities': similarities
                }
                results['predictions'].append(result_entry)
                
                # Print results
                print(f"  File: {fname}")
                print(f"    True: {event}, Predicted: {predicted_label}, Correct: {is_correct}")
                print(f"    Similarities: {similarities}")
    
    # Calculate overall metrics
    overall_accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    classification_rep = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    
    # Calculate sklearn metrics for each class (treating as one-vs-rest)
    precision = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    
    # Calculate per-class accuracy
    class_accuracies = {}
    for event in event_classes:
        if results['class_counts'][event]['total'] > 0:
            class_accuracies[event] = results['class_counts'][event]['correct'] / results['class_counts'][event]['total']
        else:
            class_accuracies[event] = 0.0
    
    # Print results
    print(f"\n{'='*60}")
    print("EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    print(f"Weighted Precision: {precision:.4f}")
    print(f"Weighted Recall: {recall:.4f}")
    print(f"Weighted F₁ Score: {f1:.4f}")
    
    print(f"\nPer-class Accuracy:")
    for event, acc in class_accuracies.items():
        correct = results['class_counts'][event]['correct']
        total = results['class_counts'][event]['total']
        print(f"  {event}: {acc:.4f} ({correct}/{total})")
    
    print(f"\nConfusion Matrix:")
    print("Rows: True labels, Columns: Predicted labels")
    print(f"Classes: {event_classes}")
    print(cm)
    
    print(f"\nDetailed Classification Report:")
    print(classification_rep)
    
    results.update({
        'overall_accuracy': overall_accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'class_accuracies': class_accuracies,
        'confusion_matrix': cm,
        'classification_report': classification_rep,
        'y_true': y_true,
        'y_pred': y_pred
    })
    
    return results

def main():
    """
    Main function to run R(2+1)D-based multiclass few-shot learning classification.
    """
    print("Multiclass Few-Shot Learning with R(2+1)D")
    print("="*50)
    
    # Configuration
    base_input_path = "F:/AIM Lab/Experiment/sliding-window/Features-processed"
    base_output_path = "F:/AIM Lab/Experiment/sliding-window/R(2+1)D embeddings"
    
    # Event classes (using underscores to match folder names)
    event_classes = ["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]
    
    # Initialize R(2+1)D feature extractor
    print("Initializing R(2+1)D feature extractor...")
    feature_extractor = R2Plus1DFeatureExtractor()
    
    # # Step 1: Generate R(2+1)D embeddings for all videos
    # print("\nStep 1: Generating R(2+1)D embeddings...")
    # for event in event_classes:
    #     for split in ["Train", "Test"]:
    #         input_folder = os.path.join(base_input_path, event, split)
    #         output_folder = os.path.join(base_output_path, event, split)
            
    #         print(f"\nProcessing {event} - {split}...")
    #         process_folder(input_folder, output_folder, feature_extractor, num_frames=32)
    
    # Step 2: Build prototypes from training embeddings
    print("\nStep 2: Building prototypes from training data...")
    prototypes = build_prototypes(base_output_path, event_classes)
    
    if not prototypes:
        print("Error: No prototypes could be built!")
        return
    
    print(f"Successfully built prototypes for {len(prototypes)} classes:")
    for event, proto in prototypes.items():
        print(f"  {event}: shape {proto.shape}")
    
    # Step 3: Evaluate on test data with threshold 0.0
    print("\nStep 3: Evaluating classification performance with threshold 0.0...")
    results = evaluate_multiclass_classification(base_output_path, prototypes, event_classes, threshold=0.0)
    
    # Step 4: Additional analysis
    print(f"\nStep 4: Additional Analysis...")
    print(f"Total test samples: {len(results['y_true'])}")
    print(f"Number of classes: {len(event_classes)}")
    
    # Find most confused classes
    cm = results['confusion_matrix']
    print(f"\nMost confused class pairs:")
    for i, true_class in enumerate(event_classes):
        for j, pred_class in enumerate(event_classes):
            if i != j and cm[i, j] > 0:
                print(f"  {true_class} → {pred_class}: {cm[i, j]} times")
    
    print(f"\n{'='*50}")
    print("FINAL RESULTS")
    print(f"{'='*50}")
    print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
    print(f"Weighted Precision: {results['precision']:.4f}")
    print(f"Weighted Recall: {results['recall']:.4f}")
    print(f"Weighted F₁ Score: {results['f1_score']:.4f}")
    
    print(f"\n{'='*50}")
    print("MULTICLASS FEW-SHOT LEARNING COMPLETE!")
    print(f"{'='*50}")

if __name__ == "__main__":
    main()

Multiclass Few-Shot Learning with R(2+1)D
Initializing R(2+1)D feature extractor...





Step 2: Building prototypes from training data...
Loading training embeddings for Goal...
  Loaded 10_england_epl_2015-2016_2015_08_29___17_00_Manchester_City_2___0_Watford.npy (shape: (512,))
  Loaded 11_england_epl_2015-2016_2015_09_12___14_45_Everton_3___1_Chelsea.npy (shape: (512,))
  Loaded 12_england_epl_2015-2016_2015_09_12___17_00_Crystal_Palace_0___1_Manchester_City.npy (shape: (512,))
  Loaded 13_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (512,))
  Loaded 14_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (512,))
  Loaded 16_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy (shape: (512,))
  Loaded 17_england_epl_2015-2016_2015_10_17___17_00_Chelsea_2___0_Aston_Villa.npy (shape: (512,))
  Loaded 18_england_epl_2015-2016_2015_10_31___15_45_Chelsea_1___3_Liverpool.npy (shape: (512,))
  Loaded 1_england_epl_2014-2015_2015_02_21___18_00_Chelsea_1___1_Burnley.npy (shape: (512,))
  Loaded

## 7(maximum) random samples for prototype and 12(maximum) random samples for evaluation

In [5]:
import os
import numpy as np
import random
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

def l2_normalize(vec):
    """
    Perform L2 normalization on a vector.
    
    Args:
        vec (np.ndarray): Input vector.
    
    Returns:
        np.ndarray: L2-normalized vector.
    """
    norm = np.linalg.norm(vec)
    if norm == 0:
        return vec
    return vec / norm

def build_prototypes_from_test(embeddings_base_path, event_classes, max_prototype_samples=7):
    """
    Build prototypes for each event class from test embeddings.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        event_classes (list): List of event class names.
        max_prototype_samples (int): Maximum number of samples to use for prototype creation.
    
    Returns:
        tuple: (prototypes dict, selected_files dict for tracking which files were used)
    """
    prototypes = {}
    selected_files = {}
    
    for event in event_classes:
        event_test_folder = os.path.join(embeddings_base_path, event, "Test")
        
        if not os.path.exists(event_test_folder):
            print(f"Warning: Test folder not found for {event}")
            continue
            
        # Get all embedding files for this event
        all_files = [f for f in os.listdir(event_test_folder) if f.endswith(".npy")]
        
        if len(all_files) <= max_prototype_samples:
            selected_files[event] = all_files
        else:
            # Randomly select files for prototype creation
            selected_files[event] = random.sample(all_files, max_prototype_samples)
        
        event_vecs = []
        print(f"Loading {len(selected_files[event])} test embeddings for {event} prototype...")
        
        # Load selected embeddings for the event
        for fname in selected_files[event]:
            vec = np.load(os.path.join(event_test_folder, fname))
            event_vecs.append(vec)
            print(f"  Selected {fname} for prototype (shape: {vec.shape})")

        if event_vecs:
            # Compute mean vector for the event and normalize
            proto_event = np.mean(event_vecs, axis=0)
            proto_event = l2_normalize(proto_event)
            prototypes[event] = proto_event
            print(f"Built prototype for {event} from {len(event_vecs)} test samples")
        else:
            print(f"Warning: No embeddings found for {event}")

    return prototypes, selected_files

def evaluate_on_train_set(embeddings_base_path, prototypes, event_classes, selected_prototype_files, max_eval_samples=12):
    """
    Evaluate classification performance on training data.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        prototypes (dict): Dictionary of prototypes for each event class.
        event_classes (list): List of event class names.
        selected_prototype_files (dict): Files that were used for prototype creation (for reference).
        max_eval_samples (int): Maximum number of samples to evaluate per class.
    
    Returns:
        dict: Dictionary containing evaluation results.
    """
    y_true = []
    y_pred = []
    results = {
        'predictions': [],
        'class_counts': {event: {'correct': 0, 'total': 0} for event in event_classes},
        'train_results': []
    }
    
    print("\n" + "="*60)
    print(f"EVALUATING ON UP TO {max_eval_samples} TRAINING SAMPLES PER CLASS")
    print("="*60)
    
    for event in event_classes:
        event_train_folder = os.path.join(embeddings_base_path, event, "Train")
        
        if not os.path.exists(event_train_folder):
            print(f"Warning: Train folder not found for {event}")
            continue
            
        print(f"\nTesting {event} training clips...")
        
        # Get all embedding files for this event
        all_files = [f for f in os.listdir(event_train_folder) if f.endswith(".npy")]
        
        if len(all_files) <= max_eval_samples:
            selected_files = all_files
        else:
            # Randomly select files for evaluation
            selected_files = random.sample(all_files, max_eval_samples)
        
        for fname in selected_files:
            test_embedding = np.load(os.path.join(event_train_folder, fname))
            predicted_label, similarities = classify_clip(test_embedding, prototypes)
            
            is_correct = predicted_label == event
            results['class_counts'][event]['total'] += 1
            
            if is_correct:
                results['class_counts'][event]['correct'] += 1
            
            # Store for sklearn metrics
            y_true.append(event)
            y_pred.append(predicted_label)
            
            # Store detailed results
            result_entry = {
                'file': fname,
                'true_label': event,
                'predicted_label': predicted_label,
                'correct': is_correct,
                'similarities': similarities,
                'split': 'train'
            }
            results['predictions'].append(result_entry)
            results['train_results'].append(result_entry)
            
            # Print results
            print(f"  File: {fname}")
            print(f"    True: {event}, Predicted: {predicted_label}, Correct: {is_correct}")
            print(f"    Similarities: {similarities}")
    
    # Calculate overall metrics
    overall_accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    classification_rep = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    
    # Calculate sklearn metrics for each class (treating as one-vs-rest)
    precision = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    
    # Calculate per-class accuracy
    class_accuracies = {}
    for event in event_classes:
        if results['class_counts'][event]['total'] > 0:
            class_accuracies[event] = results['class_counts'][event]['correct'] / results['class_counts'][event]['total']
        else:
            class_accuracies[event] = 0.0
    
    # Print results
    print(f"\n{'='*60}")
    print("EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    print(f"Weighted Precision: {precision:.4f}")
    print(f"Weighted Recall: {recall:.4f}")
    print(f"Weighted F₁ Score: {f1:.4f}")
    
    print(f"\nPer-class Accuracy:")
    for event, acc in class_accuracies.items():
        correct = results['class_counts'][event]['correct']
        total = results['class_counts'][event]['total']
        print(f"  {event}: {acc:.4f} ({correct}/{total})")
    
    print(f"\nConfusion Matrix:")
    print("Rows: True labels, Columns: Predicted labels")
    print(f"Classes: {event_classes}")
    print(cm)
    
    print(f"\nDetailed Classification Report:")
    print(classification_rep)
    
    results.update({
        'overall_accuracy': overall_accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'class_accuracies': class_accuracies,
        'confusion_matrix': cm,
        'classification_report': classification_rep,
        'y_true': y_true,
        'y_pred': y_pred,
        'train_accuracy': overall_accuracy
    })
    
    return results

def main():
    """
    Main function to run modified multiclass few-shot learning classification.
    Uses existing mean-pooled embeddings, builds prototypes from up to 7 test samples,
    and evaluates on up to 12 training samples per class.
    """
    print("Modified Multiclass Few-Shot Learning with Existing Embeddings")
    print("Using Test Data for Prototypes, Evaluating on Training Samples")
    print("="*70)
    
    # Configuration - Path to existing embeddings
    embeddings_base_path = "F:/AIM Lab/Experiment/sliding-window/Resnet-50 embeddings"
    
    # Event classes
    event_classes = ["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]
    
    # Maximum number of test samples to use for prototype creation
    max_prototype_samples = 7
    
    # Maximum number of training samples to evaluate per class
    max_eval_samples = 12
    
    # Check if embeddings exist
    print("\nChecking for existing embeddings...")
    embeddings_exist = True
    for event in event_classes:
        for split in ["Train", "Test"]:
            folder_path = os.path.join(embeddings_base_path, event, split)
            if not os.path.exists(folder_path):
                print(f"Warning: Embeddings folder not found: {folder_path}")
                embeddings_exist = False
            else:
                files = [f for f in os.listdir(folder_path) if f.endswith(".npy")]
                print(f"  Found {len(files)} embeddings in {event}/{split}")
    
    if not embeddings_exist:
        print("Error: Some embedding folders are missing. Please generate embeddings first.")
        return
    
    # Step 1: Build prototypes from test embeddings
    print(f"\nStep 1: Building prototypes from up to {max_prototype_samples} test samples per class...")
    prototypes, selected_prototype_files = build_prototypes_from_test(
        embeddings_base_path, event_classes, max_prototype_samples
    )
    
    if not prototypes:
        print("Error: No prototypes could be built!")
        return
    
    print(f"\nSuccessfully built prototypes for {len(prototypes)} classes:")
    for event, proto in prototypes.items():
        print(f"  {event}: shape {proto.shape}")
    
    print(f"\nFiles used for prototype creation:")
    for event, files in selected_prototype_files.items():
        print(f"  {event}: {files}")
    
    # Step 2: Evaluate on training samples
    print(f"\nStep 2: Evaluating classification performance on up to {max_eval_samples} training samples per class...")
    results = evaluate_on_train_set(embeddings_base_path, prototypes, event_classes, selected_prototype_files, max_eval_samples)
    
    # Step 3: Additional analysis
    print(f"\nStep 3: Additional Analysis...")
    print(f"Total evaluation samples: {len(results['y_true'])}")
    print(f"Training samples: {len(results['train_results'])}")
    print(f"Number of classes: {len(event_classes)}")
    
    # Find most confused classes
    cm = results['confusion_matrix']
    print(f"\nMost confused class pairs:")
    for i, true_class in enumerate(event_classes):
        for j, pred_class in enumerate(event_classes):
            if i != j and cm[i, j] > 0:
                print(f"  {true_class} → {pred_class}: {cm[i, j]} times")
    
    print(f"\n{'='*70}")
    print("FINAL RESULTS")
    print(f"{'='*70}")
    print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
    print(f"Training Set Accuracy: {results['train_accuracy']:.4f}")
    print(f"Weighted Precision: {results['precision']:.4f}")
    print(f"Weighted Recall: {results['recall']:.4f}")
    print(f"Weighted F₁ Score: {results['f1_score']:.4f}")
    
    print(f"\n{'='*70}")
    print("MODIFIED MULTICLASS FEW-SHOT LEARNING COMPLETE!")
    print(f"{'='*70}")

if __name__ == "__main__":
    main()

Modified Multiclass Few-Shot Learning with Existing Embeddings
Using Test Data for Prototypes, Evaluating on Training Samples

Checking for existing embeddings...
  Found 56 embeddings in Goal/Train
  Found 15 embeddings in Goal/Test
  Found 7 embeddings in Red_card/Train
  Found 2 embeddings in Red_card/Test
  Found 57 embeddings in Yellow_card/Train
  Found 15 embeddings in Yellow_card/Test
  Found 52 embeddings in Direct_free-kick/Train
  Found 14 embeddings in Direct_free-kick/Test
  Found 24 embeddings in Penalty/Train
  Found 6 embeddings in Penalty/Test
  Found 171 embeddings in no_event/Train
  Found 43 embeddings in no_event/Test

Step 1: Building prototypes from up to 7 test samples per class...
Loading 7 test embeddings for Goal prototype...
  Selected 55_england_epl_2016-2017_2016_10_22___19_30_Liverpool_2___1_West_Brom.npy for prototype (shape: (2048,))
  Selected 61_england_epl_2016-2017_2016_11_06___17_15_Liverpool_6___1_Watford.npy for prototype (shape: (2048,))
  Selec

In [6]:
import os
import numpy as np
import random
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

def l2_normalize(vec):
    """
    Perform L2 normalization on a vector.
    
    Args:
        vec (np.ndarray): Input vector.
    
    Returns:
        np.ndarray: L2-normalized vector.
    """
    norm = np.linalg.norm(vec)
    if norm == 0:
        return vec
    return vec / norm

def build_prototypes_from_test(embeddings_base_path, event_classes, num_prototype_samples=5):
    """
    Build prototypes for each event class from randomly selected test embeddings.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        event_classes (list): List of event class names.
        num_prototype_samples (int): Number of random samples to use for prototype creation.
    
    Returns:
        tuple: (prototypes dict, selected_files dict for tracking which files were used)
    """
    prototypes = {}
    selected_files = {}
    
    
    for event in event_classes:
        event_test_folder = os.path.join(embeddings_base_path, event, "Test")
        
        if not os.path.exists(event_test_folder):
            print(f"Warning: Test folder not found for {event}")
            continue
            
        # Get all embedding files for this event
        all_files = [f for f in os.listdir(event_test_folder) if f.endswith(".npy")]
        
        if len(all_files) < num_prototype_samples:
            print(f"Warning: Only {len(all_files)} files available for {event}, using all of them for prototype")
            selected_files[event] = all_files
        else:
            # Randomly select files for prototype creation
            selected_files[event] = random.sample(all_files, num_prototype_samples)
        
        event_vecs = []
        print(f"Loading {len(selected_files[event])} random test embeddings for {event} prototype...")
        
        # Load selected embeddings for the event
        for fname in selected_files[event]:
            vec = np.load(os.path.join(event_test_folder, fname))
            event_vecs.append(vec)
            print(f"  Selected {fname} for prototype (shape: {vec.shape})")

        if event_vecs:
            # Compute mean vector for the event and normalize
            proto_event = np.mean(event_vecs, axis=0)
            proto_event = l2_normalize(proto_event)
            prototypes[event] = proto_event
            print(f"Built prototype for {event} from {len(event_vecs)} test samples")
        else:
            print(f"Warning: No embeddings found for {event}")

    return prototypes, selected_files

def classify_clip(clip_embedding, prototypes):
    """
    Classify a clip based on cosine similarity to prototypes.
    
    Args:
        clip_embedding (np.ndarray): Embedding vector of the clip.
        prototypes (dict): Dictionary of prototypes for each event class.
    
    Returns:
        tuple: (predicted_label, similarity_scores)
    """
    # Normalize the clip embedding
    clip_norm = l2_normalize(clip_embedding)
    
    # Compute cosine similarities
    similarities = {}
    for event, proto in prototypes.items():
        similarity = np.dot(clip_norm, proto)
        similarities[event] = similarity
    
    # Predict the class with highest similarity
    predicted_label = max(similarities, key=similarities.get)
    
    return predicted_label, similarities

def evaluate_on_train_set(embeddings_base_path, prototypes, event_classes, selected_prototype_files, num_eval_samples=12):
    """
    Evaluate classification performance on a random subset of training data.
    
    Args:
        embeddings_base_path (str): Base path to embeddings folders.
        prototypes (dict): Dictionary of prototypes for each event class.
        event_classes (list): List of event class names.
        selected_prototype_files (dict): Files that were used for prototype creation (for reference).
        num_eval_samples (int): Number of random samples to evaluate per class.
    
    Returns:
        dict: Dictionary containing evaluation results.
    """
    y_true = []
    y_pred = []
    results = {
        'predictions': [],
        'class_counts': {event: {'correct': 0, 'total': 0} for event in event_classes},
        'train_results': []
    }
    
    print("\n" + "="*60)
    print(f"EVALUATING ON {num_eval_samples} RANDOM TRAINING SAMPLES PER CLASS")
    print("="*60)
    
    for event in event_classes:
        event_train_folder = os.path.join(embeddings_base_path, event, "Train")
        
        if not os.path.exists(event_train_folder):
            print(f"Warning: Train folder not found for {event}")
            continue
            
        print(f"\nTesting {event} training clips...")
        
        # Get all embedding files for this event
        all_files = [f for f in os.listdir(event_train_folder) if f.endswith(".npy")]
        
        if len(all_files) < num_eval_samples:
            print(f"Warning: Only {len(all_files)} files available for {event}, using all of them for evaluation")
            selected_files = all_files
        else:
            # Randomly select files for evaluation
            selected_files = random.sample(all_files, num_eval_samples)
        
        for fname in selected_files:
            test_embedding = np.load(os.path.join(event_train_folder, fname))
            predicted_label, similarities = classify_clip(test_embedding, prototypes)
            
            is_correct = predicted_label == event
            results['class_counts'][event]['total'] += 1
            
            if is_correct:
                results['class_counts'][event]['correct'] += 1
            
            # Store for sklearn metrics
            y_true.append(event)
            y_pred.append(predicted_label)
            
            # Store detailed results
            result_entry = {
                'file': fname,
                'true_label': event,
                'predicted_label': predicted_label,
                'correct': is_correct,
                'similarities': similarities,
                'split': 'train'
            }
            results['predictions'].append(result_entry)
            results['train_results'].append(result_entry)
            
            # Print results
            print(f"  File: {fname}")
            print(f"    True: {event}, Predicted: {predicted_label}, Correct: {is_correct}")
            print(f"    Similarities: {similarities}")
    
    # Calculate overall metrics
    overall_accuracy = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    classification_rep = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    
    # Calculate sklearn metrics for each class (treating as one-vs-rest)
    precision = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    recall = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    
    # Calculate per-class accuracy
    class_accuracies = {}
    for event in event_classes:
        if results['class_counts'][event]['total'] > 0:
            class_accuracies[event] = results['class_counts'][event]['correct'] / results['class_counts'][event]['total']
        else:
            class_accuracies[event] = 0.0
    
    # Print results
    print(f"\n{'='*60}")
    print("EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {overall_accuracy:.4f}")
    print(f"Weighted Precision: {precision:.4f}")
    print(f"Weighted Recall: {recall:.4f}")
    print(f"Weighted F₁ Score: {f1:.4f}")
    
    print(f"\nPer-class Accuracy:")
    for event, acc in class_accuracies.items():
        correct = results['class_counts'][event]['correct']
        total = results['class_counts'][event]['total']
        print(f"  {event}: {acc:.4f} ({correct}/{total})")
    
    print(f"\nConfusion Matrix:")
    print("Rows: True labels, Columns: Predicted labels")
    print(f"Classes: {event_classes}")
    print(cm)
    
    print(f"\nDetailed Classification Report:")
    print(classification_rep)
    
    results.update({
        'overall_accuracy': overall_accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'class_accuracies': class_accuracies,
        'confusion_matrix': cm,
        'classification_report': classification_rep,
        'y_true': y_true,
        'y_pred': y_pred,
        'train_accuracy': overall_accuracy
    })
    
    return results

def main():
    """
    Main function to run modified multiclass few-shot learning classification.
    Uses existing mean-pooled embeddings, builds prototypes from random 5 test samples,
    and evaluates on 12 random training samples per class.
    """
    print("Modified Multiclass Few-Shot Learning with Existing Embeddings")
    print("Using Test Data for Prototypes, Evaluating on Random Training Samples")
    print("="*70)
    
    # Configuration - Path to existing embeddings
    embeddings_base_path = "F:/AIM Lab/Experiment/sliding-window/R(2+1)D embeddings"
    
    # Event classes
    event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]
    
    # Number of test samples to use for prototype creation
    num_prototype_samples = 7
    
    # Number of training samples to evaluate per class
    num_eval_samples = 12
    
    # Check if embeddings exist
    print("\nChecking for existing embeddings...")
    embeddings_exist = True
    for event in event_classes:
        for split in ["Train", "Test"]:
            folder_path = os.path.join(embeddings_base_path, event, split)
            if not os.path.exists(folder_path):
                print(f"Warning: Embeddings folder not found: {folder_path}")
                embeddings_exist = False
            else:
                files = [f for f in os.listdir(folder_path) if f.endswith(".npy")]
                print(f"  Found {len(files)} embeddings in {event}/{split}")
    
    if not embeddings_exist:
        print("Error: Some embedding folders are missing. Please generate embeddings first.")
        return
    
    # Step 1: Build prototypes from random subset of test embeddings
    print(f"\nStep 1: Building prototypes from {num_prototype_samples} random test samples per class...")
    prototypes, selected_prototype_files = build_prototypes_from_test(
        embeddings_base_path, event_classes, num_prototype_samples
    )
    
    if not prototypes:
        print("Error: No prototypes could be built!")
        return
    
    print(f"\nSuccessfully built prototypes for {len(prototypes)} classes:")
    for event, proto in prototypes.items():
        print(f"  {event}: shape {proto.shape}")
    
    print(f"\nFiles used for prototype creation:")
    for event, files in selected_prototype_files.items():
        print(f"  {event}: {files}")
    
    # Step 2: Evaluate on random training samples
    print(f"\nStep 2: Evaluating classification performance on {num_eval_samples} random training samples per class...")
    results = evaluate_on_train_set(embeddings_base_path, prototypes, event_classes, selected_prototype_files, num_eval_samples)
    
    # Step 3: Additional analysis
    print(f"\nStep 3: Additional Analysis...")
    print(f"Total evaluation samples: {len(results['y_true'])}")
    print(f"Training samples: {len(results['train_results'])}")
    print(f"Number of classes: {len(event_classes)}")
    
    # Find most confused classes
    cm = results['confusion_matrix']
    print(f"\nMost confused class pairs:")
    for i, true_class in enumerate(event_classes):
        for j, pred_class in enumerate(event_classes):
            if i != j and cm[i, j] > 0:
                print(f"  {true_class} → {pred_class}: {cm[i, j]} times")
    
    print(f"\n{'='*70}")
    print("FINAL RESULTS")
    print(f"{'='*70}")
    print(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
    print(f"Training Set Accuracy: {results['train_accuracy']:.4f}")
    print(f"Weighted Precision: {results['precision']:.4f}")
    print(f"Weighted Recall: {results['recall']:.4f}")
    print(f"Weighted F₁ Score: {results['f1_score']:.4f}")
    
    print(f"\n{'='*70}")
    print("MODIFIED MULTICLASS FEW-SHOT LEARNING COMPLETE!")
    print(f"{'='*70}")

if __name__ == "__main__":
    main()

Modified Multiclass Few-Shot Learning with Existing Embeddings
Using Test Data for Prototypes, Evaluating on Random Training Samples

Checking for existing embeddings...
  Found 56 embeddings in Goal/Train
  Found 15 embeddings in Goal/Test
  Found 7 embeddings in Red_card/Train
  Found 2 embeddings in Red_card/Test
  Found 57 embeddings in Yellow_card/Train
  Found 15 embeddings in Yellow_card/Test
  Found 52 embeddings in Direct_free-kick/Train
  Found 14 embeddings in Direct_free-kick/Test
  Found 24 embeddings in Penalty/Train
  Found 6 embeddings in Penalty/Test
  Found 171 embeddings in no_event/Train
  Found 43 embeddings in no_event/Test

Step 1: Building prototypes from 7 random test samples per class...
Loading 7 random test embeddings for Goal prototype...
  Selected 23_england_epl_2015-2016_2015_12_05___20_30_Chelsea_0___1_Bournemouth.npy for prototype (shape: (512,))
  Selected 55_england_epl_2016-2017_2016_10_22___19_30_Liverpool_2___1_West_Brom.npy for prototype (shape: 

# Few-shot Using audio modality

In [19]:
import os
from moviepy import VideoFileClip

def extract_audio_from_video(video_path, audio_path, duration=7.0):
    try:
        video = VideoFileClip(video_path)
        audio = video.audio
        if audio is not None:
            audio = audio.subclipped(0, min(duration, video.duration))
            audio.write_audiofile(audio_path, fps=16000, codec='pcm_s16le', logger=None)
            print(f"Extracted audio: {audio_path}")
        else:
            print(f"No audio found in {video_path}")
        video.close()
    except Exception as e:
        print(f"Error extracting audio: {e}")

def batch_extract_audio(clips_dir, audio_dir):
    os.makedirs(audio_dir, exist_ok=True)
    for fname in os.listdir(clips_dir):
        if fname.endswith('.mp4'):
            video_path = os.path.join(clips_dir, fname)
            audio_path = os.path.join(audio_dir, os.path.splitext(fname)[0] + '.wav')
            extract_audio_from_video(video_path, audio_path)

# Example usage for all events and splits
base_clip_dir = "F:/AIM Lab/Experiment/sliding-window/Clips"
base_audio_dir = "F:/AIM Lab/Experiment/sliding-window/Audio"
event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "Indirect_free-kick", "Corner", "Substitution", "Shots_on_target", "no_event"]
splits = ["train", "test"]

for event in event_classes:
    for split in splits:
        clips_dir = os.path.join(base_clip_dir, event, split)
        audio_dir = os.path.join(base_audio_dir, event, split)
        batch_extract_audio(clips_dir, audio_dir)

Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\10_england_epl_2015-2016_2015_08_29___17_00_Manchester_City_2___0_Watford.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\11_england_epl_2015-2016_2015_09_12___14_45_Everton_3___1_Chelsea.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\12_england_epl_2015-2016_2015_09_12___17_00_Crystal_Palace_0___1_Manchester_City.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\13_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\14_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\16_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.wav
Extracted audio: F:/AIM Lab/Experiment/sliding-window/Audio\Goal\train\17_england_epl_2015-2016_2015_10_17___17_0

In [20]:
import librosa
import numpy as np

def compute_logmelspec(audio_path, n_mels=64, sr=16000, duration=7.0):
    """Enhanced log-mel spectrogram computation with quality checks."""
    try:
        y, original_sr = librosa.load(audio_path, sr=sr, duration=duration)
        
        # Quality check
        if len(y) == 0:
            print(f"Warning: Empty audio file {audio_path}")
            return None
            
        # Normalize audio amplitude
        y = librosa.util.normalize(y)
        
        # Ensure exact duration
        target_len = int(sr * duration)
        if len(y) < target_len:
            y = np.pad(y, (0, target_len - len(y)))
        else:
            y = y[:target_len]
        
        # Compute mel spectrogram with better parameters
        mel = librosa.feature.melspectrogram(
            y=y, 
            sr=sr, 
            n_mels=n_mels,
            hop_length=512,
            win_length=2048,
            n_fft=2048
        )
        logmel = librosa.power_to_db(mel, ref=np.max)
        
        return logmel
        
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")
        return None

def batch_logmelspec(audio_dir, out_dir, n_mels=64):
    os.makedirs(out_dir, exist_ok=True)
    for fname in os.listdir(audio_dir):
        if fname.endswith('.wav'):
            audio_path = os.path.join(audio_dir, fname)
            logmel = compute_logmelspec(audio_path, n_mels=n_mels)
            np.save(os.path.join(out_dir, os.path.splitext(fname)[0] + '.npy'), logmel)

# Example usage for all events and splits
base_audio_dir = "F:/AIM Lab/Experiment/sliding-window/Audio"
base_logmel_dir = "F:/AIM Lab/Experiment/sliding-window/Logmelspec"
for event in event_classes:
    for split in splits:
        audio_dir = os.path.join(base_audio_dir, event, split)
        logmel_dir = os.path.join(base_logmel_dir, event, split)
        batch_logmelspec(audio_dir, logmel_dir)

In [21]:
import tensorflow as tf
import numpy as np
import tensorflow_hub as hub

yamnet_model_handle = 'https://tfhub.dev/google/yamnet/1'
yamnet_model = hub.load(yamnet_model_handle)

def yamnet_embed(logmel):
    # YAMNet expects mono waveform, but we have logmel, so use the waveform directly if possible
    # If you have to use logmel, you need to adapt the model, but with TFHub YAMNet, use waveform:
    # Instead, load .wav and pass waveform to yamnet_model
    # For this example, let's assume you have waveform available
    # If you must use logmel, you need to use the original YAMNet code from TF Models repo
    raise NotImplementedError("For TFHub YAMNet, pass waveform not logmel. Use .wav files directly.")
def batch_yamnet_embed(audio_dir, out_dir):
    import soundfile as sf
    import librosa
    
    os.makedirs(out_dir, exist_ok=True)
    for fname in os.listdir(audio_dir):
        if fname.endswith('.wav'):
            try:
                # Load audio file
                wav, sr = sf.read(os.path.join(audio_dir, fname))
                
                # Convert stereo to mono if needed
                if len(wav.shape) == 2:
                    wav = np.mean(wav, axis=1)
                
                # Resample to 16kHz if needed
                if sr != 16000:
                    wav = librosa.resample(wav, orig_sr=sr, target_sr=16000)
                
                # Ensure correct duration (7 seconds)
                target_length = 16000 * 7
                if len(wav) > target_length:
                    wav = wav[:target_length]
                elif len(wav) < target_length:
                    wav = np.pad(wav, (0, target_length - len(wav)))
                
                # YAMNet expects float32 mono waveform
                scores, embeddings, spectrogram = yamnet_model(wav.astype(np.float32))
                
                # Mean-pool embeddings (same as video approach)
                emb = np.mean(embeddings.numpy(), axis=0)  # (1024,)
                
                # Save embedding
                output_path = os.path.join(out_dir, os.path.splitext(fname)[0] + '.npy')
                np.save(output_path, emb)
                print(f"Saved audio embedding: {output_path}")
                
            except Exception as e:
                print(f"Error processing {fname}: {e}")

# Example usage for all events and splits
base_audio_dir = "F:/AIM Lab/Experiment/sliding-window/Audio"
base_yamnet_dir = "F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings"
event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "Indirect_free-kick", "Corner", "Substitution", "Shots_on_target", "no_event"]
for event in event_classes:
    for split in splits:
        audio_dir = os.path.join(base_audio_dir, event, split)
        yamnet_dir = os.path.join(base_yamnet_dir, event, split)
        batch_yamnet_embed(audio_dir, yamnet_dir)

Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\10_england_epl_2015-2016_2015_08_29___17_00_Manchester_City_2___0_Watford.npy
Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\11_england_epl_2015-2016_2015_09_12___14_45_Everton_3___1_Chelsea.npy
Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\12_england_epl_2015-2016_2015_09_12___17_00_Crystal_Palace_0___1_Manchester_City.npy
Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\13_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy
Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\14_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy
Saved audio embedding: F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings\Goal\train\16_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy
Saved

In [11]:
import numpy as np
import os
import random
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

def l2_normalize(vec):
    norm = np.linalg.norm(vec)
    return vec if norm == 0 else vec / norm

def build_audio_prototypes(embeddings_base_path, event_classes, max_samples=7):
    prototypes = {}
    for event in event_classes:
        folder = os.path.join(embeddings_base_path, event, "Test")
        files = [f for f in os.listdir(folder) if f.endswith('.npy')]
        if len(files) > max_samples:
            files = random.sample(files, max_samples)
        vecs = [np.load(os.path.join(folder, f)) for f in files]
        proto = l2_normalize(np.mean(vecs, axis=0))
        prototypes[event] = proto
    return prototypes

def classify_audio_clip(clip_embedding, prototypes):
    clip_norm = l2_normalize(clip_embedding)
    similarities = {event: np.dot(clip_norm, proto) for event, proto in prototypes.items()}
    predicted = max(similarities, key=similarities.get)
    return predicted, similarities

def evaluate_audio_fewshot(embeddings_base_path, prototypes, event_classes, max_eval=12):
    y_true, y_pred = [], []
    detailed_results = []
    for event in event_classes:
        folder = os.path.join(embeddings_base_path, event, "Train")
        files = [f for f in os.listdir(folder) if f.endswith('.npy')]
        if len(files) > max_eval:
            files = random.sample(files, max_eval)
        for f in files:
            emb = np.load(os.path.join(folder, f))
            pred, similarities = classify_audio_clip(emb, prototypes)
            y_true.append(event)
            y_pred.append(pred)
            detailed_results.append({
                "file": f,
                "true_label": event,
                "predicted_label": pred,
                "correct": pred == event,
                "similarities": similarities
            })
            print(f"  File: {f}")
            print(f"    True: {event}, Predicted: {pred}, Correct: {pred == event}")
            print(f"    Similarities: {similarities}")
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    rec = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    report = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    print(f"\n{'='*60}")
    print("AUDIO FEW-SHOT EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {acc:.4f}")
    print(f"Weighted Precision: {prec:.4f}")
    print(f"Weighted Recall: {rec:.4f}")
    print(f"Weighted F1 Score: {f1:.4f}")
    print("\nConfusion Matrix:")
    print(cm)
    print("\nClassification Report:")
    print(report)
    return {
        'accuracy': acc,
        'precision': prec,
        'recall': rec,
        'f1': f1,
        'confusion_matrix': cm,
        'classification_report': report,
        'y_true': y_true,
        'y_pred': y_pred
    }

# Example usage
base_audio_emb_path = "F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings"
event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]
max_proto = 7
max_eval = 12
splits = ["train", "test"]
prototypes = build_audio_prototypes(base_audio_emb_path, event_classes, max_samples=max_proto)
results = evaluate_audio_fewshot(base_audio_emb_path, prototypes, event_classes, max_eval=max_eval)

  File: 71_england_epl_2016-2017_2016_12_10___20_30_Leicester_4___2_Manchester_City.npy
    True: Goal, Predicted: Direct_free-kick, Correct: False
    Similarities: {'Goal': 0.2353709, 'Red_card': 0.9098073, 'Yellow_card': 0.8756726, 'Direct_free-kick': 0.9158697, 'Penalty': 0.8664119, 'no_event': 0.87579685}
  File: 36_england_epl_2015-2016_2016_02_14___19_15_Manchester_City_1___2_Tottenham.npy
    True: Goal, Predicted: Red_card, Correct: False
    Similarities: {'Goal': 0.24759236, 'Red_card': 0.9283346, 'Yellow_card': 0.91241753, 'Direct_free-kick': 0.88368946, 'Penalty': 0.91185695, 'no_event': 0.91512424}
  File: 39_england_epl_2015-2016_2016_03_19___18_00_Chelsea_2___2_West_Ham.npy
    True: Goal, Predicted: Penalty, Correct: False
    Similarities: {'Goal': 0.25911853, 'Red_card': 0.8814073, 'Yellow_card': 0.8869864, 'Direct_free-kick': 0.86317515, 'Penalty': 0.9244723, 'no_event': 0.90881664}
  File: 67_england_epl_2016-2017_2016_11_26___18_00_Liverpool_2___0_Sunderland.npy
 

In [9]:
import torch
import torchaudio
from torchvggish import vggish, vggish_input

# Load VGGish model
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = vggish()
model.eval()
model.to(device)

def vggish_embed(audio_path):
    # Load audio (mono, 16kHz)
    waveform, sr = torchaudio.load(audio_path)
    if sr != 16000:
        waveform = torchaudio.functional.resample(waveform, sr, 16000)
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)
    # Convert waveform to numpy for vggish_input
    waveform_np = waveform.squeeze().cpu().numpy()
    # Get log-mel patches as VGGish expects
    input_batch = vggish_input.waveform_to_examples(waveform_np, 16000)
    input_tensor = torch.from_numpy(input_batch).to(device).float()
    # Get embeddings
    with torch.no_grad():
        emb = model(input_tensor)
    # Mean-pool over all patches
    return emb.cpu().numpy().mean(axis=0)  # shape (128,)

def batch_vggish_embed(audio_dir, out_dir):
    os.makedirs(out_dir, exist_ok=True)
    for fname in os.listdir(audio_dir):
        if fname.endswith('.wav'):
            try:
                emb = vggish_embed(os.path.join(audio_dir, fname))
                np.save(os.path.join(out_dir, os.path.splitext(fname)[0] + '.npy'), emb)
                print(f"Saved VGGish embedding: {fname}")
            except Exception as e:
                print(f"Error processing {fname}: {e}")

# Example usage for all events and splits
base_audio_dir = "F:/AIM Lab/Experiment/sliding-window/Audio"
base_yamnet_dir = "F:/AIM Lab/Experiment/sliding-window/VGGish-Embeddings"
event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty","no_event"]
for event in event_classes:
    for split in splits:
        audio_dir = os.path.join(base_audio_dir, event, split)
        yamnet_dir = os.path.join(base_yamnet_dir, event, split)
        batch_yamnet_embed(audio_dir, yamnet_dir)

NameError: name 'batch_yamnet_embed' is not defined

In [10]:
def build_audio_prototypes(embeddings_base_path, event_classes):
    """
    Build prototypes for each event class from ALL training embeddings.
    """
    prototypes = {}
    for event in event_classes:
        folder = os.path.join(embeddings_base_path, event, "Train")
        files = [f for f in os.listdir(folder) if f.endswith('.npy')]
        if not files:
            print(f"Warning: No embeddings found for {event} in Train folder.")
            continue
        vecs = [np.load(os.path.join(folder, f)) for f in files]
        proto = l2_normalize(np.mean(vecs, axis=0))
        prototypes[event] = proto
    return prototypes

def evaluate_audio_fewshot(embeddings_base_path, prototypes, event_classes):
    """
    Evaluate on ALL test embeddings for each class.
    """
    y_true, y_pred = [], []
    detailed_results = []
    for event in event_classes:
        folder = os.path.join(embeddings_base_path, event, "Test")
        files = [f for f in os.listdir(folder) if f.endswith('.npy')]
        if not files:
            print(f"Warning: No embeddings found for {event} in Test folder.")
            continue
        for f in files:
            emb = np.load(os.path.join(folder, f))
            pred, similarities = classify_audio_clip(emb, prototypes)
            y_true.append(event)
            y_pred.append(pred)
            detailed_results.append({
                "file": f,
                "true_label": event,
                "predicted_label": pred,
                "correct": pred == event,
                "similarities": similarities
            })
            print(f"  File: {f}")
            print(f"    True: {event}, Predicted: {pred}, Correct: {pred == event}")
            print(f"    Similarities: {similarities}")
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    rec = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    report = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    print(f"\n{'='*60}")
    print("AUDIO FEW-SHOT EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {acc:.4f}")
    print(f"Weighted Precision: {prec:.4f}")
    print(f"Weighted Recall: {rec:.4f}")
    print(f"Weighted F1 Score: {f1:.4f}")
    print("\nConfusion Matrix:")
    print(cm)
    print("\nClassification Report:")
    print(report)
    return {
        'accuracy': acc,
        'precision': prec,
        'recall': rec,
        'f1': f1,
        'confusion_matrix': cm,
        'classification_report': report,
        'y_true': y_true,
        'y_pred': y_pred
    }

# Example usage
base_audio_emb_path = "F:/AIM Lab/Experiment/sliding-window/VGGish-Embeddings"
event_classes =["Goal", "Red_card", "Yellow_card", "Direct_free-kick", "Penalty", "no_event"]

prototypes = build_audio_prototypes(base_audio_emb_path, event_classes)
results = evaluate_audio_fewshot(base_audio_emb_path, prototypes, event_classes)

  File: 15_england_epl_2015-2016_2015_09_26___17_00_Liverpool_3___2_Aston_Villa.npy
    True: Goal, Predicted: Goal, Correct: True
    Similarities: {'Goal': 0.9446552, 'Red_card': 0.9091912, 'Yellow_card': 0.6700227, 'Direct_free-kick': 0.52644676, 'Penalty': 0.44148427, 'no_event': 0.7208377}
  File: 19_england_epl_2015-2016_2015_10_31___15_45_Chelsea_1___3_Liverpool.npy
    True: Goal, Predicted: Goal, Correct: True
    Similarities: {'Goal': 0.9552772, 'Red_card': 0.9257498, 'Yellow_card': 0.6630518, 'Direct_free-kick': 0.5165465, 'Penalty': 0.4367041, 'no_event': 0.7132679}
  File: 23_england_epl_2015-2016_2015_12_05___20_30_Chelsea_0___1_Bournemouth.npy
    True: Goal, Predicted: Goal, Correct: True
    Similarities: {'Goal': 0.96031415, 'Red_card': 0.9420057, 'Yellow_card': 0.7002819, 'Direct_free-kick': 0.549065, 'Penalty': 0.4597901, 'no_event': 0.7525028}
  File: 26_england_epl_2015-2016_2015_12_26___18_00_Manchester_City_4___1_Sunderland.npy
    True: Goal, Predicted: Penalt

## Multimodel few-shot

In [12]:
import numpy as np
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, precision_score, recall_score, f1_score

def l2_normalize(vec):
    norm = np.linalg.norm(vec)
    return vec if norm == 0 else vec / norm

def combine_audio_video_features(audio_embeddings_path, video_embeddings_path, event_classes):
    """
    Combine audio and video embeddings for multimodal classification.
    """
    combined_features = {}
    
    for event in event_classes:
        for split in ["Train", "Test"]:
            audio_folder = os.path.join(audio_embeddings_path, event, split)
            video_folder = os.path.join(video_embeddings_path, event, split)
            
            combined_features[f"{event}_{split}"] = []
            
            # Check if both folders exist
            if not os.path.exists(audio_folder) or not os.path.exists(video_folder):
                print(f"Warning: Missing folder for {event}/{split}")
                continue
            
            # Get common files between audio and video
            audio_files = {f.replace('.npy', '') for f in os.listdir(audio_folder) if f.endswith('.npy')}
            video_files = {f.replace('.npy', '') for f in os.listdir(video_folder) if f.endswith('.npy')}
            common_files = audio_files.intersection(video_files)
            
            print(f"{event}/{split}: {len(common_files)} common files")
            
            for file_base in common_files:
                # Load audio embedding
                audio_emb = np.load(os.path.join(audio_folder, f"{file_base}.npy"))
                # Load video embedding  
                video_emb = np.load(os.path.join(video_folder, f"{file_base}.npy"))
                
                # Concatenate features
                combined_emb = np.concatenate([audio_emb, video_emb])
                combined_features[f"{event}_{split}"].append(combined_emb)
    
    return combined_features


def build_multimodal_prototypes(combined_features, event_classes, max_proto=7):
    """
    Build prototypes for each class from up to max_proto training samples.
    """
    prototypes = {}
    used_indices = {}
    for event in event_classes:
        feats = combined_features.get(f"{event}_Train", [])
        if len(feats) == 0:
            continue
        idxs = np.arange(len(feats))
        if len(feats) > max_proto:
            idxs = np.random.choice(idxs, max_proto, replace=False)
        used_indices[event] = idxs
        vecs = np.array([feats[i] for i in idxs])
        proto = l2_normalize(np.mean(vecs, axis=0))
        prototypes[event] = proto
    return prototypes, used_indices

def classify_multimodal_clip(clip_embedding, prototypes):
    clip_norm = l2_normalize(clip_embedding)
    similarities = {event: np.dot(clip_norm, proto) for event, proto in prototypes.items()}
    predicted = max(similarities, key=similarities.get)
    return predicted, similarities

def evaluate_multimodal_fewshot(combined_features, prototypes, event_classes, max_eval=12):
    y_true, y_pred = [], []
    detailed_results = []
    for event in event_classes:
        feats = combined_features.get(f"{event}_Test", [])
        idxs = np.arange(len(feats))
        if len(feats) > max_eval:
            idxs = np.random.choice(idxs, max_eval, replace=False)
        for i in idxs:
            emb = feats[i]
            pred, similarities = classify_multimodal_clip(emb, prototypes)
            y_true.append(event)
            y_pred.append(pred)
            detailed_results.append({
                "true_label": event,
                "predicted_label": pred,
                "correct": pred == event,
                "similarities": similarities
            })
            print(f"  Test sample: True={event}, Predicted={pred}, Correct={pred == event}")
    acc = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    rec = recall_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    f1 = f1_score(y_true, y_pred, labels=event_classes, average='weighted', zero_division=0)
    cm = confusion_matrix(y_true, y_pred, labels=event_classes)
    report = classification_report(y_true, y_pred, labels=event_classes, zero_division=0)
    print(f"\n{'='*60}")
    print("MULTIMODAL FEW-SHOT PROTOTYPE EVALUATION RESULTS")
    print(f"{'='*60}")
    print(f"Overall Accuracy: {acc:.4f}")
    print(f"Weighted Precision: {prec:.4f}")
    print(f"Weighted Recall: {rec:.4f}")
    print(f"Weighted F1 Score: {f1:.4f}")
    print("\nConfusion Matrix:")
    print(cm)
    print("\nClassification Report:")
    print(report)
    # Per-class correct prediction summary
    print("\nPer-class Correct Prediction Counts:")
    for cls in event_classes:
        cls_indices = [i for i, label in enumerate(y_true) if label == cls]
        if not cls_indices:
            print(f"{cls:15s}: No test samples.")
            continue
        cls_true = [y_true[i] for i in cls_indices]
        cls_pred = [y_pred[i] for i in cls_indices]
        correct_count = sum([t == p for t, p in zip(cls_true, cls_pred)])
        total = len(cls_indices)
        print(f"{cls:15s}: {correct_count} correct out of {total} ({correct_count/total:.2%})")
    return {
        'accuracy': acc,
        'precision': prec,
        'recall': rec,
        'f1': f1,
        'confusion_matrix': cm,
        'classification_report': report,
        'y_true': y_true,
        'y_pred': y_pred,
        'detailed_results': detailed_results
    }

# --- USAGE EXAMPLE ---
# event_classes = [ ... ]  # Already defined above
# combined_features = ...  # Already created above

audio_path = "F:/AIM Lab/Experiment/sliding-window/Yamnet-embeddings"
video_path = "F:/AIM Lab/Experiment/sliding-window/Resnet-50 embeddings"
# Combine features
print("Combining audio and video features...")
combined_features = combine_audio_video_features(audio_path, video_path, event_classes)

print("\nBuilding multimodal prototypes (few-shot, cosine similarity)...")
prototypes, used_indices = build_multimodal_prototypes(combined_features, event_classes, max_proto=7)

print("\nEvaluating multimodal prototype-based few-shot classification...")
results_proto = evaluate_multimodal_fewshot(combined_features, prototypes, event_classes, max_eval=12)

Combining audio and video features...
Goal/Train: 56 common files
Goal/Test: 15 common files
Red_card/Train: 7 common files
Red_card/Test: 2 common files
Yellow_card/Train: 57 common files
Yellow_card/Test: 15 common files
Direct_free-kick/Train: 52 common files
Direct_free-kick/Test: 14 common files
Penalty/Train: 24 common files
Penalty/Test: 6 common files
no_event/Train: 171 common files
no_event/Test: 43 common files

Building multimodal prototypes (few-shot, cosine similarity)...

Evaluating multimodal prototype-based few-shot classification...
  Test sample: True=Goal, Predicted=Goal, Correct=True
  Test sample: True=Goal, Predicted=no_event, Correct=False
  Test sample: True=Goal, Predicted=Goal, Correct=True
  Test sample: True=Goal, Predicted=no_event, Correct=False
  Test sample: True=Goal, Predicted=Penalty, Correct=False
  Test sample: True=Goal, Predicted=no_event, Correct=False
  Test sample: True=Goal, Predicted=no_event, Correct=False
  Test sample: True=Goal, Predicte