# Prepare the Resnet

In [1]:
# Install and import necessary packages
# ! pip install transformers datasets
# To install from source instead of the last release, comment the command above and uncomment the following one.
# ! pip install git+https://github.com/huggingface/transformers.git

from transformers import DetrImageProcessor
import torch
import torchvision
import torchvision.transforms.functional as F
from PIL import Image

Defaulting to user installation because normal site-packages is not writeable


In [2]:
class XCLIPAttention(torch.nn.Module):
    """Multi-headed attention from 'Attention Is All You Need' paper"""

    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = self.embed_dim // self.num_heads
        if self.head_dim * self.num_heads != self.embed_dim:
            raise ValueError(
                f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:"
                f" {self.num_heads})."
            )
        self.scale = self.head_dim**-0.5

        self.k_proj = torch.nn.Linear(self.embed_dim, self.embed_dim)
        self.v_proj = torch.nn.Linear(self.embed_dim, self.embed_dim)
        self.q_proj = torch.nn.Linear(self.embed_dim, self.embed_dim)
        self.out_proj = torch.nn.Linear(self.embed_dim, self.embed_dim)

    def _shape(self, tensor: torch.Tensor, seq_len: int, bsz: int):
        return tensor.view(bsz, seq_len, self.num_heads, self.head_dim).transpose(1, 2).contiguous()

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: torch.Tensor = None,
        causal_attention_mask: torch.Tensor = None,
        output_attentions: bool = False,
    ):
        """Input shape: Batch x Time x Channel"""

        bsz, tgt_len, embed_dim = hidden_states.size()

        # get query proj
        query_states = self.q_proj(hidden_states) * self.scale
        key_states = self._shape(self.k_proj(hidden_states), -1, bsz)
        value_states = self._shape(self.v_proj(hidden_states), -1, bsz)

        proj_shape = (bsz * self.num_heads, -1, self.head_dim)
        query_states = self._shape(query_states, tgt_len, bsz).view(*proj_shape)
        key_states = key_states.view(*proj_shape)
        value_states = value_states.view(*proj_shape)

        src_len = key_states.size(1)
        attn_weights = torch.bmm(query_states, key_states.transpose(1, 2))

        if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len):
            raise ValueError(
                f"Attention weights should be of size {(bsz * self.num_heads, tgt_len, src_len)}, but is"
                f" {attn_weights.size()}"
            )

        # apply the causal_attention_mask first
        if causal_attention_mask is not None:
            if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len):
                raise ValueError(
                    f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is"
                    f" {causal_attention_mask.size()}"
                )
            attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + causal_attention_mask
            attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)

        if attention_mask is not None:
            if attention_mask.size() != (bsz, 1, tgt_len, src_len):
                raise ValueError(
                    f"Attention mask should be of size {(bsz, 1, tgt_len, src_len)}, but is {attention_mask.size()}"
                )
            attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len) + attention_mask
            attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)

        attn_weights = torch.nn.functional.softmax(attn_weights, dim=-1)

        if output_attentions:
            # this operation is a bit akward, but it's required to
            # make sure that attn_weights keeps its gradient.
            # In order to do so, attn_weights have to reshaped
            # twice and have to be reused in the following
            attn_weights_reshaped = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
            attn_weights = attn_weights_reshaped.view(bsz * self.num_heads, tgt_len, src_len)
        else:
            attn_weights_reshaped = None

        attn_output = torch.bmm(attn_weights, value_states)

        if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim):
            raise ValueError(
                f"`attn_output` should be of size {(bsz, self.num_heads, tgt_len, self.head_dim)}, but is"
                f" {attn_output.size()}"
            )

        attn_output = attn_output.view(bsz, self.num_heads, tgt_len, self.head_dim)
        attn_output = attn_output.transpose(1, 2)
        attn_output = attn_output.reshape(bsz, tgt_len, embed_dim)

        attn_output = self.out_proj(attn_output)

        return attn_output, attn_weights_reshaped

# Copied from transformers.models.clip.modeling_clip.CLIPMLP with CLIP->XCLIP
class XCLIPMLP(torch.nn.Module):
    def __init__(self, hidden_size, intermediate_size):
        super().__init__()
        self.activation_fn = torch.nn.ReLU()
        self.fc1 = torch.nn.Linear(hidden_size, intermediate_size)
        self.fc2 = torch.nn.Linear(intermediate_size, hidden_size)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        hidden_states = self.fc1(hidden_states)
        hidden_states = self.activation_fn(hidden_states)
        hidden_states = self.fc2(hidden_states)
        return hidden_states

# Copied from transformers.models.clip.modeling_clip.CLIPEncoderLayer with CLIP->XCLIP
class XCLIPEncoderLayer(torch.nn.Module):
    def __init__(self, hidden_size, num_heads, layer_norm_eps, intermediate_size):
        super().__init__()
        self.embed_dim = hidden_size
        self.self_attn = XCLIPAttention(hidden_size, num_heads)
        self.layer_norm1 = torch.nn.LayerNorm(self.embed_dim, eps=layer_norm_eps)
        self.mlp = XCLIPMLP(hidden_size, intermediate_size)
        self.layer_norm2 = torch.nn.LayerNorm(self.embed_dim, eps=layer_norm_eps)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: torch.Tensor = None,
        causal_attention_mask: torch.Tensor = None,
        output_attentions: bool = False,
    ):
        """
        Args:
            hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)`
            attention_mask (`torch.FloatTensor`): attention mask of size
                `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
                `(config.encoder_attention_heads,)`.
            output_attentions (`bool`, *optional*):
                Whether or not to return the attentions tensors of all attention layers. See `attentions` under
                returned tensors for more detail.
        """
        residual = hidden_states

        hidden_states = self.layer_norm1(hidden_states)
        hidden_states, attn_weights = self.self_attn(
            hidden_states=hidden_states,
            attention_mask=attention_mask,
            causal_attention_mask=causal_attention_mask,
            output_attentions=output_attentions,
        )
        hidden_states = residual + hidden_states

        residual = hidden_states
        hidden_states = self.layer_norm2(hidden_states)
        hidden_states = self.mlp(hidden_states)
        hidden_states = residual + hidden_states

        return hidden_states


# Define the new X-Clip class with Resnet

In [3]:
# Import necessary modules first
import torch
num_layers = 5
batch_size = 10
height_width, hidden_size, num_heads, layer_norm_eps, intermediate_size = 24*42, 2048, 32, 1, 768


class X_Clip_mod(torch.nn.Module):

    def __init__(self, device, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        original_model = torch.load("X-CLIP.bin", map_location="cpu")
        original_model = original_model.to(device)

        # original_model = torch.load("X-CLIP.bin", map_location=device)
        
        self.resnet = torch.load("Resnet 101.bin")
        self.preprocessor = DetrImageProcessor.from_json_file("preprocessor_config.json")

        # Freeze the Resnet model parameters
        for param in self.resnet.parameters():
            param.requires_grad = False

        # Set up the encoding layer
        self.vision = torch.nn.Sequential(*[XCLIPEncoderLayer(hidden_size, num_heads, layer_norm_eps, intermediate_size) for _ in range(num_layers)])

        # And finally the final score prediction layer
        #self.scoring = torch.nn.Sequential(torch.nn.Flatten(-2), torch.nn.Linear(1009*768, 1), torch.nn.Sigmoid())
        self.scoring = torch.nn.Sequential(torch.nn.Linear(hidden_size, 1), torch.nn.ReLU(), torch.nn.Flatten(1), torch.nn.Linear(height_width, 1), torch.nn.Sigmoid())
        


    def forward(self, frames) -> torch.Tensor:
        '''
        Frames shape: (num_batch, num_frames, height, width, RGB_channels)
        Output shape: (num_batch, num_frames, 1)
        '''
        num_batch, num_frames, height, width, RGB_channels = frames.shape
        
        inputs = self.preprocessor(frames.view((num_batch*num_frames, height, width, RGB_channels)), return_tensors="pt")
        
        inputs = inputs.to(device)
        
        features, _ = self.resnet(**inputs)
        features, _ = features[-1]
        vision_output = self.vision(features.flatten(2).transpose(1, 2)) # Shape: (num_batch*num_frames, height*width, num_channels])
        del features

        scores = self.scoring(vision_output).view(num_batch, num_frames, -1)
        scores = 4*scores + 1
        return scores

In [4]:
# '''
# This is the old modified X-Clip model, the one above is new
# '''


# # Import necessary modules first
# import torch
# batch_size = 5

# # Define parameter reset function

# # Reset the model parameters
# def reset_all_weights(model: torch.nn.Module) -> None:
#     """
#     refs:
#         - https://discuss.pytorch.org/t/how-to-re-set-alll-parameters-in-a-network/20819/6
#         - https://stackoverflow.com/questions/63627997/reset-parameters-of-a-neural-network-in-pytorch
#         - https://pytorch.org/docs/stable/generated/torch.nn.Module.html
#     """

#     @torch.no_grad()
#     def weight_reset(m: torch.nn.Module):
#         # - check if the current module has reset_parameters & if it's callabed called it on m
#         reset_parameters = getattr(m, "reset_parameters", None)
#         if callable(reset_parameters):
#             m.reset_parameters()

#     # Applies fn recursively to every submodule see: https://pytorch.org/docs/stable/generated/torch.nn.Module.html
#     model.apply(fn=weight_reset)


# class X_Clip_mod(torch.nn.Module):

#     def __init__(self, device, *args, **kwargs) -> None:
#         super().__init__(*args, **kwargs)

#         original_model = torch.load("/content/drive/MyDrive/EECS545 Group/Architecture Components/X-CLIP.bin", map_location=device)
#         reset_all_weights(original_model)
#         self.resnet = torch.load("/content/drive/MyDrive/EECS545 Group/Architecture Components/Resnet 101.bin")
#         self.preprocessor = DetrImageProcessor.from_json_file("/content/drive/MyDrive/EECS545 Group/Architecture Components/preprocessor_config.json")

#         # Freeze the Resnet model parameters
#         for param in self.resnet.parameters():
#             param.requires_grad = False

#         # Extract the vision model and make change to its parameters
#         vision = original_model.base_model.vision_model
#         vision.embeddings.patch_embedding = torch.nn.Conv2d(2048, 768, kernel_size=(1, 1), stride=(1, 1), bias=False)
#         vision.embeddings.position_embedding = torch.nn.Embedding(1009, 768)
#         vision.embeddings.position_ids = torch.arange(1009).expand((1, -1))

#         # Shrink the vision encoder to 2 layers as they only need to digest the ResNet feautures, not extracting features
#         # Also, change their number of frames to batch_size for processed data.
#         vision.encoder.layers = vision.encoder.layers[0:2]
#         for layer in vision.encoder.layers:
#             layer.num_frames = batch_size

#         self.vision = vision
#         del vision

#         # The visual projection module can be used as is
#         self.visual_projection = original_model.base_model.visual_projection

#         # Extract and change the position embedding size of the mit
#         mit = original_model.base_model.mit
#         mit.position_embedding = torch.nn.Parameter(torch.rand(1, batch_size, 512))

#         self.mit = mit
#         del mit

#         # And finally the final score prediction layer
#         #self.scoring = torch.nn.Sequential(torch.nn.Flatten(-2), torch.nn.Linear(1009*768, 1), torch.nn.Sigmoid())
#         self.scoring = torch.nn.Sequential(torch.nn.Linear(512, 1), torch.nn.Sigmoid())

#     def forward(self, frames) -> torch.Tensor:
#         '''
#         Frames shape: (num_batch, num_frames, height, width, RGB_channels)
#         Output shape: (num_batch, num_frames, 1)
#         '''
#         num_batch, num_frames, height, width, RGB_channels = frames.shape
#         inputs = self.preprocessor(frames.view((num_batch*num_frames, height, width, RGB_channels)), return_tensors="pt")
#         features, _ = self.resnet(**inputs)
#         features, _ = features[-1]

#         vision_output = self.vision(features)
#         del features

#         video_embeds = vision_output[1]
#         video_embeds = self.visual_projection(video_embeds)

#         cls_features = video_embeds.view(num_batch, num_frames, -1)

#         mit_output = self.mit(cls_features)

#         video_embeds = mit_output[0]

#         scores = self.scoring(video_embeds)
#         scores = 4*scores + 1
#         return scores

# Define the dataloader to be used for training

In [10]:
import json

class X_Clip_Dataset(torch.utils.data.Dataset):

    def __init__(self, device, mode="test") -> None:
        super().__init__()

        if mode == "train":
            with open("dataset/TVSum/down-sampled frames/train.json", 'r') as j:
                self.frames = json.load(j)
        else:
            with open("dataset/TVSum/down-sampled frames/test.json", 'r') as j:
                self.frames = json.load(j)

        self.device = device

    def __getitem__(self, i):
        # Read the images
        frames_locs = self.frames[i]
        frame_set = torch.stack([F.pil_to_tensor(Image.open(item).resize((640, 360))).permute(1, 2, 0) for item in frames_locs], dim=0)

        # Read the score tensor
        path_items = frames_locs[0].split('/')
        target = path_items[-2]

        index = int(path_items[-1].rstrip('.jpg'))

        # score = torch.load(f"dataset/TVSum/ground truth/{target}.pt", map_location=self.device).mean(axis=0)
        score = torch.load(f"dataset/TVSum/ground truth/{target}.pt").mean(axis=0)
        
        if index+batch_size > len(score):
            score = score[-batch_size:]
        else:
            score = score[index:index+batch_size]
        score = score.reshape((batch_size, 1))
        return frame_set, score.float()

    def __len__(self):
        return len(self.frames)

# Start training

In [6]:
# Some libraries and model parameters
import os

checkpoint = 'X_Clip_mod.bin'
lr = 0.00005
weight_decay = 1
workers = 1
iterations = 100
start_epoch = 0

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on {device}")

Running on cuda


In [8]:
def train(train_loader, model, criterion, optimizer, epoch):
    """
    One epoch's training.

    :param train_loader: DataLoader for training data
    :param model: model
    :param criterion: content loss function (Mean Squared-Error loss)
    :param optimizer: optimizer
    :param epoch: epoch number
    """
    model.train()  # training mode enables batch normalization

    # Batches
    for i, (features, scores) in enumerate(train_loader):
        print(f'iteration: {i}')
        # Move to default device
        features = features.to(device)  # (batch_size=1, 10, 2048, 24, 42)
        scores = scores.to(device)  # (batch_size=1, 10, 1)
        # print(scores.shape)
        # Forward prop.
        predict_scores = model(features)  # (batch_size=1, 10, 1)
        # print(predict_scores.shape)
        print(predict_scores, scores)

        # Loss
        loss = criterion(predict_scores, scores)  # scalar

        # Backward prop.
        optimizer.zero_grad()
        loss.backward()

        # Update model
        optimizer.step()

        # Print status
        if i % batch_size == 0:
            print('Epoch: [{0}][{1}/{2}]----'
                  'Loss {loss:.4f}'.format(epoch, i, len(train_loader), loss=loss))

            # Save checkpoint
            torch.save({'epoch': epoch,
                        'model': model,
                        'optimizer': optimizer},
                        'X_Clip_mod.bin')

    del features, scores, predict_scores  # free some memory since their histories may be stored



# Initialize model or load checkpoint
if not os.path.exists(checkpoint):
    model = X_Clip_mod(device)

    # Initialize the optimizer
    optimizer = torch.optim.Adam(params=filter(lambda p: p.requires_grad, model.parameters()), lr=lr)

else:
    checkpoint = torch.load(checkpoint, map_location=device)
    start_epoch = checkpoint['epoch'] + 1
    model = checkpoint['model']
    optimizer = checkpoint['optimizer']

# Move to default device
model = model.to(device)
criterion = torch.nn.MSELoss().to(device)

# Custom dataloaders
train_dataset = X_Clip_Dataset(device)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=workers, pin_memory=True)  # note that we're passing the collate function here

# Total number of epochs to train for
epochs = int(iterations // len(train_loader) + 1)

In [None]:
# Epochs
epochs = 20
for epoch in range(start_epoch, epochs):
    # One epoch's training
    train(train_loader=train_loader,
            model=model,
            criterion=criterion,
            optimizer=optimizer,
            epoch=epoch)

iteration: 0
tensor([[[2.5372],
         [2.3980],
         [2.5168],
         [2.4412],
         [2.2844],
         [2.5313],
         [2.4081],
         [2.4538],
         [2.4876],
         [2.4563]],

        [[1.2907],
         [1.3546],
         [1.2968],
         [1.3147],
         [1.2986],
         [1.2523],
         [1.2259],
         [1.2760],
         [1.2874],
         [1.2996]]], device='cuda:0', grad_fn=<AddBackward0>) tensor([[[2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500],
         [2.3500]],

        [[1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000],
         [1.4000]]], device='cuda:0')
Epoch: [19][0/1040]----Loss 0.0144
iteration: 1
tensor([[[1.6299],
         [1.6694],
         [1.7865],
         [1.6101],
         [1.6756],
         [1.5873],

# Evaluation

In [17]:
def evaluate(test_loader, model, criterion, threshold=0.1):
    """
    Evaluate the model.

    :param test_loader: DataLoader for test data
    :param model: model
    :param criterion: loss function
    """
    model.eval()  # evaluation mode disables dropout
    total_loss = 0.0
    total_corrects = 0

    # No need to track gradients for validation, we're not optimizing.
    with torch.no_grad():
        for i, (features, scores) in enumerate(test_loader):
            features = features.to(device)
            scores = scores.to(device)

            # Forward prop.
            predict_scores = model(features)
            
            print(predict_scores, scores)

            # Loss
            loss = criterion(predict_scores, scores)

            total_loss += loss.item()

            # Calculate accuracy            
            binary_predictions = (loss < threshold).float()
            total_corrects += (binary_predictions == 1)

        avg_loss = total_loss / len(test_loader)
        avg_acc = total_corrects.double() / len(test_loader.dataset)

    return avg_loss, avg_acc

test_dataset = X_Clip_Dataset(device)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=2, shuffle=True, num_workers=workers, pin_memory=True)
print(evaluate(test_loader, model, criterion))

tensor([[[1.3698],
         [1.2462],
         [1.2699],
         [1.2974],
         [1.3571],
         [1.3464],
         [1.2763],
         [1.2683],
         [1.3109],
         [1.3334]],

        [[2.3401],
         [2.0405],
         [2.0268],
         [2.1332],
         [2.1846],
         [1.6895],
         [2.1687],
         [2.0670],
         [2.0475],
         [1.9312]]], device='cuda:0') tensor([[[1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500],
         [1.1500]],

        [[2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500],
         [2.6500]]], device='cuda:0')
tensor([[[1.3280],
         [1.5803],
         [1.2704],
         [1.3084],
         [1.2841],
         [1.2758],
         [1.2269],
         [1.3304],
         [1.2189],
         [1.2178]],

      

KeyboardInterrupt: 

# Calculate F1 score

In [78]:
import numpy as np
from sklearn.metrics import f1_score

# Define thresholds for class labels
thresholds = [1.5, 2.5, 3.5, 4.5]

def classify(scores):
    # Convert scores to class labels
    class_labels = np.digitize(scores.cpu().numpy(), bins=thresholds)
    return class_labels

def evaluate(model, criterion, data_loader):
    model.eval()
    all_predictions = []
    all_targets = []
    with torch.no_grad():
        for features, scores in data_loader:
            features = features.to(device)
            scores = scores.to(device)
            predict_scores = model(features)
            # Convert scores to class labels
            predicted_labels = classify(predict_scores)
            scores_labels = classify(scores)
            # append labels
            all_predictions.extend(predicted_labels)
            all_targets.extend(scores_labels)
            # print(all_predictions)
            # print(all_targets)
    # convert to 1-d array
    all_predictions = np.array(all_predictions).flatten()
    all_targets = np.array(all_targets).flatten()
    f1 = f1_score(all_targets, all_predictions, average='micro')  # Calculate F1 score
    return f1

test_dataset = X_Clip_Dataset(device)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=workers, pin_memory=True)
f1_score = evaluate(model, criterion, test_loader)
print("F1 Score:", f1_score) 
# F1 Score: 0.6026666666666667
# F1 Score: 0.6071666666666666
# F1 Score: 0.6093333333333333

F1 Score: 0.6093333333333333
