<a href="https://colab.research.google.com/github/ponakilan/vid-anomaly/blob/main/RC_IPAD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# ! 7z x '/content/drive/MyDrive/Research Credit/IPAD_dataset.zip'

! unzip -q '/content/drive/MyDrive/Research Credit/IPAD_dataset.zip' 'IPAD_dataset/R01'

In [1]:
import math

import torch
from PIL import Image
from torchvision.datasets import DatasetFolder
from torchvision.transforms import ToTensor, Resize, Compose
from torch.utils.data import Dataset, DataLoader
from transformers import ViTImageProcessor, ViTModel

SEQ_LEN = 10

def load_image(path):
    image = Image.open(path)
    return image


img_label_dataset = DatasetFolder(
    root="IPAD_dataset/R01/training/frames",
    loader=load_image,
    transform=Compose([
        Resize((224, 224)),
        ToTensor()
    ]),
    is_valid_file=lambda x: True
)

class ImgDataset(Dataset):
    def __init__(self, dataset, seq_len=SEQ_LEN):
        self.dataset = dataset
        self.seq_len = seq_len

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k')
        self.model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k').to(self.device)

    def __len__(self):
        return math.ceil(len(self.dataset) / self.seq_len)

    def __getitem__(self, idx):
        start_idx = idx * self.seq_len
        end_idx = min(start_idx + self.seq_len, len(self.dataset))
        sequence = [self.dataset[i] for i in range(start_idx, end_idx)]
        labels = [data[1] for data in sequence]
        is_valid = all(label == labels[0] for label in labels)
        if is_valid:
            images = torch.stack([data[0] for data in sequence])
            if len(images) < self.seq_len:
                images = torch.cat([images, torch.zeros(self.seq_len - len(images), 3, 224, 224)])
        else:
            images = torch.zeros(self.seq_len, 3, 224, 224)

        inputs = self.processor(
            images=images,
            return_tensors="pt"
        )
        inputs = {k: v.to(self.device) for k, v in inputs.items()}
        outputs = self.model(**inputs)
        last_hidden_states = outputs.last_hidden_state
        encoding = last_hidden_states[:, 0, :]

        return encoding, images

dataset = ImgDataset(img_label_dataset)

print(f"Number of samples: {len(dataset)}")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Number of samples: 781


In [2]:
from torch import nn


class MultiScaleTemporalAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, device, scales=[2, 4, 8, 10]):
        super().__init__()
        self.scales = scales
        self.attention_layers = nn.ModuleList([
            nn.MultiheadAttention(embed_dim, num_heads).to(device) for _ in scales
        ])

    def forward(self, x):
        B, T, D = x.shape
        outputs = []

        for i, window_size in enumerate(self.scales):
            attn_device = next(self.attention_layers[i].parameters()).device
            if window_size >= T:
                q = k = v = x.transpose(0, 1).to(attn_device)
                attn_output, _ = self.attention_layers[i](q, k, v)
                outputs.append(attn_output.transpose(0, 1))
            else:
                local_outputs = []
                for start in range(T - window_size + 1):
                    chunk = x[:, start:start + window_size, :].to(attn_device)
                    q = k = v = chunk.transpose(0, 1)
                    attn_output, _ = self.attention_layers[i](q, k, v)
                    local_outputs.append(attn_output.mean(0))
                local_output = torch.stack(local_outputs, dim=1)
                local_output = F.interpolate(local_output.transpose(1, 2), size=T, mode='linear').transpose(1, 2)
                outputs.append(local_output)

        final_output = torch.stack(outputs, dim=0).mean(0)
        return final_output  # (B, T, D)

class CNNFrameReconstructor(nn.Module):
    def __init__(self, embed_dim=768, feature_dim=512, out_channels=3, img_size=224):
        super(CNNFrameReconstructor, self).__init__()
        self.img_size = img_size
        self.feature_dim = feature_dim
        self.out_channels = out_channels

        self.fc = nn.Linear(embed_dim, feature_dim * (img_size // 16) * (img_size // 16))

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(feature_dim, feature_dim // 2, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(feature_dim // 2, feature_dim // 4, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(feature_dim // 4, feature_dim // 8, kernel_size=4, stride=2, padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(feature_dim // 8, out_channels, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        """
        x: (B, 50, embed_dim)
        Output: (B, 50, C, H, W)
        """
        B, T, D = x.shape
        x = x.view(B * T, D)
        x = self.fc(x)
        x = x.view(B * T, self.feature_dim, self.img_size // 16, self.img_size // 16)
        x = self.decoder(x)
        x = x.view(B, T, self.out_channels, self.img_size, self.img_size)
        return x

class FrameReconstructionModel(nn.Module):
    def __init__(self, device):
        super(FrameReconstructionModel, self).__init__()
        self.attn = MultiScaleTemporalAttention(
            embed_dim=768,
            num_heads=4,
            device=device,
            scales=[5, 10, 20]
        ).to(device)
        self.reconstructor = CNNFrameReconstructor().to(device)

    def forward(self, x):
        x = self.attn(x)
        x = self.reconstructor(x)
        return x.float()

In [3]:
from fastai.vision.learner import Learner
from torch.utils.data import random_split
from fastai.vision.all import *

EPOCHS = 5
BATCH_SIZE = 6

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = FrameReconstructionModel(device=device).to(device)
loss_func = MSELossFlat()

train_size = int(0.85 * len(dataset))
valid_size = len(dataset) - train_size
train_ds, valid_ds = random_split(dataset, [train_size, valid_size])

train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
valid_dl = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False)
dls = DataLoaders(train_dl, valid_dl)

learn = Learner(dls=dls, model=model, loss_func=loss_func, opt_func=Adam, lr=0.001)

# learn.lr_find()

In [None]:
learn.fine_tune(EPOCHS)

SAVE_PATH = f"/content/drive/MyDrive/model_{SEQ_LEN}_{EPOCHS}_{learn.loss}.pth"
learn.save(SAVE_PATH)
print(f"Model saved to {SAVE_PATH}")

epoch,train_loss,valid_loss,time
0,0.037117,0.024451,04:35


It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.


epoch,train_loss,valid_loss,time


In [None]:
drive.flush_and_unmount()