In [None]:
import cv2
import numpy as np

def process_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)

    height, width = edges.shape
    mask = np.zeros_like(edges)

    polygon = np.array([[
        (0, height),
        (width, height),
        (int(0.55 * width), int(0.6 * height)),
        (int(0.45 * width), int(0.6 * height))
    ]], np.int32)

    cv2.fillPoly(mask, polygon, 255)
    cropped = cv2.bitwise_and(edges, mask)

    lines = cv2.HoughLinesP(cropped, 1, np.pi/180, 50, minLineLength=50, maxLineGap=150)

    line_image = np.zeros_like(frame)
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(line_image, (x1, y1), (x2, y2), (0, 255, 0), 5)

    combined = cv2.addWeighted(frame, 0.8, line_image, 1, 1)
    return combined, lines


In [None]:
input_path = "solidWhiteRight.mp4"  # Replace if your filename differs
output_path = "lane_detected_output.avi"

cap = cv2.VideoCapture(input_path)
width = int(cap.get(3))
height = int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS)

out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"XVID"), fps, (width, height))

frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    processed_frame, lines = process_frame(frame)

    # VAC-style check: No lines => possible violation
    if lines is None:
        print(f"⚠️ Possible lane violation at frame {frame_count}")

    out.write(processed_frame)
    frame_count += 1

cap.release()
out.release()


In [None]:
!pip install ultralytics




In [None]:
from ultralytics import YOLO

# Load pretrained model
model = YOLO('yolov8n.pt')  # You can also use yolov8s.pt or custom


In [None]:
results = model.predict(source='solidWhiteRight.mp4', save=True, conf=0.5)



inference results will accumulate in RAM unless `stream=True` is passed, causing potential out-of-memory
errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/221) /content/solidWhiteRight.mp4: 384x640 1 car, 156.9ms
video 1/1 (frame 2/221) /content/solidWhiteRight.mp4: 384x640 1 car, 157.5ms
video 1/1 (frame 3/221) /content/solidWhiteRight.mp4: 384x640 1 car, 143.6ms
video 1/1 (frame 4/221) /content/solidWhiteRight.mp4: 384x640 1 truck, 139.4ms
video 1/1 (frame 5/221) /content/solidWhiteRight.mp4: 384x640 1 car, 142.8ms
video 1/1 (frame 6/221) /content/solidWhiteRight.mp4: 384x640 1 car, 143.4ms
vide

In [None]:
for result in results:
    objects = result.boxes.cls.cpu().numpy()
    if 0 in objects:  # class 0 = person
        print("🛑 Action: Slow down - pedestrian ahead")
    elif 2 in objects:  # class 2 = car
        print("⚠️ Action: Maintain distance - vehicle ahead")


⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action: Maintain distance - vehicle ahead
⚠️ Action:

In [None]:
log = []

for i, result in enumerate(results):
    classes = result.boxes.cls.cpu().numpy()
    log.append({
        "frame": i,
        "objects": classes.tolist(),
        "action": "brake" if 0 in classes else "normal"
    })

import json
with open("trip_log.json", "w") as f:
    json.dump(log, f, indent=2)


In [None]:
from google.colab import files
files.download("runs/detect/predict/solidWhiteRight.mp4")


FileNotFoundError: Cannot find file: runs/detect/predict/solidWhiteRight.mp4

In [None]:
import os
os.listdir("runs/detect")



['predict3', 'predict', 'predict2']

In [None]:
os.listdir("runs/detect/predict2")


['solidWhiteRight.avi']

In [None]:
from google.colab import files
files.download("runs/detect/predict2/solidWhiteRight.avi")  # replace with actual folder


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
from ultralytics import YOLO
import cv2

model = YOLO("yolov8n.pt")
video_path = "solidWhiteRight.mp4"

cap = cv2.VideoCapture(video_path)

while True:
    ret, frame = cap.read()
    if not ret:
        break

    results = model(frame)

    for r in results:
        for box in r.boxes:
            cls = int(box.cls[0])  # class ID
            label = model.names[cls]
            conf = float(box.conf[0])
            x1, y1, x2, y2 = map(int, box.xyxy[0])

            # Example rule
            if label == "person" and conf > 0.5:
                print("⚠️ Person ahead! BRAKE.")

            if label == "car" and conf > 0.5:
                print("🚗 Car detected. SLOW DOWN.")

            # You can add more rules here



0: 384x640 4 cars, 1 truck, 158.7ms
Speed: 5.1ms preprocess, 158.7ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
🚗 Car detected. SLOW DOWN.

0: 384x640 3 cars, 1 truck, 144.1ms
Speed: 3.9ms preprocess, 144.1ms inference, 1.9ms postprocess per image at shape (1, 3, 384, 640)
🚗 Car detected. SLOW DOWN.

0: 384x640 3 cars, 1 truck, 142.5ms
Speed: 4.1ms preprocess, 142.5ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)
🚗 Car detected. SLOW DOWN.

0: 384x640 4 cars, 1 truck, 144.1ms
Speed: 4.6ms preprocess, 144.1ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 4 cars, 1 truck, 154.1ms
Speed: 3.8ms preprocess, 154.1ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)
🚗 Car detected. SLOW DOWN.

0: 384x640 3 cars, 1 truck, 140.5ms
Speed: 4.4ms preprocess, 140.5ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
🚗 Car detected. SLOW DOWN.

0: 384x640 4 cars, 1 truck, 1 bench, 151.4ms
Speed: 3.

In [None]:
log = []

# inside loop
if label == "person":
    action = "BRAKE"
elif label == "car":
    action = "SLOW DOWN"
else:
    action = "NONE"

log.append({"frame": cap.get(1), "label": label, "action": action})


In [None]:
import pandas as pd

df = pd.DataFrame(log)
df.to_csv("trip_log.csv", index=False)


In [None]:
!pip install -q langchain chromadb tiktoken


In [None]:
with open("traffic_rules.txt", "w") as f:
    f.write("""
- Brake immediately if pedestrian appears on road.
- Slow down near other vehicles.
- Stop at red traffic lights.
- Stay in lane unless overtaking.
    """)


In [None]:
!pip install -q langchain chromadb langchain-community sentence-transformers


In [None]:
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import TextLoader, CSVLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.chains import RetrievalQA
from langchain.llms import HuggingFaceHub

# Load traffic rules and logs
text_loader = TextLoader("traffic_rules.txt")
csv_loader = CSVLoader(file_path="trip_log.csv")
docs = text_loader.load() + csv_loader.load()

# Split docs
splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_docs = splitter.split_documents(docs)

# Embed using sentence-transformers
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = Chroma.from_documents(split_docs, embeddings)


In [None]:
from langchain.llms import HuggingFaceHub

# This uses the free FLAN-T5 model
llm = HuggingFaceHub(repo_id="google/flan-t5-large", model_kwargs={"temperature":0, "max_length":256})


In [None]:
!pip install -U langchain-community


In [None]:
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import TextLoader, CSVLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings

# Load documents
text_loader = TextLoader("traffic_rules.txt")
csv_loader = CSVLoader(file_path="trip_log.csv")
docs = text_loader.load() + csv_loader.load()

# Split text
splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
split_docs = splitter.split_documents(docs)

# Generate embeddings
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = Chroma.from_documents(split_docs, embedding=embeddings)

print("✅ Embedding & document DB ready!")


✅ Embedding & document DB ready!


In [None]:
from langchain.llms import HuggingFacePipeline
from transformers import pipeline

pipe = pipeline("text2text-generation", model="google/flan-t5-base", max_new_tokens=256)
llm = HuggingFacePipeline(pipeline=pipe)

from langchain.chains import RetrievalQA
qa = RetrievalQA.from_chain_type(llm=llm, retriever=db.as_retriever())


Device set to use cpu


In [None]:
response = qa.invoke("Why did the car slow down between frame 30 and 60?")
print(response)


In [None]:
response = qa.invoke("Why did the car slow down between frame 30 and 60?")
print(response)

response = qa.invoke("What objects were detected during the trip?")
print(response)

response = qa.invoke("Was any lane lost or signal missed?")
print(response)


In [None]:
import cv2
import os
import pandas as pd

# Paths
video_path = "solidWhiteRight.mp4"
yolo_labels_dir = "runs/detect/predict/labels"  # path to YOLO label files
output_csv = "trip_log.csv"

# Lane detection helper
def is_lane_detected(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)
    height, width = edges.shape
    mask = np.zeros_like(edges)
    polygon = np.array([[
        (0, height),
        (width, height),
        (int(0.55 * width), int(0.6 * height)),
        (int(0.45 * width), int(0.6 * height))
    ]], np.int32)
    cv2.fillPoly(mask, polygon, 255)
    cropped = cv2.bitwise_and(edges, mask)
    lines = cv2.HoughLinesP(cropped, 1, np.pi/180, 50, minLineLength=50, maxLineGap=150)
    return lines is not None

# Frame-wise scan
cap = cv2.VideoCapture(video_path)
frame_idx = 0
log = []

while True:
    ret, frame = cap.read()
    if not ret:
        break

    label_file = os.path.join(yolo_labels_dir, f"{frame_idx}.txt")
    detected = []
    if os.path.exists(label_file):
        with open(label_file, 'r') as f:
            lines = f.readlines()
            for line in lines:
                cls_id = int(line.strip().split()[0])
                if cls_id == 0:
                    detected.append("person")
                elif cls_id == 1:
                    detected.append("bicycle")
                elif cls_id == 2:
                    detected.append("car")
                elif cls_id == 9:
                    detected.append("traffic light")

    lane_status = is_lane_detected(frame)
    action = "Continue normally"
    if "person" in detected:
        action = "Brake"
    elif not lane_status:
        action = "Lane lost"

    log.append({
        "frame": frame_idx,
        "action": action,
        "object_detected": ", ".join(detected) if detected else "None"
    })

    frame_idx += 1

cap.release()

# Save CSV
df = pd.DataFrame(log)
df.to_csv(output_csv, index=False)
print(f"✅ Trip log saved to: {output_csv}")


✅ Trip log saved to: trip_log.csv


In [None]:
!pip install streamlit streamlit-jupyter




In [None]:
# Step 1: Save your script as a .py file
code = """
print("🚗 Hello from VAC dashboard!")

import streamlit as st
import pandas as pd
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader, CSVLoader
from langchain.llms import HuggingFacePipeline
from transformers import pipeline

st.set_page_config(layout="wide")
st.title("🚗 Virtual Autonomous Car (VAC) Dashboard")

# Load logs
trip_log = pd.read_csv("trip_log.csv")
st.sidebar.header("Trip Summary")
st.sidebar.dataframe(trip_log, height=400)

# Load LLM & RAG chain
@st.cache_resource
def setup_rag():
    text_loader = TextLoader("traffic_rules.txt")
    csv_loader = CSVLoader(file_path="trip_log.csv")
    docs = text_loader.load() + csv_loader.load()

    splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    split_docs = splitter.split_documents(docs)
    db = Chroma.from_documents(split_docs, embedding=None)

    pipe = pipeline("text2text-generation", model="google/flan-t5-base", max_new_tokens=256)
    llm = HuggingFacePipeline(pipeline=pipe)

    qa = RetrievalQA.from_chain_type(llm=llm, retriever=db.as_retriever())
    return qa

qa = setup_rag()

# Ask a question
st.header("🧠 Ask VAC (Explainable AI)")
query = st.text_input("Ask a question about the trip (e.g., Why did the car brake?)")
if query:
    result = qa.run(query)
    st.success(result)

# Optionally display video or snapshots
st.header("🎥 Annotated Video Output")
st.video("runs/detect/predict/solidWhiteRight.mp4")

"""

with open("vac_dashboard.py", "w") as f:
    f.write(code)

print("✅ File saved as vac_dashboard.py")


✅ File saved as vac_dashboard.py


In [None]:
from streamlit_jupyter import StreamlitPatcher, tqdm
StreamlitPatcher().jupyter()
!streamlit run vac_dashboard.py


AttributeError: module 'streamlit' has no attribute 'experimental_data_editor'

In [None]:
from google.colab import files
files.download("vac_dashboard.py")


In [None]:
!pip install gradio




In [93]:
import gradio as gr
from PIL import Image
import cv2
import os

# Load a preview frame
def get_output_frame(video_path='solidWhiteRight.mp4'):
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    cap.release()
    if ret:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        return Image.fromarray(frame)
    return None

# Dummy process_frame function (replace with your real one)
def process_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)
    frame[:, :, 0] = edges  # Visual tweak
    return frame

# RAG QA interface (replace `qa.invoke()` with your real logic)
def query_rag(question):
    if question.strip() == "":
        return "Please enter a question."
    try:
        response = qa.invoke(question)  # You must have qa defined before
        return response["result"]
    except Exception as e:
        return f"Error: {e}"

# Process uploaded video and apply lane detection
def process_uploaded_video(video_file):
    input_path = video_file.name
    output_path = "uploaded_processed.avi"

    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(3))
    height = int(cap.get(4))
    fps = cap.get(cv2.CAP_PROP_FPS)

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"XVID"), fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        processed_frame = process_frame(frame)
        out.write(processed_frame)

    cap.release()
    out.release()
    return output_path

# Initialize Gradio interface
video_preview = get_output_frame()

with gr.Blocks() as dashboard:
    gr.Markdown("## 🚗 VAC Driving Assistant")
    with gr.Row():
        with gr.Column():
            gr.Image(value=video_preview, label="Detected Frame Preview")
            gr.Interface(
                fn=query_rag,
                inputs=gr.Textbox(lines=2, placeholder="Ask a driving-related question..."),
                outputs="text",
                title="Ask Me Why...",
                description="Ask why the car took certain actions",
                examples=["Why did the car slow down?", "Was a pedestrian detected?", "Was any signal missed?"]
            ).render()
        with gr.Column():
            video_input = gr.Video(label="📤 Upload a custom video")
            process_button = gr.Button("Run Lane Detection")
            video_output = gr.Video(label="🔁 Processed Output")

    process_button.click(fn=process_uploaded_video, inputs=video_input, outputs=video_output)

dashboard.launch()


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://6f11abb9b06d4b08c7.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


KeyboardInterrupt: 

In [None]:
def run_pipeline_and_log(input_video_path, output_video_path):
    # 1. Run lane detection
    # 2. Run YOLO object detection
    # 3. Write logs to trip_log.csv
    # You already have most of this in lane_detect.py or vac_dashboard.py
    pass


In [None]:
!pip install yt-dlp
!yt-dlp -f mp4 -o "dashcam.mp4" "https://www.youtube.com/watch?v=WM8bTdBs-cw"


Collecting yt-dlp
  Downloading yt_dlp-2025.6.30-py3-none-any.whl.metadata (174 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m174.3/174.3 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading yt_dlp-2025.6.30-py3-none-any.whl (3.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.3/3.3 MB[0m [31m50.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: yt-dlp
Successfully installed yt-dlp-2025.6.30
[youtube] Extracting URL: https://www.youtube.com/watch?v=WM8bTdBs-cw
[youtube] WM8bTdBs-cw: Downloading webpage
[youtube] WM8bTdBs-cw: Downloading tv client config
[youtube] WM8bTdBs-cw: Downloading player 69b31e11-main
[youtube] WM8bTdBs-cw: Downloading tv player API JSON
[youtube] WM8bTdBs-cw: Downloading ios player API JSON
[info] WM8bTdBs-cw: Downloading 1 format(s): 18
[download] Destination: dashcam.mp4
[K[download] 100% of   17.15MiB in [1;37m00:00:00[0m at [0;32m22.96MiB/s[0m


In [None]:
import gradio as gr
from PIL import Image
import cv2
import os

# Load a preview frame
def get_output_frame(video_path='solidWhiteRight.mp4'):
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    cap.release()
    if ret:
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        return Image.fromarray(frame)
    return None

# Dummy process_frame function (replace with your real one)
def process_frame(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    blur = cv2.GaussianBlur(gray, (5, 5), 0)
    edges = cv2.Canny(blur, 50, 150)
    frame[:, :, 0] = edges  # Visual tweak
    return frame

# RAG QA interface (replace `qa.invoke()` with your real logic)
def query_rag(question):
    if question.strip() == "":
        return "Please enter a question."
    try:
        response = qa.invoke(question)  # You must have qa defined before
        return response["result"]
    except Exception as e:
        return f"Error: {e}"

# Process uploaded video and apply lane detection
def process_uploaded_video(video_file):
    input_path = video_file.name
    output_path = "uploaded_processed.avi"

    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(3))
    height = int(cap.get(4))
    fps = cap.get(cv2.CAP_PROP_FPS)

    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"XVID"), fps, (width, height))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        processed_frame = process_frame(frame)
        out.write(processed_frame)

    cap.release()
    out.release()
    return output_path

# Initialize Gradio interface
video_preview = get_output_frame()

with gr.Blocks() as dashboard:
    gr.Markdown("## 🚗 VAC Driving Assistant")
    with gr.Row():
        with gr.Column():
            gr.Image(value=video_preview, label="Detected Frame Preview")
            gr.Interface(
                fn=query_rag,
                inputs=gr.Textbox(lines=2, placeholder="Ask a driving-related question..."),
                outputs="text",
                title="Ask Me Why...",
                description="Ask why the car took certain actions",
                examples=["Why did the car slow down?", "Was a pedestrian detected?", "Was any signal missed?"]
            ).render()
        with gr.Column():
            video_input = gr.Video(label="📤 Upload a custom video")
            process_button = gr.Button("Run Lane Detection")
            video_output = gr.Video(label="🔁 Processed Output")

    process_button.click(fn=process_uploaded_video, inputs=video_input, outputs=video_output)

dashboard.launch()


It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://299db4341203f42bff.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
