In [1]:
from pytorchvideo.data import LabeledVideoDataset, make_clip_sampler,labeled_video_dataset

import torch.nn as nn
import torch
from pytorch_lightning import LightningModule,seed_everything,Trainer
from pytorch_lightning.callbacks import ModelCheckpoint, LearningRateMonitor
from torch.optim.lr_scheduler import CosineAnnealingLR

import torchmetrics

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from pytorchvideo.transforms import (
    ApplyTransformToKey,
    Normalize,
    RandomShortSideScale,
    UniformTemporalSubsample,
    Permute
)

from torchvision.transforms import (
    Compose,
    Lambda,
    RandomCrop,
    RandomHorizontalFlip,
    Resize
)

from torchvision.transforms._transforms_video import (
    CenterCropVideo,
    NormalizeVideo
)




In [3]:
video_transform = Compose([
    ApplyTransformToKey(key='video',
    transform = Compose([
        UniformTemporalSubsample(20),
        Lambda(lambda x:x/255),
        Normalize((0.45,0.45,0.45),(0.225,0.225,0.225)),
        RandomShortSideScale(min_size=248,max_size=256),
        CenterCropVideo(224),
        RandomHorizontalFlip(p=0.5)
    ]),
    ),
])


In [4]:
class OurModel(LightningModule):
    def __init__(self):
        super(OurModel, self).__init__()
        self.video_model = torch.hub.load('facebookresearch/pytorchvideo', 'efficient_x3d_xs', pretrained=True)
        self.relu = nn.ReLU()  # Fixed typo
        self.linear = nn.Linear(400, 1)  # Confirm input and output dimensions
        
        self.lr = 1e-3
        self.batch_size = 8
        self.numworker = 3
        
        self.metric = torchmetrics.Accuracy(task="binary")
        self.criterion = nn.BCEWithLogitsLoss()
        
        # Initialize validation step outputs list
        self.validation_step_outputs = []
        self.train_step_outputs = []
        self.test_step_outputs = []
    
    def forward(self, x):
        x = self.video_model(x)
        x = self.relu(x)
        x = self.linear(x)
        return x
    
    def configure_optimizers(self):
        opt = torch.optim.AdamW(params=self.parameters(), lr=self.lr)  # Fixed typo
        scheduler = CosineAnnealingLR(opt, T_max=10, eta_min=1e-6, last_epoch=-1)  # Define T_0
        return {"optimizer": opt, "lr_scheduler": scheduler}
    def train_dataloader(self):
        dataset = labeled_video_dataset(train_df,
                                      clip_sampler=make_clip_sampler('random',2),
                                      transform=video_transform,decode_audio=False,)
        loader = DataLoader(dataset,batch_size=self.batch_size,num_workers=self.numworker,pin_memory=True)
        return loader
    def training_step(self, batch, batch_idx):
        video, label = batch['video'], batch['label']
        out = self(video)
        loss = self.criterion(out, label)
        metric = self.metric(out, label.to(torch.int64))
        
        # Append training loss to outputs list
        self.train_step_outputs.append(loss.item())
        return {"loss": loss, "metric": metric.detach()}
    def on_train_epoch_end(self):
        # Compute the average training loss
        epoch_average = torch.tensor(self.train_step_outputs).mean()
        
        # Log the average training loss
        self.log("train_loss", epoch_average.item())
        
        # Clear the training step outputs list
        self.train_step_outputs.clear()


    def val_dataloader(self):
        dataset = labeled_video_dataset(val_df,
                                      clip_sampler=make_clip_sampler('random',2),
                                      transform=video_transform,decode_audio=False,)
        loader = DataLoader(dataset,batch_size=self.batch_size,num_workers=self.numworker,pin_memory=True)
        return loader
    
    def validation_step(self, batch, batch_idx):
        video, label = batch['video'], batch['label']
        out = self(video)
        loss = self.criterion(out, label)
        metric = self.metric(out, label.to(torch.int64))
        
        # Append validation loss to outputs list
        self.validation_step_outputs.append(loss.item())
        return {"loss": loss, "metric": metric.detach()}
    
    def on_validation_epoch_end(self):
        # Compute the average validation loss
        epoch_average = torch.tensor(self.validation_step_outputs).mean()
        
        # Log the average validation loss
        self.log("val_loss", epoch_average.item())
        
        # Clear the validation step outputs list
        self.validation_step_outputs.clear()
        
    def test_dataloader(self):
        dataset = labeled_video_dataset(val_df,
                                      clip_sampler=make_clip_sampler('random',2),
                                      transform=video_transform,decode_audio=False,)
        loader = DataLoader(dataset,batch_size=self.batch_size,num_workers=self.numworker,pin_memory=True)
        return loader
    def test_step(self, batch, batch_idx):
        video, label = batch['video'], batch['label']
        out = self(video)
        loss = self.criterion(out, label)
        
        # Append test loss to outputs list
        self.test_step_outputs.append(loss.item())
        return {"loss": loss}
    def on_test_epoch_end(self):
        # Compute the average test loss
        epoch_average = torch.tensor(self.test_step_outputs).mean()
        
        # Log the average test loss
        self.log("test_loss", epoch_average.item())
        
        # Clear the test step outputs list
        self.test_step_outputs.clear()



In [5]:
# Path to the checkpoint
checkpoint_path = 'last.ckpt'

# Load the model
model = OurModel.load_from_checkpoint(checkpoint_path)

Using cache found in C:\Users\natis/.cache\torch\hub\facebookresearch_pytorchvideo_main


In [6]:
from pytorchvideo.data.encoded_video import EncodedVideo
video = EncodedVideo.from_path('test_video.mp4')

In [7]:
video_data = video.get_clip(0,2)
print(video_data['video'].shape)
video_data = video_transform(video_data)
print(video_data['video'].shape)

torch.Size([3, 60, 1080, 1920])
torch.Size([3, 20, 224, 224])


In [8]:

inputs = video_data['video']
inputs = torch.unsqueeze(inputs,0)
print(inputs.shape)

torch.Size([1, 3, 20, 224, 224])


In [9]:
pred = model(inputs)
pred = pred.detach().cpu().numpy()

import numpy as np
# Apply sigmoid to convert logit to probability
probability = 1 / (1 + np.exp(-(pred[0][0])))
print(probability)  # This will give a probability between 0 and 1

0.9010360734173022


In [10]:
pred = np.where(pred >= 0.5,1,0)
pred

array([[1]])