# Cloning the Fine Tuned Model From my GitHub


In [None]:
!git clone https://github.com/karthiKN-sk/grootan_ai_task.git

# Download the Library/Packages

In [None]:
!pip3 install opencv-python ultralytics supervision numpy matplotlib rich.progress transformers gradio pillow huggingface_hub

# Import Libraries

In [None]:
from ultralytics import YOLO
import supervision as sv
import cv2
import numpy as np
import matplotlib.pyplot as plt
from rich.progress import Progress
from typing import Dict, Iterable, List, Optional, Set, Any
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import os
from huggingface_hub import login

# Setting HF Token in Environment variable

In [None]:
token = None
token_file = "/content/grootan_ai_task/variables.py"

with open(token_file, "r") as f:
    for line in f:
        if line.startswith("HF_TOKEN="):
            token = line.strip().split("=", 1)[1].strip('"').strip("'")
            break

if token:
    os.environ["HF_TOKEN"] = token
    login(token=token)
else:
    raise ValueError("HF_TOKEN not found in variables.py")

# Configuration for Zone Definitions, Color Palette, and Vehicle Turn Classification.

This code snippet sets up essential constants and configurations for a vehicle tracking and turn analysis system, including color settings, polygonal zone definitions, naming schemes, turn mapping logic, and a utility function for turn-based color annotation.

1. Color Palette Definition
    * Defines a reusable color palette used for drawing zones, bounding boxes, and labels.

2. Turn Mapping Logic

      * Maps combinations of entry (In1,2,3,4) and exit (Out1,2,3,4) zones to specific turn types (right_turn, left_turn, u_turn, straight).

      * Used to classify vehicle movement patterns across zones.

      * **generate_turn_mapping**: Generates a flexible turn mapping for 1–3 way intersections.
        It assigns right_turn, left_turn, straight, or u_turn based on available exits for each entry zone.
        The logic is order-based and adapts automatically to the number of zones.
    
This setup provides a foundational configuration layer for visual annotation, spatial zone management, and logical turn analysis in a computer vision-based traffic monitoring system.

In [None]:
COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])

# Four way turn mapping
FOUR_WAY_TURN_MAPPING =  {
    # Entry from Z1
    ("Z1", "Z1"): "u_turn",
    ("Z1", "Z2"): "right_turn",
    ("Z1", "Z3"): "straight",
    ("Z1", "Z4"): "left_turn",

    # Entry from Z2
    ("Z2", "Z2"): "u_turn",
    ("Z2", "Z3"): "right_turn",
    ("Z2", "Z4"): "straight",
    ("Z2", "Z1"): "left_turn",

    # Entry from Z3
    ("Z3", "Z3"): "u_turn",
    ("Z3", "Z4"): "right_turn",
    ("Z3", "Z1"): "straight",
    ("Z3", "Z2"): "left_turn",

    # Entry from Z4
    ("Z4", "Z4"): "u_turn",
    ("Z4", "Z1"): "right_turn",
    ("Z4", "Z2"): "straight",
    ("Z4", "Z3"): "left_turn",
}

# Global Detection State Initialization for Vehicle Turn Tracking.

This snippet initializes a global dictionary named detections_state that is used to persist and manage tracking data across video frames in a vehicle monitoring pipeline.

Dictionary Keys:

* "**tracker_id_to_zone_id**":
   Maps each unique vehicle (tracker_id) to the zone ID it first appeared in.
   Used to group and identify vehicles during processing.

* "**vehicle_paths**":
Tracks both the entry (in) and exit (out) zones for each vehicle.
Format: **{tracker_id: {"in": zone_name, "out": zone_name}}**.

* "**vehicle_turns**":
Stores the classified type of turn for each vehicle, such as "left_turn", "right_turn", "u_turn", or "straight".

This stateful object is referenced and updated throughout the processing pipeline to enable accurate vehicle path tracking and turn classification across video frames.

In [None]:
detections_state = {
    "tracker_id_to_zone_id": {},
    "vehicle_paths": {},
    "vehicle_turns": {},
}

# Vehicle Entry-Exit Tracking and Turn Classification Logic

This function, update_detections_state, manages and updates the internal state used to track vehicle movement through predefined zones, and classifies the type of turn each vehicle makes (left, right, U-turn, or straight).

Key Responsibilities:

1. Track Entry Zones:

    * Maps each vehicle (via its tracker ID) to the in zone where it first appeared.

    * Ensures the entry point is recorded only once per vehicle.

2. Track Exit Zones:

    * Detects and records the out zone where the vehicle exits.

3. Turn Detection:

    * Uses predefined TURN_MAPPING to determine the turn type based on zone pairs (in → out).

    * Updates the vehicle_turns dictionary only when both zones are known.

4. Class ID Assignment:

    * Associates each detection with a zone ID for visual annotation by mapping tracker_id to its zone_id.

5. Filtering Valid Detections:

    * Returns only detections that have been successfully associated with zones (i.e., not class ID -1).

This function plays a central role in transforming low-level detection data into high-level vehicle movement understanding, essential for turn analysis in traffic videos.

In [None]:
def update_detections_state(
    detections_all: sv.Detections,
    detections_in_zones: List[sv.Detections],
    config: Dict[str, Any],
    state: Dict[str, Any] = detections_state
) -> sv.Detections:
    tracker_id_to_zone_id = state["tracker_id_to_zone_id"]
    vehicle_paths = state["vehicle_paths"]
    vehicle_turns = state.setdefault("vehicle_turns", {})


    # --- Assign entry zones and track vehicle IN zone names ---
    zone_in_names = list(config["zones"].keys())  
    for zone_in_id, detections_in_zone in enumerate(detections_in_zones):
        zone_name = zone_in_names[zone_in_id]
        for tracker_id in detections_in_zone.tracker_id:
            tracker_id_to_zone_id.setdefault(tracker_id, zone_in_id)

            # Initialize vehicle path if not already present
            vehicle_paths.setdefault(tracker_id, {"in": None, "out": None})
            if vehicle_paths[tracker_id]["in"] is None:
                vehicle_paths[tracker_id]["in"] = zone_name
                
            if tracker_id in tracker_id_to_zone_id:
                vehicle_paths.setdefault(tracker_id, {"in": None, "out": None})
                if vehicle_paths[tracker_id]["out"] is None and vehicle_paths[tracker_id]["in"] != zone_name:
                    vehicle_paths[tracker_id]["out"] = zone_name

    zone_centers = config["zone_centers"]
    # --- Detect turns ---
    for tracker_id, path in vehicle_paths.items():
        in_zone = path["in"]
        out_zone = path["out"]
        if in_zone and out_zone and tracker_id not in vehicle_turns:
            turn_type = config["turn_mapping"].get(in_zone, {}).get(out_zone)
            if turn_type:
                vehicle_turns[tracker_id] = turn_type

    # Assign class_id for drawing/annotation
    if len(detections_all) > 0:
        detections_all.class_id = np.vectorize(
            lambda x: tracker_id_to_zone_id.get(x, -1)
        )(detections_all.tracker_id)
    else:
        detections_all.class_id = np.array([], dtype=int)

    return detections_all[detections_all.class_id != -1]


# Zone Initialization and PolygonZone dictionary Utilities.

This code provides two utility functions used in the vehicle turn detection system:
1. **initiate_polygon_zones**:

    * Takes a list of zone names and corresponding polygon coordinates.
    * Initializes and returns a dictionary mapping each name to a PolygonZone object.


These functions are key to defining spatial zones for detecting vehicle movement and labeling them appropriately in the video annotation process.

In [None]:
def initiate_polygon_zones(
    names: List[str],
    polygons: List[np.ndarray],
    triggering_anchors: Iterable[sv.Position] = [sv.Position.CENTER],
) -> Dict[str, sv.PolygonZone]:
    return {
        name: sv.PolygonZone(polygon=polygon, triggering_anchors=triggering_anchors)
        for name, polygon in zip(names, polygons)
    }

# Setup Configuration for Vehicle Turn Detection Pipeline.

The **setup_video_processor** function initializes and returns a configuration dictionary containing all essential components and parameters needed to process a video for vehicle turn detection. Here's what it sets up:

1. Video Input/Output Paths:

      * **source_video_path**: Path to the input video.

      * **target_video_path**: Optional path to save the processed output.

2. Detection Parameters:

      * **confidence_threshold**: Minimum confidence level for YOLO model detections.

      * **iou_threshold**: IOU threshold used during object tracking.

3. Detection and Tracking Tools:

      * **model**: A fine-tuned YOLO model for vehicle detection.

      * **tracker**: ByteTrack tracker for maintaining vehicle identities.

4. Video Metadata:

      * **video_info**: Extracts frame rate, resolution, and frame count from the input video.

5. Zone Definitions:

      * **zones**: Get polygon areas from User to detect entry and exit for turn classification.

6. Annotation Tools:

      * **box_annotator**: Draws bounding boxes on detected vehicles.

      * **label_annotator**: Displays vehicle IDs.

      * **trace_annotator**: Adds trajectory traces to show vehicle movement paths.

7. Detection State Handler:

      * **detections_manager**: A function to manage turn state updates.

8. Zone Definitions (Get From User Polygons )

    * ZONE_IN_POLYGONS: List of polygonal coordinates marking entry zones for vehicles.

    * ZONE_OUT_POLYGONS: List of polygonal coordinates marking exit zones for vehicles.

    * Each polygon is an array of 2D points (x, y).
    * Generate Zone Names Using Polygon

This setup function centralizes the configuration, making it easy to pass all necessary components to the video processing pipeline.

In [None]:
def setup_video_processor(
    source_video_path: str,
    target_video_path: Optional[str] = None,
    zones: List[list] = [],
    confidence_threshold: float = 0.4,
    iou_threshold: float = 0.7,
) -> Dict[str, Any]:

    if not zones:
        raise ValueError("'zones' must contain at least one polygon.")

    # Convert polygon lists to NumPy arrays
    ZONE_POLYGONS = [np.array(polygon, dtype=np.int32) for polygon in zones]

    ZONE_NAMES = [f"Z{i+1}" for i in range(len(ZONE_POLYGONS))]

    TURN_MAPPING = FOUR_WAY_TURN_MAPPING

    return {
        "conf_threshold": confidence_threshold,
        "iou_threshold": iou_threshold,
        "source_video_path": source_video_path,
        "target_video_path": target_video_path,
        "model": YOLO("/content/grootan_ai_task/models/YoloFineTunedV1.pt"),
        "tracker": sv.ByteTrack(),
        "video_info": sv.VideoInfo.from_video_path(source_video_path),
        "zones": initiate_polygon_zones(ZONE_NAMES,ZONE_POLYGONS),
        "box_annotator": sv.BoxAnnotator(color=COLORS),
        "label_annotator": sv.LabelAnnotator(color=COLORS, text_color=sv.Color.BLACK),
        "trace_annotator": sv.TraceAnnotator(
            color=COLORS, position=sv.Position.CENTER, trace_length=100, thickness=2
        ),
        "detections_manager": update_detections_state,
        "turn_mapping": TURN_MAPPING
    }

# Analyze and Visualize Vehicle Turn Statistics

The **analyze_turns** function evaluates vehicle turn data to generate a summary of turn behavior and visual insights. Here's what it does:

1. Summary Computation:
    *  Counts total tracked vehicles.
    *  Computes how many made right turns, left turns, U-turns, or went straight.

2. Console Output:
    *   Prints a summary report of the turn counts to the terminal.

3. Data Packaging:
    *   Creates a JSON-style dictionary (**turn_message**) containing:
        *   A message,
        *   Overall turn statistics (**turn_counts**),
        *   Individual vehicle turn details by tracker ID (**turn_details**).


4. Visualization:

      *   Plots a bar chart using **matplotlib** to visually represent the count of each turn type.
      *   Saves the chart as **turn_analysis.png** for later use (e.g., appending to video).

5. Return:
      *   Outputs the structured **turn_message**, suitable for downstream use in reports or QA systems.

This function bridges raw detection data with user-friendly output, supporting both analysis and visualization of vehicle behavior.


In [None]:
def analyze_turns(vehicle_turns):
    """Analyze the turns and create summary statistics"""
    total_vehicles = len(vehicle_turns)
    if total_vehicles == 0:
        print("No vehicles were detected or tracked.")
        return
    # Count the different types of turns
    right_turns = sum(1 for turns in vehicle_turns.values() if turns == "right_turn" )
    left_turns = sum(1 for turns in vehicle_turns.values() if turns == "left_turn" )
    u_turns = sum(1 for turns in vehicle_turns.values() if turns == "u_turn" )
    no_turns = sum(1 for turns in vehicle_turns.values() if turns == "straight")

    print("\n--- Turn Analysis Results ---")
    print(f"Total unique vehicles tracked: {total_vehicles}")
    print(f"Vehicles making right turns: {right_turns} ")
    print(f"Vehicles making left turns: {left_turns}")
    print(f"Vehicles making U-turns: {u_turns}")
    print(f"Vehicles with no detected turns: {no_turns}")


    # Create a visualization
    turn_counts = {
        'Right Turn': right_turns,
        'Left Turn': left_turns,
        'U-Turn': u_turns,
        'No Turn': no_turns
    }

    turn_message = {
    "message": "Turn Analysis Results completed.",
    "total_vehicles": total_vehicles,
    "turn_counts": {
        "Vehicles making right turns" : right_turns,
        "Vehicles making left turns": left_turns,
        "Vehicles making U-turns": u_turns,
        "Vehicles with no detected turns (Straight)": no_turns
    },
     "turn_details": [
        {"tracker_id": tracker_id, "turn": turn}
        for tracker_id, turn in vehicle_turns.items()
     ]
    }

    plt.figure(figsize=(10, 6))
    colors = ['red', 'green', 'black', 'blue']
    plt.bar(turn_counts.keys(), turn_counts.values(), color=colors)
    plt.title('Vehicle Turn Analysis')
    plt.ylabel('Number of Vehicles')
    plt.grid(axis='y', linestyle='--', alpha=0.7)

    # Add count labels on top of each bar
    for i, (key, value) in enumerate(turn_counts.items()):
        plt.text(i, value + 0.3, str(value), ha='center')

    plt.savefig('turn_analysis.png')
    plt.show()
    plt.close()
    return turn_message

# Annotate Video Frames with Zone Information and Vehicle Turn Statistics


The **annotate_frame** function overlays comprehensive visual annotations on each video frame to aid in understanding vehicle movement and behavior. Here's what it does:

1. Draws zones on the frame using polygons and labels with distinct colors.

2. Generates labels for each detected vehicle using their tracker IDs (e.g., **"Car #12"**).

3. Applies multiple annotation layers:
    *   Trajectory traces via **trace_annotator**
    *   Bounding boxes via **box_annotator**
    *   Vehicle ID labels via **label_annotator**

4. Computes turn statistics from the global detections_state:
    *   Total vehicles tracked
    *   Counts of right turns, left turns, U-turns, and straight movements

5. Displays summary metrics visually on the frame, including:
    *   A detection count badge
    *   Fixed-position statistics on turn types, color-coded for clarity (e.g., red for right turns, green for left turns, etc.)

This function is central to making the video output interpretable by overlaying both spatial (zones) and behavioral (turn types) information for each detected vehicle.

In [None]:
def annotate_frame(frame: np.ndarray, detections: sv.Detections, config: Dict[str, Any]) -> np.ndarray:
    frame_ = frame.copy()

    # Zones Colors
    color_palette = COLORS.colors
    num_colors = len(color_palette)

    # Process entry zones
    for i, (zin_name, zin) in enumerate(config["zones"].items()):
        color = color_palette[i % num_colors]
        zin_anchor = sv.get_polygon_center(zin.polygon)
        frame_ = sv.draw_polygon(frame_, zin.polygon, color)
        frame_ = sv.draw_text(frame_, text=zin_name, text_anchor=zin_anchor, text_color=color)

    labels = [f"Car #{id_}" for id_ in detections.tracker_id]

    frame_ = config["trace_annotator"].annotate(frame_, detections)
    frame_ = config["box_annotator"].annotate(frame_, detections)
    frame_ = config["label_annotator"].annotate(frame_, detections, labels)

    # Count the different types of turns
    vehicle_turns= detections_state["vehicle_turns"]
    total_vehicles = len(vehicle_turns)
    right_turns = sum(1 for turns in vehicle_turns.values() if turns == "right_turn" )
    left_turns = sum(1 for turns in vehicle_turns.values() if turns == "left_turn" )
    u_turns = sum(1 for turns in vehicle_turns.values() if turns == "u_turn" )
    no_turns = sum(1 for turns in vehicle_turns.values() if turns == "straight")

    # Add detection count info
    total_count = len(detections)
    frame_ = sv.draw_text(
        frame_,
        f"Detected: {total_count}",
        sv.Point(50, 50),
        background_color=sv.Color.from_hex("#FF7F50")
    )
    # Draw fixed turn statistics on the center-left of the frame
    start_x = 80
    start_y = 350
    line_spacing = 40
    text_color = sv.Color(r=255, g=255, b=255)

    # Line 1: Total vehicles tracked
    frame_ = sv.draw_text(
        frame_,
        text=f"Total vehicles tracked: {total_vehicles}",
        text_anchor=sv.Point(start_x + 30, start_y),
        background_color=sv.Color.from_hex("#DDDDDD"),
    )

    # Line 2: Right turns (Red)
    frame_ = sv.draw_text(
        frame_,
        text=f"Right turns: {right_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + line_spacing),
        background_color=sv.Color(r=255, g=0, b=0),
        text_color=text_color
    )

    # Line 3: Left turns (Green)
    frame_ = sv.draw_text(
        frame_,
        text=f"Left turns: {left_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 2 * line_spacing),
        background_color=sv.Color(r=0, g=255, b=0),

    )

    # Line 4: U-turns (Black)
    frame_ = sv.draw_text(
        frame_,
        text=f"U-turns: {u_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 3 * line_spacing),
        background_color=sv.Color(r=0, g=0, b=0),
        text_color=text_color
    )

    # Line 5: No turns (Blue)
    frame_ = sv.draw_text(
        frame_,
        text=f"No turns: {no_turns}",
        text_anchor=sv.Point(start_x + 10, start_y + 4 * line_spacing),
        background_color=sv.Color(r=0, g=0, b=255),
        text_color=text_color
    )


    return frame_

# Process and Annotate Video Frame for Vehicle Turn Detection

The function **process_frame** handles a single video frame in the vehicle turn detection pipeline. Here's a breakdown of its functionality:

1. Runs object detection on the input frame using a **YOLO model (from config["model"])**, with specified confidence and IoU thresholds.

2. Converts detection results into a standardized format (**sv.Detections**) and forces all detected class IDs to zero (indicating a single-class tracking scenario, like vehicles).

3. Updates object tracks using a tracking algorithm (**config["tracker"]**).

4. Checks zone entry/exit: For each pair of entry (**zone_in**) and exit (**zone_out**) zones, it filters detections currently inside these zones.

5. Filters detections further using a custom **detections_manager** function that processes zone-based transitions to determine vehicle turns.

6. Annotates the frame (e.g., drawing bounding boxes and turn labels) using the **annotate_frame** function.

In [None]:
def process_frame(frame: np.ndarray, config: Dict[str, Any]) -> np.ndarray:
    result = config["model"](frame, verbose=False, conf=config["conf_threshold"], iou=config["iou_threshold"])[0]
    detections = sv.Detections.from_ultralytics(result)
    detections.class_id = np.zeros(len(detections))
    detections = config["tracker"].update_with_detections(detections)

    detections_in_zones = []
    for zone_in in config["zones"].values():
        in_zone = detections[zone_in.trigger(detections)]
        detections_in_zones.append(in_zone)

    filtered = config["detections_manager"](detections, detections_in_zones,config)
    return annotate_frame(frame, filtered, config)

# Process and Annotate Video Frames for Vehicle Turn Detection

The **process_video** function reads a video frame-by-frame, processes each frame to detect and annotate vehicles, and outputs the results either to a video file or a live display window. Here's what it does:

1. Frame Extraction: Uses a frame generator to read frames from the source video.

2. Progress Tracking: Displays a live progress bar using the rich library to monitor video processing.

3. Annotation Pipeline:
    *   For each frame, it calls **process_frame**() to detect vehicles, determine turn behavior, and apply annotations.

4. Output Handling:

    *   If output path is specified: Saves the annotated video to disk using VideoSink.
    *   Otherwise: Displays annotated frames live using OpenCV (**cv2.imshow**).

5. Sample Frame Export: Saves a single annotated frame as an image (annotated_output.png) for preview or debugging.

6. Returns the dictionary of vehicle turn states (vehicle_turns) tracked during the video processing.

This function is the core executor of the turn detection pipeline, enabling both real-time display and file output of the analyzed results.

In [None]:
def process_video(config: Dict[str, Any]) -> None:
    frame_generator = sv.get_video_frames_generator(config["source_video_path"])
    total = config["video_info"].total_frames

    with Progress() as progress:
        task = progress.add_task("[green]Processing video...", total=total)
        if config["target_video_path"]:
            with sv.VideoSink(config["target_video_path"], config["video_info"]) as sink:
                saved_sample = False
                for frame in frame_generator:
                    annotated = process_frame(frame, config)
                    sink.write_frame(annotated)
                    if not saved_sample:
                        cv2.imwrite("annotated_output.png", annotated)
                        saved_sample = True
                    progress.advance(task)
        else:
            for frame in frame_generator:
                annotated = process_frame(frame, config)
                cv2.imshow("Processed Video", annotated)
                if cv2.waitKey(1) & 0xFF == ord("q"):
                    break
                progress.advance(task)
            cv2.destroyAllWindows()

    return detections_state["vehicle_turns"]

# Append Turn Analysis Summary Chart to Video Output

The function add_final_summary_to_video enhances a processed video by appending a visual summary of vehicle turn statistics at the end. Here's what it does:

1. Analyzes turn data using the provided vehicle turn state.

2. Loads a bar chart image (turn_analysis.png) that visually represents turn statistics.

3. Reads and copies all frames from the original processed video.

4. Appends the chart image as static frames for 5 seconds at the end of the video.

5. Saves the new video with the summary chart to the specified output path.

6. Returns a structured JSON summary of the vehicle turn data.


In [None]:
def add_final_summary_to_video(video_path, vehicle_turns, output_path="final_output.mp4"):
    """Add a final summary frame to the end of the video"""

    # First analyze the turns
    vehicle_turn_json = analyze_turns(vehicle_turns)

    # Load the bar chart image
    chart_img = cv2.imread("turn_analysis.png")
    if chart_img is None:
        raise FileNotFoundError("turn_analysis.png not found.")

    # Read the original video
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Resize chart to match video resolution
    chart_img = cv2.resize(chart_img, (width, height))

    # Create the output video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') #mp4v h264
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Copy all frames from the original video
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        out.write(frame)

    # Append chart image as 5 seconds of frames
    for _ in range(int(fps * 5)):
        out.write(chart_img)

    # Release resources
    cap.release()
    out.release()
    print(f"Final video with chart saved as '{output_path}'")

    return vehicle_turn_json

# Init Video Processing (Full Video-Based Vehicle Turn Detection and Summary Pipeline)

This function **run_full_vehicle_turn_pipeline** performs the complete pipeline for analyzing vehicle movements in a video. It:

1. Processes the input video using a configured video processor to detect and trace vehicle movements.

2. Analyzes vehicle turns (left, right, U-turn, straight) and records them.

3. Generates a summary chart of the turn statistics and appends it to the output video.

4. Returns a JSON summary of vehicle turn analytics for further use (e.g., visualization or question answering).


In [None]:
def run_full_vehicle_turn_pipeline(
    source_video_path: str,
    final_output_path: str = "final_output.mp4",
    zones: List[list] = [],
):
    """
    Runs the full pipeline: processes video, tracks turns, and appends summary.
    """
    if not zones:
        raise ValueError("'zones' must contain at least one polygon.")

    # Step 1: Setup and process the video
    config = setup_video_processor(
        source_video_path=source_video_path,
        target_video_path="output_traced.mp4",
        zones=zones
    )
    vehicle_turns_state = process_video(config)

    # Step 2: Append summary chart to the traced video
    vehicle_turn_json = add_final_summary_to_video(
        video_path="output_traced.mp4",
        vehicle_turns=vehicle_turns_state,
        output_path=final_output_path
    )

    return vehicle_turn_json

# Vehicle Turn Detection Summary & AI-Powered Question Answering.
**convert_turn_stats_to_text(analysis_result)**:
Converts the vehicle turn detection results (a JSON dictionary) into a readable text summary, including:

1. Total vehicle count

2. Turn type counts (right, left, U-turn, straight)

3. Per-vehicle turn information.

**Use:**
This summary is later passed to a language model for answering questions.

**create_pipeline(text_data)**:
Creates a custom question-answering function qa_pipeline(question) that:

1. Takes a natural language question

2. Feeds it to Qwen along with the vehicle turn summary

3. Returns only the assistant's reply from the model output

**Purpose:**
This abstracts the model usage so the user can ask follow-up questions based on video analytics.

In [None]:
def convert_turn_stats_to_text(analysis_result):
    turn_counts = analysis_result.get("turn_counts", {})
    turn_details = analysis_result.get("turn_details", [])

    total = analysis_result.get("total_vehicles", 0)
    right = turn_counts.get("Vehicles making right turns", 0)
    left = turn_counts.get("Vehicles making left turns", 0)
    u_turn = turn_counts.get("Vehicles making U-turns", 0)
    straight = turn_counts.get("Vehicles with no detected turns (Straight)", 0)

    summary_text = (
    f"A total of {total} cars were tracked during the analysis. "
    f"Among them, {right} made right turns, {left} made left turns, "
    f"{u_turn} performed U-turns (also referred to as 'uturns' or 'reverse turns'), and {straight} continued straight without making any turns. (also referred to as 'no turns')"
    )

    if turn_details:
        detail_sentences = [
            f"Vehicle ID {item['tracker_id']} made a {item['turn'].replace('_', ' ').lower()}."
            for item in turn_details
        ]
        details_text = " ".join(detail_sentences)
        return f"{summary_text}{details_text}"
    else:
        return f"{summary_text}. No individual vehicle turn details were recorded."



# Load Qwen model and tokenizer (only once globally)
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-1.5B-Instruct")
generation_pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)



def create_pipeline(text_data):
    """
    Create a simple function to handle QA using Qwen with the full text_data
    """
    def qa_pipeline(question):
        text = f"""
        You are an expert in analyzing traffic video data, specifically vehicle turn behavior.

        Answer the question as thoroughly as possible using only the provided context. If the answer is not present in the context, respond with: "Answer is not available in the context." Do not provide fabricated or assumed information.

        Context:
        {text_data}

        Based on the above context, answer the following question clearly and concisely:

        Question:
        {question}
        """

        messages = [
            {"role": "system", "content": "You are an expert in vehicle turn analysis. Respond clearly and accurately using only the provided context."},
            {"role": "user", "content": text},
        ]
        response = generation_pipe(messages, max_new_tokens=1000)[0]
        print(response)
        assistant_response = ""
        for msg in response['generated_text']:
            if msg.get("role") == "assistant":
                assistant_response = msg.get("content", "")
                break
        return assistant_response

    return qa_pipeline


# Vehicle Turn Detection with Interactive Zone Drawing Using Gradio
Interactive Vehicle Turn Detection with Zone-Based Video Analysis

**gradio** is used to create an interactive web UI for uploading a video, processing it, and asking questions.

**tempfile** is used to handle temporary storage of the uploaded video.

This Python application provides an interactive web interface to detect vehicle turns in traffic videos by allowing users to manually draw polygonal zones on the first video frame. Built using Gradio for the UI and OpenCV for video processing, the tool enables the following workflow:

1. Upload a traffic video in common formats like MP4 or AVI.

2. Extract and display the first frame of the video for zone drawing.

3. Draw multiple polygonal zones (Z1, Z2, Z3, Z4, etc.) on the frame to define regions of interest in an intersection pattern.

4. Visualize the drawn zones with distinct colors and labels.

5. Analyze the video based on the defined zones to detect vehicle turns and movements.

6. View the processed video highlighting vehicle turn events.

7. Interact with the analysis by asking questions about vehicle turns through a natural language interface powered by a custom pipeline.


(**encode_to_browser_safe_mp4**): This function converts a video file to a browser-safe MP4 format using **ffmpeg**.To transcode a video (any format) into an MP4 file that's optimized for web playback in browsers (like Chrome, Firefox, Safari, etc.).



In [None]:
import gradio as gr
import tempfile
import subprocess
import cv2
import numpy as np
import json
from PIL import Image
import os

# Persistent state for document store and pipeline
global_pipeline = None
global_turn_json = None

class ZoneDrawer:
    def __init__(self):
        self.zones = []  # Simple list of zones
        self.current_frame = None
        self.video_path = None

    def process_video(self, video_file):
        """Extract first frame from uploaded video"""
        if video_file is None:
            return None, "Please upload a video file first."

        try:
            # Store video path for later use
            self.video_path = video_file

            # Read video and extract first frame
            cap = cv2.VideoCapture(video_file)
            ret, frame = cap.read()
            cap.release()

            if not ret:
                return None, "❌ Could not extract frame from video."

            # Convert BGR to RGB for display
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            self.current_frame = frame_rgb

            return frame_rgb, f"✅ Video loaded! Frame size: {frame_rgb.shape[1]}x{frame_rgb.shape[0]}. Draw zones then analyze."

        except Exception as e:
            return None, f"❌ Error processing video: {str(e)}"

    def process_drawing(self, image_data):
        """Process the drawn image and extract polygon points"""
        if image_data is None:
            return None, "No drawing data received."

        try:
            # Convert the drawing to numpy array
            if isinstance(image_data, dict) and 'layers' in image_data:
                drawing = image_data['layers'][0] if image_data['layers'] else image_data['background']
            else:
                drawing = image_data

            # Convert PIL Image to numpy array
            if isinstance(drawing, Image.Image):
                drawing_array = np.array(drawing)
            else:
                drawing_array = drawing

            # Extract polygon points from the drawing
            points = self.extract_polygon_points(drawing_array)

            if len(points) < 3:
                return self.show_current_zones(), "⚠️ Please draw a polygon with at least 3 points."

            # Add to zones list
            self.zones.append(points)

            # Create updated visualization
            result_image = self.create_zone_visualization()

            zone_count = len(self.zones)
            return result_image, f"✅ Added zone Z{zone_count}! Total zones: {zone_count}"

        except Exception as e:
            return self.show_current_zones(), f"❌ Error processing drawing: {str(e)}"

    def extract_polygon_points(self, drawing_array):
        """Extract polygon points from drawn image"""
        # Convert to grayscale for processing
        if len(drawing_array.shape) == 3:
            gray = cv2.cvtColor(drawing_array, cv2.COLOR_RGB2GRAY)
        else:
            gray = drawing_array

        # Find contours in the drawing
        contours, _ = cv2.findContours(gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        if not contours:
            return []

        # Get the largest contour
        largest_contour = max(contours, key=cv2.contourArea)

        # Approximate the contour to reduce points
        epsilon = 0.02 * cv2.arcLength(largest_contour, True)
        approx = cv2.approxPolyDP(largest_contour, epsilon, True)

        # Convert to list of [x, y] points
        points = [[int(point[0][0]), int(point[0][1])] for point in approx]

        return points

    def create_zone_visualization(self):
        """Create visualization with all zones"""
        if self.current_frame is None:
            return None

        # Create a copy of the frame
        vis_frame = self.current_frame.copy()

        # Draw all zones with different colors
        colors = [
            (0, 255, 0),    # Green
            (255, 0, 0),    # Red
            (0, 0, 255),    # Blue
            (255, 255, 0),  # Yellow
            (255, 0, 255),  # Magenta
            (0, 255, 255),  # Cyan
            (255, 165, 0),  # Orange
            (128, 0, 128),  # Purple
        ]

        for i, zone_points in enumerate(self.zones):
            if len(zone_points) >= 3:
                # Use different colors for different zones (cycle through colors)
                color = colors[i % len(colors)]

                # Convert points to numpy array
                pts = np.array(zone_points, np.int32)

                # Draw filled polygon with transparency
                overlay = vis_frame.copy()
                cv2.fillPoly(overlay, [pts], color)
                cv2.addWeighted(vis_frame, 0.7, overlay, 0.3, 0, vis_frame)

                # Draw border
                cv2.polylines(vis_frame, [pts], isClosed=True, color=color, thickness=3)

                # Add zone label (Z1, Z2, Z3, Z4)
                center = np.mean(pts, axis=0).astype(int)
                zone_label = f"Z{i+1}"
                cv2.putText(vis_frame, zone_label, (center[0]-15, center[1]),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
                cv2.putText(vis_frame, zone_label, (center[0]-15, center[1]),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 1)

        return vis_frame

    def show_current_zones(self):
        """Show current zones without adding new ones"""
        return self.create_zone_visualization()

    def clear_last_zone(self):
        """Clear the last drawn zone"""
        if self.zones:
            removed_zone = len(self.zones)
            self.zones.pop()
            result_image = self.create_zone_visualization()
            return result_image, f"🗑️ Removed zone Z{removed_zone}. Remaining zones: {len(self.zones)}"
        else:
            result_image = self.show_current_zones()
            return result_image, "⚠️ No zones to remove."

    def clear_all_zones(self):
        """Clear all zones"""
        total = len(self.zones)
        self.zones = []
        result_image = self.show_current_zones()
        return result_image, f"🔄 Cleared all {total} zones."

    def get_zone_info(self):
        """Get current zone information"""
        zone_count = len(self.zones)
        zone_labels = ', '.join([f"Z{i+1}" for i in range(zone_count)]) if zone_count > 0 else "None"

        info = (
            f"📊 **Zone Summary:**\n\n"
            f"📍 **Total Zones:** {zone_count}\n"
            f"🏷️ **Zone Labels:** {zone_labels}\n\n"
            f"📋 **Zone Coordinates:**\n"
            f"```json\n"
            f"{json.dumps(self.zones, indent=2)}\n"
            f"```\n\n"
            f"🎯 **Ready for Analysis:** {'✅ Yes' if zone_count > 0 else '❌ Draw zones first'}"
        )

        return info

    def get_zones_for_pipeline(self):
        """Get zones in format expected by pipeline"""
        return {'zones': self.zones}

# Initialize the zone drawer
zone_drawer = ZoneDrawer()

def analyze_video_with_zones(video_file_path):
    """Analyze video using drawn zones"""
    global global_pipeline, global_turn_json

    if not video_file_path:
        return None, "Please upload a video file."

    # Check if zones are drawn
    zones_data = zone_drawer.get_zones_for_pipeline()
    if not zones_data['zones']:
        return None, "❌ Please draw at least one zone before analyzing."

    try:
        # Create temporary file
        with open(video_file_path, "rb") as source_file:
            with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_file:
                tmp_file.write(source_file.read())
                tmp_video_path = tmp_file.name

        raw_output_path = "raw_output.mp4"
        browser_safe_path = "final_output_with_summary.mp4"

        # Save zones to temporary file for pipeline
        zones_file = "temp_zones.json"
        with open(zones_file, 'w') as f:
            json.dump(zones_data, f, indent=2)

        # Run the full vehicle turn detection pipeline with zones
        global_turn_json = run_full_vehicle_turn_pipeline(
            source_video_path=tmp_video_path,
            final_output_path=raw_output_path,
            zones=zones_data["zones"]
        )

        # Re-encode video
        encode_to_browser_safe_mp4(raw_output_path, browser_safe_path)

        # # Create document store and pipeline for QA
        text_data = convert_turn_stats_to_text(global_turn_json)
        global_pipeline = create_pipeline(text_data)

        # Clean up temporary files
        if os.path.exists(zones_file):
            os.remove(zones_file)
        if os.path.exists(tmp_video_path):
            os.remove(tmp_video_path)

        zone_summary = f"{len(zones_data['zones'])} zones (Z1-Z{len(zones_data['zones'])})"
        return browser_safe_path, f"✅ Video analyzed successfully with {zone_summary}! You can now ask questions."

    except Exception as e:
        return None, f"❌ Error during analysis: {str(e)}\nZones data: {zones_data}"

def answer_question(user_question):
    """Answer questions about the analyzed video"""
    if not global_pipeline or not global_turn_json:
        return "Please analyze a video first."
    return global_pipeline(user_question)

def encode_to_browser_safe_mp4(input_path: str, output_path: str):
    """Convert video to browser-safe format"""
    cmd = [
        "ffmpeg", "-y", "-i", input_path,
        "-vcodec", "libx264", "-preset", "ultrafast",
        "-acodec", "aac", "-movflags", "+faststart",
        output_path
    ]
    try:
        subprocess.run(cmd, check=True)
    except subprocess.CalledProcessError:
        print("Error: ffmpeg failed to convert video to browser-safe format.")

# Create Gradio interface
def create_interface():
    with gr.Blocks(title="Vehicle Turn Detection with Zone Drawing", theme=gr.themes.Soft()) as interface:
        gr.HTML("""
        <h1 style="text-align: center;">🚗 Vehicle Turn Detection with Zone Drawing</h1>
        <p style="text-align: center;">Upload video → Draw zones in intersection pattern (Z1-Z4) → Analyze turns → Ask questions</p>
        <div style="text-align: center; border: 1px solid #ffeaa7; border-radius: 5px; padding: 10px; margin: 10px 0;">
            <strong>⚠️ Important:</strong> Draw zones in intersection pattern (Z1-top, Z2-left, Z3-bottom, Z4-right) for accurate turn detection!
        </div>
        """)

        with gr.Tab("📹 Step 1: Upload & Draw Zones"):
            with gr.Row():
                with gr.Column(scale=2):
                    # Video upload
                    video_input = gr.File(
                        label="📹 Upload Video",
                        file_types=[".mp4", ".avi", ".mov"],
                        type="filepath"
                    )

                    # Frame display and drawing area
                    frame_display = gr.ImageEditor(
                        label="🎨 Draw Zones in Intersection Pattern: Z1(top)→Z2(left)→Z3(bottom)→Z4(right)",
                        type="pil",
                        # tool="sketch",
                        height=400
                    )

                    # Control buttons
                    with gr.Row():
                        save_zone_btn = gr.Button("✅ Save Zone", variant="primary")
                        clear_last_btn = gr.Button("🗑️ Remove Last Zone")
                        clear_all_btn = gr.Button("🔄 Clear All Zones")

                    # Status message
                    zone_status = gr.Textbox(
                        label="📢 Zone Status",
                        interactive=False,
                        max_lines=2
                    )

                with gr.Column(scale=1):
                    # Zone information
                    zone_info = gr.Markdown(
                        value="Upload a video to start drawing zones.",
                        label="📊 Zone Information"
                    )

                    # Instructions
                    gr.Markdown("""
                    ### 📖 Instructions:

                    1. **Upload** a video file
                    2. **Draw** polygons on the frame (labeled as Z1, Z2, Z3, Z4...)
                    3. **Save** each zone after drawing
                    4. Go to **Step 2** to analyze

                    ### 🎨 Drawing Tips:
                    - Use sketch tool to draw polygons
                    - Zones are labeled as Z1, Z2, Z3, Z4...
                    - Each zone gets a different color
                    - Draw closed shapes for best results

                    ### 🔄 **IMPORTANT - Zone Layout for Accurate Turn Detection:**

                    **Recommended Zone Pattern:**
                    ```
                        Z1
                    Z2      Z3
                        Z4
                    ```

                    ### 📍 **Zone Positioning Guidelines:**
                    - **Z1 (Top)**: North/Entry zone
                    - **Z2 (Left)**: West/Left zone
                    - **Z3 (Bottom)**: South/Exit zone
                    - **Z4 (Right)**: East/Right zone
                    - **Draw Anticlockwise Zones patterns**

                    ### 🚗 Turn Detection Accuracy:
                    - Zone drawing direction affects turn classification
                    - Match drawing direction with vehicle flow for best results
                    - Incorrect direction may cause turn misclassification
                    """)

        with gr.Tab("🔍 Step 2: Analyze Video"):
            with gr.Row():
                with gr.Column():
                    analyze_btn = gr.Button("🚀 Analyze Video with Zones", variant="primary", size="lg")
                    analysis_status = gr.Textbox(label="📊 Analysis Status", interactive=False)

                with gr.Column():
                    video_output = gr.Video(label="📹 Processed Video")

        with gr.Tab("❓ Step 3: Ask Questions"):
            gr.Markdown("### Ask questions about the analyzed video:")
            with gr.Row():
                with gr.Column():
                    question_input = gr.Textbox(
                        label="💬 Your Question",
                        placeholder="e.g., How many U-turns were made? What was the most common turn type?",
                        lines=2
                    )
                with gr.Column():
                    answer_output = gr.Textbox(
                        label="🤖 Answer",
                        lines=4,
                        interactive=False
                    )

        # Event handlers for zone drawing
        video_input.change(
            fn=zone_drawer.process_video,
            inputs=[video_input],
            outputs=[frame_display, zone_status]
        )

        save_zone_btn.click(
            fn=zone_drawer.process_drawing,
            inputs=[frame_display],
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info,
            outputs=[zone_info]
        )

        clear_last_btn.click(
            fn=zone_drawer.clear_last_zone,
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info,
            outputs=[zone_info]
        )

        clear_all_btn.click(
            fn=zone_drawer.clear_all_zones,
            outputs=[frame_display, zone_status]
        ).then(
            fn=zone_drawer.get_zone_info,
            outputs=[zone_info]
        )

        # Event handler for video analysis
        analyze_btn.click(
            fn=lambda: analyze_video_with_zones(zone_drawer.video_path),
            outputs=[video_output, analysis_status]
        )

        # Event handler for Q&A
        question_input.submit(
            fn=answer_question,
            inputs=[question_input],
            outputs=[answer_output]
        )

        question_input.change(
            fn=answer_question,
            inputs=[question_input],
            outputs=[answer_output]
        )

    return interface


if __name__ == "__main__":
    interface = create_interface()
    interface.launch(
        share=True,
        debug=True,
        show_error=True
    )