In [1]:
import pandas
import numpy as np
import matplotlib.pyplot as plt
import pickle   

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
import plotly.express as px
import plotly.graph_objects as go
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

In [2]:
BATCH_SIZE = 16
NUM_OF_ITERATIONS = 500
LEARNING_RATE = 0.0003
EMBEDDING_DIM = 8

# BATCH_SIZE = 16
# NUM_OF_ITERATIONS = 1000
# LEARNING_RATE = 0.0001
# EMBEDDING_DIM = 8


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [3]:
all_data1 =  pickle.load(open("all_latents.pkl", "rb"))
all_data2 = pickle.load(open("test_latents.pkl", "rb"))
all_data = all_data1 + all_data2
type1_data = [x for x in all_data if x['video_type'] == 0]
type2_data = [x for x in all_data if x['video_type'] == 1]
type3_data = [x for x in all_data if x['video_type'] == 2]

print("type1_data:", len(type1_data))
print("type2_data:", len(type2_data))
print("type3_data:", len(type3_data))

type1_data: 134
type2_data: 52
type3_data: 25


In [4]:
t = all_data1[0]['latent_straight']
t.shape

torch.Size([1, 4, 38, 36, 64])

In [5]:
t = t.mean(dim=1, keepdim=False)
t.shape

torch.Size([1, 38, 36, 64])

In [6]:
1024+256


1280

In [7]:
t = t.squeeze(0)
t.shape

torch.Size([38, 36, 64])

In [8]:
np.random.seed(42)
type1_data, test_data1 = train_test_split(type1_data, test_size=0.2)
type2_data, test_data2 = train_test_split(type2_data, test_size=0.2)
type3_data, test_data3 = train_test_split(type3_data, test_size=0.2)

In [9]:
class customDataset(Dataset):
    def __init__(self, data_list):
        self.data_list = data_list
    def __len__(self):
        return len(self.data_list)
    def __getitem__(self, idx):
        sample = self.data_list[idx]
        latent_forward = torch.squeeze(sample['latent_straight']).float().to(device)
        latent_reverse = torch.squeeze(sample['latent_reverse']).float().to(device)
        label = torch.tensor(sample['video_type'], dtype=torch.int8).to(device)
        return latent_forward, latent_reverse, label

class InfiniteDataLoader:
    def __init__(self, dataloader):
        self.dataloader = dataloader
        self.data_iter = iter(dataloader)
    def get_next(self):
        try:
            data = next(self.data_iter)
        except StopIteration:
            self.data_iter = iter(self.dataloader)
            data = next(self.data_iter)
        return data
    

dataset1 = customDataset(type1_data)
dataset2 = customDataset(type2_data)
dataset3 = customDataset(type3_data)
dataloader1 = DataLoader(dataset1, batch_size=BATCH_SIZE, shuffle=True)
dataloader2 = DataLoader(dataset2, batch_size=BATCH_SIZE, shuffle=True)
dataloader3 = DataLoader(dataset3, batch_size=BATCH_SIZE, shuffle=True)
infinite_dataloader1 = InfiniteDataLoader(dataloader1)
infinite_dataloader2 = InfiniteDataLoader(dataloader2)
infinite_dataloader3 = InfiniteDataLoader(dataloader3)

In [10]:
latent_forward, latent_reverse, labels = infinite_dataloader3.get_next()
print(f"{latent_forward.shape=}")
print(f"{latent_reverse.shape=}")
print(f"{labels.shape=}")  

latent_forward.shape=torch.Size([16, 4, 38, 36, 64])
latent_reverse.shape=torch.Size([16, 4, 38, 36, 64])
labels.shape=torch.Size([16])


In [11]:
class VideoEmbedder(nn.Module):
    def __init__(self, out_dim=32):
        super().__init__()
        self.conv3d = nn.Sequential(
            nn.Conv3d(in_channels=1, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool3d((1,2,2)),
            nn.Conv3d(8,16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool3d((None,1,1))  # keep T, pool over H,W
        )
        self.time_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(16, out_dim)

    def forward(self, x):
        x = x.mean(dim=1, keepdim=True)
        x = self.conv3d(x)       # => [B,16,T,1,1]
        x = x.squeeze(-1).squeeze(-1)  # => [B,16,T]
        x = self.time_pool(x)          # => [B,16,1]
        x = x.squeeze(-1)              # => [B,16]
        x = self.fc(x)  
        return x

    def total_params(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad) 

VE_model = VideoEmbedder(out_dim=EMBEDDING_DIM)
VE_model.total_params()

3832

In [12]:
# custom IngoNCE loss function
def info_nce_loss(anchor_emb, pos_emb, neg_embs, temperature=0.07):
    """
    anchor_emb: [B, D]    (embeddings for anchor samples)
    pos_emb:    [B, D]    (embeddings for positive samples)
    neg_embs:   [B, D] or [N, D] (embeddings for negative samples)
                Could be a single batch of negative examples
                or a bigger pool. 
    temperature: scalar (softmax temperature)

    Returns: scalar (the average InfoNCE loss over the batch)
    """
    sim = nn.functional.cosine_similarity
    pos_sim = sim(anchor_emb, pos_emb)
    # For each anchor in [B], we want to compare to all negative embeddings.
    bsize = anchor_emb.size(0)
    anchor_exp = anchor_emb.unsqueeze(1)        # => [B,1,D]
    neg_exp = neg_embs.unsqueeze(0)             # => [1,N,D]
    neg_sims = sim(anchor_exp.expand(-1, neg_embs.size(0), -1),
                   neg_exp.expand(bsize, -1, -1))
    pos_sim = pos_sim.unsqueeze(1)              # => [B,1]
    logits = torch.cat([pos_sim, neg_sims], dim=1)  # => [B, 1+N]
    logits = logits / temperature
    labels = torch.zeros(bsize, dtype=torch.long, device=anchor_emb.device)
    loss = F.cross_entropy(logits, labels)
    return loss

In [13]:
VE_model = VE_model.to(device)
VE_model.train()
optimizer = optim.Adam(VE_model.parameters(), lr=LEARNING_RATE)
losses = []

# training loop 
for i in range(NUM_OF_ITERATIONS):

    latent1_forward, latent1_reverse, labels = infinite_dataloader1.get_next()
    latent2_forward, latent2_reverse, labels = infinite_dataloader2.get_next()
    latent3_forward, latent3_reverse, labels = infinite_dataloader3.get_next()

    emb1_frw = VE_model(latent1_forward) 
    emb1_rev = VE_model(latent1_reverse)
    emb2_frw = VE_model(latent2_forward)
    emb2_rev = VE_model(latent2_reverse)
    emb3_frw = VE_model(latent3_forward)
    emb3_rev = VE_model(latent3_reverse)

    loss1 = info_nce_loss(
        anchor_emb=emb1_frw,
        pos_emb=emb1_frw,
        neg_embs=torch.cat([emb1_rev, emb2_frw, emb3_frw, emb3_rev], dim=0)
    )
    loss3 = info_nce_loss(
        anchor_emb=emb2_frw,
        pos_emb=emb2_frw,
        neg_embs=torch.cat([emb1_frw, emb2_rev, emb3_frw, emb3_rev], dim=0)
    )
    loss5 = info_nce_loss(
        anchor_emb=emb3_frw,
        pos_emb=emb3_frw,
        neg_embs=torch.cat([emb1_frw, emb2_frw, emb1_rev, emb2_rev], dim=0)
    )


    loss = (loss1 + loss3 + loss5) / 3.0

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    losses.append(loss.item())  
    if i % 25 == 0:
        print(f"Iter {i} | Loss: {loss.item():.4f}")

Iter 0 | Loss: 2.1509
Iter 25 | Loss: 2.0436
Iter 50 | Loss: 1.8727
Iter 75 | Loss: 1.0607
Iter 100 | Loss: 0.4837
Iter 125 | Loss: 0.0229
Iter 150 | Loss: 0.0001
Iter 175 | Loss: 0.0001
Iter 200 | Loss: 0.0006
Iter 225 | Loss: 0.0000
Iter 250 | Loss: 0.0007
Iter 275 | Loss: 0.0001
Iter 300 | Loss: 0.0002
Iter 325 | Loss: 0.0000
Iter 350 | Loss: 0.0004
Iter 375 | Loss: 0.0002
Iter 400 | Loss: 0.0015
Iter 425 | Loss: 0.0001
Iter 450 | Loss: 0.0000
Iter 475 | Loss: 0.0000


In [14]:
VE_model.eval()
embeddings = []
labels = []
files = []
# Extract embeddings and labels
for d in tqdm(all_data):
    latent = torch.squeeze(d['latent_straight']).float().unsqueeze(0).to(device)
    label = d['video_type']
    files.append(d['file'])
    labels.append(int(label))
    emb = VE_model(latent)
    emb = emb.cpu().detach().numpy()
    embeddings.append(emb)

# Flatten embeddings and apply PCA
embeddings = np.vstack(embeddings)

100%|██████████| 211/211 [00:00<00:00, 3493.36it/s]


In [15]:
pca = PCA(n_components=2)
emb_2d = pca.fit_transform(embeddings)
# normalize embeddings
emb_2d = (emb_2d - emb_2d.mean(axis=0)) / emb_2d.std(axis=0)
X = emb_2d[:, 0]
Y = emb_2d[:, 1]

In [16]:
X.shape, Y.shape, len(labels)

((211,), (211,), 211)

In [19]:
labels_str = np.array(labels).astype(str)
# change labels description
labels_str[labels_str == '0'] = 'No violation'
labels_str[labels_str == '1'] = 'Violation'
labels_str[labels_str == '2'] = 'Other'

fig = px.scatter(
    x=X,
    y=Y,
    color=labels_str,  # Assign colors based on labels
    color_discrete_sequence=px.colors.qualitative.Set1,  # Use discrete color palette
    labels={'x': 'PCA Component 1', 'y': 'PCA Component 2', 'color': 'Label'},
    title='PCA Visualization of Embeddings',
    hover_data={'Label': labels, 'File': files}
)

# Customize marker properties
fig.update_traces(marker=dict(size=8, opacity=0.6), showlegend=True) 

# Adjust layout for a squared plot
fig.update_layout(
    xaxis=dict(scaleanchor="y", title="PCA Component 1"),
    yaxis=dict(title="PCA Component 2"),
    height=600,  # Set height
    width=800    # Set width
)

# Show the plot
fig.show()

In [18]:
# kNN for classification on test data
from sklearn.neighbors import KNeighborsClassifier

# Extract embeddings and labels
test_data = test_data3
classifier = KNeighborsClassifier(n_neighbors=3)
classifier.fit(embeddings, labels)

acc_scores = []
for d in tqdm(test_data):
    latent = torch.squeeze(d['latent_straight']).float().unsqueeze(0).to(device)
    label = d['video_type']
    emb = VE_model(latent)
    emb = emb.cpu().detach().numpy()
    pred = classifier.predict(emb)
    acc = (pred == label)
    acc_scores.append(acc)

print(f"Accuracy: {np.mean(acc_scores):.4f}")   



100%|██████████| 5/5 [00:00<00:00, 717.59it/s]

Accuracy: 0.6000



