**MODEL PIPELINE** This is a CRNN based model with an attention layer, which takes as input clips, in form of log mel spectrograms, with the game audio and the streamer audio stacked.

**1. Downloading the custom Dataset from Kaggle:**

---


TO GET THE DATASET GO TO KAGGLE SETTINGS AND DOWNLOAD
THE API KEY FILE(JSON), UNDER THE LEGACY API CREDENTIALS AND THEN UPLOAD IT TO COLAB LOCAL STORAGE MAKE SURE THE FOLDER IT IS UNDER MATCHES THE FOLDER PATH IN THE FIRST CELL.

In [None]:
import os
import kagglehub

# the folder containing the key
os.environ['KAGGLE_CONFIG_DIR'] = "/content"

# download dataset
path = kagglehub.dataset_download("aadityahh/dataset-highlight-sense")

print("Dataset path:", path)

**2. CONFIGURATION** the output_dir stores the path of the trained model after training is done, all the hyperparameters are listed here.

In [None]:
base_path = path # after copying files from kaggle to local ssd for faster processing
output_dir='/content/best_model.pt'
# log mel spectrograms specification
n_mels= 80
hop_length= 512
sample_rate= 22050
# model specifications
max_duration_sec= 60.0
max_time_frames= 2584 # Calculated as int(max_duration_sec * sample_rate / hop_length)
rnn_dropout= 0.3
dropout= 0.5
batch_size= 64
lr= 1e-4
weight_decay= 1e-5
epochs= 10
val_split= 0.15
grad_clip: float = 1.0 # maxxed at 1.0

**3. Import required dependencies**

In [None]:
import os, zipfile, random, gc, time, io
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass, field
from collections import defaultdict
import warnings; warnings.filterwarnings('ignore')

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torch.cuda.amp import GradScaler, autocast
import numpy as np
from tqdm.auto import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')

**4. Indexing Dataset** Appending the paths as well as labels to the index

In [None]:
from pathlib import Path

def get_data_index(base_path_str):
    index = {
        'positive': [],
        'negative': []
    }

    base_path = Path(base_path_str)

    pos_path = (base_path / 'Positive').rglob('*.pt')
    index['positive'] = [str(p) for p in pos_path]

    neg_path = (base_path / 'Negative').rglob('*.pt')
    index['negative'] = [str(p) for p in neg_path]

    return index

print("getting data index")
data_index_final = get_data_index(base_path)
print(f"Found {len(data_index_final['positive'])} positive and {len(data_index_final['negative'])} negative files.")

**5. Train Val Split**

In [None]:
from sklearn.model_selection import train_test_split
all_files=data_index_final['positive']+data_index_final['negative']
all_labels=[1]*len(data_index_final['positive'])+[0]*len(data_index_final['negative'])
train_files, val_files, train_labels, val_labels = train_test_split(
    all_files, all_labels, test_size=val_split, stratify=all_labels, random_state=42
)
print(f"no. of training files {len(train_files)}, no. of validation files {len(val_files)}")

**6. Dataset and Augmentations** since this model contains an attention layer at end, there are a few steps involved in this phase, first for positive clips, a mask is generated with the pad value of the lowest energy moment in the clip, to pad the clip to the length of 60 second. Now the negative clips are all 30 seconds each, to not allow the model to cheat on the basis of mask length, I got the probablity distribution of the positive clips, and augmented the negative clips on that basis- by either cropping the clips or by adding a random chunk at the end. Then using the same padding logic as positive clips,along with this a volume augmentation is applied, so the model doesnt dumb learn that high volume= highlight.

In [None]:
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset
from typing import List
import random

class HighlightSenseDataset(Dataset):
    def __init__(self, file_paths: List[str], labels: List[int], max_frames: int = 2584,training=True):
        self.file_paths = file_paths
        self.labels = labels
        self.max_frames = max_frames
        self.training=training
        # Hardcoded probability distribution of positive samples
        self.pos_distribution = [
            (215, 430, 0.114),    # 0-10s
            (431, 861, 0.227),    # 10-20s
            (862, 1292, 0.279),   # 20-30s
            (1293, 1722, 0.184),  # 30-40s
            (1723, 2153, 0.072),  # 40-50s
            (2154, 2584, 0.124),  # 50-60s+
        ]

    def sample_frame_count(self):
        rand_val = random.random()
        cumulative = 0.0
        for start, end, prob in self.pos_distribution:
            cumulative += prob
            if rand_val <= cumulative:
                return random.randint(start, end)
        return 2584  # fallback

    def pad_or_truncate(self, input_tensor):
        num_frames = input_tensor.shape[2]
        mask = torch.ones(self.max_frames)

        if num_frames >= self.max_frames:
            output_tensor = input_tensor[:, :, :self.max_frames]
            return output_tensor, mask
        else:
            pad_size = self.max_frames - num_frames
            silence_val = input_tensor.min().item()
            pad_tensor = F.pad(input_tensor, (0, pad_size), value=silence_val)
            mask[num_frames:] = 0

            return pad_tensor, mask

    def augment_negative_tensor(self, input_tensor):
        num_frames = input_tensor.shape[2]
        final_frames = self.sample_frame_count()

        if final_frames <= num_frames:
            # Crop to shorter length
            start = random.randint(0, num_frames - final_frames)
            return input_tensor[:, :, start:start + final_frames]
        else:
            # Need more frames - repeat a chunk
            need = final_frames - num_frames
            chunk_size = min(need, num_frames)
            start = random.randint(0, num_frames - chunk_size)
            extra = input_tensor[:, :, start:start + chunk_size]
            augmented_tensor = torch.cat((input_tensor, extra), dim=2)
            return augmented_tensor[:, :, :final_frames]

    def __len__(self):
      return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        label = self.labels[idx]
        data = torch.load(file_path)
        input_tensor = data['tensor']
        input_tensor=input_tensor.float()
        if self.training:
          scale = random.uniform(0.5, 1.5)
          input_tensor=input_tensor*scale


        if label == 1:
            input_tensor, mask = self.pad_or_truncate(input_tensor)
        else:
          if self.training:
            input_tensor = self.augment_negative_tensor(input_tensor)
          input_tensor, mask = self.pad_or_truncate(input_tensor)

        return input_tensor, mask, label

**7. Dataloader** used weighted random sampling since the dataset was a little biased, since negative clips were more- 1:1.2 ratio was present

In [None]:
from torch.utils.data import DataLoader

train_ds = HighlightSenseDataset(train_files, train_labels, max_time_frames)
val_ds   = HighlightSenseDataset(val_files, val_labels, max_time_frames, training=False)

In [None]:
import torch
from torch.utils.data import WeightedRandomSampler

labels = torch.tensor(train_ds.labels)

class_counts = torch.bincount(labels)
class_weights = 1.0 / class_counts.float()

sample_weights = class_weights[labels]

sampler = WeightedRandomSampler(
    weights=sample_weights,
    num_samples=len(sample_weights),
    replacement=True
)



In [None]:
train_loader = DataLoader(
    train_ds,
    batch_size=32,
    sampler=sampler
)

val_loader = DataLoader(
    val_ds,
    batch_size=32,
    shuffle=False
)


**8. Model Architecture** The model consists of CNN+RNN+Attention+Classifier, 4 layers of CRNN, 2 layers of RNN, an attention layer and a classifier with 2 linear layers. To further prevent dumb learning volume- instance norm is useed on the first layer for per clip normalization to remove any volume bias

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


#convblock
class ConvBlock(nn.Module):
    def __init__(self, inc, outc, pool=(2,2), instance_norm=False):
        super().__init__()

        self.conv = nn.Conv2d(inc, outc, 3, padding=1)

        if instance_norm:
            self.norm = nn.InstanceNorm2d(outc, affine=True)
        else:
            self.norm = nn.BatchNorm2d(outc)

        self.pool = nn.MaxPool2d(pool)

    def forward(self, x):
        x = self.conv(x)
        x = self.norm(x)
        x = F.relu(x)
        x = self.pool(x)
        return x


# Attention
class Attention(nn.Module):
    def __init__(self, dim, att_dim):
        super().__init__()
        self.fc1 = nn.Linear(dim, att_dim)
        self.fc2 = nn.Linear(att_dim, 1, bias=False)

    def forward(self, x, mask=None):
        e = torch.tanh(self.fc1(x))
        s = self.fc2(e).squeeze(-1)
        if mask is not None:
            s = s.masked_fill(mask == 0, -1e9)

        w = torch.softmax(s, dim=1)
        ctx = torch.bmm(w.unsqueeze(1), x).squeeze(1)
        return ctx, w


# MAIN MODEL
class HighlightCRNN(nn.Module):
    def __init__(
        self,
        n_mels=80,
        cnn_channels=[32,64,128,256],
        rnn_hidden=256,
        rnn_layers=2,
        rnn_dropout=0.3,
        att_dim=128,
        dropout=0.5
    ):
        super().__init__()

        #CNN
        layers = []
        in_ch = 2

        for i, out_ch in enumerate(cnn_channels):
            use_instance_norm = (i == 0)     # only first layer
            pool = (2,2) if i < 3 else (1,2)

            layers.append(
                ConvBlock(in_ch, out_ch, pool, instance_norm=use_instance_norm)
            )
            in_ch = out_ch

        self.cnn = nn.Sequential(*layers)

        # after pooling freq size reduces by 8
        freq_out = n_mels // 8
        cnn_feat_dim = cnn_channels[-1] * freq_out

        #RNN
        self.rnn = nn.GRU(
            input_size=cnn_feat_dim,
            hidden_size=rnn_hidden,
            num_layers=rnn_layers,
            batch_first=True,
            bidirectional=True,
            dropout=rnn_dropout if rnn_layers > 1 else 0
        )

        #Attention
        self.attn = Attention(rnn_hidden * 2, att_dim)

        #Classifier
        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(rnn_hidden * 2, 128),
            nn.ReLU(),
            nn.Dropout(dropout / 2),
            nn.Linear(128, 1)
        )

    def forward(self, x, mask=None):
        B = x.size(0)

        #CNN
        c = self.cnn(x)
        c = c.permute(0, 3, 1, 2)
        c = c.reshape(B, c.size(1), -1)

        #Mask
        if mask is not None:
            m = mask[:, ::16]
            if m.size(1) != c.size(1):
                m = F.pad(m, (0, c.size(1) - m.size(1)))[:, :c.size(1)]
        else:
            m = None
        #RNN
        rnn_out, _ = self.rnn(c)

        #Attention
        ctx, att = self.attn(rnn_out, m)

        #Classifier
        out = self.classifier(ctx)

        return out, att


Simple checking wether the model works it should output a simple (4,1) shape

In [None]:
#verify if model works
n_mels = 80
cnn_channels = [32, 64, 128, 256]
rnn_hidden = 256
rnn_layers = 2
rnn_dropout = 0.3
attention_dim = 128
dropout = 0.5

model = HighlightCRNN(
    n_mels,
    cnn_channels,
    rnn_hidden,
    rnn_layers,
    rnn_dropout,
    attention_dim,
    dropout
).to(device)


x = torch.randn(4, 2, 80, 2584).to(device)
m = torch.ones(4, 2584).to(device)


model.eval()
with torch.no_grad():
    out, att = model(x, m)
    print(out.shape)


**9. Training and evaluation** using simple BCELoss and using AdamW optimizer which also incorporates L2 regularization, along with this another regularization used is gradient clipping, with limit set at 1.0, the model is saved as a dictionary containing the number of epochs, and the model weights. The Val set runs just after the training set and calculates loss, the f1 and and precision score @0.5 threshold

In [None]:
import torch
import torch.nn as nn
from tqdm.auto import tqdm
def train_and_eval(
    model,
    train_loader,
    val_loader,
    device,
    lr=1e-4,
    weight_decay=1e-5,
    epochs=epochs,
    grad_clip=1,
    pos_weight=None,
    save_path = None

):
    model = model.to(device)

    #weighted class since data imbalanced
    if pos_weight is not None:
        pw = torch.tensor([pos_weight], device=device)
        criterion = nn.BCEWithLogitsLoss(pos_weight=pw)
    else:
        criterion = nn.BCEWithLogitsLoss()

    optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

    #TRAIN
    print("started training")
    model.train()

    for epoch in range(epochs):
        train_loss = 0.0

        with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs}") as progress_bar:
          for specs, masks, labels in train_loader:
            specs = specs.to(device)
            masks = masks.to(device)
            labels = labels.unsqueeze(1).to(device).float()

            optimizer.zero_grad()

            logits, _ = model(specs, masks)
            loss = criterion(logits, labels)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
            optimizer.step()
            current_loss=loss.item()
            train_loss += loss.item()
            progress_bar.set_postfix(loss=f"{current_loss:.4f}")
            progress_bar.update(1)

        train_loss /= len(train_loader)
        print(f"Epoch {epoch+1} | Train Loss = {train_loss:.4f}")

    #Save Model
    checkpoint = {
        'model': model.state_dict(),
        'epochs': epochs
    }
    torch.save(checkpoint, save_path)
    print(f"Model saved to {save_path}")


    # ValSet
    model.eval()
    total_val_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for specs, masks, labels in val_loader:
            specs = specs.to(device)
            masks = masks.to(device)
            labels = labels.to(device).unsqueeze(1).float()

            logits, _ = model(specs, masks)
            loss = criterion(logits, labels)
            total_val_loss += loss.item()

            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).float()

            all_preds.append(preds.cpu())
            all_labels.append(labels.cpu())

    val_loss = total_val_loss / len(val_loader)

    preds = torch.cat(all_preds).numpy().flatten()
    labels = torch.cat(all_labels).numpy().flatten()

    tp = ((preds == 1) & (labels == 1)).sum()
    fp = ((preds == 1) & (labels == 0)).sum()
    fn = ((preds == 0) & (labels == 1)).sum()

    precision = tp / (tp + fp + 1e-8)
    recall = tp / (tp + fn + 1e-8)
    f1 = 2 * precision * recall / (precision + recall + 1e-8)

    print(f"Val Loss : {val_loss:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"F1 Score : {f1:.4f}")

    return val_loss, precision, f1


loss_final, precision,f1=train_and_eval(model,train_loader,val_loader,device,lr=1e-4,weight_decay=1e-5,epochs=epochs,grad_clip=1.0,pos_weight=None,save_path=os.path.join(output_dir, "best_model.pt"))
print(loss_final)
print(precision)
print(f1)


**10. Cleanup** After training and eval the model hogs up ram and gpu, to clean it run this function


In [None]:
#deleting memory for rerun
import gc
if 'model' in globals(): del model
if 'optimizer' in globals(): del optimizer
gc.collect()
torch.cuda.empty_cache()
model = HighlightCRNN(n_mels, cnn_channels, rnn_hidden, rnn_layers,
                      rnn_dropout, attention_dim, dropout)