In [3]:
# ===============================================================
# AutoSafeDrive ADAS ‚Äî FINAL SUBMISSION (CLEAN VERSION)
# Real-Time Object Detection + Risk Estimation + Gradio Demo
# ===============================================================

!pip install -q ultralytics gradio kagglehub opencv-python-headless tqdm pandas matplotlib

# ===============================
# IMPORTS
# ===============================
import os, random, shutil, traceback
from pathlib import Path
from datetime import datetime
import cv2, numpy as np, pandas as pd, matplotlib.pyplot as plt
from tqdm import tqdm
from ultralytics import YOLO
import gradio as gr

try:
    import kagglehub
    _KH = True
except:
    _KH = False

# ===============================
# PATHS
# ===============================
ROOT = "/content"
DATA_ROOT = f"{ROOT}/data/idd"
TEST_DIR = f"{DATA_ROOT}/test/images"
OUT_DIR = f"{ROOT}/autosafedrive_outputs"

Path(TEST_DIR).mkdir(parents=True, exist_ok=True)
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

# ===============================
# LOAD YOLOv8 MODEL
# ===============================
model = YOLO("yolov8n.pt")
names = model.model.names
print("Model Loaded:", names)

# ===============================
# ADAS CONFIG
# ===============================
INTEREST = {"car","bus","truck","motorcycle","bicycle","person","traffic light"}
VULNERABLE = {"person","bicycle","motorcycle"}

CONF_THRESH = 0.25
MIN_BOX_H = 10

RISK_SAFE = 35
RISK_CAUTION = 65

# ===============================
# HELPERS
# ===============================
def risk_color(r):
    return (0,200,0) if r < RISK_SAFE else (0,215,255) if r < RISK_CAUTION else (0,0,230)

def status_from_risk(r):
    return ("SAFE",(0,200,0)) if r < RISK_SAFE else ("CAUTION",(0,215,255)) if r < RISK_CAUTION else ("DANGER",(0,0,230))

def draw_banner(img, text, color):
    (tw,th),_ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)
    cv2.rectangle(img,(10,10),(tw+30,th+30),color,-1)
    cv2.putText(img,text,(18,10+th+10),cv2.FONT_HERSHEY_SIMPLEX,0.9,(0,0,0),2)

def compute_risk(box, img_h, cls):
    x1,y1,x2,y2 = box
    proximity = (y2 - y1) / img_h * 100
    vuln = 15 if cls in VULNERABLE else 0
    risk = proximity * 0.6 + vuln
    return float(np.clip(risk, 0, 100))

# ===============================
# DOWNLOAD DATASET (KaggleHub)
# ===============================
def download_idd():
    if not _KH:
        print("‚ö†Ô∏è KaggleHub unavailable ‚Äî please upload dataset manually.")
        return None

    print("üì¶ Downloading IDD via KaggleHub‚Ä¶")
    try:
        path = kagglehub.dataset_download("mitanshuchakrawarty/new-idd-dataset")
        print("‚úî Dataset downloaded to:", path)
        return path
    except:
        print("‚ö†Ô∏è KaggleHub download failed.")
        return None

# ===============================
# FIX ‚Äî COPY DATASET TO WORKING DIR
# ===============================
def copy_idd_to_working():
    SRC = "/kaggle/input/new-idd-dataset"
    if not os.path.exists(SRC):
        print("‚ö†Ô∏è Dataset path not found:", SRC)
        return False

    print("üìÅ Copying IDD dataset to working directory...")
    for root, _, files in os.walk(SRC):
        for f in files:
            if f.lower().endswith(("jpg","jpeg","png")):
                src = os.path.join(root, f)
                dst = os.path.join(DATA_ROOT, f)
                shutil.copy(src, dst)
    print("‚úî Dataset copied successfully.")
    return True

# ===============================
# PREPARE TEST SUBSET
# ===============================
def prepare_test_subset(take_n=30):
    all_imgs = [
        os.path.join(DATA_ROOT, f)
        for f in os.listdir(DATA_ROOT)
        if f.lower().endswith(("jpg","jpeg","png"))
    ]

    if len(all_imgs) == 0:
        raise RuntimeError("‚ùå No images found after dataset copy.")

    random.shuffle(all_imgs)
    picks = all_imgs[:take_n]

    for p in picks:
        dst = os.path.join(TEST_DIR, os.path.basename(p))
        shutil.copy(p, dst)

    return sorted(os.listdir(TEST_DIR))

# ===============================
# ANNOTATION FUNCTION
# ===============================
def annotate_image(img):
    res = model(img, conf=CONF_THRESH, verbose=False)[0]
    h,w = img.shape[:2]
    vis = img.copy()
    risks = []

    if res.boxes is not None:
        bx = res.boxes.xyxy.cpu().numpy()
        cl = res.boxes.cls.cpu().numpy().astype(int)
        cf = res.boxes.conf.cpu().numpy()

        for (x1,y1,x2,y2), cid, conf in zip(bx, cl, cf):
            cls = names[cid]
            if cls not in INTEREST: continue
            if (y2-y1) < MIN_BOX_H: continue

            risk = compute_risk([x1,y1,x2,y2], h, cls)
            risks.append(risk)

            color = risk_color(risk)
            cv2.rectangle(vis,(int(x1),int(y1)),(int(x2),int(y2)),color,2)
            cv2.putText(vis,f"{cls} {int(conf*100)}% R{int(risk)}%",
                        (int(x1),max(20,int(y1)-5)),
                        cv2.FONT_HERSHEY_SIMPLEX,0.6,color,2)

    avg = float(np.mean(risks)) if risks else 0.0
    status, color = status_from_risk(avg)
    draw_banner(vis, f"AVG {avg:.1f}%  {status}", color)

    return vis, avg, status

# ===============================
# BATCH EVALUATION
# ===============================
def batch_eval():
    files = sorted(os.listdir(TEST_DIR))
    results = []

    for fn in tqdm(files, desc="Batch Evaluating"):
        img = cv2.imread(os.path.join(TEST_DIR, fn))
        vis, avg, status = annotate_image(img)

        cv2.imwrite(os.path.join(OUT_DIR, f"annot_{fn}"), vis)
        results.append({"image":fn,"avg_risk":round(avg,2),"status":status})

    df = pd.DataFrame(results)
    csv_path = os.path.join(OUT_DIR, f"eval_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv")
    df.to_csv(csv_path, index=False)

    plt.figure(figsize=(6,4))
    plt.hist(df["avg_risk"], bins=10, color="skyblue", edgecolor="black")
    plt.title("Risk Distribution")
    plt.savefig(os.path.join(OUT_DIR,"risk_hist.png"))
    plt.close()

    return df, csv_path

# ===============================
# GRADIO DEMO
# ===============================
def analyze_demo(pil_img):
    bgr = cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
    vis, avg, status = annotate_image(bgr)
    return cv2.cvtColor(vis, cv2.COLOR_BGR2RGB), f"Avg Risk: {avg:.1f}% | {status}"

def launch_demo():
    with gr.Blocks() as app:
        gr.Markdown("## üöó AutoSafeDrive ADAS Demo")
        img_in = gr.Image(type="pil")
        img_out = gr.Image()
        status_out = gr.Textbox()
        btn = gr.Button("Analyze")
        btn.click(analyze_demo, inputs=[img_in], outputs=[img_out,status_out])
    app.launch(share=True)

# ===============================
# MAIN EXECUTION
# ===============================
try:
    download_idd()             # Download dataset
    copy_idd_to_working()      # Copy dataset to /content/data/idd
    prepare_test_subset(20)    # Prepare subset
    df, csv_path = batch_eval()
    print("‚úî Batch Evaluation Complete:", csv_path)
    launch_demo()              # Start Gradio demo

except Exception as e:
    print("‚ùå ERROR OCCURRED:")
    traceback.print_exc()
    print("Please upload images manually to:", TEST_DIR)


Model Loaded: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted plant', 59: 'bed', 60: 'dining table', 61: 'toilet', 62: 'tv', 63: 'laptop', 64: 'mouse', 65: 'remote', 66: 'keyboard', 67: 'cell phone

Batch Evaluating: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 20/20 [00:08<00:00,  2.47it/s]


‚úî Batch Evaluation Complete: /content/autosafedrive_outputs/eval_20251116_153704.csv
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://76519c2cd375d615c7.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
