In [1]:
import os

import torch
from torch import nn
from torch.utils.data import DataLoader

import torchvision
from torchvision import datasets, transforms

from pathlib import Path
from tqdm.auto import tqdm

In [2]:
# Setting up device agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device is running on {device}")

Device is running on cuda


# **Data setup**

In [None]:
# Downloading dataset
!kaggle datasets download ckay16/accident-detection-from-cctv-footage

# Unzip dataset
!unzip accident-detection-from-cctv-footage

In [3]:
train_dir = Path("data/train")
test_dir = Path("data/test")

In [4]:
data_transform = transforms.Compose([
    transforms.Resize(size=(224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor()
])

In [5]:
train_data = datasets.ImageFolder(root=train_dir,
                                  transform=data_transform,
                                  target_transform=None)

test_data = datasets.ImageFolder(root=test_dir,
                                 transform=data_transform)

In [6]:
class_names = train_data.classes

In [7]:
# Hyper-parameters
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

train_dataloader = DataLoader(dataset=train_data,
                             batch_size=BATCH_SIZE,
                             num_workers=NUM_WORKERS,
                             shuffle=True)

test_dataloader = DataLoader(dataset=test_data,
                            batch_size=BATCH_SIZE,
                            num_workers=NUM_WORKERS,
                            shuffle=False)

# **Model building**

In [8]:
# Transfer Learning
weights = torchvision.models.EfficientNet_B0_Weights.DEFAULT
model = torchvision.models.efficientnet_b0(weights=weights).to(device)

In [9]:
for params in model.features.parameters():
    params.require_grad = False

In [10]:
model.classifier = nn.Sequential(
    nn.Dropout(p=0.2, inplace=True),
    nn.Linear(in_features=1280, out_features=len(class_names), bias=True)
).to(device)

In [11]:
# Loading pre-trained model
model_path = Path("models/accident_detection_model.pth")

model.load_state_dict(torch.load(f=model_path, weights_only=True))

<All keys matched successfully>

# **Inference**

In [12]:
import cv2
from PIL import Image
import gradio as gr

In [13]:
def predict(video_path: "str"):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error: Could not open video file.")
        exit()

    while True:
        ret, frame = cap.read()
    
        if not ret:
                print("End of video.")
                break
    
        frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        image_transform = transforms.Compose([
            transforms.Resize(size=(224, 224)),
            transforms.ToTensor()
        ])
    
        frame = image_transform(frame).unsqueeze(dim=0).to(device)
        model.eval()
        with torch.inference_mode():
            y_pred = model(frame).to(device)
            y_pred_label = torch.argmax(y_pred, dim=1)

    cap.release()
    cv2.destroyAllWindows()
    
    return class_names[y_pred_label]

In [14]:
# To view the demonstration, you may input a video by running the cell below.
demo = gr.Interface(predict,
                   inputs=gr.Video(),
                   outputs=gr.Label())

demo.launch()

* Running on local URL:  http://127.0.0.1:7860

To create a public link, set `share=True` in `launch()`.




End of video.
End of video.


# **Training and Testing**

In [37]:
# Setup loss func and optimizer
loss_func = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(),
                           lr=0.01)

In [48]:
def train_step(model: nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_func: nn.Module,
              optimizer: torch.optim.Optimizer):
    model.train()

    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        
        y_pred = model(X)
        loss = loss_func(y_pred, y)
        train_loss += loss.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)


    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    
    return train_loss, train_acc




def test_step(model: nn.Module,
             dataloader: torch.utils.data.DataLoader,
             loss_func: nn.Module):
    model.eval()

    test_loss, test_acc = 0, 0

    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            test_pred = model(X)
            loss = loss_func(test_pred, y)
            test_loss += loss.item()

            test_pred_labels = test_pred.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    
    return test_loss, test_acc



def train(epochs: int,
         model: nn.Module,
         train_dataloader: torch.utils.data.DataLoader,
         test_dataloader: torch.utils.data.DataLoader,
         loss_func: nn.Module,
         optimizer: torch.optim.Optimizer):
    
    results = {
        "train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                          dataloader=train_dataloader,
                                          loss_func=loss_func,
                                          optimizer=optimizer)

        test_loss, test_acc = test_step(model=model,
                                       dataloader=test_dataloader,
                                       loss_func=loss_func)

        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        results["train_loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
        results["train_acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
        results["test_loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
        results["test_acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

    return results


In [None]:
torch.cuda.manual_seed(42)
model_results = train(epochs=5,
                     model=model,
                     train_dataloader=train_dataloader,
                     test_dataloader=test_dataloader,
                     loss_func=loss_func,
                     optimizer=optimizer)

# **Saving model**

In [51]:
MODEL_PATH = Path("models")
MODEL_NAME = "accident_detection_model.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
torch.save(obj=model.state_dict(), # only saving the state_dict() only saves the learned parameters
           f=MODEL_SAVE_PATH)