<a href="https://colab.research.google.com/github/khanakshah27/GuardianAI/blob/model/vidcr.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

!pip install torch torchvision opencv-python-headless





In [None]:

import cv2
import numpy as np
import torch
import torch.nn as nn
from torchvision.models.video import r3d_18, R3D_18_Weights

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")




In [None]:
def video_to_embeddings(video_path, model, mean, std, clip_len=16, stride=8, size=112):
    cap = cv2.VideoCapture(video_path)
    frames = []
    embeddings = []
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame = cv2.resize(frame, (size, size))
        frame = frame.astype(np.float32) / 255.0
        frame = (frame - mean) / std
        frames.append(frame)
    cap.release()

    for start in range(0, len(frames) - clip_len + 1, stride):
        clip = np.stack(frames[start:start+clip_len], axis=0)
        clip = torch.tensor(clip, dtype=torch.float32).permute(3, 0, 1, 2).unsqueeze(0).to(device)
        with torch.no_grad():
            feats = model.stem(clip)
            feats = model.layer1(feats)
            feats = model.layer2(feats)
            feats = model.layer3(feats)
            feats = model.layer4(feats)
            emb = feats.mean([-3, -2, -1]).squeeze(0).cpu().numpy()
            embeddings.append(emb)
    return np.stack(embeddings) if embeddings else None

mean = np.array([0.43216, 0.394666, 0.37645])
std = np.array([0.22803, 0.22145, 0.216989])

r3d = r3d_18(weights=R3D_18_Weights.DEFAULT)
r3d.fc = nn.Identity()
r3d = r3d.eval().to(device).double()

Downloading: "https://download.pytorch.org/models/r3d_18-b3b3357e.pth" to /root/.cache/torch/hub/checkpoints/r3d_18-b3b3357e.pth


100%|██████████| 127M/127M [00:01<00:00, 131MB/s]


In [None]:
class LSTMAutoEncoder(nn.Module):

 def __init__(self, input_dim, hidden_dim=128):

    super().__init__()

    self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)

    self.decoder = nn.LSTM(hidden_dim, input_dim, batch_first=True)

    self.hidden_dim = hidden_dim

 def forward(self, x):



    _, (h, _) = self.encoder(x)

    z = h[-1].unsqueeze(0).repeat(x.size(1), 1, 1).permute(1, 0, 2)

    decoded, _ = self.decoder(z)

    return decoded

lstmae = LSTMAutoEncoder(input_dim=512, hidden_dim=128).to(device)
lstmae = lstmae.eval()
lstmae = lstmae.float()

In [None]:
r3dclassifier = r3d_18(weights=R3D_18_Weights.DEFAULT)
r3dclassifier.fc = nn.Linear(r3dclassifier.fc.in_features, 2)
r3dclassifier = r3dclassifier.eval().to(device)
r3dclassifier = r3dclassifier.float()

In [None]:

def fusion_decision(r3d_logits, ae_x, ae_xhat, anomaly_threshold=0.7, crime_conf_threshold=0.75, normal_conf_threshold=0.8): # Lowered crime_conf_threshold

    probs = torch.softmax(torch.tensor(r3d_logits), dim=1)
    crime_conf = probs[:, 1].max().item()
    normal_conf = probs[:, 0].max().item()

    ae_x = torch.tensor(ae_x, dtype=torch.float32)
    ae_xhat = torch.tensor(ae_xhat, dtype=torch.float32)
    mse = torch.mean((ae_x - ae_xhat) ** 2, dim=1)
    anomaly_score = mse.mean().item()

    print(f"  Classifier Conf (Crime): {crime_conf:.2f}")
    print(f"  Classifier Conf (No Crime): {normal_conf:.2f}")
    print(f"  Anomaly Score: {anomaly_score:.2f}")

    if crime_conf > crime_conf_threshold:
        return "crime", crime_conf, anomaly_score
    elif normal_conf > normal_conf_threshold:
        return "no crime", normal_conf, anomaly_score
    elif anomaly_score > anomaly_threshold:
         return "crime", crime_conf, anomaly_score
    else:
        return "no crime", normal_conf, anomaly_score

In [None]:

embedding_classifier = EmbeddingClassifier(input_dim=512, num_classes=2).to(device)
embedding_classifier.load_state_dict(torch.load('embedding_classifier.pth'))
embedding_classifier.eval()
embedding_classifier = embedding_classifier.float()

def analyze_video(video_path):

    emb_seq = video_to_embeddings(video_path, r3d, mean, std)
    if emb_seq is None:
        print("No clips found.")
        return

    emb_seq_torch = torch.tensor(emb_seq, dtype=torch.float32).unsqueeze(0).to(device)

    with torch.no_grad():
        xhat = lstmae(emb_seq_torch).cpu().squeeze(0).numpy()

    r3d_preds = []
    with torch.no_grad():
        clip_embeddings = torch.tensor(emb_seq, dtype=torch.float32).to(device)
        logits = embedding_classifier(clip_embeddings)

        avg_logits = torch.mean(logits, dim=0).unsqueeze(0).cpu().numpy()
        r3d_preds = avg_logits


    label, r3d_conf, anom = fusion_decision(r3d_preds, emb_seq, xhat)
    print(f"RESULT: {label.upper()} (Classifier conf={r3d_conf:.2f}, Anomaly score={anom:.2f})")

In [None]:
lstmae = lstmae.to(device).float()
r3dclassifier = r3dclassifier.to(device).float()

In [None]:
for param in r3d.parameters():
    param.data = param.data.float()
for buf in r3d.buffers():
    buf.data = buf.data.float()
for param in lstmae.parameters():
    param.data = param.data.float()
for buf in lstmae.buffers():
    buf.data = buf.data.float()
for param in r3dclassifier.parameters():
    param.data = param.data.float()
for buf in r3dclassifier.buffers():
    buf.data = buf.data.float()

In [None]:
analyze_video("/content/crime5.mp4")
#analyze_video("/content/good2.mp4")


  Classifier Conf (Crime): 0.98
  Classifier Conf (No Crime): 0.02
  Anomaly Score: 1.05
RESULT: CRIME (Classifier conf=0.98, Anomaly score=1.05)


In [None]:

full_dataset = [
    ("/content/crime4.mp4", "crime"),
    ("/content/crime3.mp4", "crime"),
    ("/content/good1.mp4", "no crime"),
    ("/content/crime5.mp4", "crime"),
    ("/content/good2.mp4", "no crime"),
    ("/content/good3.mp4", "no crime"),
    ("/content/good4.mp4", "no crime"),
    ("/content/crime6.mp4", "crime"),

]

import random

random.shuffle(full_dataset)

train_size = int(0.8 * len(full_dataset))
training_dataset = full_dataset[:train_size]
testing_dataset = full_dataset[train_size:]


correct_predictions = 0
total_videos = len(testing_dataset)

print("Evaluating pipeline accuracy on the testing dataset...")

for video_path, ground_truth in testing_dataset:
    print(f"\nAnalyzing {video_path} (Ground Truth: {ground_truth})")

    import sys
    from io import StringIO
    old_stdout = sys.stdout
    redirected_output = StringIO()
    sys.stdout = redirected_output

    try:
        analyze_video(video_path)
    finally:
        sys.stdout = old_stdout

    output = redirected_output.getvalue().strip()
    print(output)

    predicted_label = "unknown"
    if "RESULT: CRIME" in output:
        predicted_label = "crime"
    elif "RESULT: NO CRIME" in output:
        predicted_label = "no crime"

    print(f"Predicted: {predicted_label}")

    if predicted_label == ground_truth:
        correct_predictions += 1

accuracy = (correct_predictions / total_videos) * 100 if total_videos > 0 else 0
print(f"\nOverall Accuracy on Testing Dataset: {accuracy:.2f}%")

Evaluating pipeline accuracy on the testing dataset...

Analyzing /content/crime4.mp4 (Ground Truth: crime)
Classifier Conf (Crime): 0.95
  Classifier Conf (No Crime): 0.05
  Anomaly Score: 0.59
RESULT: CRIME (Classifier conf=0.95, Anomaly score=0.59)
Predicted: crime

Analyzing /content/good1.mp4 (Ground Truth: no crime)
Classifier Conf (Crime): 0.02
  Classifier Conf (No Crime): 0.98
  Anomaly Score: 0.87
RESULT: NO CRIME (Classifier conf=0.98, Anomaly score=0.87)
Predicted: no crime

Overall Accuracy on Testing Dataset: 100.00%


In [None]:
training_dataset = [
    ("/content/crime4.mp4", "crime"),
    ("/content/crime3.mp4", "crime"),
    ("/content/good1.mp4", "no crime"),
    ("/content/crime5.mp4", "crime"),
    ("/content/good2.mp4", "no crime"),
    ("/content/good3.mp4", "no crime"),
]

In [None]:
import torch.optim as optim

class EmbeddingClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Linear(input_dim, num_classes)

    def forward(self, x):
        return self.fc(x)

embedding_classifier = EmbeddingClassifier(input_dim=512, num_classes=2).to(device)
embedding_classifier = embedding_classifier.float()

def train_embedding_classifier(model, feature_extractor, dataset, criterion, optimizer, device, mean, std, epochs=10):
    model.train()
    feature_extractor.eval()
    for epoch in range(epochs):
        running_loss = 0.0
        for video_path, label in dataset:

            emb_seq = video_to_embeddings(video_path, feature_extractor, mean, std)
            if emb_seq is None:
                print(f"Skipping {video_path}: No clips found.")
                continue

            target = torch.tensor([1 if label == "crime" else 0], dtype=torch.long).to(device)

            clip_embeddings = torch.tensor(emb_seq, dtype=torch.float32).to(device)
            logits = model(clip_embeddings)

            avg_logits = torch.mean(logits, dim=0).unsqueeze(0)

            loss = criterion(avg_logits, target)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(dataset):.4f}")

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(embedding_classifier.parameters(), lr=0.001)

train_embedding_classifier(embedding_classifier, r3d, training_dataset, criterion, optimizer, device, mean, std, epochs=50)

torch.save(embedding_classifier.state_dict(), 'embedding_classifier.pth')

Epoch 1/50, Loss: 0.9745
Epoch 2/50, Loss: 0.5871
Epoch 3/50, Loss: 0.4393
Epoch 4/50, Loss: 0.3849


## Classifier training

### Subtask:
Train the R3D classifier on the prepared dataset.

**Reasoning**:
Define the training function for the R3D classifier.

In [None]:
import torch.optim as optim
import torch.nn as nn
from torchvision.models.video import r3d_18, R3D_18_Weights
import os

class EmbeddingClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Linear(input_dim, num_classes)

    def forward(self, x):
        return self.fc(x)

embedding_classifier = EmbeddingClassifier(input_dim=512, num_classes=2).to(device)
embedding_classifier = embedding_classifier.float()
def preextract_embeddings(dataset, feature_extractor, device, mean, std, output_dir="preextracted_embeddings"):
    os.makedirs(output_dir, exist_ok=True)
    feature_extractor.eval()
    for video_path, label in dataset:
        video_name = os.path.basename(video_path).split('.')[0]
        output_path = os.path.join(output_dir, f"{video_name}_embeddings.pt")
        if os.path.exists(output_path):
            print(f"Embeddings for {video_name} already exist. Skipping extraction.")
            continue

        print(f"Extracting embeddings for {video_path}...")
        emb_seq = video_to_embeddings(video_path, feature_extractor, mean, std)
        if emb_seq is not None:

            torch.save({'embeddings': emb_seq, 'label': label}, output_path)
            print(f"Saved embeddings for {video_name}")
        else:
            print(f"Could not extract embeddings for {video_path}")


preextract_embeddings(training_dataset, r3d, device, mean, std)

def train_embedding_classifier_from_preextracted(model, preextracted_dir, criterion, optimizer, device, epochs=10):
    model.train()

    preextracted_files = [os.path.join(preextracted_dir, f) for f in os.listdir(preextracted_dir) if f.endswith(".pt")]
    print(f"Found {len(preextracted_files)} pre-extracted embedding files.")

    for epoch in range(epochs):
        running_loss = 0.0
        for file_path in preextracted_files:

            data = torch.load(file_path, weights_only=False)
            emb_seq = data['embeddings']
            label = data['label']

            target = torch.tensor([1 if label == "crime" else 0], dtype=torch.long).to(device)

            clip_embeddings = torch.tensor(emb_seq, dtype=torch.float32).to(device)
            logits = model(clip_embeddings)
            avg_logits = torch.mean(logits, dim=0).unsqueeze(0)
            loss = criterion(avg_logits, target)


            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {running_loss/len(preextracted_files):.4f}")


criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(embedding_classifier.parameters(), lr=0.001)

train_embedding_classifier_from_preextracted(embedding_classifier, "preextracted_embeddings", criterion, optimizer, device, epochs=20) # Reduced epochs

torch.save(embedding_classifier.state_dict(), 'embedding_classifier.pth')

Embeddings for crime4 already exist. Skipping extraction.
Embeddings for crime3 already exist. Skipping extraction.
Embeddings for good1 already exist. Skipping extraction.
Embeddings for crime5 already exist. Skipping extraction.
Embeddings for good2 already exist. Skipping extraction.
Embeddings for good3 already exist. Skipping extraction.
Found 6 pre-extracted embedding files.
Epoch 1/20, Loss: 0.7808
Epoch 2/20, Loss: 0.4020
Epoch 3/20, Loss: 0.3517
Epoch 4/20, Loss: 0.3213
Epoch 5/20, Loss: 0.2554
Epoch 6/20, Loss: 0.2019
Epoch 7/20, Loss: 0.1628
Epoch 8/20, Loss: 0.1355
Epoch 9/20, Loss: 0.1178
Epoch 10/20, Loss: 0.1048
Epoch 11/20, Loss: 0.0936
Epoch 12/20, Loss: 0.0836
Epoch 13/20, Loss: 0.0750
Epoch 14/20, Loss: 0.0677
Epoch 15/20, Loss: 0.0617
Epoch 16/20, Loss: 0.0566
Epoch 17/20, Loss: 0.0523
Epoch 18/20, Loss: 0.0485
Epoch 19/20, Loss: 0.0452
Epoch 20/20, Loss: 0.0422
