<a href="https://colab.research.google.com/github/mohitkr6483/AI-Powered-Road-Accident-Detection-Project/blob/main/Road_Accident_Detection_Using_YOLO_v8%2C_Optical_Flow_%26_ResNet_18.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [11]:
# ============================================================
# Road ACCIDENTS DETECTION In Pre-recorded Videos
# ============================================================
#
# BASE PRETRAINED MODELS USED:
#
# 1) YOLOv8n (Ultralytics)
#    - Pretrained on COCO dataset
#    - Used for real-time object detection (cars, bikes, people, trucks, poles)
#
# 2) ResNet-18 (TorchVision)
#    - Pretrained on ImageNet
#    - Fine-tuned on Kaggle accident image dataset
#    - Used as image-level accident classifier (optional)
#
# ============================================================

!pip install -q opencv-python-headless ultralytics yt-dlp pandas numpy torch torchvision scikit-learn

import cv2, os, uuid, subprocess, gc, sys
import numpy as np, pandas as pd, torch
import torch.nn as nn
from torchvision import models, datasets, transforms
from ultralytics import YOLO
from google.colab import files, drive
from datetime import datetime
from IPython.display import display, HTML

# ============================================================
# CONFIGURABLE PARAMETERS
# ============================================================
GLOBAL_STD_FACTOR = 2.0
FRAME_STRIDE = 2
MERGE_GAP_SECONDS = 4.0

DRIVE_ROOT = "/content/drive/MyDrive/accident_detection_cache"
KAGGLE_DATASET_DIR = f"{DRIVE_ROOT}/kaggle_dataset"
MODEL_PATH = f"{DRIVE_ROOT}/image_accident_classifier.pth"

# ============================================================
# SETUP
# ============================================================
drive.mount("/content/drive")
os.makedirs(DRIVE_ROOT, exist_ok=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

# ============================================================
# YOLO MODEL (ALWAYS USED)
# ============================================================
yolo = YOLO("yolov8n.pt")

# ============================================================
# IMAGE MODEL SELECTION (ALWAYS ASK)
# ============================================================
print("\n================ IMAGE MODEL OPTION ================")
print("1 → Use SAVED pretrained image model (Kaggle-trained)")
print("2 → TRAIN / RETRAIN image model using Kaggle dataset")
#print("3 → SKIP image model completely")
print("====================================================")

while True:
    img_choice = input("Enter choice (1 / 2 ): ").strip()
    if img_choice in ("1", "2", "3"):
        break
    print("Invalid input. Please choose 1, 2, or 3.\n")

use_image_model = False
img_model = None

# ============================================================
# IMAGE MODEL SETUP
# ============================================================
if img_choice != "3":
    img_model = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
    img_model.fc = nn.Linear(img_model.fc.in_features, 1)
    img_model = img_model.to(device)

    img_tf = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize(
            [0.485, 0.456, 0.406],
            [0.229, 0.224, 0.225]
        )
    ])

    # --------------------------------------------------------
    # OPTION 1: USE SAVED MODEL
    # --------------------------------------------------------
    if img_choice == "1":
        if not os.path.exists(MODEL_PATH):
            print("\n Saved model not found.")
            print("Please train the model at least once.\n")
            sys.exit(1)

        img_model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
        img_model.eval()
        use_image_model = True

        print("\n Loaded saved pretrained image model.")
        print(f" Model path: {MODEL_PATH}\n")

    # --------------------------------------------------------
    # OPTION 2: TRAIN / RETRAIN (ASK EPOCHS)
    # --------------------------------------------------------
    elif img_choice == "2":
        while True:
            try:
                epochs = int(input("Enter number of training epochs: ").strip())
                if epochs > 0:
                    break
                print("Epochs must be a positive integer.")
            except ValueError:
                print("Please enter a valid integer.")

        dataset = datasets.ImageFolder(KAGGLE_DATASET_DIR, transform=img_tf)

        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        train_ds, _ = torch.utils.data.random_split(
            dataset, [train_size, val_size]
        )

        train_dl = torch.utils.data.DataLoader(
            train_ds, batch_size=32, shuffle=True
        )

        loss_fn = nn.BCEWithLogitsLoss()
        opt = torch.optim.Adam(img_model.parameters(), lr=1e-4)

        print("\n Training image classifier...\n")
        for ep in range(epochs):
            img_model.train()
            for x, y in train_dl:
                x = x.to(device)
                y = y.float().unsqueeze(1).to(device)

                loss = loss_fn(img_model(x), y)
                opt.zero_grad()
                loss.backward()
                opt.step()

            print(f"Epoch {ep+1}/{epochs} completed")

        torch.save(img_model.state_dict(), MODEL_PATH)
        img_model.eval()
        use_image_model = True

        print("\n Training complete.")
        print(f" Model saved at: {MODEL_PATH}\n")

else:
    print("\n Image model skipped. Using YOLO + motion only.\n")

# ============================================================
# HELPER FUNCTIONS
# ============================================================
def entropy(gray):
    h = cv2.calcHist([gray],[0],None,[64],[0,256])
    h /= h.sum()
    return -np.sum(h * np.log2(h + 1e-6))

def optical_flow(prev, curr):
    flow = cv2.calcOpticalFlowFarneback(
        prev, curr, None, 0.5, 2, 9, 2, 5, 1.1, 0
    )
    mag,_ = cv2.cartToPolar(flow[...,0], flow[...,1])
    return mag.mean()

def format_mm_ss(sec):
    sec = int(round(sec))
    return f"{sec//60:02d}:{sec%60:02d}"

def infer_accident_description(labels):
    s = set(labels)
    if "car" in s and "motorcycle" in s: return "Car hit bike"
    if "car" in s and "person" in s: return "Car hit pedestrian"
    if "truck" in s and "car" in s: return "Truck hit car"
    if "car" in s and "truck" in s: return "Car hit truck"
    if "car" in s: return "Car crashed"
    if "truck" in s: return "Truck crashed"
    return "Traffic accident detected"

# ============================================================
# EVENT DETECTION (NO DUPLICATES)
# ============================================================
def detect_events(frame_scores, fps):
    frames = np.array([f for f,_,_ in frame_scores])
    scores = np.array([s for _,s,_ in frame_scores])

    threshold = scores.mean() + GLOBAL_STD_FACTOR * scores.std()

    events, current, last = [], None, None
    for f, s in zip(frames, scores):
        if s > threshold:
            if current is None:
                current = {"start": f, "end": f}
            elif (f - last) / fps <= MERGE_GAP_SECONDS:
                current["end"] = f
            else:
                events.append(current)
                current = {"start": f, "end": f}
            last = f
        else:
            if current and (f - last) / fps > MERGE_GAP_SECONDS:
                events.append(current)
                current = None

    if current:
        events.append(current)

    return list({(e["start"], e["end"]): e for e in events}.values())

# ============================================================
# VIDEO ANALYSIS
# ============================================================
def analyze(video):
    cap = cv2.VideoCapture(video)
    total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS) or 30

    ret, prev = cap.read()
    if not ret:
        return pd.DataFrame()

    prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
    frame_scores, frame_labels = [], {}
    idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        idx += 1
        print(f"\rAnalyzing frame {idx}/{total}", end="", flush=True)

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        flow = optical_flow(prev_gray, gray)
        ent = entropy(gray)

        res = yolo(frame, conf=0.25, verbose=False)[0]
        labels = [yolo.model.names[int(c)] for c in res.boxes.cls] if res.boxes else []

        score = 3.0 * flow + 2.2 * ent + len(labels)
        frame_scores.append((idx, score, labels))
        frame_labels[idx] = labels

        prev_gray = gray

    cap.release()
    print("\nAnalysis completed.")

    events = detect_events(frame_scores, fps)

    if not events:
        return pd.DataFrame([{
            "Event_ID": "-",
            #"Accident_Description": "No accident detected",
            "Start_Time": "-",
            "End_Time": "-",
            "Start_Frame": "-",
            "End_Frame": "-"
        }])

    rows = []
    for i, e in enumerate(events, 1):
        sf, ef = e["start"], e["end"]
        labels = []
        for f in range(sf, ef + 1):
            labels.extend(frame_labels.get(f, []))

        rows.append({
            "Event_ID": i,
            #"Accident_Description": infer_accident_description(labels),
            "Start_Time": format_mm_ss(sf / fps),
            "End_Time": format_mm_ss(ef / fps),
            "Start_Frame": sf,
            "End_Frame": ef
        })

    return pd.DataFrame(rows)

# ============================================================
# MAIN MENU
# ============================================================
while True:
    print("\n1 → Upload video")
    print("2 → Enter video URL")
    print("3 → Exit")

    choice = input("Choose option (1/2/3): ").strip()
    if choice not in ("1","2","3"):
        print("Invalid input.")
        continue
    if choice == "3":
        print("Exiting.")
        break

    if choice == "1":
        up = files.upload()
        video = "/content/" + next(iter(up))
    else:
        url = input("Enter video URL: ").strip()
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        sid = uuid.uuid4().hex[:6]
        out = f"/content/video_{ts}_{sid}.%(ext)s"
        subprocess.run(["yt-dlp","-f","best","-o",out,url])
        video = [f"/content/{f}" for f in os.listdir("/content")
                 if f.startswith(f"video_{ts}_{sid}")][0]

    df = analyze(video)

    print("\n===== ACCIDENT ANALYSIS RESULT =====\n")
    display(HTML(df.to_html(index=False)))

    torch.cuda.empty_cache()
    gc.collect()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Device: cpu

1 → Use SAVED pretrained image model (Kaggle-trained)
2 → TRAIN / RETRAIN image model using Kaggle dataset
Enter choice (1 / 2 ): 1

 Loaded saved pretrained image model.
 Model path: /content/drive/MyDrive/accident_detection_cache/image_accident_classifier.pth


1 → Upload video
2 → Enter video URL
3 → Exit
Choose option (1/2/3): 2
Enter video URL: https://www.youtube.com/shorts/Lp1UrlcAw2A
Analyzing frame 150/151
Analysis completed.

===== ACCIDENT ANALYSIS RESULT =====



Event_ID,Start_Time,End_Time,Start_Frame,End_Frame
1,00:02,00:03,72,76



1 → Upload video
2 → Enter video URL
3 → Exit
Choose option (1/2/3): 2
Enter video URL: https://www.youtube.com/shorts/SlP5KfKzTpw
Analyzing frame 475/476
Analysis completed.

===== ACCIDENT ANALYSIS RESULT =====



Event_ID,Start_Time,End_Time,Start_Frame,End_Frame
1,00:11,00:15,323,439



1 → Upload video
2 → Enter video URL
3 → Exit
Choose option (1/2/3): 2
Enter video URL: https://www.youtube.com/watch?v=YF_DzoTDO-0
Analyzing frame 887/888
Analysis completed.

===== ACCIDENT ANALYSIS RESULT =====



Event_ID,Start_Time,End_Time,Start_Frame,End_Frame
1,00:01,00:02,28,55
2,00:25,00:26,759,771



1 → Upload video
2 → Enter video URL
3 → Exit
Choose option (1/2/3): 3
Exiting.
