In [None]:
from typing import Callable

import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image

from sklearn.metrics import roc_auc_score, roc_curve

import torch
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
from torchvision import models

import cv2

In [None]:
rerun_feats = True      # False if you want to use the cached features

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
backbone_model: models.inception.Inception3 = models.inception_v3(pretrained=True).to(device)
backbone_model.eval()



Inception3(
  (Conv2d_1a_3x3): BasicConv2d(
    (conv): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), bias=False)
    (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (Conv2d_2a_3x3): BasicConv2d(
    (conv): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(32, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (Conv2d_2b_3x3): BasicConv2d(
    (conv): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (maxpool1): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  (Conv2d_3b_1x1): BasicConv2d(
    (conv): Conv2d(64, 80, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(80, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (Conv2d_4a_3x3): BasicConv2d(
    (conv): Conv2d(80, 192, kernel_size=(3, 3), stri

In [5]:
feat_dim = backbone_model.fc.out_features

In [6]:
train_df = pd.read_csv('train.csv')
val_df = pd.read_csv('val.csv')

In [7]:
train_df

Unnamed: 0,id,time_of_event,time_of_alert,label,vid_path,event_frame,tensor_path
0,0,20.760,19.136,1,data/00000.mp4,7,train_tensor/00000.pt
1,4,19.367,19.167,1,data/00004.mp4,7,train_tensor/00004.pt
2,5,20.874,20.809,1,data/00005.mp4,7,train_tensor/00005.pt
3,6,19.233,17.133,1,data/00006.mp4,7,train_tensor/00006.pt
4,7,21.200,19.500,1,data/00007.mp4,7,train_tensor/00007.pt
...,...,...,...,...,...,...,...
1195,2132,,,0,data/02132.mp4,-1,train_tensor/02132.pt
1196,2133,,,0,data/02133.mp4,-1,train_tensor/02133.pt
1197,2134,,,0,data/02134.mp4,-1,train_tensor/02134.pt
1198,2136,,,0,data/02136.mp4,-1,train_tensor/02136.pt


In [8]:
class CustomDataset(Dataset):
    def __init__(self, df: pd.DataFrame):
        self.X = df['tensor_path'].tolist()
        self.y = df['label'].tolist()
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx: int):
        tensor_path = self.X[idx]
        label = self.y[idx]

        # Load the Tensor
        tensor: torch.Tensor = torch.load(tensor_path)

        return tensor, label


In [9]:
train_ds = CustomDataset(train_df)
val_ds = CustomDataset(val_df)

In [10]:
def get_features(model: torch.nn.Module, ds: Dataset, device: str = 'cuda') -> np.ndarray:
    features = []

    for X, y in tqdm(ds):
        X = X.to(device)
        feat: torch.Tensor = model(X)
        feat = feat.detach().cpu().numpy()

        features.append(feat)

    return np.stack(features, axis=0)

In [11]:
if rerun_feats:
    train_feats = get_features(backbone_model, train_ds)
    np.save('train_feats.npy', train_feats)

if rerun_feats:
    val_feats = get_features(backbone_model, val_ds)
    np.save('val_feats.npy', val_feats)


In [12]:
train_feats: np.ndarray = np.load('train_feats.npy')
val_feats: np.ndarray = np.load('val_feats.npy')

In [13]:
y_train: np.ndarray = train_df['label'].values
y_val: np.ndarray = val_df['label'].values
frame_train: np.ndarray = train_df['event_frame'].values
frame_val: np.ndarray = val_df['event_frame'].values

In [14]:
train_feats.shape, val_feats.shape

((1200, 16, 1000), (300, 16, 1000))

In [15]:
def reshape_processer(feats: np.ndarray) -> np.ndarray:
    return feats.reshape(feats.shape[0], -1)

In [16]:
processer = reshape_processer

In [17]:
class CustomDataset(Dataset):
    def __init__(self, X: np.ndarray, y: np.ndarray):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx: int):
        feats = self.X[idx]
        label = self.y[idx]
        return feats, label

# MLP

In [None]:
class BinaryClassifier(torch.nn.Module):
    def __init__(self, input_dim: int, device: str = 'cuda'):
        super().__init__()

        self.fc = torch.nn.Linear(input_dim, 1)
        self.sigmoid = torch.nn.Sigmoid()
        self.device = device
    
    def to(self, device: str):
        self.device = device
        return super().to(device)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.fc(x)
        x = self.sigmoid(x)
        return x
    
    def fit(self, ds: Dataset, epochs: int = 10, batch_size: int = 32, lr = 0.001):
        self.train()
        dl = DataLoader(ds, batch_size=batch_size, shuffle=True)

        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        criterion = torch.nn.BCELoss()


        for epoch in tqdm(range(epochs)):
            for X_batch, y_batch in dl:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device).float()
                optimizer.zero_grad()
                y_pred = self(X_batch)
                loss = criterion(y_pred.squeeze(), y_batch)
                loss.backward()
                optimizer.step()
    
    @torch.no_grad()
    def predict_proba(self, ds: Dataset) -> np.ndarray:
        self.eval()
        dl = DataLoader(ds, batch_size=32, shuffle=False)

        preds = []

        for X_batch, _ in dl:
            X_batch = X_batch.to(self.device)
            y_pred = self(X_batch)
            preds.append(y_pred.squeeze().cpu().numpy())
        
        return np.concatenate(preds, axis=0)
    
    @torch.no_grad()
    def evaluation(self, ds: Dataset, threshold=0.5) -> np.ndarray:
        self.eval()
        dl = DataLoader(ds, batch_size=32, shuffle=False)

        correct = 0
        n = len(ds)

        for X_batch, y_batch in dl:
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)
            y_pred = self(X_batch)

            y_pred = (y_pred.squeeze() > threshold).int()
            correct += (y_pred == y_batch).sum().item()
        
        return correct / n

In [19]:
train_ds = CustomDataset(processer(train_feats), y_train)
val_ds = CustomDataset(processer(val_feats), y_val)

In [20]:
feat_dim = train_ds[0][0].shape[0]

In [21]:
model: BinaryClassifier = BinaryClassifier(input_dim=feat_dim).to(device)

In [22]:
model.fit(train_ds, epochs=100, batch_size=32)

100%|██████████| 100/100 [00:05<00:00, 19.57it/s]


In [23]:
val_score = model.predict_proba(val_ds)
print("Validation ROC-AUC:", roc_auc_score(y_val, val_score))        # 0.9: Excellent, 0.8: Good, 0.7: Fair, 0.6: Poor, 0.5: Fail

Validation ROC-AUC: 0.7261333333333333


In [24]:
fpr, tpr, thresh = roc_curve(y_val, val_score)
j_scores = tpr - fpr
best_idx = np.argmax(j_scores)
best_threshold = thresh[best_idx]

In [25]:
model.evaluation(val_ds), model.evaluation(val_ds, best_threshold)

(0.6433333333333333, 0.67)

# TIMESTAMP PREDICTION

In [None]:
class TimeStampModel(torch.nn.Module):
    def __init__(self, input_dim: int, output_dim: int, device: str = 'cuda'):
        super().__init__()

        self.fc = torch.nn.Linear(input_dim, output_dim)

        self.device = device
    
    def to(self, device: str):
        self.device = device
        return super().to(device)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.fc(x)
        return x
    
    def fit(self, ds: Dataset, epochs: int = 10, batch_size: int = 32, lr = 0.001):
        self.train()
        dl = DataLoader(ds, batch_size=batch_size, shuffle=True)

        optimizer = torch.optim.Adam(self.parameters(), lr=lr)
        criterion = torch.nn.CrossEntropyLoss()

        for epoch in tqdm(range(epochs)):
            for X_batch, y_batch in dl:
                X_batch = X_batch.to(self.device)
                y_batch = y_batch.to(self.device)
                optimizer.zero_grad()
                y_pred = self(X_batch)
                loss = criterion(y_pred, y_batch)
                loss.backward()
                optimizer.step()
    
    @torch.no_grad()
    def evaluation(self, ds: Dataset) -> np.ndarray:
        self.eval()
        dl = DataLoader(ds, batch_size=32, shuffle=False)

        correct = 0
        n = len(ds)

        for X_batch, y_batch in dl:
            X_batch = X_batch.to(self.device)
            y_batch = y_batch.to(self.device)
            y_pred = self(X_batch)

            y_pred = torch.argmax(y_pred, dim=1)
            correct += (y_pred == y_batch).sum().item()
        
        return correct / n

In [27]:
train_indices = np.where(frame_train != -1)[0]
val_indices = np.where(frame_val != -1)[0]

In [28]:
train_ds = CustomDataset(processer(train_feats[train_indices]), frame_train[train_indices])
val_ds = CustomDataset(processer(val_feats[val_indices]), frame_val[val_indices])

In [29]:
timestamp_model = TimeStampModel(input_dim=feat_dim, output_dim=16).to(device)

In [30]:
timestamp_model.fit(train_ds, epochs=100, batch_size=32)

100%|██████████| 100/100 [00:02<00:00, 40.45it/s]


In [31]:
timestamp_model.evaluation(val_ds)

0.84

# Final Model

In [None]:
class CollisionDetection:
    def __init__(self, backbone_model: torch.nn.Module, detector_model: BinaryClassifier, timestamp_model: TimeStampModel, transform: Callable, threshold: float = 0.5):
        self.backbone_model = backbone_model
        self.detector_model = detector_model
        self.timestamp_model = timestamp_model
        self.transform = transform
        self.threshold = threshold

        self.n_frames = 16

    def extract_frames(self, path: int):
        cap = cv2.VideoCapture(path)

        fps = int(cap.get(cv2.CAP_PROP_FPS))
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        duration = frame_count / fps
        
        
        step = max(frame_count // self.n_frames, 1)
        frames = []

        for i in range(self.n_frames):
            cap.set(cv2.CAP_PROP_POS_FRAMES, i * step)
            ret, frame = cap.read()

            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            frame = Image.fromarray(frame)
            frame = self.transform(frame)

            frames.append(frame)

        cap.release()

        return torch.stack(frames), duration

    @torch.no_grad()
    def infer_one_video(self, video_path: str):
        frames, duration = self.extract_frames(video_path)
        frames = frames.to(device)

        feats: torch.Tensor = self.backbone_model(frames)

        feats = feats.reshape(-1)        

        proba = self.detector_model(feats)

        if proba > self.threshold:
            timestamp_pred = self.timestamp_model(feats)
            timestamp_pred = torch.argmax(timestamp_pred, dim=0).cpu().numpy()

            start = timestamp_pred * duration / self.n_frames
            end   = (timestamp_pred + 1) * duration / self.n_frames

            return start, end
        
        return -1, -1
    

In [66]:
tfm = transforms.Compose([
    transforms.Resize((299, 299)),
    transforms.ToTensor(), # ToTensor : [0, 255] -> [0, 1]
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [67]:
final_model = CollisionDetection(
    backbone_model=backbone_model,
    detector_model=model,
    timestamp_model=timestamp_model,
    transform=tfm,
    threshold=best_threshold
)

In [68]:
final_model.infer_one_video("data/00003.mp4")

(17.54375, 20.05)

In [None]:
qsort = 