# Cloning the Fine Tuned Model From my GitHub


In [1]:
!git clone https://github.com/karthiKN-sk/grootan_ai_task.git

fatal: destination path 'grootan_ai_task' already exists and is not an empty directory.


# Download the Library/Packages

In [2]:
!pip install opencv-python ultralytics supervision numpy matplotlib rich.progress transformers gradio pillow huggingface_hub



# Import Libraries

In [None]:
from ultralytics import YOLO
import supervision as sv
import cv2
import numpy as np
import matplotlib.pyplot as plt
from rich.progress import Progress
from typing import Dict, Tuple, Iterable, List, Optional, Set, Any
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import os
from huggingface_hub import login
from datetime import timedelta

# Setting HF Token in Environment variable

In [None]:
token = None
token_file = "/content/grootan_ai_task/variables.py"

with open(token_file, "r") as f:
    for line in f:
        if line.startswith("HF_TOKEN="):
            token = line.strip().split("=", 1)[1].strip('"').strip("'")
            break

if token:
    os.environ["HF_TOKEN"] = token
    login(token=token)
else:
    raise ValueError("HF_TOKEN not found in variables.py")

# Configuration for Zone Definitions, Color Palette, and Vehicle Turn Classification.

This code snippet sets up essential constants and configurations for a vehicle tracking and turn analysis system, including color settings, polygonal zone definitions, naming schemes, turn mapping logic, and a utility function for turn-based color annotation.

1. Color Palette Definition
    * Defines a reusable color palette used for drawing zones, bounding boxes, and labels.
    * To show time in hours:minutes:seconds format instead of just seconds
    
This setup provides a foundational configuration layer for visual annotation, spatial zone management, and logical turn analysis in a computer vision-based traffic monitoring system.

In [None]:
COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])

def format_time(seconds):
    return str(timedelta(seconds=round(seconds)))

# Global Detection State Initialization for Vehicle Turn Tracking.

This snippet initializes a global dictionary named detections_state that is used to persist and manage tracking data across video frames in a vehicle monitoring pipeline.

Dictionary Keys:

* "**tracker_id_to_zone_id**":
   Maps each unique vehicle (tracker_id) to the zone ID it first appeared in.
   Used to group and identify vehicles during processing.

* "**vehicle_paths**":
Tracks both the entry (in) and exit (out) zones for each vehicle.
Format: **{tracker_id: {"in": zone_name, "out": zone_name}}**.

* "**vehicle_turns**":
Stores the classified type of turn for each vehicle, such as "left_turn", "right_turn", "u_turn", or "straight".

This stateful object is referenced and updated throughout the processing pipeline to enable accurate vehicle path tracking and turn classification across video frames.

In [None]:
detections_state = {
    "tracker_id_to_zone_id": {},
    "vehicle_paths": {},
    "vehicle_turns": {},
}

# Vehicle Entry-Exit Tracking and Turn Classification Logic

This function, update_detections_state, manages and updates the internal state used to track vehicle movement through predefined zones, and classifies the type of turn each vehicle makes (left, right, U-turn, or straight).

Key Responsibilities:

1. Track Entry Zones:

    * Maps each vehicle (via its tracker ID) to the in zone where it first appeared.

    * Ensures the entry point is recorded only once per vehicle.

    * Detects and records the out zone where the vehicle exits.

2. Turn Detection:

    * Uses predefined TURN_MAPPING to determine the turn type based on zone pairs (in → out).

    * Updates the vehicle_turns dictionary only when both zones are known.

3. Class ID Assignment:

    * Associates each detection with a zone ID for visual annotation by mapping tracker_id to its zone_id.

4. Filtering Valid Detections:

    * Returns only detections that have been successfully associated with zones (i.e., not class ID -1).

This function plays a central role in transforming low-level detection data into high-level vehicle movement understanding, essential for turn analysis in traffic videos.

In [None]:
def update_detections_state(
    detections_all: sv.Detections,
    detections_in_zones: List[sv.Detections],
    detections_out_zones: List[sv.Detections],
    config: Dict[str, Any],
    frame_idx: int,
    state: Dict[str, Any] = detections_state
) -> sv.Detections:
    tracker_id_to_zone_id = state["tracker_id_to_zone_id"]
    tracker_id_to_exit_zone_id = state.setdefault("tracker_id_to_exit_zone_id", {})
    vehicle_paths = state["vehicle_paths"]
    vehicle_turns = state.setdefault("vehicle_turns", {})
    fps = config["video_info"].fps

    # --- Assign entry zones and track vehicle IN zone names ---
    zone_in_names = list(config["zones_in"].keys())
    for zone_in_id, detections_in_zone in enumerate(detections_in_zones):
        zone_name = zone_in_names[zone_in_id]
        for tracker_id in detections_in_zone.tracker_id:
            tracker_id_to_zone_id.setdefault(tracker_id, zone_in_id)

            # Initialize vehicle path if not already present
            vehicle_paths.setdefault(tracker_id, {"in": None, "out": None,"in_time": None,"out_time": None})
            if vehicle_paths[tracker_id]["in"] is None:
                vehicle_paths[tracker_id]["in"] = zone_name
                if frame_idx is not None:
                   vehicle_paths[tracker_id]["in_time"] = frame_idx / fps

    # --- Count exits grouped by entry zone and track vehicle OUT zone names ---
    zone_out_names = list(config["zones_out"].keys())
    for zone_out_id, detections_out_zone in enumerate(detections_out_zones):
        zone_name = zone_out_names[zone_out_id]
        for tracker_id in detections_out_zone.tracker_id:
            if tracker_id in tracker_id_to_zone_id:
                if vehicle_paths[tracker_id]["out"] is None:
                    vehicle_paths[tracker_id]["out"] = zone_name

                    if frame_idx is not None:
                        timestamp_sec = frame_idx / fps
                        vehicle_paths[tracker_id]["out_time"] = timestamp_sec
                tracker_id_to_exit_zone_id[tracker_id] = zone_out_id

    # --- Detect turns ---
    for tracker_id, path in vehicle_paths.items():
        in_zone = path["in"]
        out_zone = path["out"]
        in_time = path["in_time"]
        out_time = path["out_time"]
        if in_zone and out_zone and tracker_id not in vehicle_turns:
            turn_type = config["turn_mapping"].get((in_zone, out_zone))
            if turn_type:
                vehicle_turns[tracker_id] = {
                    "turn_type": turn_type,
                    "from_zone": in_zone,
                    "to_zone": out_zone,
                    "in_time": in_time,
                    "out_time": out_time
                }
                print(f"[TURN] Vehicle {tracker_id}: {in_zone} → {out_zone} → {turn_type},Time: {format_time(in_time)} → {format_time(out_time)}")

    # Assign class_id based on EXIT zone
    if len(detections_all) > 0:
        detections_all.class_id = np.vectorize(
            lambda x: tracker_id_to_exit_zone_id.get(x, tracker_id_to_zone_id.get(x, -1))
        )(detections_all.tracker_id)
    else:
        detections_all.class_id = np.array([], dtype=int)

    return detections_all[detections_all.class_id != -1]

# Zone Initialization and PolygonZone dictionary Utilities.

This code provides two utility functions used in the vehicle turn detection system:
1. **initiate_polygon_zones**:

    * Takes a list of zone names and corresponding polygon coordinates.
    * Initializes and returns a dictionary mapping each name to a PolygonZone object.


These functions are key to defining spatial zones for detecting vehicle movement and labeling them appropriately in the video annotation process.

In [None]:
def initiate_polygon_zones(
    names: List[str],
    polygons: List[np.ndarray],
    triggering_anchors: Iterable[sv.Position] = [sv.Position.CENTER],
) -> Dict[str, sv.PolygonZone]:
    return {
        name: sv.PolygonZone(polygon=polygon, triggering_anchors=triggering_anchors)
        for name, polygon in zip(names, polygons)
    }

# Setup Configuration for Vehicle Turn Detection Pipeline.

The **setup_video_processor** function initializes and returns a configuration dictionary containing all essential components and parameters needed to process a video for vehicle turn detection. Here's what it sets up:

1. Video Input/Output Paths:

      * **source_video_path**: Path to the input video.

      * **target_video_path**: Optional path to save the processed output.

2. Detection Parameters:

      * **confidence_threshold**: Minimum confidence level for YOLO model detections.

      * **iou_threshold**: IOU threshold used during object tracking.

3. Detection and Tracking Tools:

      * **model**: A fine-tuned YOLO model for vehicle detection.

      * **tracker**: ByteTrack tracker for maintaining vehicle identities.

4. Video Metadata:

      * **video_info**: Extracts frame rate, resolution, and frame count from the input video.

5. Zone Definitions:

      * **zones**: Get polygon areas from User to detect Zones for turn classification.

6. Annotation Tools:

      * **box_annotator**: Draws bounding boxes on detected vehicles.

      * **label_annotator**: Displays vehicle IDs.

      * **trace_annotator**: Adds trajectory traces to show vehicle movement paths.

7. Detection State Handler:

      * **detections_manager**: A function to manage turn state updates.

8. Zone Definitions (Get From User Polygons )

    * ZONE_POLYGONS: List of polygonal coordinates marking entry zones for vehicles.
    * Each polygon is an array of 2D points (x, y).
    * Generate Zone Names Using Polygon

This setup function centralizes the configuration, making it easy to pass all necessary components to the video processing pipeline.

In [None]:
def setup_video_processor(
    source_video_path: str,
    target_video_path: Optional[str] = None,
    zones: Dict[str, list] = {},
    turn_mapping: Optional[Dict[Tuple[str, str], str]] = None,
    confidence_threshold: float = 0.4,
    iou_threshold: float = 0.7,
) -> Dict[str, Any]:

    if not zones or not zones.get("entry") or not zones.get("exit"):
        raise ValueError("'zones' must contain both non-empty 'entry' and 'exit' lists.")

    # Convert polygon lists to NumPy arrays
    ZONE_IN_POLYGONS = [np.array(zone["points"], dtype=np.int32) for zone in zones["entry"]]
    ZONE_OUT_POLYGONS = [np.array(zone["points"], dtype=np.int32) for zone in zones["exit"]]

    ZONE_IN_NAMES = [zone['id'] for zone in zones["entry"]]
    ZONE_OUT_NAMES = [zone['id'] for zone in zones["exit"]]

    TURN_MAPPING = turn_mapping

    return {
        "conf_threshold": confidence_threshold,
        "iou_threshold": iou_threshold,
        "source_video_path": source_video_path,
        "target_video_path": target_video_path,
        "model": YOLO("/content/grootan_ai_task/models/YoloFineTunedV3.pt"),
        "tracker": sv.ByteTrack(),
        "video_info": sv.VideoInfo.from_video_path(source_video_path),
        "zones_in": initiate_polygon_zones(ZONE_IN_NAMES,ZONE_IN_POLYGONS),
        "zones_out": initiate_polygon_zones(ZONE_OUT_NAMES,ZONE_OUT_POLYGONS),
        "box_annotator": sv.BoxAnnotator(color=COLORS),
        "label_annotator": sv.LabelAnnotator(color=COLORS, text_color=sv.Color.BLACK),
        "trace_annotator": sv.TraceAnnotator(
            color=COLORS, position=sv.Position.CENTER, trace_length=100, thickness=2
        ),
        "detections_manager": update_detections_state,
        "turn_mapping": TURN_MAPPING
    }

# Analyze and Visualize Vehicle Turn Statistics

The **analyze_turns** function evaluates vehicle turn data to generate a summary of turn behavior and visual insights. Here's what it does:

1. Summary Computation:
    *  Counts total tracked vehicles.
    *  Computes how many made right turns, left turns, U-turns, or went straight.

2. Console Output:
    *   Prints a summary report of the turn counts to the terminal.

3. Data Packaging:
    *   Creates a JSON-style dictionary (**turn_message**) containing:
        *   A message,
        *   Overall turn statistics (**turn_counts**),
        *   Individual vehicle turn details by tracker ID (**turn_details**).


4. Visualization:

      *   Plots a bar chart using **matplotlib** to visually represent the count of each turn type.
      *   Saves the chart as **turn_analysis.png** for later use (e.g., appending to video).

5. Return:
      *   Outputs the structured **turn_message**, suitable for downstream use in reports or QA systems.

This function bridges raw detection data with user-friendly output, supporting both analysis and visualization of vehicle behavior.


In [None]:
def analyze_turns(vehicle_turns):
    """Analyze the turns and create summary statistics"""
    total_vehicles = len(vehicle_turns)
    if total_vehicles == 0:
        raise ValueError("No vehicles were detected or tracked.")
    # Count the different types of turns
    right_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "right_turn" )
    left_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "left_turn" )
    u_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "u_turn" )
    no_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "straight")

    print("\n--- Turn Analysis Results ---")
    print(f"Total unique vehicles tracked: {total_vehicles}")
    print(f"Vehicles making right turns: {right_turns} ")
    print(f"Vehicles making left turns: {left_turns}")
    print(f"Vehicles making U-turns: {u_turns}")
    print(f"Vehicles with no detected turns: {no_turns}")


    # Create a visualization
    turn_counts = {
        'Right Turn': right_turns,
        'Left Turn': left_turns,
        'U-Turn': u_turns,
        'No Turn': no_turns
    }

    turn_message = {
    "message": "Turn Analysis Results completed.",
    "total_vehicles": total_vehicles,
    "turn_counts": {
        "Vehicles making right turns" : right_turns,
        "Vehicles making left turns": left_turns,
        "Vehicles making U-turns": u_turns,
        "Vehicles with no detected turns (Straight)": no_turns
    },
    "turn_details": [
            {
                "tracker_id": int(tracker_id),
                "turn": data["turn_type"],
                "from": data["from_zone"],
                "to": data["to_zone"],
                "in_time": data['in_time'],
                "out_time": data['out_time']
            }
            for tracker_id, data in vehicle_turns.items()
        ]
    }

    plt.figure(figsize=(10, 6))
    colors = ['red', 'green', 'black', 'blue']
    plt.bar(turn_counts.keys(), turn_counts.values(), color=colors)
    plt.title('Vehicle Turn Analysis')
    plt.ylabel('Number of Vehicles')
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    # Add count labels on top of each bar
    for i, (key, value) in enumerate(turn_counts.items()):
        plt.text(i, value + 0.3, str(value), ha='center')

    plt.savefig('turn_analysis.png')
    plt.show()
    plt.close()
    return turn_message

# Annotate Video Frames with Zone Information and Vehicle Turn Statistics


The **annotate_frame** function overlays comprehensive visual annotations on each video frame to aid in understanding vehicle movement and behavior. Here's what it does:

1. Draws zones on the frame using polygons and labels with distinct colors.

2. Generates labels for each detected vehicle using their tracker IDs (e.g., **"Car #12"**).

3. Applies multiple annotation layers:
    *   Trajectory traces via **trace_annotator**
    *   Bounding boxes via **box_annotator**
    *   Vehicle ID labels via **label_annotator**

4. Computes turn statistics from the global detections_state:
    *   Total vehicles tracked
    *   Counts of right turns, left turns, U-turns, and straight movements

5. Displays summary metrics visually on the frame, including:
    *   A detection count badge
    *   Fixed-position statistics on turn types, color-coded for clarity (e.g., red for right turns, green for left turns, etc.)

This function is central to making the video output interpretable by overlaying both spatial (zones) and behavioral (turn types) information for each detected vehicle.

In [None]:
def annotate_frame(frame: np.ndarray, detections: sv.Detections, config: Dict[str, Any]) -> np.ndarray:
    frame_ = frame.copy()

    # Draw zones
    for i, ((zin_name, zin), (zout_name, zout)) in enumerate(zip(config["zones_in"].items(), config["zones_out"].items())):
        in_color = sv.Color(b=0, g=255, r=0)
        out_color = sv.Color(b=0, g=0, r=255)
        zin_anchor = sv.get_polygon_center(zin.polygon)
        zout_anchor = sv.get_polygon_center(zout.polygon)
        frame_ = sv.draw_polygon(frame_, zin.polygon, in_color)
        frame_ = sv.draw_text(frame_, text=zin_name, text_anchor=zin_anchor, text_color=in_color)
        frame_ = sv.draw_polygon(frame_, zout.polygon, out_color)
        frame_ = sv.draw_text(frame_, text=zout_name, text_anchor=zout_anchor, text_color=out_color)

    vehicle_turns= detections_state["vehicle_turns"]

    labels = [f"Car #{id_}" for id_ in detections.tracker_id]
    frame_ = config["trace_annotator"].annotate(frame_, detections)
    frame_ = config["box_annotator"].annotate(frame_, detections)
    frame_ = config["label_annotator"].annotate(frame_, detections, labels)

    # Count the different types of turns
    total_vehicles = len(vehicle_turns)
    right_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "right_turn" )
    left_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "left_turn" )
    u_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "u_turn" )
    no_turns = sum(1 for turns in vehicle_turns.values() if turns["turn_type"] == "straight")

    # Add detection count info
    total_count = len(detections)
    frame_ = sv.draw_text(
        frame_,
        f"Detected: {total_count}",
        sv.Point(50, 50),
        background_color=sv.Color.from_hex("#FF7F50")
    )
    # Draw fixed turn statistics on the center-left of the frame
    start_x = 80
    start_y = 350
    line_spacing = 40
    text_color = sv.Color(r=255, g=255, b=255)

    # Line 1: Total vehicles tracked
    frame_ = sv.draw_text(
        frame_,
        text=f"Total vehicles tracked: {total_vehicles}",
        text_anchor=sv.Point(start_x + 30, start_y),
        background_color=sv.Color.from_hex("#DDDDDD"),
    )

    # Line 2: Right turns (Red)
    frame_ = sv.draw_text(
        frame_,
        text=f"Right turns: {right_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + line_spacing),
        background_color=sv.Color(r=255, g=0, b=0),
        text_color=text_color
    )

    # Line 3: Left turns (Green)
    frame_ = sv.draw_text(
        frame_,
        text=f"Left turns: {left_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 2 * line_spacing),
        background_color=sv.Color(r=0, g=255, b=0),

    )

    # Line 4: U-turns (Black)
    frame_ = sv.draw_text(
        frame_,
        text=f"U-turns: {u_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 3 * line_spacing),
        background_color=sv.Color(r=0, g=0, b=0),
        text_color=text_color
    )

    # Line 5: No turns (Blue)
    frame_ = sv.draw_text(
        frame_,
        text=f"No turns: {no_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 4 * line_spacing),
        background_color=sv.Color(r=0, g=0, b=255),
        text_color=text_color
    )


    return frame_

# Process and Annotate Video Frame for Vehicle Turn Detection

The function **process_frame** handles a single video frame in the vehicle turn detection pipeline. Here's a breakdown of its functionality:

1. Runs object detection on the input frame using a **YOLO model (from config["model"])**, with specified confidence and IoU thresholds.

2. Converts detection results into a standardized format (**sv.Detections**) and forces all detected class IDs to zero (indicating a single-class tracking scenario, like vehicles).

3. Updates object tracks using a tracking algorithm (**config["tracker"]**).

4. Checks zone: it filters detections currently inside these zones.

5. Filters detections further using a custom **detections_manager** function that processes zone-based transitions to determine vehicle turns.

6. Annotates the frame (e.g., drawing bounding boxes and turn labels) using the **annotate_frame** function.

In [None]:
def process_frame(frame: np.ndarray, frame_idx: int, config: Dict[str, Any]) -> np.ndarray:
    result = config["model"](frame, verbose=False, conf=config["conf_threshold"], iou=config["iou_threshold"])[0]
    detections = sv.Detections.from_ultralytics(result)
    detections.class_id = np.zeros(len(detections))
    detections = config["tracker"].update_with_detections(detections)

    detections_in_zones, detections_out_zones = [], []
    for zone_in, zone_out in zip(config["zones_in"].values(), config["zones_out"].values()):
        in_zone = detections[zone_in.trigger(detections)]
        out_zone = detections[zone_out.trigger(detections)]
        detections_in_zones.append(in_zone)
        detections_out_zones.append(out_zone)

    filtered = config["detections_manager"](detections, detections_in_zones, detections_out_zones, config, frame_idx)
    return annotate_frame(frame, filtered, config)

# Process and Annotate Video Frames for Vehicle Turn Detection

The **process_video** function reads a video frame-by-frame, processes each frame to detect and annotate vehicles, and outputs the results either to a video file or a live display window. Here's what it does:

1. Frame Extraction: Uses a frame generator to read frames from the source video.

2. Progress Tracking: Displays a live progress bar using the rich library to monitor video processing.

3. Annotation Pipeline:
    *   For each frame, it calls **process_frame**() to detect vehicles, determine turn behavior, and apply annotations.

4. Output Handling:

    *   If output path is specified: Saves the annotated video to disk using VideoSink.
    *   Otherwise: Displays annotated frames live using OpenCV (**cv2.imshow**).

5. Sample Frame Export: Saves a single annotated frame as an image (annotated_output.png) for preview or debugging.

6. Returns the dictionary of vehicle turn states (vehicle_turns) tracked during the video processing.

This function is the core executor of the turn detection pipeline, enabling both real-time display and file output of the analyzed results.

In [None]:
def process_video(config: Dict[str, Any]) -> None:
    frame_generator = sv.get_video_frames_generator(config["source_video_path"])
    total = config["video_info"].total_frames

    with Progress() as progress:
        task = progress.add_task("[green]Processing video...", total=total)
        if config["target_video_path"]:
            with sv.VideoSink(config["target_video_path"], config["video_info"]) as sink:
                saved_sample = False
                for frame_idx, frame in enumerate(frame_generator):
                    annotated = process_frame(frame, frame_idx, config)
                    sink.write_frame(annotated)
                    if not saved_sample:
                        cv2.imwrite("annotated_output.png", annotated)
                        saved_sample = True
                    progress.advance(task)
        else:
            for frame in frame_generator:
                annotated = process_frame(frame, config)
                cv2.imshow("Processed Video", annotated)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
                progress.advance(task)
            cv2.destroyAllWindows()

    return detections_state["vehicle_turns"]

# Append Turn Analysis Summary Chart to Video Output

The function add_final_summary_to_video enhances a processed video by appending a visual summary of vehicle turn statistics at the end. Here's what it does:

1. Analyzes turn data using the provided vehicle turn state.

2. Loads a bar chart image (turn_analysis.png) that visually represents turn statistics.

3. Reads and copies all frames from the original processed video.

4. Appends the chart image as static frames for 5 seconds at the end of the video.

5. Saves the new video with the summary chart to the specified output path.

6. Returns a structured JSON summary of the vehicle turn data.


In [None]:
def add_final_summary_to_video(video_path, vehicle_turns, output_path="final_output.mp4"):
    """Add a final summary frame to the end of the video"""

    # First analyze the turns
    vehicle_turn_json = analyze_turns(vehicle_turns)

    # Load the bar chart image
    chart_img = cv2.imread("turn_analysis.png")
    if chart_img is None:
        raise FileNotFoundError("turn_analysis.png not found.")

    # Read the original video
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Resize chart to match video resolution
    chart_img = cv2.resize(chart_img, (width, height))

    # Create the output video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') #mp4v h264
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Copy all frames from the original video
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    # Append chart image as 5 seconds of frames
    for _ in range(int(fps * 5)):
        out.write(chart_img)

    # Release resources
    cap.release()
    out.release()
    print(f"Final video with chart saved as '{output_path}'")

    return vehicle_turn_json

# Init Video Processing (Full Video-Based Vehicle Turn Detection and Summary Pipeline)

This function **run_full_vehicle_turn_pipeline** performs the complete pipeline for analyzing vehicle movements in a video. It:

1. Processes the input video using a configured video processor to detect and trace vehicle movements.

2. Analyzes vehicle turns (left, right, U-turn, straight) and records them.

3. Generates a summary chart of the turn statistics and appends it to the output video.

4. Returns a JSON summary of vehicle turn analytics for further use (e.g., visualization or question answering).


In [None]:
def run_full_vehicle_turn_pipeline(
    source_video_path: str,
    final_output_path: str = "final_output.mp4",
    zones: Dict[str, list] = {},
    turn_mappings: List[Dict[str, str]] = []
):
    """
    Runs the full pipeline: processes video, tracks turns, and appends summary.
    """
    if not zones or not zones.get("entry") or not zones.get("exit"):
        raise ValueError("'zones' must contain both non-empty 'entry' and 'exit' lists.")

    # Build the mapping dict
    def build_turn_mapping_dict(mapping_list):
        TURN_TYPE_MAPPING = {
            "left": "left_turn",
            "right": "right_turn",
            "straight": "straight",
            "u-turn": "u_turn"
        }

        return {
            (m["from_zone"], m["to_zone"]): TURN_TYPE_MAPPING.get(m["turn_type"], m["turn_type"])
            for m in mapping_list
        }

    mapping_dict = build_turn_mapping_dict(turn_mappings)

    # Step 1: Setup and process the video
    config = setup_video_processor(
        source_video_path=source_video_path,
        target_video_path="output_traced.mp4",
        zones=zones,
        turn_mapping=mapping_dict
    )
    vehicle_turns_state = process_video(config)

    # Step 2: Append summary chart to the traced video
    vehicle_turn_json = add_final_summary_to_video(
        video_path="output_traced.mp4",
        vehicle_turns=vehicle_turns_state,
        output_path=final_output_path
    )

    return vehicle_turn_json

# Vehicle Turn Detection Summary & AI-Powered Question Answering.
**convert_turn_stats_to_text(analysis_result)**:
Converts the vehicle turn detection results (a JSON dictionary) into a readable text summary, including:

1. Total vehicle count

2. Turn type counts (right, left, U-turn, straight)

3. Per-vehicle turn information.

**Use:**
This summary is later passed to a language model for answering questions.

**create_pipeline(text_data)**:
Creates a custom question-answering function qa_pipeline(question) that:

1. Takes a natural language question

2. Feeds it to Qwen along with the vehicle turn summary

3. Returns only the assistant's reply from the model output

**Purpose:**
This abstracts the model usage so the user can ask follow-up questions based on video analytics.

In [None]:
def convert_turn_stats_to_text(analysis_result):
    turn_counts = analysis_result.get("turn_counts", {})
    turn_details = analysis_result.get("turn_details", [])

    total = analysis_result.get("total_vehicles", 0)
    right = turn_counts.get("Vehicles making right turns", 0)
    left = turn_counts.get("Vehicles making left turns", 0)
    u_turn = turn_counts.get("Vehicles making U-turns", 0)
    straight = turn_counts.get("Vehicles with no detected turns (Straight)", 0)

    summary_text = (
    f"A total of {total} cars were tracked during the analysis. "
    f"Among them, {right} made right turns, {left} made left turns, "
    f"{u_turn} performed U-turns (also referred to as 'uturns' or 'reverse turns'), and {straight} continued straight without making any turns. (also referred to as 'no turns')"
    )

    if turn_details:
        detail_sentences = detail_sentences = [
            f"Vehicle ID {item['tracker_id']} (also referred to as 'car' or 'car with tracker ID') "
            f"made a {item['turn'].replace('_', ' ').lower()} from {item['from']} at (Time: {format_time(item['in_time'])}) "
            f"to {item['to']} at (Time: {format_time(item['out_time'])})."
            for item in turn_details
        ]
        details_text = "\n".join(detail_sentences)
        return f"{summary_text}{details_text}"
    else:
        return f"{summary_text}. No individual vehicle turn details were recorded."



# Load Qwen model and tokenizer (only once globally)
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")
generation_pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)



def create_pipeline(text_data):
    """
    Create a simple function to handle QA using Qwen with the full text_data
    """
    def qa_pipeline(question):
        text = f"""
        You are an expert in analyzing traffic video data, specifically vehicle turn behavior.

        Answer the question as thoroughly as possible using only the provided context. If the answer is not present in the context, respond with: "Answer is not available in the context." Do not provide fabricated or assumed information.

        Context:
        {text_data}

        Based on the above context, answer the following question clearly and concisely:

        Question:
        {question}
        """

        messages = [
            {"role": "system", "content": "You are an expert in vehicle turn analysis. Respond clearly and accurately using only the provided context."},
            {"role": "user", "content": text},
        ]
        response = generation_pipe(messages, max_new_tokens=1000)[0]
        print(response)
        assistant_response = ""
        for msg in response['generated_text']:
            if msg.get("role") == "assistant":
                assistant_response = msg.get("content", "")
                break
        return assistant_response

    return qa_pipeline

# Vehicle Turn Detection with Interactive Zone Drawing Using Gradio
Interactive Vehicle Turn Detection with Zone-Based Video Analysis

**gradio** is used to create an interactive web UI for uploading a video, processing it, and asking questions.

**tempfile** is used to handle temporary storage of the uploaded video.

This Python application provides an interactive web interface to detect vehicle turns in traffic videos by allowing users to manually draw polygonal zones on the first video frame. Built using Gradio for the UI and OpenCV for video processing, the tool enables the following workflow:

1. Upload a traffic video in common formats like MP4.

2. Extract and display the first frame of the video for zone drawing.

3. Draw multiple polygonal zones In/Out(Z1, Z2, Z3, Z4, etc.) on the frame to define regions of interest in an intersection pattern.

4. Visualize the drawn zones with distinct colors and labels.

5. Analyze the video based on the defined zones to detect vehicle turns and movements.

6. View the processed video highlighting vehicle turn events.

7. Interact with the analysis by asking questions about vehicle turns through a natural language interface powered by a custom pipeline.


(**encode_to_browser_safe_mp4**): This function converts a video file to a browser-safe MP4 format using **ffmpeg**.To transcode a video (any format) into an MP4 file that's optimized for web playback in browsers (like Chrome, Firefox, Safari, etc.).



In [None]:
import gradio as gr
import tempfile
import subprocess
import cv2
import numpy as np
import json
from PIL import Image
import os

# Persistent state for document store and pipeline
global_turn_json = None
global_pipeline = None

class ZoneDrawer:
    def __init__(self):
        # Simplified structure for Step 1: Only direction, no turn types yet
        self.zones = []  # Each zone: {'id': 'Z1', 'direction': 'in', 'points': [[x,y],...]}
        self.turn_mappings = []  # Each mapping: {'from_zone': 'Z1', 'to_zone': 'Z2', 'turn_type': 'left'}
        self.zone_counter = 1
        self.current_frame = None
        self.current_direction = 'in'  # 'in' or 'out'
        self.video_path = None

    def process_video(self, video_file):
        """Extract first frame from uploaded video"""
        if video_file is None:
            return None, "Please upload a video file first."

        try:
            self.video_path = video_file
            cap = cv2.VideoCapture(video_file)
            ret, frame = cap.read()
            cap.release()

            if not ret:
                return None, "❌ Could not extract frame from video."

            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            self.current_frame = frame_rgb

            return frame_rgb, f"✅ Video loaded! Frame size: {frame_rgb.shape[1]}x{frame_rgb.shape[0]}. Draw IN and OUT zones."

        except Exception as e:
            return None, f"❌ Error processing video: {str(e)}"

    def set_zone_direction(self, direction):
        """Set current zone direction (in/out)"""
        self.current_direction = direction
        color = "🟢 GREEN" if direction == 'in' else "🔴 RED"
        return f"Drawing mode: {direction.upper()} zones ({color})"

    def process_drawing(self, image_data):
        """Process the drawn image and extract polygon points - Step 1: Only direction"""
        if image_data is None:
            return None, "No drawing data received."

        try:
            # Convert the drawing to numpy array
            if isinstance(image_data, dict) and 'layers' in image_data:
                drawing = image_data['layers'][0] if image_data['layers'] else image_data['background']
            else:
                drawing = image_data

            if isinstance(drawing, Image.Image):
                drawing_array = np.array(drawing)
            else:
                drawing_array = drawing

            points = self.extract_polygon_points(drawing_array)

            if len(points) < 3:
                return self.show_current_zones(), "⚠️ Please draw a polygon with at least 3 points."

            # Create new zone with only direction (no turn type yet)
            zone_id = f"Z{self.zone_counter}"
            new_zone = {
                'id': zone_id,
                'direction': self.current_direction,  # 'in' or 'out'
                'points': points
            }

            self.zones.append(new_zone)
            self.zone_counter += 1

            result_image = self.create_zone_visualization()

            total_zones = len(self.zones)
            in_count = len([z for z in self.zones if z['direction'] == 'in'])
            out_count = len([z for z in self.zones if z['direction'] == 'out'])

            return result_image, f"✅ Added {zone_id} ({self.current_direction.upper()})! Total: {total_zones} (In={in_count}, Out={out_count})"

        except Exception as e:
            return self.show_current_zones(), f"❌ Error processing drawing: {str(e)}"

    def extract_polygon_points(self, drawing_array):
        """Extract polygon points from drawn image"""
        if len(drawing_array.shape) == 3:
            gray = cv2.cvtColor(drawing_array, cv2.COLOR_RGB2GRAY)
        else:
            gray = drawing_array

        contours, _ = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        if not contours:
            return []

        largest_contour = max(contours, key=cv2.contourArea)
        epsilon = 0.02 * cv2.arcLength(largest_contour, True)
        approx = cv2.approxPolyDP(largest_contour, epsilon, True)
        points = [[int(point[0][0]), int(point[0][1])] for point in approx]

        return points

    def create_zone_visualization(self):
        """Create visualization with all zones - simplified for Step 1"""
        if self.current_frame is None:
            return None

        vis_frame = self.current_frame.copy()

        # Draw all zones (simplified - no turn types shown yet)
        for zone in self.zones:
            direction = zone['direction']
            zone_points = zone['points']
            zone_id = zone['id']

            if len(zone_points) >= 3:
                pts = np.array(zone_points, np.int32)

                # Color based on direction: Green for 'in', Red for 'out'
                color = (0, 255, 0) if direction == 'in' else (255, 0, 0)  # Green or Red

                # Draw filled polygon with transparency
                overlay = vis_frame.copy()
                cv2.fillPoly(overlay, [pts], color)
                cv2.addWeighted(vis_frame, 0.7, overlay, 0.3, 0, vis_frame)

                # Draw border
                cv2.polylines(vis_frame, [pts], isClosed=True, color=color, thickness=3)

                # Add simple zone label (no turn type yet)
                center = np.mean(pts, axis=0).astype(int)
                direction_abbr = "IN" if direction == 'in' else "OUT"
                zone_text = f"{zone_id}-{direction_abbr}"

                # Draw background and text
                font_scale = 0.7
                thickness = 2
                (text_width, text_height), baseline = cv2.getTextSize(zone_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)

                text_x = center[0] - text_width // 2
                text_y = center[1] + text_height // 2

                cv2.rectangle(vis_frame,
                            (text_x - 5, text_y - text_height - 5),
                            (text_x + text_width + 5, text_y + 5),
                            (0, 0, 0), -1)

                cv2.putText(vis_frame, zone_text, (text_x, text_y),
                          cv2.FONT_HERSHEY_SIMPLEX, font_scale, (255, 255, 255), thickness)

        # Draw turn mappings as arrows (with turn type labels)
        for mapping in self.turn_mappings:
            from_zone = next((z for z in self.zones if z['id'] == mapping['from_zone']), None)
            to_zone = next((z for z in self.zones if z['id'] == mapping['to_zone']), None)

            if from_zone and to_zone:
                from_center = np.mean(np.array(from_zone['points']), axis=0).astype(int)
                to_center = np.mean(np.array(to_zone['points']), axis=0).astype(int)

                # Draw arrow between zones
                cv2.arrowedLine(vis_frame, tuple(from_center), tuple(to_center),
                              (255, 255, 0), 4, tipLength=0.1)  # Yellow arrows

                # Add turn type label on the arrow
                arrow_center = ((from_center[0] + to_center[0]) // 2, (from_center[1] + to_center[1]) // 2)
                turn_text = mapping['turn_type']

                cv2.circle(vis_frame, arrow_center, 15, (255, 255, 0), -1)
                cv2.putText(vis_frame, turn_text, (arrow_center[0] - 8, arrow_center[1] + 5),
                          cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0, 0, 0), 1)

        return vis_frame

    def show_current_zones(self):
        """Show current zones without adding new ones"""
        return self.create_zone_visualization()

    def create_turn_mapping(self, from_zone_id, to_zone_id, turn_type):
        """Create a turn mapping between two zones with specified turn type"""
        if not from_zone_id or not to_zone_id:
            return self.show_current_zones(), "⚠️ Please select both FROM and TO zones."

        if from_zone_id == to_zone_id:
            return self.show_current_zones(), "⚠️ FROM and TO zones must be different."

        if not turn_type:
            return self.show_current_zones(), "⚠️ Please select a turn type."

        # Check if zones exist
        from_zone = next((z for z in self.zones if z['id'] == from_zone_id), None)
        to_zone = next((z for z in self.zones if z['id'] == to_zone_id), None)

        if not from_zone or not to_zone:
            return self.show_current_zones(), "⚠️ Selected zones not found."

        # Check if mapping already exists
        existing = next((m for m in self.turn_mappings
                        if m['from_zone'] == from_zone_id and m['to_zone'] == to_zone_id), None)

        if existing:
            return self.show_current_zones(), f"⚠️ Mapping {from_zone_id} → {to_zone_id} already exists."

        # Create mapping with user-specified turn type
        new_mapping = {
            'from_zone': from_zone_id,
            'to_zone': to_zone_id,
            'turn_type': turn_type
        }

        self.turn_mappings.append(new_mapping)

        result_image = self.create_zone_visualization()

        icons = {"left": "⬅️", "right": "➡️", "straight": "⬆️", "u-turn": "🔄"}
        icon = icons.get(turn_type, "📍")

        return result_image, f"✅ Added mapping: {from_zone_id} → {to_zone_id} ({turn_type} {icon}). Total mappings: {len(self.turn_mappings)}"

    def clear_last_zone(self):
        """Remove the most recently drawn zone"""
        if self.zones:
            removed_zone = self.zones.pop()
            # Remove any mappings involving this zone
            self.turn_mappings = [m for m in self.turn_mappings
                                if m['from_zone'] != removed_zone['id'] and m['to_zone'] != removed_zone['id']]
            result_image = self.create_zone_visualization()
            return result_image, f"🗑️ Removed {removed_zone['id']} and related mappings. Remaining: {len(self.zones)} zones"
        else:
            result_image = self.show_current_zones()
            return result_image, f"⚠️ No zones to remove."

    def clear_all_zones(self):
        """Clear all zones and mappings"""
        total_zones = len(self.zones)
        total_mappings = len(self.turn_mappings)
        self.zones = []
        self.turn_mappings = []
        self.zone_counter = 1
        result_image = self.show_current_zones()
        return result_image, f"🔄 Cleared {total_zones} zones and {total_mappings} mappings."

    def clear_last_mapping(self):
        """Remove the most recently created mapping"""
        if self.turn_mappings:
            removed_mapping = self.turn_mappings.pop()
            result_image = self.create_zone_visualization()
            return result_image, f"🗑️ Removed mapping: {removed_mapping['from_zone']} → {removed_mapping['to_zone']}. Remaining: {len(self.turn_mappings)} mappings"
        else:
            result_image = self.show_current_zones()
            return result_image, f"⚠️ No mappings to remove."

    def get_zone_choices(self):
        """Get list of zone IDs for dropdowns"""
        return [zone['id'] for zone in self.zones]

    def get_in_zone_choices(self):
        """Get list of IN zone IDs for FROM dropdown"""
        return [zone['id'] for zone in self.zones if zone['direction'] == 'in']

    def get_out_zone_choices(self):
        """Get list of OUT zone IDs for TO dropdown"""
        return [zone['id'] for zone in self.zones if zone['direction'] == 'out']

    def get_zones_as_named_dict(self):
        entry = [z for z in self.zones if z["direction"] == "in"]
        exit = [z for z in self.zones if z["direction"] == "out"]
        return {"entry": entry, "exit": exit}

    def get_turn_mappings(self):
        return self.turn_mappings

    def get_zone_info_without_mappings(self):
        return self.get_zone_info(show_mappings=False)

    def get_zone_info_with_mappings(self):
        return self.get_zone_info(show_mappings=True)

    def get_zone_info(self,show_mappings=False):
        """Get current zone and mapping information"""
        if not self.zones:
            return "Upload a video and draw some zones to see information here."

        # Count by direction
        in_count = len([z for z in self.zones if z['direction'] == 'in'])
        out_count = len([z for z in self.zones if z['direction'] == 'out'])

        # Create summary
        info = f"📊 **Zone Summary:**\n\n"
        info += f"🎯 **Total Zones:** {len(self.zones)}\n"
        info += f"🟢 **IN Zones:** {in_count}\n"
        info += f"🔴 **OUT Zones:** {out_count}\n\n"

        info += f"📋 **Zone Details:**\n"
        for zone in self.zones:
            dir_color = "🟢" if zone['direction'] == 'in' else "🔴"
            info += f"{dir_color} **{zone['id']}** - {zone['direction'].upper()}\n"

        if show_mappings:
            info += f"\n🔗 **Turn Mappings ({len(self.turn_mappings)}):**\n"
            if self.turn_mappings:
                for mapping in self.turn_mappings:
                    info += f"➡️ **{mapping['from_zone']} → {mapping['to_zone']}** ({mapping['turn_type']})\n"
            else:
                info += "No mappings created yet. Go to Step 2 to create turn mappings.\n"

        if show_mappings:
            ready = len(self.zones) > 0 and len(self.turn_mappings) > 0
            info += f"\n🎯 **Ready for Analysis:** {'✅ Yes' if ready else '❌ Need zones and mappings'}"

        return info


    def get_zones_for_pipeline(self):
        """Get zones and mappings for enhanced pipeline analysis"""
        return {
            'zones': self.zones,
            'mappings': self.turn_mappings,
            'legacy_zones': self._convert_to_legacy_format()
        }

    def _convert_to_legacy_format(self):
        """Convert new format to legacy format for backward compatibility"""
        legacy_zones = {'entry': [], 'exit': []}

        for zone in self.zones:
            if zone['direction'] == 'in':
                legacy_zones['entry'].append(zone['points'])
            else:
                legacy_zones['exit'].append(zone['points'])

        return legacy_zones

# Initialize the enhanced zone drawer
zone_drawer = ZoneDrawer()

def analyze_video_with_zones(video_file_path):
    """Analyze video using enhanced zones and mappings"""
    global global_pipeline, global_turn_json

    if not video_file_path:
        return None, "Please upload a video file."

    # Check if zones and mappings are created
    enhanced_data = zone_drawer.get_zones_for_pipeline()

    if not enhanced_data['zones']:
        return None, "❌ Please draw some zones before analyzing."

    if not enhanced_data['mappings']:
        return None, "❌ Please create turn mappings before analyzing."

    try:
        # Create temporary file
        with open(video_file_path, "rb") as source_file:
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_file:
                tmp_file.write(source_file.read())
                tmp_video_path = tmp_file.name

        raw_output_path = "raw_output.mp4"
        browser_safe_path = "final_output_with_summary.mp4"

        # Save enhanced zone data
        zones_file = "zones.json"
        with open(zones_file, 'w') as f:
            json.dump(enhanced_data, f, indent=2)

        zones_dict = zone_drawer.get_zones_as_named_dict()
        turns = zone_drawer.get_turn_mappings()
        # Run enhanced pipeline (you would need to modify your existing pipeline)
        global_turn_json = run_full_vehicle_turn_pipeline(
            source_video_path=tmp_video_path,
            final_output_path=raw_output_path,
            zones=zones_dict,
            turn_mappings=turns
        )

        # Re-encode video
        encode_to_browser_safe_mp4(raw_output_path, browser_safe_path)

        # Create enhanced document store
        text_data = convert_turn_stats_to_text(global_turn_json, enhanced_data)
        global_pipeline = create_pipeline(text_data)

        # Clean up
        if os.path.exists(zones_file):
            os.remove(zones_file)
        if os.path.exists(tmp_video_path):
            os.remove(tmp_video_path)

        # Create summary
        zone_count = len(enhanced_data['zones'])
        mapping_count = len(enhanced_data['mappings'])

        return browser_safe_path, f"✅ Video analyzed with {zone_count} zones and {mapping_count} turn mappings! Enhanced analysis ready.", "/content/turn_analysis.png"

    except Exception as e:
        return None, f"❌ Error during analysis: {str(e)}", None

def answer_question(user_question):
    """Answer questions about the analyzed video"""
    if not global_pipeline or not global_turn_json:
        return "Please analyze a video first."
    return global_pipeline(user_question)

def encode_to_browser_safe_mp4(input_path: str, output_path: str):
    """Convert video to browser-safe format"""
    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-vcodec", "libx264", "-preset", "ultrafast",
        "-acodec", "aac", "-movflags", "+faststart",
        output_path
    ]
    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError:
        print("Error: ffmpeg failed to convert video to browser-safe format.")


def update_zone_dropdowns():
    """Update dropdown choices when zones change"""
    in_choices = zone_drawer.get_in_zone_choices()
    out_choices = zone_drawer.get_out_zone_choices()
    return gr.Dropdown(choices=in_choices), gr.Dropdown(choices=out_choices)

# Create Enhanced Gradio interface
def create_interface():
    with gr.Blocks(title="🚗 Vehicle Turn Detection with Zone Mapping", theme=gr.themes.Soft()) as interface:
        gr.HTML("""
        <h1 style="text-align: center;">🚗 Vehicle Turn Detection with Zone Mapping</h1>
        <p style="text-align: center;">Step 1: Draw IN/OUT zones → Step 2: Create turn mappings → Step 3: Analyze flows → Step 4: Ask questions</p>
        """)

        with gr.Tab("🎨 Step 1: Draw IN/OUT Zones"):
            with gr.Row():
                with gr.Column(scale=2):
                    # Video upload
                    video_input = gr.File(
                        label="📹 Upload Video",
                        file_types=[".mp4", ".avi", ".mov"],
                        type="filepath"
                    )

                    # Frame display and drawing area
                    frame_display = gr.ImageEditor(
                        label="🎨 Draw Zones (Click and drag to draw polygons)",
                        type="pil",
                        brush=gr.Brush(default_size=12),
                        height=600
                    )

                    # Simplified control panel - only direction
                    with gr.Row():
                        zone_direction = gr.Radio(
                            choices=["in", "out"],
                            value="in",
                            label="🎯 Zone Direction",
                            info="IN (Green) for vehicle entry, OUT (Red) for vehicle exit"
                        )

                        direction_status = gr.Textbox(
                            label="📍 Current Mode",
                            value="Drawing mode: IN zones (🟢 GREEN)",
                            interactive=False
                        )

                    # Zone control buttons
                    with gr.Row():
                        save_zone_btn = gr.Button("✅ Save Zone", variant="primary", size="lg")
                        clear_last_btn = gr.Button("🗑️ Remove Last Zone")
                        clear_all_btn = gr.Button("🔄 Reset All")

                    # Zone status
                    zone_status = gr.Textbox(
                        label="📢 Zone Status",
                        interactive=False,
                        max_lines=2
                    )

                with gr.Column(scale=1):
                    # Zone information
                    zone_info = gr.Markdown(
                        value="Upload a video to start drawing zones.",
                        label="📊 Zone Information"
                    )

                    # Instructions
                    gr.Markdown("""
                    ### 📖 Step 1 Instructions:

                    1. **Upload** a video file
                    2. **Select** zone direction (IN/OUT only)
                    3. **Draw** polygons on the frame
                    4. **Save** each zone after drawing
                    5. Go to **Step 2** to define turn mappings

                    ### 🎨 Simple Zone System:
                    - **IN Zones** 🟢: Where vehicles enter the intersection
                    - **OUT Zones** 🔴: Where vehicles exit the intersection
                    - Each zone gets an ID (Z1, Z2, Z3...)
                    - Turn types will be defined in Step 2

                    ### 🔄 **Recommended Layout:**

                    Draw zones at key entry and exit points:
                    - **IN zones**: At each road entrance to intersection
                    - **OUT zones**: At each road exit from intersection
                    """)

        with gr.Tab("🔗 Step 2: Create Turn Mappings"):
            with gr.Row():
                with gr.Column():
                    gr.Markdown("### Define Traffic Flow Patterns")
                    gr.Markdown("Connect IN zones to OUT zones and specify the turn type:")

                    with gr.Row():
                        from_zone = gr.Dropdown(
                            label="🟢 FROM Zone (IN)",
                            info="Select source IN zone",
                            interactive=True
                        )
                        to_zone = gr.Dropdown(
                            label="🔴 TO Zone (OUT)",
                            info="Select destination OUT zone",
                            interactive=True
                        )

                    # Turn type selector - NOW IN STEP 2
                    turn_type_mapping = gr.Radio(
                        choices=["left", "right", "straight", "u-turn"],
                        label="🚗 Turn Type",
                        info="What type of turn is this mapping?",
                        interactive=True
                    )

                    with gr.Row():
                        create_mapping_btn = gr.Button("🔗 Create Mapping", variant="primary")
                        clear_last_mapping_btn = gr.Button("🗑️ Remove Last Mapping")

                    mapping_status = gr.Textbox(
                        label="🔗 Mapping Status",
                        interactive=False
                    )

                    zone_info_step2 = gr.Markdown(
                        value="Draw zones and create mappings to see summary here.",
                        label="📊 Zone + Mapping Info"
                    )

                with gr.Column():
                    # Updated frame display with mappings
                    mapping_display = gr.Image(
                        label="🗺️ Zone Layout with Turn Mappings",
                        height=500
                    )

                    gr.Markdown("""
                    ### 🔗 **Step 2: Define Turn Types**

                    Now you can specify exactly what type of turn each flow represents:

                    **Turn Type Options:**
                    - ⬅️ **Left Turn**: Vehicle turns left
                    - ➡️ **Right Turn**: Vehicle turns right
                    - ⬆️ **Straight**: Vehicle goes straight through
                    - 🔄 **U-Turn**: Vehicle makes U-turn

                    **Example Mappings:**
                    - Z1 → Z4 (Straight)
                    - Z1 → Z5 (Left)
                    - Z2 → Z6 (Right)
                    - Z3 → Z4 (U-Turn)

                    **Yellow arrows** with turn type show your mappings.
                    """)

        with gr.Tab("🔍 Step 3: Analyze Video"):
            with gr.Row():
                with gr.Column():
                    analyze_btn = gr.Button("🚀 Analyze Video with Turn Mappings", variant="primary", size="lg")
                    analysis_status = gr.Textbox(label="📊 Analysis Status", interactive=False)
                    turn_analysis_image = gr.Image(label="🖼️ Turn Analysis Results")

                with gr.Column():
                    video_output = gr.Video(label="📹 Processed Video with Flow Analysis")

        with gr.Tab("❓ Step 4: Ask Questions"):
            gr.Markdown("### Ask detailed questions about traffic flows:")
            with gr.Row():
                with gr.Column():
                    question_input = gr.Textbox(
                        label="💬 Your Question",
                        placeholder="e.g., How many vehicles turned left from Z1 to Z5? Which turn type had the most traffic?",
                        lines=3
                    )
                with gr.Column():
                    answer_output = gr.Textbox(
                        label="🤖 Analysis Answer",
                        lines=6,
                        interactive=False
                    )

            gr.Markdown("""
            ### 💡 **Example Questions:**
            - "How many left turns were made from Z1 to Z5?"
            - "Which turn mapping had the highest traffic volume?"
            - "Compare straight-through vs turning traffic"
            - "What percentage of vehicles from Z1 went straight vs turned?"
            - "Show me all U-turn statistics"
            """)

        # Event handlers for Step 1 - zone drawing only
        video_input.change(
            fn=zone_drawer.process_video,
            inputs=[video_input],
            outputs=[frame_display, zone_status]
        ).then(
            fn=update_zone_dropdowns,
            outputs=[from_zone, to_zone]
        )

        zone_direction.change(
            fn=zone_drawer.set_zone_direction,
            inputs=[zone_direction],
            outputs=[direction_status]
        )

        # Event handlers for zone drawing
        save_zone_btn.click(
            fn=zone_drawer.process_drawing,
            inputs=[frame_display],
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info_without_mappings,
            outputs=[zone_info]
        ).then(
            fn=update_zone_dropdowns,
            outputs=[from_zone, to_zone]
        ).then(
            fn=zone_drawer.show_current_zones,
            outputs=[mapping_display]
        )

        clear_last_btn.click(
            fn=zone_drawer.clear_last_zone,
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info_without_mappings,
            outputs=[zone_info]
        ).then(
            fn=update_zone_dropdowns,
            outputs=[from_zone, to_zone]
        ).then(
            fn=zone_drawer.show_current_zones,
            outputs=[mapping_display]
        )

        clear_all_btn.click(
            fn=zone_drawer.clear_all_zones,
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info_without_mappings,
            outputs=[zone_info]
        ).then(
            fn=update_zone_dropdowns,
            outputs=[from_zone, to_zone]
        ).then(
            fn=zone_drawer.show_current_zones,
            outputs=[mapping_display]
        )

        # Event handlers for turn mappings
        create_mapping_btn.click(
            fn=zone_drawer.create_turn_mapping,
            inputs=[from_zone, to_zone, turn_type_mapping],
            outputs=[mapping_display, mapping_status]
        ).then(
            fn=zone_drawer.get_zone_info_with_mappings,
            outputs=[zone_info_step2]
        )

        clear_last_mapping_btn.click(
            fn=zone_drawer.clear_last_mapping,
            outputs=[mapping_display, mapping_status]
        ).then(
            fn=zone_drawer.get_zone_info_with_mappings,
            outputs=[zone_info_step2]
        )

        # Update mapping display when zones change
        from_zone.change(
            fn=zone_drawer.show_current_zones,
            outputs=[mapping_display]
        )

        to_zone.change(
            fn=zone_drawer.show_current_zones,
            outputs=[mapping_display]
        )

        # Event handler for enhanced video analysis
        analyze_btn.click(
            fn=lambda: analyze_video_with_zones(zone_drawer.video_path),
            outputs=[video_output, analysis_status, turn_analysis_image]
        )

        # Event handler for Q&A
        question_input.submit(
            fn=answer_question,
            inputs=[question_input],
            outputs=[answer_output]
        )

        question_input.change(
            fn=answer_question,
            inputs=[question_input],
            outputs=[answer_output]
        )

    return interface


if __name__ == "__main__":
    interface = create_interface()
    interface.launch(
        share=True,
        debug=True,
        show_error=True
    )

In [None]:
# zones = [
#     {
#       "id": "Z1",
#       "direction": "in",
#       "points": [
#         [
#           1167,
#           379
#         ],
#         [
#           1496,
#           585
#         ],
#         [
#           1598,
#           418
#         ],
#         [
#           1337,
#           232
#         ]
#       ]
#     },
#     {
#       "id": "Z2",
#       "direction": "in",
#       "points": [
#         [
#           1330,
#           822
#         ],
#         [
#           1231,
#           749
#         ],
#         [
#           1035,
#           968
#         ],
#         [
#           1017,
#           1031
#         ],
#         [
#           1107,
#           1045
#         ]
#       ]
#     },
#     {
#       "id": "Z3",
#       "direction": "in",
#       "points": [
#         [
#           1064,
#           118
#         ],
#         [
#           921,
#           54
#         ],
#         [
#           667,
#           265
#         ],
#         [
#           790,
#           389
#         ]
#       ]
#     },
#     {
#       "id": "Z4",
#       "direction": "in",
#       "points": [
#         [
#           548,
#           586
#         ],
#         [
#           504,
#           689
#         ],
#         [
#           715,
#           882
#         ],
#         [
#           837,
#           791
#         ],
#         [
#           788,
#           732
#         ]
#       ]
#     },
#     {
#       "id": "Z5",
#       "direction": "out",
#       "points": [
#         [
#           668,
#           266
#         ],
#         [
#           450,
#           414
#         ],
#         [
#           528,
#           569
#         ],
#         [
#           779,
#           393
#         ]
#       ]
#     },
#     {
#       "id": "Z6",
#       "direction": "out",
#       "points": [
#         [
#           1157,
#           28
#         ],
#         [
#           1089,
#           83
#         ],
#         [
#           1087,
#           131
#         ],
#         [
#           1174,
#           362
#         ],
#         [
#           1331,
#           209
#         ]
#       ]
#     },
#     {
#       "id": "Z7",
#       "direction": "out",
#       "points": [
#         [
#           1597,
#           724
#         ],
#         [
#           1522,
#           626
#         ],
#         [
#           1285,
#           774
#         ],
#         [
#           1402,
#           870
#         ]
#       ]
#     },
#     {
#       "id": "Z8",
#       "direction": "out",
#       "points": [
#         [
#           700,
#           880
#         ],
#         [
#           838,
#           1046
#         ],
#         [
#           993,
#           1013
#         ],
#         [
#           808,
#           800
#         ]
#       ]
#     }
#   ]

# mappings = [
#     {
#       "from_zone": "Z1",
#       "to_zone": "Z8",
#       "turn_type": "straight"
#     },
#     {
#       "from_zone": "Z1",
#       "to_zone": "Z5",
#       "turn_type": "right"
#     },
#     {
#       "from_zone": "Z1",
#       "to_zone": "Z7",
#       "turn_type": "left"
#     },
#     {
#       "from_zone": "Z1",
#       "to_zone": "Z6",
#       "turn_type": "u-turn"
#     },
#     {
#       "from_zone": "Z2",
#       "to_zone": "Z6",
#       "turn_type": "right"
#     },
#     {
#       "from_zone": "Z2",
#       "to_zone": "Z5",
#       "turn_type": "straight"
#     },
#     {
#       "from_zone": "Z2",
#       "to_zone": "Z8",
#       "turn_type": "left"
#     },
#     {
#       "from_zone": "Z2",
#       "to_zone": "Z7",
#       "turn_type": "u-turn"
#     },
#     {
#       "from_zone": "Z3",
#       "to_zone": "Z7",
#       "turn_type": "straight"
#     },
#     {
#       "from_zone": "Z3",
#       "to_zone": "Z8",
#       "turn_type": "right"
#     },
#     {
#       "from_zone": "Z3",
#       "to_zone": "Z6",
#       "turn_type": "left"
#     },
#     {
#       "from_zone": "Z3",
#       "to_zone": "Z5",
#       "turn_type": "u-turn"
#     },
#     {
#       "from_zone": "Z4",
#       "to_zone": "Z5",
#       "turn_type": "left"
#     },
#     {
#       "from_zone": "Z4",
#       "to_zone": "Z6",
#       "turn_type": "straight"
#     },
#     {
#       "from_zone": "Z4",
#       "to_zone": "Z8",
#       "turn_type": "u-turn"
#     },
#     {
#       "from_zone": "Z4",
#       "to_zone": "Z7",
#       "turn_type": "right"
#     }
#   ]


# def get_zones_as_named_dict(zones):
#     entry = [z for z in zones if z["direction"] == "in"]
#     exit = [z for z in zones if z["direction"] == "out"]
#     return {"entry": entry, "exit": exit}

In [None]:
# run_full_vehicle_turn_pipeline(
#             source_video_path="/content/traffic_chaos.mp4",
#             final_output_path="outwithturns.mp4",
#             zones=get_zones_as_named_dict(zones),
#             turn_mappings=mappings
#         )