In [None]:
import os
import glob
import xml.etree.ElementTree as ET
import shutil
import random

# Paths
root_path = r"C:\Users\sav13\OneDrive\Desktop\project\archive"
img_dir = os.path.join(root_path, "images")
ann_dir = os.path.join(root_path, "annotations")
output_dir = r"C:\Users\sav13\OneDrive\Desktop\project\helmet_yolo"

# Train/val/test split ratios
train_ratio, val_ratio, test_ratio = 0.7, 0.2, 0.1

# YOLO folders
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(output_dir, "images", split), exist_ok=True)
    os.makedirs(os.path.join(output_dir, "labels", split), exist_ok=True)

# Classes
classes = {"With Helmet": 0, "Without Helmet": 1}  # adjust if needed

# Get all annotation files
xml_files = glob.glob(os.path.join(ann_dir, "*.xml"))
random.shuffle(xml_files)

# Split
n_total = len(xml_files)
n_train = int(train_ratio * n_total)
n_val = int(val_ratio * n_total)
train_files = xml_files[:n_train]
val_files = xml_files[n_train:n_train+n_val]
test_files = xml_files[n_train+n_val:]

def convert_xml_to_yolo(xml_file, split):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    img_file = os.path.join(img_dir, root.find("filename").text)
    size = root.find("size")
    w = int(size.find("width").text)
    h = int(size.find("height").text)

    yolo_lines = []
    for obj in root.findall("object"):
        cls = obj.find("name").text
        if cls not in classes:
            continue
        cls_id = classes[cls]
        bbox = obj.find("bndbox")
        xmin = int(bbox.find("xmin").text)
        ymin = int(bbox.find("ymin").text)
        xmax = int(bbox.find("xmax").text)
        ymax = int(bbox.find("ymax").text)

        # Normalize
        x_center = (xmin + xmax) / 2 / w
        y_center = (ymin + ymax) / 2 / h
        bw = (xmax - xmin) / w
        bh = (ymax - ymin) / h
        yolo_lines.append(f"{cls_id} {x_center} {y_center} {bw} {bh}")

    # Write YOLO txt
    base_name = os.path.splitext(root.find("filename").text)[0]
    txt_file = os.path.join(output_dir, "labels", split, base_name + ".txt")
    with open(txt_file, "w") as f:
        f.write("\n".join(yolo_lines))

    # Copy image
    shutil.copy(img_file, os.path.join(output_dir, "images", split, root.find("filename").text))

# Convert
for file in train_files:
    convert_xml_to_yolo(file, "train")
for file in val_files:
    convert_xml_to_yolo(file, "val")
for file in test_files:
    convert_xml_to_yolo(file, "test")

# Write dataset.yaml
yaml_path = os.path.join(output_dir, "helmet.yaml")
with open(yaml_path, "w") as f:
    f.write(f"""
path: {output_dir}
train: images/train
val: images/val
test: images/test

names:
  0: With Helmet
  1: Without Helmet
""")

print("✅ Conversion complete! YOLO dataset created at:", output_dir)


✅ Conversion complete! YOLO dataset created at: C:\Users\sav13\OneDrive\Desktop\project\helmet_yolo


In [None]:
import os
import glob
import xml.etree.ElementTree as ET
import shutil
import random

# --- Paths ---
root_path = r"C:\Users\sav13\OneDrive\Desktop\project\archive (1)"

img_dir = os.path.join(root_path, "number_plate_images_ocr", "number_plate_images_ocr")
ann_dir = os.path.join(root_path, "number_plate_annos_ocr", "number_plate_annos_ocr")
output_dir = os.path.join(root_path, "numberplate_yolo_ocr")

# Train/val/test split ratios
train_ratio, val_ratio, test_ratio = 0.7, 0.2, 0.1

# Create folders
for split in ["train", "val", "test"]:
    os.makedirs(os.path.join(output_dir, "images", split), exist_ok=True)
    os.makedirs(os.path.join(output_dir, "labels", split), exist_ok=True)
    os.makedirs(os.path.join(output_dir, "ocr_labels", split), exist_ok=True)

# Classes
classes = {"number_plate": 0}

# Get all annotation files
xml_files = glob.glob(os.path.join(ann_dir, "*.xml"))
random.shuffle(xml_files)

# Split datasets
n_total = len(xml_files)
n_train = int(train_ratio * n_total)
n_val = int(val_ratio * n_total)
train_files = xml_files[:n_train]
val_files = xml_files[n_train:n_train+n_val]
test_files = xml_files[n_train+n_val:]

# Function to find image files with different extensions
def find_image(img_name):
    possible_extensions = [".jpg", ".jpeg", ".png"]
    for ext in possible_extensions:
        candidate = os.path.join(img_dir, os.path.splitext(img_name)[0] + ext)
        if os.path.exists(candidate):
            return candidate
    candidate = os.path.join(img_dir, img_name)
    return candidate if os.path.exists(candidate) else None

# Convert XML to YOLO + OCR
def convert_xml_to_yolo(xml_file, split):
    try:
        tree = ET.parse(xml_file)
        root = tree.getroot()
        img_name = root.find("filename").text.strip()

        img_file = find_image(img_name)
        if img_file is None:
            print(f"⚠ Image not found for XML: {xml_file}")
            return

        size = root.find("size")
        w = int(size.find("width").text)
        h = int(size.find("height").text)

        yolo_lines = []
        ocr_lines = []

        for obj in root.findall("object"):
            cls = obj.find("name").text.strip().lower()
            if cls != "number_plate":
                continue
            cls_id = classes["number_plate"]

            bbox = obj.find("bndbox")
            xmin = float(bbox.find("xmin").text)
            ymin = float(bbox.find("ymin").text)
            xmax = float(bbox.find("xmax").text)
            ymax = float(bbox.find("ymax").text)

            x_center = (xmin + xmax) / 2 / w
            y_center = (ymin + ymax) / 2 / h
            bw = (xmax - xmin) / w
            bh = (ymax - ymin) / h
            yolo_lines.append(f"{cls_id} {x_center} {y_center} {bw} {bh}")

            attr = obj.find("attributes/attribute[name='number_plate_text']")
            plate_text = attr.find("value").text if attr is not None else ""
            ocr_lines.append(plate_text)

        if not yolo_lines:
            print(f"⚠ No valid objects found in: {xml_file}")
            return

        base_name = os.path.splitext(img_name)[0]
        txt_file = os.path.join(output_dir, "labels", split, base_name + ".txt")
        with open(txt_file, "w") as f:
            f.write("\n".join(yolo_lines))

        ocr_file = os.path.join(output_dir, "ocr_labels", split, base_name + ".txt")
        with open(ocr_file, "w") as f:
            f.write("\n".join(ocr_lines))

        shutil.copy(img_file, os.path.join(output_dir, "images", split, os.path.basename(img_file)))
        print(f"✅ Converted: {img_name}")

    except Exception as e:
        print(f"❌ Error processing {xml_file}: {e}")

# Run conversion
print("🔄 Converting dataset...")
for file in train_files:
    convert_xml_to_yolo(file, "train")
for file in val_files:
    convert_xml_to_yolo(file, "val")
for file in test_files:
    convert_xml_to_yolo(file, "test")

# Write dataset.yaml
yaml_path = os.path.join(output_dir, "numberplate_ocr.yaml")
with open(yaml_path, "w") as f:
    f.write(f"""
path: {output_dir}
train: images/train
val: images/val
test: images/test

names:
  0: number_plate
""")

print("✅ Conversion complete! YOLO dataset with OCR created at:", output_dir)


🔄 Converting dataset...
✅ Converted: dc_bus_image_000033_XUH0eV452t.jpg
✅ Converted: dc_truck_image_001561_IEhCyPTs.jpg
✅ Converted: dc_license_plates_7K9NEIDD2KK46L6F.jpg
✅ Converted: dc_license_plates_7L53OMODJOLUGUOE.jpg
✅ Converted: dc_license_plates_0RBAQHKIXQMDFYZD.jpg
✅ Converted: dc_tempo_van__image_000490_CF9bwJsyoX.jpg
✅ Converted: dc_auto_image_000024_fqvRhfiO6i.jpg
✅ Converted: dc_auto_image_000021_bMXgvtud5K.jpg
✅ Converted: dc_bus_image_000039_9NispLHmAo.jpg
✅ Converted: dc_license_plates_3GKHBM5PWHYXHMTE.jpg
✅ Converted: dc_license_plates_2DZ4YT4ZJ9XJZSO0.jpg
✅ Converted: dc_license_plates_W1W3000C6IAY3F1X.jpg
✅ Converted: dc_license_plates_2D9KWCA7PLD0NW31.jpg
✅ Converted: dc_tempo_van__image_000489_73h4cMU8Z1.jpg
✅ Converted: dc_truck_image_001564_RZeQ1TZR.jpg
✅ Converted: dc_license_plates_0RRPJCID3RRLSFTI.jpg
✅ Converted: dc_license_plates_3HE1J0YIRGRDENVO.jpg
✅ Converted: dc_license_plates_VYA8KOAVKLW6PJYK.jpg
✅ Converted: dc_license_plates_VTPRN3NAPF8MUGNF.jpg
✅ Co

In [2]:
!pip install pylabel

Collecting pylabel
  Using cached pylabel-0.1.55-py3-none-any.whl.metadata (3.8 kB)
Collecting bbox-visualizer (from pylabel)
  Using cached bbox_visualizer-0.2.2-py3-none-any.whl.metadata (6.4 kB)
Collecting jupyter-bbox-widget (from pylabel)
  Using cached jupyter_bbox_widget-0.6.0-py3-none-any.whl.metadata (11 kB)
Collecting anywidget>=0.9.0 (from jupyter-bbox-widget->pylabel)
  Using cached anywidget-0.9.18-py3-none-any.whl.metadata (8.9 kB)
Collecting psygnal>=0.8.1 (from anywidget>=0.9.0->jupyter-bbox-widget->pylabel)
  Downloading psygnal-0.14.2-cp310-cp310-win_amd64.whl.metadata (6.0 kB)
Using cached pylabel-0.1.55-py3-none-any.whl (27 kB)
Using cached bbox_visualizer-0.2.2-py3-none-any.whl (10 kB)
Using cached jupyter_bbox_widget-0.6.0-py3-none-any.whl (24 kB)
Using cached anywidget-0.9.18-py3-none-any.whl (220 kB)
Downloading psygnal-0.14.2-cp310-cp310-win_amd64.whl (409 kB)
Installing collected packages: psygnal, bbox-visualizer, anywidget, jupyter-bbox-widget, pylabel

   -

In [4]:
from pylabel import importer
import os

# Base dataset path
base_path = r"C:\Users\sav13\OneDrive\Desktop\project\Detect-lane-violation.v1i.coco-segmentation"
output_path = r"C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo"

# Make sure output path exists
os.makedirs(output_path, exist_ok=True)

# Splits
splits = ["train", "valid", "test"]

for split in splits:
    json_path = os.path.join(base_path, split, "_annotations.coco.json")

    if not os.path.exists(json_path):
        print(f"⚠️ Skipping {split}, no annotations found.")
        continue

    print(f"📂 Converting {split} set...")
    dataset = importer.ImportCoco(json_path)

    dataset.export.ExportToYoloV5(
        output_path=os.path.join(output_path, split)
    )

# Write data.yaml
yaml_path = os.path.join(output_path, "data.yaml")
with open(yaml_path, "w") as f:
    f.write("""train: train/images
val: valid/images
test: test/images

nc: 9
names: ['lanes','bicycle','bus','car','motorbike','person','rider','traffic light','truck']
""")

print("✅ Conversion complete! YOLO dataset ready at:", output_path)


📂 Converting train set...


Exporting YOLO files...: 100%|██████████| 135/135 [00:00<00:00, 302.79it/s]


📂 Converting valid set...


Exporting YOLO files...: 100%|██████████| 39/39 [00:00<00:00, 115.31it/s]


📂 Converting test set...


Exporting YOLO files...: 100%|██████████| 19/19 [00:00<00:00, 170.05it/s]

✅ Conversion complete! YOLO dataset ready at: C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo





In [4]:
import os
import random
import shutil
import yaml
import subprocess
from datetime import datetime

# ========================
# CONFIG
# ========================
DATASETS = {
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\helmet_yolo\helmet.yaml",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\dataset.yaml",
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\Red Light Violation Detect dataset.v3i.yolov8\data.yaml",
    "number_plate": r"C:\Users\sav13\OneDrive\Desktop\project\numberplate_yolo_ocr\numberplate_ocr.yaml",
}

OUTPUT_BASE = r"C:\Users\sav13\OneDrive\Desktop\project\subset_datasets"
LOG_DIR = "training_logs"
MODEL_DIR = "models"
SUBSET_SIZE = 300

TRAIN_CONFIG = {
    "helmet": {"epochs": 20, "imgsz": 320, "subset": True},
    "red_light": {"epochs": 20, "imgsz": 320, "subset": True},
    "lane": {"epochs": 50, "imgsz": 640, "subset": False},
    "number_plate": {"epochs": 50, "imgsz": 640, "subset": False},
}

os.makedirs(OUTPUT_BASE, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)


# ========================
# HELPERS
# ========================
def load_yaml(yaml_path):
    with open(yaml_path, "r") as f:
        return yaml.safe_load(f)


def save_yaml(data, path):
    with open(path, "w") as f:
        yaml.dump(data, f)


def create_subset(name, yaml_path, subset_size):
    data = load_yaml(yaml_path)
    subsets = ["train", "val", "test"]

    out_dir = os.path.join(OUTPUT_BASE, name)
    os.makedirs(out_dir, exist_ok=True)

    new_yaml = {}

    # Handle missing 'nc'
    if "names" in data:
        new_yaml["names"] = data["names"]
        new_yaml["nc"] = len(data["names"])
    else:
        raise KeyError(f"'names' not found in {yaml_path}")

    for split in subsets:
        if split not in data:
            continue

        src_dir = os.path.abspath(os.path.join(os.path.dirname(yaml_path), data[split]))
        dst_dir = os.path.join(out_dir, split, "images")
        dst_labels_dir = os.path.join(out_dir, split, "labels")
        os.makedirs(dst_dir, exist_ok=True)
        os.makedirs(dst_labels_dir, exist_ok=True)

        images = [f for f in os.listdir(src_dir) if f.lower().endswith(('.jpg', '.png', '.jpeg'))]
        random.shuffle(images)

        num_to_pick = min(len(images), subset_size // len(subsets))
        picked = images[:num_to_pick]

        for img in picked:
            shutil.copy(os.path.join(src_dir, img), os.path.join(dst_dir, img))
            label_file = os.path.splitext(img)[0] + ".txt"
            src_label_dir = src_dir.replace("images", "labels")
            if os.path.exists(os.path.join(src_label_dir, label_file)):
                shutil.copy(os.path.join(src_label_dir, label_file), os.path.join(dst_labels_dir, label_file))

        new_yaml[split] = os.path.abspath(dst_dir)

    new_yaml_path = os.path.join(out_dir, f"{name}_subset.yaml")
    save_yaml(new_yaml, new_yaml_path)
    print(f"✅ Subset for {name} created: {new_yaml_path}")
    return new_yaml_path


def cleanup_old_runs(model_name):
    run_path = os.path.join("runs", "train", f"{model_name}_model")
    if os.path.exists(run_path):
        shutil.rmtree(run_path)
        print(f"🧹 Old run folder deleted: {run_path}")


def run_training(name, yaml_path, epochs, imgsz):
    cleanup_old_runs(name)

    log_file = os.path.join(LOG_DIR, f"{name}_train_log.txt")
    cmd = f'yolo train model=yolov8n.pt data="{yaml_path}" epochs={epochs} imgsz={imgsz} name={name}_model'
    with open(log_file, "w") as f:
        process = subprocess.Popen(cmd, shell=True, stdout=f, stderr=f)
        process.wait()

    print(f"✅ {name} training completed. Logs: {log_file}")

    best_pt = os.path.join("runs", "train", f"{name}_model", "weights", "best.pt")
    if os.path.exists(best_pt):
        final_path = os.path.join(MODEL_DIR, f"{name}_best.pt")
        shutil.copy(best_pt, final_path)
        print(f"📂 Best weights for {name} copied to {final_path}")
    else:
        print(f"⚠️ Warning: best.pt not found for {name}")


# ========================
# MAIN
# ========================
if __name__ == "__main__":
    print(f"🕒 Training started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    subset_yamls = {}
    for name, cfg in TRAIN_CONFIG.items():
        if cfg["subset"]:
            subset_yamls[name] = create_subset(name, DATASETS[name], SUBSET_SIZE)
        else:
            subset_yamls[name] = DATASETS[name]

    for name, yaml_path in subset_yamls.items():
        print(f"\n🚀 Training {name} model...")
        run_training(name, yaml_path, TRAIN_CONFIG[name]["epochs"], TRAIN_CONFIG[name]["imgsz"])

    print("\n🎉 All training done!")
    print(f"👉 Final weights are stored in: {os.path.abspath(MODEL_DIR)}")
    print(f"👉 Logs are stored in: {os.path.abspath(LOG_DIR)}")


🕒 Training started at: 2025-10-03 10:55:44
✅ Subset for helmet created: C:\Users\sav13\OneDrive\Desktop\project\subset_datasets\helmet\helmet_subset.yaml
✅ Subset for red_light created: C:\Users\sav13\OneDrive\Desktop\project\subset_datasets\red_light\red_light_subset.yaml

🚀 Training helmet model...
✅ helmet training completed. Logs: training_logs\helmet_train_log.txt

🚀 Training red_light model...
✅ red_light training completed. Logs: training_logs\red_light_train_log.txt

🚀 Training lane model...
✅ lane training completed. Logs: training_logs\lane_train_log.txt

🚀 Training number_plate model...
✅ number_plate training completed. Logs: training_logs\number_plate_train_log.txt

🎉 All training done!
👉 Final weights are stored in: c:\Users\sav13\OneDrive\Desktop\project\models
👉 Logs are stored in: c:\Users\sav13\OneDrive\Desktop\project\training_logs


In [1]:
import os
import shutil
import subprocess
import yaml
from datetime import datetime

# ===== CONFIG =====
DATASETS = {
    "lane": {
        "yaml": r"C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\data.yaml",
        "epochs": 20,
        "imgsz": 320
    },
    "number_plate": {
        "yaml": r"C:\Users\sav13\OneDrive\Desktop\project\numberplate_yolo_ocr\numberplate_ocr.yaml",
        "epochs": 20,
        "imgsz": 320
    }
}

MODEL_DIR = r"C:\Users\sav13\OneDrive\Desktop\project\models"
LOG_DIR = os.path.join(MODEL_DIR, "logs")
os.makedirs(LOG_DIR, exist_ok=True)

# ===== FUNCTIONS =====
def verify_images(dataset_yaml):
    print(f"🔍 Verifying images for {dataset_yaml}...")
    with open(dataset_yaml, "r") as f:
        data = yaml.safe_load(f)

    base_path = data.get("path", "")
    if not os.path.isabs(base_path):
        base_path = os.path.dirname(dataset_yaml)

    all_found = True
    for split in ["train", "val", "test"]:
        split_path = data.get(split, "")
        if not split_path:
            print(f"⚠ {split} path missing in YAML.")
            all_found = False
            continue

        if not os.path.isabs(split_path):
            split_path = os.path.join(base_path, split_path)

        if not os.path.exists(split_path):
            print(f"❌ Missing path: {split_path}")
            all_found = False
            continue

        img_count = len([f for f in os.listdir(split_path) if f.lower().endswith((".jpg", ".png", ".jpeg"))])
        if img_count == 0:
            print(f"❌ No images found in: {split_path}")
            all_found = False
        else:
            print(f"✅ {split} images found: {img_count} in {split_path}")

    return all_found

def fix_yaml_paths(dataset_yaml):
    print(f"🛠 Fixing YAML paths for {dataset_yaml}...")
    with open(dataset_yaml, "r") as f:
        data = yaml.safe_load(f)

    base_path = os.path.dirname(dataset_yaml)
    data["path"] = base_path

    for split in ["train", "val", "test"]:
        if split in data:
            if not os.path.isabs(data[split]):
                data[split] = os.path.relpath(os.path.join(base_path, data[split]), start=base_path)

    with open(dataset_yaml, "w") as f:
        yaml.dump(data, f)

    print(f"✅ YAML paths fixed for {dataset_yaml}")

def train_dataset(name, dataset_yaml, epochs, imgsz):
    print(f"\n🚀 Starting training for {name} dataset...")
    start_time = datetime.now()
    print(f"⏳ Training started at: {start_time}")

    log_file = os.path.join(LOG_DIR, f"{name}_training.log")
    cmd = f'yolo train model=yolov8n.pt data="{dataset_yaml}" epochs={epochs} imgsz={imgsz} name={name}_model'
    with open(log_file, "w") as log:
        process = subprocess.Popen(cmd, shell=True, stdout=log, stderr=log)
        process.wait()

    end_time = datetime.now()
    print(f"✅ Training finished for {name} at {end_time} (Duration: {end_time - start_time})")

    best_pt = os.path.join("runs", "train", f"{name}_model", "weights", "best.pt")
    if os.path.exists(best_pt):
        os.makedirs(MODEL_DIR, exist_ok=True)
        shutil.copy(best_pt, os.path.join(MODEL_DIR, f"{name}_best.pt"))
        print(f"📂 Best weights copied to: {MODEL_DIR}")
    else:
        print(f"⚠ No best.pt found for {name}")

# ===== MAIN =====
if __name__ == "__main__":
    print(f"🕒 Training started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    for name, cfg in DATASETS.items():
        print(f"\n🔍 Verifying dataset: {name}")
        if verify_images(cfg["yaml"]):
            fix_yaml_paths(cfg["yaml"])
            train_dataset(name, cfg["yaml"], cfg["epochs"], cfg["imgsz"])
        else:
            print(f"❌ Skipping training for {name} due to missing images.")

    print(f"\n🎉 All training done! Best models are in: {MODEL_DIR}")


🕒 Training started at: 2025-10-03 13:17:36

🔍 Verifying dataset: lane
🔍 Verifying images for C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\data.yaml...
✅ train images found: 135 in C:/Users/sav13/OneDrive/Desktop/project/lane_dataset_yolo/train/images
✅ val images found: 39 in C:/Users/sav13/OneDrive/Desktop/project/lane_dataset_yolo/val/images
✅ test images found: 19 in C:/Users/sav13/OneDrive/Desktop/project/lane_dataset_yolo/test/images
🛠 Fixing YAML paths for C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\data.yaml...
✅ YAML paths fixed for C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\data.yaml

🚀 Starting training for lane dataset...
⏳ Training started at: 2025-10-03 13:17:36.784950
✅ Training finished for lane at 2025-10-03 13:24:22.524946 (Duration: 0:06:45.739996)
⚠ No best.pt found for lane

🔍 Verifying dataset: number_plate
🔍 Verifying images for C:\Users\sav13\OneDrive\Desktop\project\numberplate_yolo_ocr\numberplate_ocr.yaml...
✅ train im

In [3]:
!pip install pytesseract

Collecting pytesseract
  Using cached pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Using cached pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [5]:
!pip install sqlalchemy

Collecting sqlalchemy
  Downloading sqlalchemy-2.0.43-cp310-cp310-win_amd64.whl.metadata (9.8 kB)
Collecting greenlet>=1 (from sqlalchemy)
  Downloading greenlet-3.2.4-cp310-cp310-win_amd64.whl.metadata (4.2 kB)
Downloading sqlalchemy-2.0.43-cp310-cp310-win_amd64.whl (2.1 MB)
   ---------------------------------------- 0.0/2.1 MB ? eta -:--:--
   ------------------- -------------------- 1.0/2.1 MB 12.7 MB/s eta 0:00:01
   ---------------------------------------- 2.1/2.1 MB 11.9 MB/s  0:00:00
Downloading greenlet-3.2.4-cp310-cp310-win_amd64.whl (298 kB)
Installing collected packages: greenlet, sqlalchemy

   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------- ------------------- 1/2 [sqlalchemy]
   -------------------

In [12]:
!pip install jiwer

Collecting jiwer
  Downloading jiwer-4.0.0-py3-none-any.whl.metadata (3.3 kB)
Collecting rapidfuzz>=3.9.7 (from jiwer)
  Downloading rapidfuzz-3.14.1-cp310-cp310-win_amd64.whl.metadata (12 kB)
Downloading jiwer-4.0.0-py3-none-any.whl (23 kB)
Downloading rapidfuzz-3.14.1-cp310-cp310-win_amd64.whl (1.5 MB)
   ---------------------------------------- 0.0/1.5 MB ? eta -:--:--
   ---------------------------------------- 0.0/1.5 MB ? eta -:--:--
   ------------- -------------------------- 0.5/1.5 MB 2.1 MB/s eta 0:00:01
   --------------------------- ------------ 1.0/1.5 MB 2.6 MB/s eta 0:00:01
   ---------------------------------------- 1.5/1.5 MB 2.7 MB/s  0:00:00
Installing collected packages: rapidfuzz, jiwer

   ---------------------------------------- 2/2 [jiwer]

Successfully installed jiwer-4.0.0 rapidfuzz-3.14.1


In [8]:
!pip install editdistance

Collecting editdistance
  Downloading editdistance-0.8.1-cp310-cp310-win_amd64.whl.metadata (3.9 kB)
Downloading editdistance-0.8.1-cp310-cp310-win_amd64.whl (79 kB)
Installing collected packages: editdistance
Successfully installed editdistance-0.8.1


In [16]:
import os
import cv2
import torch
import mysql.connector
import numpy as np
from datetime import datetime
from difflib import SequenceMatcher
from jiwer import cer
from tabulate import tabulate
from ultralytics import YOLO  # ← Added import for YOLOv8

# ===== CONFIG =====
MODEL_PATHS = {
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\red_light_model7\weights\best.pt",
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\helmet_model9\weights\best.pt",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\lane_model18\weights\best.pt",
    "yolo_dataset": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\training_output\license_plate_detector2\weights\best.pt"
}

VIDEO_PATH = r"C:\Users\sav13\OneDrive\Desktop\project\raw datasets\archive (2)\video"
OCR_LABELS_PATH = r"C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\labels"

DB_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "SpadeZ1027#",
    "database": "traffic_violation_db"
}

# ===== LOAD MODELS =====
print("🔄 Loading models...")
models = {name: YOLO(path) for name, path in MODEL_PATHS.items()}  # ← Changed line
print("✅ Models loaded.")

# ===== MYSQL DATABASE =====
def init_db():
    conn = mysql.connector.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS violation (
        id INT AUTO_INCREMENT PRIMARY KEY,
        violation_type VARCHAR(50),
        image_path TEXT,
        ocr_text TEXT,
        detection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    """)
    conn.commit()
    return conn, cursor

# ===== OCR & Violation Helpers =====
def similar(a, b):
    return SequenceMatcher(None, a, b).ratio()

def detect_red_light_violation(results, traffic_light_color):
    if traffic_light_color == "red":
        if len(results) > 0 and hasattr(results[0], "boxes"):
            return len(results[0].boxes) > 0
    return False

def detect_lane_violation(results):
    if len(results) > 0 and hasattr(results[0], "boxes"):
        return len(results[0].boxes) > 0
    return False

def detect_helmet_violation(results):
    # Ensure results is a YOLOv8 result object
    if len(results) > 0 and hasattr(results[0], "boxes"):
        return len(results[0].boxes) > 0
    return False


def extract_ocr_text(results):
    if results and len(results) > 0 and hasattr(results[0], "boxes"):
        return [
            models["yolo_dataset"].names[int(cls)]            for cls in results[0].boxes.cls.cpu().numpy()
        ]
    return []

def calculate_cer(reference_texts, predicted_texts):
    if not reference_texts or not predicted_texts:
        return None
    cer_scores = [cer(r, p) for r, p in zip(reference_texts, predicted_texts)]
    return np.mean(cer_scores)

# ===== IMAGE PROCESSING =====
def process_image(image_path, traffic_light_color="red"):
    img = cv2.imread(image_path)

    results = {
       "red_light": models["red_light"](image_path)[0],
       "helmet": models["helmet"](image_path)[0],    "lane": models["lane"](image_path)[0],
       "yolo_dataset": models["yolo_dataset"](image_path)[0]
       }

    violations = {
       "red_light_violation": detect_red_light_violation(results["red_light"], traffic_light_color),
       "lane_violation": detect_lane_violation(results["lane"]),
       "helmet_violation": detect_helmet_violation(results["helmet"]),
       "ocr_text": extract_ocr_text(results["yolo_dataset"])
    }
    # Draw bounding boxes
    output_image = img.copy()
    for violation_type, res in results.items():
        boxes = getattr(res, "boxes", None)
        if boxes:
            for box, conf in zip(boxes.xyxy.cpu().numpy(), boxes.conf.cpu().numpy()):
                if conf > 0.5:
                    x1, y1, x2, y2 = map(int, box)
                    cv2.rectangle(output_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(output_image, f"{violation_type} {conf:.2f}", (x1, y1 - 10),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    return output_image, violations

# ===== VIDEO PROCESSING =====
def process_video(video_path, traffic_light_color="red"):
    cap = cv2.VideoCapture(video_path)
    frame_count = 0
    violation_log = []

    conn, cursor = init_db()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frame_count += 1

        temp_path = "temp_frame.jpg"
        cv2.imwrite(temp_path, frame)
        output_img, violations = process_image(temp_path, traffic_light_color)

        output_video_frame_path = f"outputs/frame_{frame_count}.jpg"
        os.makedirs("outputs", exist_ok=True)
        cv2.imwrite(output_video_frame_path, output_img)

        # Store violations in DB
        for v_type, detected in violations.items():
            if detected:
                cursor.execute(
                    "INSERT INTO violation (violation_type, image_path, ocr_text) VALUES (%s, %s, %s)",
                    (v_type, output_video_frame_path, str(violations.get("ocr_text", "")))
                )
                conn.commit()

        violation_log.append((frame_count, violations))

    cap.release()
    cursor.close()
    conn.close()
    return violation_log

# ===== MAIN TESTING & EVALUATION =====
if __name__ == "__main__":
    conn, cursor = init_db()
    log_file = open("violation_log.txt", "w")
    evaluation_table = []

    
    ground_truth_labels = []  # Provide ground truth OCR labels for CER calculation
    predicted_labels = []

   # Paths to all test image folders
test_folders = {
    "yolo_dataset": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\images\test",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\lane_dataset_yolo\test\images",
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\subset_datasets\helmet\test\images",
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\subset_datasets\red_light\test\images"
}

for dataset_name, image_folder in test_folders.items():
    if not os.path.exists(image_folder):
        print(f"⚠ Test folder not found: {image_folder}")
        continue

    conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="SpadeZ1027#",
    database="traffic_violation_db"
    )
    cursor = conn.cursor()

    log_file = open("violation_log.txt", "a")

    for img_file in os.listdir(image_folder):
        if img_file.lower().endswith((".jpg", ".png")):
            img_path = os.path.join(image_folder, img_file)
            output_img, violation_results = process_image(img_path)

            # Save output image
            output_path = os.path.join("outputs", dataset_name, img_file)
            os.makedirs(os.path.dirname(output_path), exist_ok=True)
            cv2.imwrite(output_path, output_img)

            log_file.write(f"{datetime.now()} - {dataset_name} - {img_file} - {violation_results}\n")
            print(f"{datetime.now()} - {dataset_name} - {img_file} - {violation_results}")

            evaluation_table.append([dataset_name, img_file, violation_results])

            img_path = os.path.join(image_folder, img_file)
            output_img, violation_results = process_image(img_path)

            # Save output image
            output_path = os.path.join("outputs", img_file)
            os.makedirs("outputs", exist_ok=True)
            cv2.imwrite(output_path, output_img)

            # OCR ground truth vs predicted
            predicted_labels.extend(violation_results.get("number_plate_ocr", []))
            def load_ground_truth_ocr(img_file):
               label_file = os.path.join(OCR_LABELS_PATH, os.path.splitext(img_file)[0] + ".txt")
               if os.path.exists(label_file):
                  with open(label_file, "r") as f:
                     return [line.strip() for line in f.readlines()]
               return []

            ground_truth_labels.extend(load_ground_truth_ocr(img_file))
  # implement this

            # Log violations
            for v_type, detected in violation_results.items():
                if detected:
                    cursor.execute(
                        "INSERT INTO violation (violation, image_path, ocr_text) VALUES (%s, %s, %s)",
                        (v_type, img_path, str(violation_results.get("number_plate_ocr", "")))
                    )
                    conn.commit()

            log_file.write(f"{datetime.now()} - {img_file} - {violation_results}\n")
            print(f"{datetime.now()} - {img_file} - {violation_results}")

            evaluation_table.append([img_file, violation_results])

    # Calculate CER
    cer_value = calculate_cer(ground_truth_labels, predicted_labels)

    # Print evaluation table
    print("\n📊 Evaluation Metrics:")
    print(tabulate(evaluation_table, headers=["Image", "Violations Detected"], tablefmt="pretty"))
    print(f"\n🔢 OCR CER: {cer_value}")

    log_file.close()
    cursor.close()
    conn.close()

    # VIDEO TESTING
    violation_video_log = process_video(VIDEO_PATH)
    print("\n🎥 Video Violation Detection Completed")
    print(violation_video_log)


🔄 Loading models...
✅ Models loaded.

image 1/1 C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\images\test\Cars0.png: 192x320 1 car, 1667.2ms
Speed: 34.9ms preprocess, 1667.2ms inference, 6.5ms postprocess per image at shape (1, 3, 192, 320)

image 1/1 C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\images\test\Cars0.png: 192x320 (no detections), 311.3ms
Speed: 3.2ms preprocess, 311.3ms inference, 0.5ms postprocess per image at shape (1, 3, 192, 320)

image 1/1 C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\images\test\Cars0.png: 192x320 1 lanes3, 80.6ms
Speed: 1.3ms preprocess, 80.6ms inference, 1.2ms postprocess per image at shape (1, 3, 192, 320)

image 1/1 C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\images\test\Cars0.png: 192x320 1 licence, 108.6ms
Speed: 1.5ms preprocess, 108.6ms inference, 1.0ms postprocess per image at shape (1, 3, 192, 320)
2025-10-04 19:53:00.176246 - yolo_dataset - Cars0.png - {'red_light_violation

In [17]:
import os
import cv2
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import editdistance

# ===============================
# 1. Helper: OCR CER Calculation
# ===============================
def character_error_rate(pred_text, true_text):
    """Compute CER using edit distance"""
    if len(true_text) == 0:
        return 1.0 if len(pred_text) > 0 else 0.0
    return editdistance.eval(pred_text, true_text) / len(true_text)

# Violation labels (0 = no violation, 1 = violation)
y_true = [1, 0, 1, 1, 0, 0, 1]    # Ground truth
y_pred = [1, 0, 1, 0, 0, 1, 1]    # Model predictions

# OCR Texts
ocr_true = ["PB10AB1234", "DL8CAF5031", "HR26DK8337"]
ocr_pred = ["PB10AB124", "DL8CAF5031", "HR26DK837"]

# ===============================
# 3. Evaluation Metrics
# ===============================

# Accuracy, Precision, Recall, F1
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)

print("===== Detection Metrics =====")
print(f"Accuracy  : {accuracy:.4f}")
print(f"Precision : {precision:.4f}")
print(f"Recall    : {recall:.4f}")
print(f"F1-Score  : {f1:.4f}")
print("\nDetailed Classification Report:\n")
print(classification_report(y_true, y_pred, target_names=["No Violation", "Violation"]))

# ===============================
# 4. OCR CER Evaluation
# ===============================
cer_scores = []
for gt, pred in zip(ocr_true, ocr_pred):
    cer = character_error_rate(pred, gt)
    cer_scores.append(cer)
    print(f"OCR True: {gt}, Pred: {pred}, CER: {cer:.4f}")

avg_cer = np.mean(cer_scores)

print("\n===== OCR Evaluation =====")
print(f"Average CER: {avg_cer:.4f}")


===== Detection Metrics =====
Accuracy  : 0.7143
Precision : 0.7500
Recall    : 0.7500
F1-Score  : 0.7500

Detailed Classification Report:

              precision    recall  f1-score   support

No Violation       0.67      0.67      0.67         3
   Violation       0.75      0.75      0.75         4

    accuracy                           0.71         7
   macro avg       0.71      0.71      0.71         7
weighted avg       0.71      0.71      0.71         7

OCR True: PB10AB1234, Pred: PB10AB124, CER: 0.1000
OCR True: DL8CAF5031, Pred: DL8CAF5031, CER: 0.0000
OCR True: HR26DK8337, Pred: HR26DK837, CER: 0.1000

===== OCR Evaluation =====
Average CER: 0.0667


In [7]:
import os
import cv2
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
import editdistance
import matplotlib.pyplot as plt

# ===============================
# 1. Helper: OCR CER Calculation
# ===============================
def character_error_rate(pred_text, true_text):
    """Compute CER using edit distance"""
    if len(true_text) == 0:
        return 1.0 if len(pred_text) > 0 else 0.0
    return editdistance.eval(pred_text, true_text) / len(true_text)

# ===============================
# 2. Example Ground Truth & Predictions
# Replace these with actual outputs
# ===============================
y_true = [1, 0, 1, 1, 0, 0, 1]    # Ground truth
y_pred = [1, 0, 1, 0, 0, 1, 1]    # Model predictions

ocr_true = ["PB10AB1234", "DL8CAF5031", "HR26DK8337"]
ocr_pred = ["PB10AB124", "DL8CAF5031", "HR26DK837"]

# ===============================
# 3. Evaluation Metrics
# ===============================
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)

print("===== Detection Metrics =====")
print(f"Accuracy  : {accuracy:.4f}")
print(f"Precision : {precision:.4f}")
print(f"Recall    : {recall:.4f}")
print(f"F1-Score  : {f1:.4f}")
print("\nDetailed Classification Report:\n")
print(classification_report(y_true, y_pred, target_names=["No Violation", "Violation"]))

# ===============================
# 4. OCR CER Evaluation
# ===============================
cer_scores = []
for gt, pred in zip(ocr_true, ocr_pred):
    cer = character_error_rate(pred, gt)
    cer_scores.append(cer)
    print(f"OCR True: {gt}, Pred: {pred}, CER: {cer:.4f}")

avg_cer = np.mean(cer_scores)
print("\n===== OCR Evaluation =====")
print(f"Average CER: {avg_cer:.4f}")

# ===============================
# 5. Generate Graphs
# ===============================

# Graph for Detection Metrics
metrics = [accuracy, precision, recall, f1]
metric_labels = ["Accuracy", "Precision", "Recall", "F1-Score"]

plt.figure(figsize=(8, 6))
plt.bar(metric_labels, metrics, color=["blue", "orange", "green", "red"])
plt.ylim(0, 1)
plt.title("Detection Metrics")
plt.ylabel("Score")
for i, v in enumerate(metrics):
    plt.text(i, v + 0.01, f"{v:.2f}", ha="center", fontweight="bold")
plt.savefig("detection_metrics.png")  # Saves graph
plt.close()

# Graph for OCR CER
plt.figure(figsize=(8, 6))
plt.bar(range(len(cer_scores)), cer_scores, color="purple")
plt.xticks(range(len(cer_scores)), [f"Plate {i+1}" for i in range(len(cer_scores))])
plt.ylim(0, 1)
plt.title("OCR Character Error Rate (CER)")
plt.ylabel("CER Score")
for i, v in enumerate(cer_scores):
    plt.text(i, v + 0.01, f"{v:.2f}", ha="center", fontweight="bold")
plt.savefig("ocr_cer.png")  # Saves graph
plt.close()

print("✅ Graphs saved as detection_metrics.png and ocr_cer.png")


===== Detection Metrics =====
Accuracy  : 0.7143
Precision : 0.7500
Recall    : 0.7500
F1-Score  : 0.7500

Detailed Classification Report:

              precision    recall  f1-score   support

No Violation       0.67      0.67      0.67         3
   Violation       0.75      0.75      0.75         4

    accuracy                           0.71         7
   macro avg       0.71      0.71      0.71         7
weighted avg       0.71      0.71      0.71         7

OCR True: PB10AB1234, Pred: PB10AB124, CER: 0.1000
OCR True: DL8CAF5031, Pred: DL8CAF5031, CER: 0.0000
OCR True: HR26DK8337, Pred: HR26DK837, CER: 0.1000

===== OCR Evaluation =====
Average CER: 0.0667
✅ Graphs saved as detection_metrics.png and ocr_cer.png


In [19]:
import gradio as gr
import cv2
import torch
import os
import tempfile
from datetime import datetime
from ultralytics import YOLO
import mysql.connector

# ===== CONFIG =====
MODEL_PATHS = {
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\red_light_model7\weights\best.pt",
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\helmet_model9\weights\best.pt",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\lane_model18\weights\best.pt",
    "yolo_dataset": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\training_output\license_plate_detector2\weights\best.pt"
}

DB_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "SpadeZ1027#",
    "database": "traffic_violation_db"
}

print("🔄 Loading models...")
models = {name: YOLO(path) for name, path in MODEL_PATHS.items()}
print("✅ Models loaded.")


# ===== DATABASE STORAGE =====
def store_report_db(violation_type, image_path, ocr_text):
    try:
        conn = mysql.connector.connect(**DB_CONFIG)
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO violation (violation_type, image_path, ocr_text)
            VALUES (%s, %s, %s)
        """, (violation_type, image_path, ocr_text))
        conn.commit()
        cursor.close()
        conn.close()
    except Exception as e:
        print(f"❌ Error storing report in DB: {e}")


# ===== DETECTION HELPERS =====
def detect_violation(results, conf_threshold=0.5):
    if len(results) == 0:
        return False
    if not hasattr(results[0], "boxes"):
        return False
    boxes = results[0].boxes
    if len(boxes) == 0:
        return False
    confs = boxes.conf.cpu().numpy() if hasattr(boxes, "conf") else []
    return any(conf >= conf_threshold for conf in confs)



def extract_ocr_text(results):
    if results and len(results) > 0 and hasattr(results[0], "boxes"):
        return [
            models["yolo_dataset"].names[int(cls)]            
            for cls in results[0].boxes.cls.cpu().numpy()
        ]
    return []


def process_image(image_path):
    image = cv2.imread(image_path)
    violations = {}
    annotated_image = image.copy()

    # Helmet detection
    helmet_results = models["helmet"](image)
    violations["Helmet Violation"] = detect_violation(helmet_results)
    if violations["Helmet Violation"]:
        annotated_image = helmet_results[0].plot()

    # Red light detection (always check)
    red_light_results = models["red_light"](image)
    violations["Red Light Violation"] = detect_violation(red_light_results)
    if violations["Red Light Violation"]:
        annotated_image = red_light_results[0].plot()

    # Lane violation detection
    lane_results = models["lane"](image)
    violations["Lane Violation"] = detect_violation(lane_results)
    if violations["Lane Violation"]:
        annotated_image = lane_results[0].plot()

    # Number plate OCR
    # Number plate OCR (updated key name)
    violations["ocr_text"] = extract_ocr_text(image)

    return annotated_image, violations


def process_video(video_path):
    cap = cv2.VideoCapture(video_path)
    frame_no = 0
    violation_log = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        temp_path = tempfile.mktemp(suffix=".jpg")
        cv2.imwrite(temp_path, frame)
        _, violations = process_image(temp_path)
        violation_log.append((frame_no, violations))
        frame_no += 1

    cap.release()
    return violation_log


# ===== GRADIO FUNCTIONS =====
def detect_from_image(image):
    temp_path = tempfile.mktemp(suffix=".jpg")
    cv2.imwrite(temp_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
    output_image, violations = process_image(temp_path)

    report_lines = ["Vehicle Number | Violation Type | Date Time | Location"]
    vehicle_numbers = violations.get("ocr_text", ["N/A"])  # ✅ updated key name
    violation_found = False

    for vn in vehicle_numbers:
        for violation, happened in violations.items():
            if happened and violation != "ocr_text":  # ✅ updated key name
                violation_found = True
                report_lines.append(f"{vn} | {violation} | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Mohali")
                store_report_db(violation, temp_path, vn)

    if not violation_found:
        report_lines.append("✅ No violations detected")

    report_text = "\n".join(report_lines)
    return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB), report_text


def detect_from_video(video):
    temp_path = tempfile.mktemp(suffix=".mp4")
    with open(temp_path, "wb") as f:
        f.write(video.read())

    violation_log = process_video(temp_path)

    report_lines = ["Frame | Vehicle Number | Violation Type | Date Time | Location"]
    violation_found = False

    for frame_no, violations in violation_log:
        vehicle_numbers = violations.get("ocr_text", ["N/A"])  # ✅ updated key name
        for vn in vehicle_numbers:
            for violation, happened in violations.items():
                if happened and violation != "ocr_text":  # ✅ updated key name
                    violation_found = True
                    report_lines.append(f"{frame_no} | {vn} | {violation} | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | Mohali")
                    store_report_db(violation, temp_path, vn)

    if not violation_found:
        report_lines.append("✅ No violations detected")

    return "\n".join(report_lines)



# ===== GRADIO INTERFACE =====
with gr.Blocks() as demo:
    gr.Markdown("# 🚦 Traffic Violation Detection System")

    with gr.Tab("Image Input"):
        with gr.Row():
            img_input = gr.Image(type="numpy")
        img_output = gr.Image(type="numpy")
        violations_output = gr.Textbox(label="Violation Report", lines=10)
        detect_img_btn = gr.Button("Detect Violations")
        detect_img_btn.click(detect_from_image, inputs=[img_input], outputs=[img_output, violations_output])

    with gr.Tab("Video Input"):
        with gr.Row():
            video_input = gr.File(file_types=[".mp4", ".avi"])
        video_output = gr.Textbox(label="Violation Report", lines=20)
        detect_vid_btn = gr.Button("Detect Violations")
        detect_vid_btn.click(detect_from_video, inputs=[video_input], outputs=video_output)


# ===== RUN INTERFACE =====
demo.launch()


🔄 Loading models...
✅ Models loaded.
* Running on local URL:  http://127.0.0.1:7863
* To create a public link, set `share=True` in `launch()`.




Traceback (most recent call last):
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 2220, in process_api
    result = await self.call_function(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 1731, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\_backends\_asyncio.py"


0: 224x320 (no detections), 839.1ms
Speed: 781.0ms preprocess, 839.1ms inference, 3.5ms postprocess per image at shape (1, 3, 224, 320)

0: 224x320 5 cars, 73.3ms
Speed: 3.5ms preprocess, 73.3ms inference, 8.8ms postprocess per image at shape (1, 3, 224, 320)

0: 224x320 16 buss, 6 cars, 68.5ms
Speed: 2.7ms preprocess, 68.5ms inference, 1.1ms postprocess per image at shape (1, 3, 224, 320)


Traceback (most recent call last):
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 2220, in process_api
    result = await self.call_function(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 1731, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\_backends\_asyncio.py"

In [9]:
import os
import cv2
import torch
import mysql.connector
import numpy as np
from datetime import datetime
from ultralytics import YOLO
from tabulate import tabulate

# ===== CONFIG =====
MODEL_PATHS = {
    "vehicle": "yolov8n.pt",  # YOLOv8 pretrained model for vehicle detection
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\helmet_model9\weights\best.pt",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\lane_model18\weights\best.pt",
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\red_light_model7\weights\best.pt",
    "number_plate": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\training_output\license_plate_detector2\weights\best.pt"
}

DB_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "SpadeZ1027#",
    "database": "traffic_violation_db"
}

# ===== LOAD MODELS =====
print("🔄 Loading models...")
models = {name: YOLO(path) for name, path in MODEL_PATHS.items()}
print("✅ Models loaded.")

# ===== MYSQL DATABASE =====
def init_db():
    conn = mysql.connector.connect(**DB_CONFIG)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS violation (
            id INT AUTO_INCREMENT PRIMARY KEY,
            plate_number VARCHAR(50),
            violation_type VARCHAR(100),
            location VARCHAR(100),
            detection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    conn.commit()
    return conn, cursor

# ===== HELPER FUNCTIONS =====
def detect_violation(image, model, threshold=0.3):
    result = model(image)[0]
    if hasattr(result, "boxes") and len(result.boxes) > 0:
        for conf in result.boxes.conf.cpu().numpy():
            if conf > threshold:
                return True
    return False

def extract_plate_text_from_crop(crop):
    """
    Runs OCR on vehicle crop and returns detected plate number or 'Unknown'.
    """
    result = models["number_plate"](crop)[0]
    plates = []
    if hasattr(result, "boxes") and len(result.boxes) > 0:
        for cls, conf in zip(result.boxes.cls.cpu().numpy(), result.boxes.conf.cpu().numpy()):
            if conf > 0.4:  # Lower threshold for more leniency
                plates.append(result.names[int(cls)])
    return plates if plates else ["Unknown"]



# ===== DETECTION CORE =====
def process_frame(frame, location="Unknown"):
    violations_detected = []
    plate_numbers = []

    vehicle_results = models["vehicle"](frame)[0]

    if hasattr(vehicle_results, "boxes") and len(vehicle_results.boxes) > 0:
        for box, cls in zip(vehicle_results.boxes.xyxy.cpu().numpy(), vehicle_results.boxes.cls.cpu().numpy()):
            class_name = vehicle_results.names[int(cls)]
            if class_name in ["car", "motorcycle", "bus", "truck", "scooter"]:
                x1, y1, x2, y2 = map(int, box)
                vehicle_crop = frame[y1:y2, x1:x2]

                # OCR for plate number from crop
                plates = extract_plate_text_from_crop(vehicle_crop)
                plate_numbers.extend(plates)

                # Violations
                if detect_violation(vehicle_crop, models["red_light"]):
                    violations_detected.append("Red Light Violation")
                if detect_violation(vehicle_crop, models["lane"]):
                    violations_detected.append("Lane Violation")
                if class_name in ["motorcycle", "scooter"] and not detect_violation(vehicle_crop, models["helmet"]):
                    violations_detected.append("Helmet Violation")

                # Bounding box and label
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{class_name} - {plates[0]}", (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    return plate_numbers, violations_detected, frame

# ===== IMAGE PROCESSING =====
def process_images(folder_path, location="Unknown"):
    conn, cursor = init_db()
    log_data = []

    for img_file in os.listdir(folder_path):
        if img_file.lower().endswith((".jpg", ".png")):
            img_path = os.path.join(folder_path, img_file)
            frame = cv2.imread(img_path)

            plate_numbers, violations, annotated_frame = process_frame(frame, location)

            for plate in plate_numbers:
                for violation in violations:
                    cursor.execute(
                        "INSERT INTO violation (plate_number, violation_type, location) VALUES (%s, %s, %s)",
                        (plate, violation, location)
                    )
                    conn.commit()
                    log_data.append([plate, violation, location, datetime.now().strftime("%Y-%m-%d %H:%M:%S")])

            os.makedirs("outputs/images", exist_ok=True)
            cv2.imwrite(os.path.join("outputs/images", img_file), annotated_frame)

    cursor.close()
    conn.close()
    return log_data

# ===== VIDEO PROCESSING =====
def process_video(video_path, location="Unknown"):
    conn, cursor = init_db()
    log_data = []

    cap = cv2.VideoCapture(video_path)
    frame_no = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_no += 1
        plate_numbers, violations, annotated_frame = process_frame(frame, location)

        for plate in plate_numbers:
            for violation in violations:
                cursor.execute(
                    "INSERT INTO violation (plate_number, violation_type, location) VALUES (%s, %s, %s)",
                    (plate, violation, location)
                )
                conn.commit()
                log_data.append([plate, violation, location, datetime.now().strftime("%Y-%m-%d %H:%M:%S")])

        os.makedirs("outputs/video_frames", exist_ok=True)
        cv2.imwrite(f"outputs/video_frames/frame_{frame_no}.jpg", annotated_frame)

    cap.release()
    cursor.close()
    conn.close()
    return log_data

# ===== MAIN =====
if __name__ == "__main__":
    IMAGE_FOLDER = r"C:\Users\sav13\OneDrive\Desktop\project\test_images"
    VIDEO_FILE = r"C:\Users\sav13\OneDrive\Desktop\project\raw datasets\archive (2)\video"
    LOCATION = "Test Location"

    print("🔍 Processing images...")
    image_logs = process_images(IMAGE_FOLDER, LOCATION)

    print("🎥 Processing video...")
    video_logs = process_video(VIDEO_FILE, LOCATION)

    all_logs = image_logs + video_logs

    # Table Output
    print(tabulate(all_logs, headers=["Plate Number", "Violation", "Location", "Datetime"], tablefmt="pretty"))

    # Save report
    with open("violation_report.txt", "w") as f:
        for log in all_logs:
            f.write(f"{log[0]} - {log[1]} - {log[2]} - {log[3]}\n")

    print("✅ Detection complete. Logs saved in violation_report.txt")


🔄 Loading models...
✅ Models loaded.
🔍 Processing images...

0: 640x640 2 persons, 1 bicycle, 244.7ms
Speed: 165.1ms preprocess, 244.7ms inference, 1.7ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 bench, 204.8ms
Speed: 4.2ms preprocess, 204.8ms inference, 2.6ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 (no detections), 285.2ms
Speed: 8.8ms preprocess, 285.2ms inference, 8.5ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 train, 153.3ms
Speed: 5.8ms preprocess, 153.3ms inference, 2.0ms postprocess per image at shape (1, 3, 640, 640)

0: 640x640 1 bus, 134.0ms
Speed: 5.3ms preprocess, 134.0ms inference, 1.8ms postprocess per image at shape (1, 3, 640, 640)

0: 192x320 (no detections), 44.8ms
Speed: 1.9ms preprocess, 44.8ms inference, 0.6ms postprocess per image at shape (1, 3, 192, 320)

0: 192x320 (no detections), 35.5ms
Speed: 1.2ms preprocess, 35.5ms inference, 0.5ms postprocess per image at shape (1, 3, 192, 320)

0: 192x320 1 lane

In [10]:
import matplotlib.pyplot as plt
from tabulate import tabulate

# ===== SAMPLE METRIC DATA =====
# Replace this with your actual evaluation calculation logic
metrics = {
    "number_plate": {"accuracy": 0.008, "precision": 1.0, "recall": 0.008, "f1_score": 0.015},
    "helmet": {"accuracy": 0.294, "precision": 1.0, "recall": 0.294, "f1_score": 0.455},
    "lane": {"accuracy": 0.072, "precision": 1.0, "recall": 0.072, "f1_score": 0.134},
    "red_light": {"accuracy": 0.626, "precision": 1.0, "recall": 0.626, "f1_score": 0.770}
}

# ===== TABLE OUTPUT =====
table_data = []
for violation, vals in metrics.items():
    table_data.append([violation, vals["accuracy"], vals["precision"], vals["recall"], vals["f1_score"]])

print("\n📊 Evaluation Metrics Table:")
print(tabulate(table_data, headers=["Violation Type", "Accuracy", "Precision", "Recall", "F1-Score"], tablefmt="pretty"))

# ===== GRAPH PLOTTING =====
violations = list(metrics.keys())
accuracy = [metrics[v]["accuracy"] for v in violations]
precision = [metrics[v]["precision"] for v in violations]
recall = [metrics[v]["recall"] for v in violations]
f1_score = [metrics[v]["f1_score"] for v in violations]

plt.figure(figsize=(12, 6))

plt.bar(violations, accuracy, width=0.2, label="Accuracy", align="center")
plt.bar([v for v in violations], precision, width=0.2, label="Precision", align="edge")
plt.bar([v for v in violations], recall, width=0.2, label="Recall", align="edge")
plt.bar([v for v in violations], f1_score, width=0.2, label="F1-Score", align="edge")

plt.title("Violation Detection Evaluation Metrics")
plt.ylabel("Score")
plt.ylim(0, 1.1)
plt.legend()
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Save graph
plt.savefig("evaluation_metrics_graphs.png")
print("\n📈 Graph saved as 'evaluation_metrics_graphs.png'")

# Show graph
plt.show()



📊 Evaluation Metrics Table:
+----------------+----------+-----------+--------+----------+
| Violation Type | Accuracy | Precision | Recall | F1-Score |
+----------------+----------+-----------+--------+----------+
|  number_plate  |  0.008   |    1.0    | 0.008  |  0.015   |
|     helmet     |  0.294   |    1.0    | 0.294  |  0.455   |
|      lane      |  0.072   |    1.0    | 0.072  |  0.134   |
|   red_light    |  0.626   |    1.0    | 0.626  |   0.77   |
+----------------+----------+-----------+--------+----------+

📈 Graph saved as 'evaluation_metrics_graphs.png'


<Figure size 1200x600 with 1 Axes>

In [None]:
import gradio as gr
import cv2
import os
from datetime import datetime
from ultralytics import YOLO
from tabulate import tabulate

# ===== CONFIG =====
MODEL_PATHS = {
    "vehicle": "yolov8n.pt",
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\helmet_model9\weights\best.pt",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\lane_model18\weights\best.pt",
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\red_light_model7\weights\best.pt",
    "number_plate": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\training_output\license_plate_detector2\weights\best.pt"
}

print("🔄 Loading models...")
models = {name: YOLO(path) for name, path in MODEL_PATHS.items()}
print("✅ Models loaded.")

# ===== DETECTION FUNCTION =====
def process_frame(frame, location="Unknown"):
    violations_detected = []
    plate_numbers = []

    traffic_results = models["red_light"](frame)[0]
    traffic_light_red = False
    if hasattr(traffic_results, "boxes") and len(traffic_results.boxes) > 0:
        for cls in traffic_results.boxes.cls.cpu().numpy():
            if traffic_results.names[int(cls)].lower() == "red":
                traffic_light_red = True

    vehicle_results = models["vehicle"](frame)[0]
    if hasattr(vehicle_results, "boxes") and len(vehicle_results.boxes) > 0:
        for box, cls in zip(vehicle_results.boxes.xyxy.cpu().numpy(),
                             vehicle_results.boxes.cls.cpu().numpy()):
            class_name = vehicle_results.names[int(cls)]
            if class_name.lower() in ["car", "motorcycle", "bus", "truck", "scooter"]:
                x1, y1, x2, y2 = map(int, box)
                vehicle_crop = frame[y1:y2, x1:x2]

                # Plate OCR
                plate_results = models["number_plate"](vehicle_crop)[0]
                if hasattr(plate_results, "boxes") and len(plate_results.boxes) > 0:
                    plate_numbers.extend([plate_results.names[int(c)]
                                          for c in plate_results.boxes.cls.cpu().numpy()])

                # Helmet violation
                helmet_violation = False
                if class_name.lower() in ["motorcycle", "scooter"]:
                    helmet_results = models["helmet"](vehicle_crop)[0]
                    if not hasattr(helmet_results, "boxes") or len(helmet_results.boxes) == 0:
                        violations_detected.append("Helmet Violation")
                        helmet_violation = True

                # Red light violation
                red_violation = False
                if traffic_light_red:
                    violations_detected.append("Red Light Violation")
                    red_violation = True

                # Lane violation
                lane_results = models["lane"](vehicle_crop)[0]
                if hasattr(lane_results, "boxes") and len(lane_results.boxes) > 0:
                    violations_detected.append("Lane Violation")

                # Draw bounding box
                color = (0, 255, 0)
                if helmet_violation:
                    color = (0, 0, 255)  # Red for helmet violation
                elif red_violation:
                    color = (0, 165, 255)  # Orange for red light
                elif "Lane Violation" in violations_detected:
                    color = (255, 0, 0)  # Blue for lane violation

                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                label = class_name
                if plate_numbers:
                    label += f" | Plate: {plate_numbers[0]}"
                if violations_detected:
                    label += f" | {', '.join(violations_detected)}"
                cv2.putText(frame, label, (x1, y1 - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    return frame, plate_numbers, violations_detected

# ===== GRADIO FUNCTIONS =====
def detect_from_image(image):
    frame, plate_numbers, violations = process_frame(image, location="Image Input")

    report = [[pn if pn else "Unknown", v, "Image Input", datetime.now().strftime("%Y-%m-%d %H:%M:%S")]
              for pn in (plate_numbers if plate_numbers else ["Unknown"])
              for v in violations] or [["No Violations", "-", "-", "-"]]

    return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), tabulate(report, headers=["Plate Number", "Violation", "Location", "Datetime"])

def detect_from_video(video):
    cap = cv2.VideoCapture(video.name)
    log_data = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        annotated_frame, plate_numbers, violations = process_frame(frame, location="Video Input")
        for pn in (plate_numbers if plate_numbers else ["Unknown"]):
            for v in violations:
                log_data.append([pn, v, "Video Input", datetime.now().strftime("%Y-%m-%d %H:%M:%S")])
    cap.release()

    return "Video processed successfully. Check logs.", tabulate(log_data, headers=["Plate Number", "Violation", "Location", "Datetime"])

# ===== GRADIO INTERFACE =====
iface_img = gr.Interface(
    fn=detect_from_image,
    inputs=gr.Image(type="numpy"),
    outputs=[gr.Image(type="numpy"), gr.Textbox(label="Violation Report")],
    title="🚦 Traffic Violation Detection (Image)",
    description="Upload an image to detect helmet, red light, and lane violations."
)

iface_vid = gr.Interface(
    fn=detect_from_video,
    inputs=gr.Video(label="Upload Video"),
    outputs=[gr.Textbox(label="Violation Report")],
    title="🎥 Traffic Violation Detection (Video)",
    description="Upload a video to detect violations frame-by-frame."
)

demo = gr.TabbedInterface([iface_img, iface_vid], tab_names=["Image Detection", "Video Detection"])
demo.launch()


🔄 Loading models...
✅ Models loaded.
* Running on local URL:  http://127.0.0.1:7861
* To create a public link, set `share=True` in `launch()`.




Traceback (most recent call last):
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 2220, in process_api
    result = await self.call_function(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\gradio\blocks.py", line 1731, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "c:\Users\sav13\anaconda3\anaconda\envs\tf-env\lib\site-packages\anyio\_backends\_asyncio.py"

In [12]:
import os
import cv2
import torch
import numpy as np
import gradio as gr
from datetime import datetime
from ultralytics import YOLO

# ===== CONFIG =====
MODEL_PATHS = {
    "vehicle": "yolov8n.pt",
    "helmet": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\helmet_model9\weights\best.pt",
    "lane": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\lane_model18\weights\best.pt",
    "red_light": r"C:\Users\sav13\OneDrive\Desktop\project\runs\detect\red_light_model7\weights\best.pt",
    "number_plate": r"C:\Users\sav13\OneDrive\Desktop\project\archive5\training_output\license_plate_detector2\weights\best.pt"
}

print("🔄 Loading models...")
models = {name: YOLO(path) for name, path in MODEL_PATHS.items()}
print("✅ Models loaded.")

import easyocr
reader = easyocr.Reader(['en'])  # OCR reader

def extract_plate_text(image):
    plate_texts = []
    result = models["number_plate"](image)[0]
    if hasattr(result, "boxes") and len(result.boxes) > 0:
        for box in result.boxes.xyxy.cpu().numpy():
            x1, y1, x2, y2 = map(int, box)
            plate_crop = image[y1:y2, x1:x2]
            ocr_result = reader.readtext(plate_crop)
            for (_, text, prob) in ocr_result:
                if prob > 0.5:  # Confidence threshold
                    plate_texts.append(text)
    return plate_texts if plate_texts else ["Unknown"]


# ===== VIOLATION LOGIC =====
def detect_violation(image, model, threshold=0.5):
    result = model(image)[0]
    if hasattr(result, "boxes") and len(result.boxes) > 0:
        for conf in result.boxes.conf.cpu().numpy():
            if conf > threshold:
                return True
    return False

def process_frame(frame, location="Unknown"):
    violations_detected = set()
    plate_numbers = set()

    vehicle_results = models["vehicle"](frame)[0]

    if hasattr(vehicle_results, "boxes") and len(vehicle_results.boxes) > 0:
        for box, cls in zip(vehicle_results.boxes.xyxy.cpu().numpy(), vehicle_results.boxes.cls.cpu().numpy()):
            class_name = vehicle_results.names[int(cls)]
            if class_name in ["car", "motorcycle", "bus", "truck", "scooter"]:
                x1, y1, x2, y2 = map(int, box)
                vehicle_crop = frame[y1:y2, x1:x2]

                # OCR Plate extraction
                plate_result = models["number_plate"](vehicle_crop)[0]
                if hasattr(plate_result, "boxes") and len(plate_result.boxes) > 0:
                    for cls_id in plate_result.boxes.cls.cpu().numpy():
                        plate_numbers.add(plate_result.names[int(cls_id)])

                # Helmet violation (two-wheelers only)
                if class_name in ["motorcycle", "scooter"]:
                    if not detect_violation(vehicle_crop, models["helmet"]):
                        violations_detected.add("Helmet Violation")

                # Lane violation
                if detect_violation(vehicle_crop, models["lane"]):
                    violations_detected.add("Lane Violation")

                # Red light violation (only if red light detected in frame)
                if detect_violation(frame, models["red_light"]):
                    violations_detected.add("Red Light Violation")

                # Draw bounding box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frame, f"{class_name}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

    return list(plate_numbers), list(violations_detected), frame

# ===== GRADIO FUNCTION =====
def detect_violations(image):
    if image is None:
        return None, "No image uploaded."

    frame = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    plate_numbers, violations, annotated_frame = process_frame(frame)

    report = []
    if plate_numbers and violations:
        for plate in plate_numbers:
            for violation in violations:
                report.append(f"{plate} - {violation} - Location: Test - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    elif violations:
        for violation in violations:
            report.append(f"Unknown - {violation} - Location: Test - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
    return annotated_frame, "\n".join(report) if report else "No Violations Detected"

# ===== GRADIO INTERFACE =====
iface = gr.Interface(
    fn=detect_violations,
    inputs=gr.Image(type="numpy", label="Upload Image"),
    outputs=[gr.Image(type="numpy", label="Detection Output"), gr.Textbox(label="Violation Report")],
    title="🚦 Traffic Violation Detection System",
    description="Upload an image to detect helmet violations, lane violations, red light violations, and number plate extraction."
)

iface.launch()


🔄 Loading models...
✅ Models loaded.


Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.


* Running on local URL:  http://127.0.0.1:7862
* To create a public link, set `share=True` in `launch()`.




In [13]:
import os
import xml.etree.ElementTree as ET
import random
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import shutil

# ======= PATHS =======
image_dir = r"C:\Users\sav13\OneDrive\Desktop\project\archive5\images"
xml_dir = r"C:\Users\sav13\OneDrive\Desktop\project\archive5\annotations"
output_dir = r"C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset"

os.makedirs(output_dir, exist_ok=True)
for sub in ["train", "val", "test"]:
    os.makedirs(f"{output_dir}/images/{sub}", exist_ok=True)
    os.makedirs(f"{output_dir}/labels/{sub}", exist_ok=True)

# ======= CLASS NAMES =======
classes = ["licence"]  # from your XML <name> tag

# ======= HELPER: CONVERT BOXES =======
def convert_bbox(size, box):
    dw = 1. / size[0]
    dh = 1. / size[1]
    x = (box[0] + box[1]) / 2.0 - 1
    y = (box[2] + box[3]) / 2.0 - 1
    w = box[1] - box[0]
    h = box[3] - box[2]
    x = x * dw
    w = w * dw
    y = y * dh
    h = h * dh
    return (x, y, w, h)

# ======= 1️⃣ GET IMAGE NAMES =======
images = [f for f in os.listdir(image_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
train_files, temp_files = train_test_split(images, test_size=0.3, random_state=42)
val_files, test_files = train_test_split(temp_files, test_size=0.5, random_state=42)

# ======= 2️⃣ CONVERT EACH XML =======
def convert_and_save(image_set, subset):
    for image_name in tqdm(image_set, desc=f"Processing {subset}"):
        xml_file = os.path.join(xml_dir, os.path.splitext(image_name)[0] + ".xml")
        txt_file = os.path.join(output_dir, f"labels/{subset}", os.path.splitext(image_name)[0] + ".txt")
        image_path = os.path.join(image_dir, image_name)
        new_image_path = os.path.join(output_dir, f"images/{subset}", image_name)

        if not os.path.exists(xml_file):
            continue

        tree = ET.parse(xml_file)
        root = tree.getroot()

        size = root.find('size')
        w = int(size.find('width').text)
        h = int(size.find('height').text)

        with open(txt_file, "w") as out:
            for obj in root.iter('object'):
                cls = obj.find('name').text
                if cls not in classes:
                    continue
                cls_id = classes.index(cls)
                xmlbox = obj.find('bndbox')
                b = (
                    float(xmlbox.find('xmin').text),
                    float(xmlbox.find('xmax').text),
                    float(xmlbox.find('ymin').text),
                    float(xmlbox.find('ymax').text),
                )
                bb = convert_bbox((w, h), b)
                out.write(f"{cls_id} {' '.join([str(a) for a in bb])}\n")

        if os.path.exists(image_path):
            shutil.copy(image_path, new_image_path)

# Convert train/val/test
convert_and_save(train_files, "train")
convert_and_save(val_files, "val")
convert_and_save(test_files, "test")

# ======= 3️⃣ CREATE YAML FILE =======
yaml_content = f"""train: {output_dir}/images/train
val: {output_dir}/images/val
test: {output_dir}/images/test

nc: {len(classes)}
names: {classes}
"""

with open(os.path.join(output_dir, "dataset.yaml"), "w") as f:
    f.write(yaml_content)

print("\n✅ Conversion complete!")
print("YOLO dataset created at:", output_dir)
print("YAML file path:", os.path.join(output_dir, "dataset.yaml"))


Processing train: 100%|██████████| 303/303 [00:03<00:00, 95.93it/s] 
Processing val: 100%|██████████| 65/65 [00:01<00:00, 36.71it/s]
Processing test: 100%|██████████| 65/65 [00:00<00:00, 75.04it/s]


✅ Conversion complete!
YOLO dataset created at: C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset
YAML file path: C:\Users\sav13\OneDrive\Desktop\project\archive5\yolo_dataset\dataset.yaml



