# Import Section

In [None]:
 # you have to run this first and then after it installs restart session then you can run the rest of the code blocks
!python3 -m pip install --upgrade --user ortools

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import cv2
import h5py
import yaml
import math
import torch
import random
import numpy as np
from torch import nn
from os import PathLike
from pathlib import Path
import torch.nn.init as init
from typing import Any, List, Dict
from torch.nn import functional as F
from ortools.algorithms.python import knapsack_solver


# Helper Logic-less Functions
### Functions to make the code look readable and better

In [None]:
#helper logic-less functions (read/write)

def load_yaml(path: PathLike) -> Any:
    with open(path) as f:
        obj = yaml.safe_load(f)
    return obj

def dump_yaml(obj: Any, path: PathLike) -> None:
    with open(path, 'w') as f:
        yaml.dump(obj, f)

# Helper Classes

### `VideoDataset` : used for reading files from the dataset into the memory.
### `Data Loader` : used for shuffeling and loading batches of these VideoDataset to help in training and evaluating.


In [None]:
# Helper Classes for Data Loading and proccessing

class VideoDataset(object):
    # constructor to set class varaibles
    def __init__(self, keys: List[str]):
        self.keys = keys
        self.datasets = self.get_datasets(keys)
    # getter to load data from the dataset file into the memory
    def __getitem__(self, index):
        key = self.keys[index]
        video_path = Path(key)
        dataset_name = str(video_path.parent)
        video_name = video_path.name
        video_file = self.datasets[dataset_name][video_name]

        seq = video_file['features'][...].astype(np.float32)
        gtscore = video_file['gtscore'][...].astype(np.float32)
        cps = video_file['change_points'][...].astype(np.int32)
        n_frames = video_file['n_frames'][...].astype(np.int32)
        nfps = video_file['n_frame_per_seg'][...].astype(np.int32)
        picks = video_file['picks'][...].astype(np.int32)
        user_summary = None
        if 'user_summary' in video_file:
            user_summary = video_file['user_summary'][...].astype(np.float32)

        gtscore -= gtscore.min()
        gtscore /= gtscore.max()

        return key, seq, gtscore, cps, n_frames, nfps, picks, user_summary

    # return dataset keys length
    def __len__(self):
        return len(self.keys)

    # open dataset h5 file in read mode
    @staticmethod
    def get_datasets(keys: List[str]) -> Dict[str, h5py.File]:
        dataset_paths = {str(Path(key).parent) for key in keys}
        datasets = {path: h5py.File(path, 'r') for path in dataset_paths}
        return datasets


class DataLoader(object):
    # constructor to set class varaibles
    def __init__(self, dataset: VideoDataset, shuffle: bool):
        self.dataset = dataset
        self.shuffle = shuffle
        self.data_idx = list(range(len(self.dataset)))

    # shuffle data on iterate for training
    def __iter__(self):
        self.iter_idx = 0
        if self.shuffle:
            random.shuffle(self.data_idx)
        return self

    # processes the next batch when looping on data for training and testing
    def __next__(self):
        if self.iter_idx == len(self.dataset):
            raise StopIteration
        curr_idx = self.data_idx[self.iter_idx]
        batch = self.dataset[curr_idx]
        self.iter_idx += 1
        return batch

# Hepler logic Functions
### `knapsack`: to find the best segments in the video with a specific propotion
###`generate_summary` : This function is used to convert key fragment scores into shot level summaries.
### `evaluate_summary`: this function takes both user summary and machine summary and returns the preformance using f-score metric


In [None]:
# initialize knapsack from ortools library
osolver = knapsack_solver.KnapsackSolver(
    knapsack_solver.SolverType.KNAPSACK_DYNAMIC_PROGRAMMING_SOLVER, 'test'
)

def knapsack_ortools(values, weights,capacity ):
    scale = 1000
    values = np.array(values)
    weights = np.array(weights)
    values = (values * scale).astype(np.int_)
    weights = (weights).astype(np.int_)
    capacity = capacity

    osolver.init(values.tolist(), [weights.tolist()], [capacity])
    osolver.solve()
    # get the knapsack picks
    packed_items = [x for x in range(0, len(weights))
                    if osolver.best_solution_contains(x)]

    return packed_items


In [None]:
# Convert from keyframes to keyshot summaries

def generate_summary(ypred, cps, n_frames, nfps, positions, proportion=0.2):
    # upscale the downsampled sequence
    n_segs = cps.shape[0]
    frame_scores = np.zeros((n_frames), dtype=np.float32)
    if positions.dtype != int:
        positions = positions.astype(np.int32)
    if positions[-1] != n_frames:
        positions = np.concatenate([positions, [n_frames]])
    for i in range(len(positions) - 1):
        pos_left, pos_right = positions[i], positions[i+1]
        if i == len(ypred):
            frame_scores[pos_left:pos_right] = 0
        else:
            frame_scores[pos_left:pos_right] = ypred[i]

    # take the average across change points
    seg_score = []
    for seg_idx in range(n_segs):
        start, end = int(cps[seg_idx,0]), int(cps[seg_idx,1]+1)
        scores = frame_scores[start:end]
        seg_score.append(float(scores.mean()))

    # get the knapsack picks
    limits = int(math.floor(n_frames * proportion))
    packed = knapsack_ortools(seg_score, nfps, n_segs, limits)

    # convert the chosen segments into binary summary
    summary = np.zeros(n_frames, dtype=np.bool_)
    for seg_idx in packed:
        first, last = cps[seg_idx]
        summary[first:last + 1] = True

    return summary


In [None]:
def evaluate_summary(machine_summary, user_summaries):
    # Ensure inputs are in float32 format
    machine_summary = machine_summary.astype(np.float32)
    user_summaries = user_summaries.astype(np.float32)
    n_users, n_frames = user_summaries.shape

    # Binarize the summaries (1 if > 0, else 0)
    machine_summary[machine_summary > 0] = 1
    user_summaries[user_summaries > 0] = 1

    # Adjust machine summary length to match user summary length
    if len(machine_summary) > n_frames:
        machine_summary = machine_summary[:n_frames]  # Truncate if too long
    elif len(machine_summary) < n_frames:
        zero_padding = np.zeros(n_frames - len(machine_summary))
        machine_summary = np.concatenate([machine_summary, zero_padding])  # Pad with zeros if too short
    
    # Compute F-score, precision, and recall for each user's summary
    f_scores = [],precisions = [],recalls = []
    for user_idx in range(n_users):
        user_summary = user_summaries[user_idx, :]
        overlap_duration = (machine_summary * user_summary).sum()
        precision = overlap_duration / (machine_summary.sum() + 1e-8)
        recall = overlap_duration / (user_summary.sum() + 1e-8)

        # Avoid division by zero issues
        f_score = 0. if (precision == 0 and recall == 0) else (2 * precision * recall) / (precision + recall)

        f_scores.append(f_score)
        precisions.append(precision)
        recalls.append(recall)

    # Compute the average F-score, precision, and recall across all users
    avg_f_score = np.mean(f_scores)
    avg_precision = np.mean(precisions)
    avg_recall = np.mean(recalls)

    return avg_f_score, avg_precision, avg_recall


#VideoSummarizerNetwork

In [None]:
class SelfAttention(nn.Module):

    def __init__(self, input_size=1024, output_size=1024):
        super(SelfAttention, self).__init__()

        # define attention params
        self.m = input_size
        self.output_size = output_size
        # define 3 vectors (K,Q,V)
        self.K = nn.Linear(in_features=self.m, out_features=self.output_size, bias=False)
        self.Q = nn.Linear(in_features=self.m, out_features=self.output_size, bias=False)
        self.V = nn.Linear(in_features=self.m, out_features=self.output_size, bias=False)
        # define output linear transformation layer
        self.output_linear = nn.Linear(in_features=self.output_size, out_features=self.m, bias=False)
        self.drop50 = nn.Dropout(0.5)

    def forward(self, x):
        # process the features through 3 layers k,q,v
        K = self.K(x)
        Q = self.Q(x)
        V = self.V(x)
        # normalize the attention scores
        Q *= 0.06
        logits = torch.matmul(Q, K.transpose(1,0))
        attention_weights = nn.functional.softmax(logits, dim=-1)
        weights = self.drop50(attention_weights)
        # calculate final weighted values
        y = torch.matmul(weights, V)

        y = self.output_linear(y)

        return y


class VideoSummarizerNetwork(nn.Module):

    def __init__(self):
        super(VideoSummarizerNetwork, self).__init__()
        self.input_size = 1024
        self.hidden_size = 1024
        
        self.attention_layer = SelfAttention(input_size=self.input_size, output_size=self.input_size)
        self.fc1 = nn.Linear(in_features=self.input_size, out_features=self.hidden_size)
        self.fc2 = nn.Linear(in_features=self.hidden_size, out_features=1)
        self.sigmoid = nn.Sigmoid()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        self.softmax = nn.Softmax(dim=0)
        self.norm_input = nn.LayerNorm(self.input_size)
        self.norm_hidden = nn.LayerNorm(self.hidden_size)

    def forward(self, video_features):
        feature_size = video_features.shape[2]  # Extract the feature size (third dimension)
    
        # Flatten the sequence into a batch of frames
        video_features = video_features.view(-1, feature_size)
        attention_output = self.attention_layer(video_features)
        residual_output = attention_output + video_features  # Residual connection (adding input to attention output)
        # Apply dropout and layer normalization on the residual output
        normalized_output = self.dropout(residual_output)
        normalized_output = self.norm_input(normalized_output)
        
        # Pass through the first fully connected layer, apply ReLU and dropout
        hidden_output = self.fc1(normalized_output)
        hidden_output = self.relu(hidden_output)
        hidden_output = self.dropout(hidden_output)
        hidden_output = self.norm_hidden(hidden_output)
        
        # Pass through the second fully connected layer to get the final output
        output = self.fc2(hidden_output)
        output = self.sigmoid(output).view(1, -1)

        return output

    def predict(self, sequence):
        # Make a prediction for a given sequence
        predictions = self(sequence)
        predictions = predictions[0].detach().cpu().numpy()
        return predictions


## `Weights initialization` help in getting reproducible results  each time we train

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname == 'Linear':
        init.xavier_uniform_(m.weight, gain=np.sqrt(2.0))
        if m.bias is not None:
            init.constant_(m.bias, 0.1)

# Model Training

In [None]:
def train(split,split_idx):

    print("Initializing VideoSummarizerNetwork model and optimizer...")
    #enable train mode
    model.train()
    criterion = nn.MSELoss().cuda()
    #initialize optimizer with the hyperparameters
    parameters = filter(lambda p: p.requires_grad, model.parameters())
    optimizer = torch.optim.Adam(parameters, lr= 0.00005, weight_decay= 0.00001)

    print("Starting training...")
    max_val_fscore = 0
    max_val_fscore_epoch = 0
    # get training keys and load the videos in train loader
    train_set = VideoDataset(split['train_keys'])
    train_loader = DataLoader(train_set, shuffle=True)
    # get testing keys and load the videos in test loader
    val_set = VideoDataset(split['test_keys'])
    val_loader = DataLoader(val_set, shuffle=False)
    # run the training process for 120 epochs
    for epoch in range(120):

        print("Epoch: {0:6}".format(str(epoch)+"/"+str(120)), end='')
        model.train()

        set_loss = []

        for _, seq, gtscore, change_points, n_frames, nfps, picks, user_summary in train_loader:
            # prepare input for model and target for loss calculation
            seq = torch.from_numpy(seq).unsqueeze(0).cuda()
            target = torch.from_numpy(gtscore).unsqueeze(0).cuda()
            # predict based on seq(features)
            y = model(seq)
            # compare model prediction and actual lables
            loss = criterion(y, target)
            # record loss of each video in each epoch
            set_loss.append(float(loss))
            # do backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Evaluate test dataset and save model with highest fscore
        val_fscore = eval(model,val_loader)
        if max_val_fscore < val_fscore:
            max_val_fscore = val_fscore
            max_val_fscore_epoch = epoch
            #torch.save(model.state_dict(), str(f'{saved_model_split}{split_idx}.pt'))


        print("   Train loss: {0:.05f}".format(np.mean(set_loss)), end='')
        print('   Test F-score avg/max: {0:0.5}/{1:0.5}'.format(val_fscore, max_val_fscore))


    return max_val_fscore, max_val_fscore_epoch

#Model evaluation

In [None]:
def eval(model, val_loader):

    # put the model in evaluation mode
    model.eval()
    val_scores = []

    # start the evaluation process without gradient descnet
    with torch.no_grad():
        #loop over testing videos
        for test_key, seq, gtscore, cps, n_frames, nfps, picks, user_summary in val_loader:
            # prepare for prediction and loss calculation
            seq = torch.from_numpy(seq).unsqueeze(0).float().cuda()
            # predict scores of the video i
            y = model(seq)
            prob = y[0].detach().cpu().numpy()
            # convert scores to keyshot summary
            pred_summary = generate_summary(prob, cps, n_frames, nfps, picks)
            # get fscore of the summary
            fm,_,_ = evaluate_summary(pred_summary, user_summary)

            val_scores.append(fm)
    # return the average of all videos in the test set
    return np.mean(val_scores)

# Start Model Training and Evaluation
`initialize Model`: this code initializes a the model for video summarization, ensures reproducibility by setting random seeds, and sets up GPU cuda device



In [None]:
def initializeModel():
  #configure random seed for numpy and torch and random libraries
  rnd_seed = 12345
  random.seed(rnd_seed)
  np.random.seed(rnd_seed)
  torch.manual_seed(rnd_seed)

  #initialize model weights
  model = VideoSummarizerNetwork()
  model.eval()
  model.apply(weights_init)

  #configure cuda device for gpu acceleration
  cuda_device = 0
  torch.cuda.set_device(cuda_device)
  torch.cuda.manual_seed(rnd_seed)
  model.cuda()
  return model


In [None]:
# dataset splits file
dataset_path = "/content/drive/MyDrive/datasets/tvsum - Copy.yml"

# load splits in memory
split_path = Path(dataset_path)
splits = load_yaml(split_path)

avg_eval_scores = 0

# initialize model, train on each split and save average f-score across 5 splits
for split_idx, split in enumerate(splits):
    model = initializeModel()
    print(f'Start training on {split_path.stem}: split {split_idx}')
    fscore, fscore_epoch = train(split,split_idx)
    avg_eval_scores += fscore
avg_eval_scores /= len(splits)

print(f'Training done on {split_path.stem}. F-score: {avg_eval_scores:.4f}')

# Model Inference
### 1- we navigate into cloud directory to load the code of KTS that is used to preprocess videos before inference process

### 2- then we load the model and preprocess the data and build video summary from the model prediction

In [None]:
import sys
sys.path.insert(0, '/content/drive/MyDrive/Colab Notebooks/')
%cd /content/drive/MyDrive/Colab Notebooks
%pwd

/content/drive/MyDrive/Colab Notebooks


'/content/drive/MyDrive/Colab Notebooks'

In [None]:
from kts import VideoPreprocessor

In [None]:
video_path = "/content/drive/MyDrive/datasets/video/sTEELN-vY30.mp4"
save_path = "/content/drive/MyDrive/datasets/output/sTEELN-vY30.mp4"
model_path = "/content/drive/MyDrive/datasets/model-trained/our-attention-vas0.pt"


model = VideoSummarizerNetwork()
model = model.eval().to("cpu")
state_dict = torch.load(model_path,map_location=lambda storage, loc: storage)


#preprocess the video
video_proc = VideoPreprocessor(15)
n_frames, seq, cps, nfps, picks = video_proc.run(video_path)
seq_len = len(seq)

print('Predicting Scores ...')

with torch.no_grad():
    seq_torch = torch.from_numpy(seq).unsqueeze(0).to("cpu")
    pred_cls = model.predict(seq_torch)
    pred_summ = generate_summary(pred_cls, cps, n_frames, nfps, picks)
print('Writing Video ...')

# load original video
cap = cv2.VideoCapture(video_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# create summary video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(save_path, fourcc, fps, (width, height))

frame_idx = 0
while True:
    ret, frame = cap.read()
    if not ret:
        break

    if pred_summ[frame_idx]:
        out.write(frame)

    frame_idx += 1

out.release()
cap.release()

Predicting summary ...
Writing summary video ...
