# System Versions

In [None]:
import torch
import torchvision

print(torch.__version__)
print(torchvision.__version__)
print(torch.cuda.is_available())


In [None]:
import tensorflow as tf

print("Num GPUs Available:", len(tf.config.list_physical_devices('GPU')))

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    print("GPU Details:", gpus)
    print("TensorFlow is using the GPU")
else:
    print("No GPU detected")


# Data Augmentation

In [None]:
import albumentations as A
import cv2
import os
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm

In [None]:
# Augmentations use paraller processing,,,

transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    A.Rotate(limit=30, p=0.5),
    A.GaussianBlur(blur_limit=3, p=0.2),
], bbox_params=A.BboxParams(format="yolo", label_fields=["class_labels"]))


In [None]:
def load_yolo_labels(label_path):
    if not os.path.exists(label_path):
        return []
    with open(label_path, "r") as f:
        return [list(map(float, line.strip().split())) for line in f.readlines()]

In [None]:
def save_yolo_labels(label_path, labels):
    with open(label_path, "w") as f:
        for label in labels:
            f.write(" ".join(map(str, label)) + "\n")

In [None]:
def augment_image(image_path, label_path, output_dir):
    image = cv2.imread(image_path)
    if image is None:
        return


In [None]:
def augment_image(image_path, label_path, output_dir):
    image = cv2.imread(image_path)
    if image is None:
        return

    h, w = image.shape[:2]
    labels = load_yolo_labels(label_path)
    
    if not labels:
        return

    bboxes = [[x, y, bw, bh] for _, x, y, bw, bh in labels]
    class_labels = [label[0] for label in labels]

    for i in range(3):  # 3 augmentations per image
        augmented = transform(image=image, bboxes=bboxes, class_labels=class_labels)
        aug_img = augmented["image"]
        aug_bboxes = augmented["bboxes"]

        new_labels = [[cls] + list(bbox) for bbox, cls in zip(aug_bboxes, class_labels)]

        output_image_path = os.path.join(output_dir, "images", f"aug_{i}_{os.path.basename(image_path)}")
        output_label_path = os.path.join(output_dir, "labels", f"aug_{i}_{os.path.basename(label_path)}")
        
        cv2.imwrite(output_image_path, aug_img)
        save_yolo_labels(output_label_path, new_labels)


In [None]:
def process_image(image_file):
    image_path = os.path.join(r"F:\camera\accident detection\dataset\train\images", image_file)
    label_path = os.path.join(r"F:\camera\accident detection\dataset\train\labels", image_file.replace(".jpg", ".txt").replace(".png", ".txt"))
    augment_image(image_path, label_path, r"F:\camera\accident detection\delete")


In [None]:
if __name__ == "__main__":
    os.makedirs(r"F:\camera\accident detection\delete\images", exist_ok=True)
    os.makedirs(r"F:\camera\accident detection\delete\labels", exist_ok=True)

    image_files = [f for f in os.listdir(r"F:\camera\accident detection\dataset\train\images") if f.endswith((".jpg", ".png"))]

    with ThreadPoolExecutor() as executor:
        list(tqdm(executor.map(process_image, image_files), total=len(image_files)))

    print("Fast Albumentations augmentation completed!")


# Model Training

# Phase I

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO('yolov8s.pt')

In [None]:
data_yaml = r'F:\camera\accident detection\dataset\data.yaml'

In [None]:
model.train(data=data_yaml, epochs=10, batch=16, augment=False, imgsz=640)

# Phase II (*10 - 20 Epochs*)

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO(r"F:\camera\accident detection\runs\detect\train30\weights\best.pt")

In [None]:
model.train(data=data_yaml, epochs=10, batch=16, augment=False, imgsz=640)

# Phase III (*20 - 30 Epochs*)

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO(r"F:\camera\accident detection\runs\detect\train30\weights\best.pt")

In [None]:
model.train(data=data_yaml, epochs=10, batch=16, augment=False, imgsz=640)

# Phase IV (*30 - 40 Epochs*)

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO(r"F:\camera\accident detection\runs\detect\train30\weights\best.pt")

In [None]:
model.train(data=data_yaml, epochs=10, batch=16, augment=False, imgsz=640)

# Phase V (*40 - 50 Epochs*)

In [None]:
from ultralytics import YOLO

In [None]:
model = YOLO(r"F:\camera\accident detection\runs\detect\train30\weights\best.pt")

In [None]:
model.train(data=data_yaml, epochs=10, batch=16, augment=False, imgsz=640)

# Demo

In [None]:
import cv2
from ultralytics import YOLO

model = YOLO(r"F:\camera\accident detection\runs\detect\train30\weights\best.pt")

class_names = {
    0: "Accident",
    1: "NoAccident",
    2: "Car",
    3: "Mild",
    4: "Moderate",
    5: "Motor Cycle",
    6: "Severe"
}

video_path = "demo_accident.mp4"
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video file.")
    exit()

display_width = 800  # Adjust as needed

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    height, width = frame.shape[:2]
    aspect_ratio = width / height
    new_height = int(display_width / aspect_ratio)
    frame = cv2.resize(frame, (display_width, new_height))

    results = model(frame)

    for result in results:
        for box in result.boxes:
            x1, y1, x2, y2 = map(int, box.xyxy[0])  # Get bounding box
            conf = float(box.conf[0])  # Confidence score
            cls = int(box.cls[0])  # Class ID
            label = f"{class_names.get(cls, 'Unknown')} ({conf:.2f})"

            color = (0, 0, 255) if cls == 0 else (0, 255, 0) if cls == 1 else (255, 0, 0)

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, label, (x1, y1 - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    cv2.imshow("Accident Detection", frame)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
import cv2
video_path = "demo_accident.mp4"
cap = cv2.VideoCapture(video_path)

if not cap.isOpened():
    print("Error: Could not open video file.")
    exit()
