In [11]:
!pip install ultralytics



In [12]:
!pip install cvzone



In [13]:
!pip install mysql-connector-python



In [14]:
!pip install langchain_google_genai



In [15]:
import cv2
import numpy as np
from ultralytics import YOLO
import cvzone
import base64
import os
import time
import threading
import mysql.connector
from mysql.connector import Error
from langchain_core.messages import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI

In [16]:
GOOGLE_API_KEY = "AIzaSyAWgEuXrORguIRmmYE7ZphtZIU0d7zDffk"
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

In [17]:
import sqlite3
connection = sqlite3.connect("vehicle_detection.db")

In [18]:
class VehicleDetectionProcessor:
    def __init__(self, video_path="/content/my.mp4", database_path="vehicle_detection.db", area_coordinates=np.array([(0, 0), (0, 600), (1020, 600), (1020, 0)], np.int32)):
        """Initializes the vehicle detection processor with SQLite database connection."""
        self.video_path = video_path
        self.cap = cv2.VideoCapture(video_path)
        if not self.cap.isOpened():
            raise FileNotFoundError("Error: Could not open video file.")
        self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
        self.yolo_model = YOLO("yolov8n.pt")
        self.names = self.yolo_model.names
        self.gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
        self.db_connection = self.connect_to_database(database_path)
        self.create_table_if_not_exists()
        self.area = area_coordinates
        self.vehicle_data = {}
        self.speed_limit = 111.11  # px/s, approx. 40 km/h assuming 1 px = 0.1 m
        self.expected_direction = 'right'
        self.db_lock = threading.Lock()

    def connect_to_database(self, database_path):
        """Connects to the SQLite database (creates it if not exists)."""
        try:
            conn = sqlite3.connect(database_path)
            print("✅ Connected to SQLite database.")
            return conn
        except sqlite3.Error as e:
            print(f"❌ Error connecting to SQLite: {e}")
            return None

    def create_table_if_not_exists(self):
        """Creates the vehicles table if it doesn't exist."""
        try:
            cursor = self.db_connection.cursor()
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS vehicles (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp TEXT,
                    track_id INTEGER,
                    vehicle_type TEXT,
                    vehicle_color TEXT,
                    vehicle_company TEXT
                )
            """)
            self.db_connection.commit()
            print("✅ Table `vehicles` ready.")
        except sqlite3.Error as e:
            print(f"❌ Error creating table: {e}")

    def calculate_center(self, box):
        """Calculates the center of the bounding box."""
        x1, y1, x2, y2 = box
        center_x = (x1 + x2) / 2
        center_y = (y1 + y2) / 2
        return (center_x, center_y)

    def process_crop_image(self, crop_img, track_id):
        """Processes a cropped vehicle image and saves data to SQLite."""
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
        _, buffer = cv2.imencode(".jpg", crop_img)
        img_base64 = base64.b64encode(buffer).decode("utf-8")
        prompt = "Analyze this vehicle image and provide the vehicle type, color, and company name."
        message = HumanMessage(
            content=[{"type": "text", "text": prompt}, {"type": "image_url", "image_url": f"data:image/jpeg;base64,{img_base64}"}]
        )
        try:
            response = self.gemini_model.invoke([message])
            details = response.content.strip().split("\n")
            vehicle_type = details[0].split(": ")[1] if len(details) > 0 else "---"
            vehicle_color = details[1].split(": ")[1] if len(details) > 1 else "---"
            vehicle_company = details[2].split(": ")[1] if len(details) > 2 else "---"
            self.save_data_to_database(timestamp, track_id, vehicle_type, vehicle_color, vehicle_company)
        except Exception as e:
            print(f"❌ Error processing image with Gemini: {e}")

    def crop_and_process(self, frame, box, track_id):
        """Crops the vehicle from the frame and sends it for analysis."""
        x1, y1, x2, y2 = map(int, box)
        crop_img = frame[y1:y2, x1:x2]
        threading.Thread(target=self.process_crop_image, args=(crop_img, track_id)).start()

    def save_data_to_database(self, timestamp, track_id, vehicle_type, vehicle_color, vehicle_company):
        """Inserts detected vehicle data into the SQLite database."""
        if vehicle_type.lower() in ["vehicle type", "---"] or vehicle_color.lower() in ["vehicle color", "---"]:
            print(f"🚫 Skipping invalid data: {vehicle_type}, {vehicle_color}, {vehicle_company}")
            return
        try:
            with self.db_lock:
                cursor = self.db_connection.cursor()
                query = "INSERT INTO vehicles (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company) VALUES (?, ?, ?, ?, ?)"
                values = (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company)
                cursor.execute(query, values)
                self.db_connection.commit()
            print(f"✅ Data saved for track ID {track_id} in SQLite.")
        except sqlite3.Error as e:
            print(f"❌ Error saving data to SQLite: {e}")

    def process_video_frame(self, frame, frame_num):
        """Processes each video frame and detects vehicles using YOLO."""
        frame = cv2.resize(frame, (1020, 600))
        results = self.yolo_model.track(frame, persist=True)
        if results and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.int().cpu().tolist()
            class_ids = results[0].boxes.cls.int().cpu().tolist()
            track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
            allowed_classes = ["car", "truck", "motorcycle"]
            for box, track_id, class_id in zip(boxes, track_ids, class_ids):
                class_name = self.names[class_id]
                if class_name not in allowed_classes:
                    continue
                x1, y1, x2, y2 = map(int, box)
                current_position = self.calculate_center(box)
                if track_id not in self.vehicle_data:
                    self.vehicle_data[track_id] = {'last_position': current_position, 'last_frame': frame_num, 'velocity': None, 'speed': None}
                else:
                    last_position = self.vehicle_data[track_id]['last_position']
                    last_frame = self.vehicle_data[track_id]['last_frame']
                    time_difference = (frame_num - last_frame) / self.fps
                    if time_difference > 0:
                        dx = current_position[0] - last_position[0]
                        dy = current_position[1] - last_position[1]
                        vx = dx / time_difference
                        vy = dy / time_difference
                        speed = np.sqrt(vx**2 + vy**2)
                        self.vehicle_data[track_id]['velocity'] = (vx, vy)
                        self.vehicle_data[track_id]['speed'] = speed
                    self.vehicle_data[track_id]['last_position'] = current_position
                    self.vehicle_data[track_id]['last_frame'] = frame_num
                velocity = self.vehicle_data[track_id]['velocity']
                speed = self.vehicle_data[track_id]['speed']
                if speed is not None:
                    vx, vy = velocity
                    speeding = speed > self.speed_limit
                    wrong_way = (self.expected_direction == 'right' and vx < 0) or (self.expected_direction == 'left' and vx > 0)
                    if speeding and wrong_way:
                        color = (0, 255, 255)  # Yellow for both violations
                    elif speeding:
                        color = (0, 0, 255)  # Red for speeding
                    elif wrong_way:
                        color = (255, 0, 0)  # Blue for wrong way
                    else:
                        color = (0, 255, 0)  # Green for normal
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
                    cvzone.putTextRect(frame, class_name, (x1, y1), 1, 1)
                    speed_text = f"Speed: {speed:.2f} px/s"
                    cvzone.putTextRect(frame, speed_text, (x1, y1 - 20), 1, 1)
                    if speeding:
                        cvzone.putTextRect(frame, "Speeding", (x1, y1 - 40), 1, 1)
                    if wrong_way:
                        cvzone.putTextRect(frame, "Wrong Way", (x1, y1 - 60), 1, 1)
                else:
                    color = (0, 255, 0)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
                    cvzone.putTextRect(frame, class_name, (x1, y1), 1, 1)
                self.crop_and_process(frame, box, track_id)
        return frame

    def start_processing(self):
        """Starts the video processing loop and saves output to a video file."""
        output_path = "output.mp4"
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        frame_size = (1020, 600)
        out = cv2.VideoWriter(output_path, fourcc, self.fps, frame_size)
        frame_num = 0
        while self.cap.isOpened():
            ret, frame = self.cap.read()
            if not ret:
                break
            frame = self.process_video_frame(frame, frame_num)
            cv2.polylines(frame, [self.area], True, (0, 255, 0), 2)
            out.write(frame)
            frame_num += 1
        self.cap.release()
        out.release()
        cv2.destroyAllWindows()
        print(f"✅ Processing completed. Output saved to {output_path}.")

if __name__ == "__main__":
    # Define area coordinates to cover the entire frame (adjusted for 1020x600 resolution)
    area_coords = np.array([(0, 0), (0, 600), (1020, 600), (1020, 0)], np.int32)
    processor = VehicleDetectionProcessor(
        video_path="/content/my.mp4",
        database_path="vehicle_detection.db",
        area_coordinates=area_coords
    )
    processor.start_processing()

Output hidden; open in https://colab.research.google.com to view.

In [None]:
# class VehicleDetectionProcessor:
#     def __init__(self, video_file="/content/my.mp4", yolo_model_path="yolo12n.pt"):
#         """Initializes the vehicle detection processor with MySQL database connection."""
#         self.gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")

#         try:
#             self.yolo_model = YOLO(yolo_model_path)
#             self.names = self.yolo_model.names
#         except Exception as e:
#             raise RuntimeError(f"Error loading YOLO model: {e}")

#         self.cap = cv2.VideoCapture(video_file)
#         if not self.cap.isOpened():
#             raise FileNotFoundError("Error: Could not open video file.")

#         self.area = np.array([(392, 407), (382, 448), (740, 456), (734, 419)], np.int32)
#         self.processed_track_ids = set()
#         self.cropped_images_folder = "cropped_vehicles"
#         os.makedirs(self.cropped_images_folder, exist_ok=True)

#         # MySQL Database Connection
#         self.db_connection = self.connect_to_database()
#         self.create_table_if_not_exists()

#     def connect_to_database(self):
#       """Connects to the SQLite database (creates it if not exists)."""
#       try:
#           connection = sqlite3.connect("vehicle_detection.db")
#           print("✅ Connected to SQLite database.")
#           return connection
#       except sqlite3.Error as e:
#           print(f"❌ Error connecting to SQLite: {e}")
#           return None

#     def create_table_if_not_exists(self):
#         """Creates the vehicles table if it doesn't exist."""
#         try:
#             cursor = self.db_connection.cursor()
#             cursor.execute("""
#                 CREATE TABLE IF NOT EXISTS vehicles (
#                     id INT AUTO_INCREMENT PRIMARY KEY,
#                     timestamp VARCHAR(50),
#                     track_id INT,
#                     vehicle_type VARCHAR(50),
#                     vehicle_color VARCHAR(50),
#                     vehicle_company VARCHAR(50)
#                 )
#             """)
#             self.db_connection.commit()
#             print("✅ Table `vehicles` ready.")
#         except Error as e:
#             print(f"❌ Error creating table: {e}")

#     def save_data_to_mysql(self, timestamp, track_id, vehicle_type, vehicle_color, vehicle_company):
#         """Inserts detected vehicle data into the MySQL database."""
#         if vehicle_type.lower() in ["vehicle type", "---"] or vehicle_color.lower() in ["vehicle color", "---"]:
#             print(f"🚫 Skipping invalid data: {vehicle_type}, {vehicle_color}, {vehicle_company}")
#             return

#         try:
#             cursor = self.db_connection.cursor()
#             query = "INSERT INTO vehicles (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company) VALUES (%s, %s, %s, %s, %s)"
#             values = (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company)
#             cursor.execute(query, values)
#             self.db_connection.commit()
#             print(f"✅ Data saved for track ID {track_id} in MySQL.")
#         except Error as e:
#             print(f"❌ Error saving data to MySQL: {e}")

#     def analyze_image_with_gemini(self, image_path):
#         """Sends an image to Gemini AI for analysis and extracts vehicle details."""
#         try:
#             with open(image_path, "rb") as img_file:
#                 base64_image = base64.b64encode(img_file.read()).decode("utf-8")

#             message = HumanMessage(
#                 content=[
#                     {"type": "text", "text": "Extract ONLY these details:\n"
#                      "| Vehicle Model Name(e.g,truck)|Vehicle Color|Vehicle Company|\n"
#                      "|------------|------------|--------------|"},
#                     {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}, "description": "Detected vehicle"}
#                 ]
#             )

#             response = self.gemini_model.invoke([message])
#             response_text = response.content.strip()

#             # Extract valid rows
#             valid_rows = [
#                 row.split("|")[1:-1]
#                 for row in response_text.split("\n")
#                 if "|" in row and "Vehicle Type" not in row and "---" not in row
#             ]

#             return valid_rows  # Returns a list of [vehicle_type, vehicle_color, vehicle_company]
#         except Exception as e:
#             print(f"❌ Error invoking Gemini model: {e}")
#             return []

#     def process_crop_image(self, image, track_id):
#         """Processes a cropped vehicle image and saves data to MySQL."""
#         timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
#         image_filename = os.path.join(self.cropped_images_folder, f"vehicle_{track_id}_{timestamp.replace(':', '-')}.jpg")
#         cv2.imwrite(image_filename, image)

#         extracted_data = self.analyze_image_with_gemini(image_filename)

#         for row in extracted_data:
#             if len(row) == 3:
#                 Vehicle_Model_Name, vehicle_color, vehicle_company = row
#                 self.save_data_to_mysql(timestamp, track_id, Vehicle_Model_Name.strip(), vehicle_color.strip(), vehicle_company.strip())

#     def crop_and_process(self, frame, box, track_id):
#         """Crops the vehicle from the frame and sends it for analysis."""
#         if track_id in self.processed_track_ids:
#             print(f"Track ID {track_id} already processed. Skipping.")
#             return

#         x1, y1, x2, y2 = map(int, box)
#         cropped_image = frame[y1:y2, x1:x2]
#         self.processed_track_ids.add(track_id)
#         threading.Thread(target=self.process_crop_image, args=(cropped_image, track_id), daemon=True).start()

#     def process_video_frame(self, frame):
#         """Processes each video frame and detects vehicles using YOLO."""
#         frame = cv2.resize(frame, (1020, 600))
#         results = self.yolo_model.track(frame, persist=True)

#         if results and results[0].boxes is not None:
#             boxes = results[0].boxes.xyxy.int().cpu().tolist()
#             class_ids = results[0].boxes.cls.int().cpu().tolist()
#             track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
#             allowed_classes = ["car", "truck"]

#             for box, track_id, class_id in zip(boxes, track_ids, class_ids):
#                 class_name = self.names[class_id]
#                 if class_name not in allowed_classes:
#                     continue
#                 x1, y1, x2, y2 = map(int, box)
#                 if cv2.pointPolygonTest(self.area, (x2, y2), False) >= 0:
#                     cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 2)
#                     cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
#                     cvzone.putTextRect(frame, class_name, (x1, y1), 1, 1)
#                     self.crop_and_process(frame, box, track_id)
#         return frame

#     def start_processing(self):
#       """Starts the video processing loop and saves output to a video file."""
#       # Initialize video writer
#       output_path = "output.mp4"
#       fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # Codec for MP4
#       fps = int(self.cap.get(cv2.CAP_PROP_FPS))  # Get video FPS
#       frame_size = (1020, 600)  # Match the resized frame size
#       out = cv2.VideoWriter(output_path, fourcc, fps, frame_size)

#       while self.cap.isOpened():
#           ret, frame = self.cap.read()
#           if not ret:
#               break
#           frame = self.process_video_frame(frame)
#           cv2.polylines(frame, [self.area], True, (0, 255, 0), 2)
#           out.write(frame)  # Write frame to output video

#       # Release resources
#       self.cap.release()
#       out.release()
#       cv2.destroyAllWindows()
#       print(f"✅ Processing completed. Output saved to {output_path}.")


In [None]:
# import cv2
# import numpy as np
# import time
# import os
# import threading
# import sqlite3
# from ultralytics import YOLO
# from langchain_google_genai import ChatGoogleGenerativeAI
# from langchain_core.messages import HumanMessage
# import base64
# import cvzone


# class VehicleDetectionProcessor:
#     def __init__(self, video_file="/content/my.mp4", yolo_model_path="yolo12n.pt"):
#         """Initializes the vehicle detection processor with SQLite database connection."""
#         self.gemini_model = ChatGoogleGenerativeAI(model="gemini-1.5-flash")

#         try:
#             self.yolo_model = YOLO(yolo_model_path)
#             self.names = self.yolo_model.names
#         except Exception as e:
#             raise RuntimeError(f"Error loading YOLO model: {e}")

#         self.cap = cv2.VideoCapture(video_file)
#         if not self.cap.isOpened():
#             raise FileNotFoundError("Error: Could not open video file.")

#         self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
#         self.area = np.array([(392, 407), (382, 448), (740, 456), (734, 419)], np.int32)
#         self.processed_track_ids = set()
#         self.cropped_images_folder = "cropped_vehicles"
#         os.makedirs(self.cropped_images_folder, exist_ok=True)

#         self.vehicle_data = {}
#         self.speed_limit = 100  # px/s, arbitrary value
#         self.expected_direction = 'right'

#         self.db_connection = self.connect_to_database()
#         self.create_table_if_not_exists()

#     def connect_to_database(self):
#         """Connects to the SQLite database (creates it if not exists)."""
#         try:
#             connection = sqlite3.connect("vehicle_detection.db")
#             print("✅ Connected to SQLite database.")
#             return connection
#         except sqlite3.Error as e:
#             print(f"❌ Error connecting to SQLite: {e}")
#             return None

#     def create_table_if_not_exists(self):
#         """Creates the vehicles table if it doesn't exist."""
#         try:
#             cursor = self.db_connection.cursor()
#             cursor.execute("""
#                 CREATE TABLE IF NOT EXISTS vehicles (
#                     id INTEGER PRIMARY KEY AUTOINCREMENT,
#                     timestamp TEXT,
#                     track_id INTEGER,
#                     vehicle_type TEXT,
#                     vehicle_color TEXT,
#                     vehicle_company TEXT
#                 )
#             """)
#             self.db_connection.commit()
#             print("✅ Table `vehicles` ready.")
#         except sqlite3.Error as e:
#             print(f"❌ Error creating table: {e}")

#     def save_data_to_database(self, timestamp, track_id, vehicle_type, vehicle_color, vehicle_company):
#         """Inserts detected vehicle data into the SQLite database."""
#         if vehicle_type.lower() in ["vehicle type", "---"] or vehicle_color.lower() in ["vehicle color", "---"]:
#             print(f"🚫 Skipping invalid data: {vehicle_type}, {vehicle_color}, {vehicle_company}")
#             return

#         try:
#             cursor = self.db_connection.cursor()
#             query = "INSERT INTO vehicles (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company) VALUES (?, ?, ?, ?, ?)"
#             values = (timestamp, track_id, vehicle_type, vehicle_color, vehicle_company)
#             cursor.execute(query, values)
#             self.db_connection.commit()
#             print(f"✅ Data saved for track ID {track_id} in SQLite.")
#         except sqlite3.Error as e:
#             print(f"❌ Error saving data to SQLite: {e}")

#     def analyze_image_with_gemini(self, image_path):
#         """Sends an image to Gemini AI for analysis and extracts vehicle details."""
#         try:
#             with open(image_path, "rb") as img_file:
#                 base64_image = base64.b64encode(img_file.read()).decode("utf-8")

#             message = HumanMessage(
#                 content=[
#                     {"type": "text", "text": "Extract ONLY these details:\n"
#                      "| Vehicle Model Name(e.g,truck)|Vehicle Color|Vehicle Company|\n"
#                      "|------------|------------|--------------|"},
#                     {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}, "description": "Detected vehicle"}
#                 ]
#             )

#             response = self.gemini_model.invoke([message])
#             response_text = response.content.strip()

#             valid_rows = [
#                 row.split("|")[1:-1]
#                 for row in response_text.split("\n")
#                 if "|" in row and "Vehicle Type" not in row and "---" not in row
#             ]

#             return valid_rows
#         except Exception as e:
#             print(f"❌ Error invoking Gemini model: {e}")
#             return []

#     def process_crop_image(self, image, track_id):
#         """Processes a cropped vehicle image and saves data to SQLite."""
#         timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
#         image_filename = os.path.join(self.cropped_images_folder, f"vehicle_{track_id}_{timestamp.replace(':', '-')}.jpg")
#         cv2.imwrite(image_filename, image)

#         extracted_data = self.analyze_image_with_gemini(image_filename)

#         for row in extracted_data:
#             if len(row) == 3:
#                 Vehicle_Model_Name, vehicle_color, vehicle_company = row
#                 self.save_data_to_database(timestamp, track_id, Vehicle_Model_Name.strip(), vehicle_color.strip(), vehicle_company.strip())

#     def crop_and_process(self, frame, box, track_id):
#         """Crops the vehicle from the frame and sends it for analysis."""
#         if track_id in self.processed_track_ids:
#             print(f"Track ID {track_id} already processed. Skipping.")
#             return

#         x1, y1, x2, y2 = map(int, box)
#         cropped_image = frame[y1:y2, x1:x2]
#         self.processed_track_ids.add(track_id)
#         threading.Thread(target=self.process_crop_image, args=(cropped_image, track_id), daemon=True).start()

#     def calculate_center(self, box):
#         """Calculates the center of the bounding box."""
#         x1, y1, x2, y2 = box
#         center_x = (x1 + x2) / 2
#         center_y = (y1 + y2) / 2
#         return (center_x, center_y)

#     def process_video_frame(self, frame):
#         """Processes each video frame and detects vehicles using YOLO."""
#         frame = cv2.resize(frame, (1020, 600))
#         results = self.yolo_model.track(frame, persist=True)

#         if results and results[0].boxes is not None:
#             boxes = results[0].boxes.xyxy.int().cpu().tolist()
#             class_ids = results[0].boxes.cls.int().cpu().tolist()
#             track_ids = results[0].boxes.id.int().cpu().tolist() if results[0].boxes.id is not None else [-1] * len(boxes)
#             allowed_classes = ["car", "truck", "motorcycle"]

#             for box, track_id, class_id in zip(boxes, track_ids, class_ids):
#                 class_name = self.names[class_id]
#                 if class_name not in allowed_classes:
#                     continue
#                 x1, y1, x2, y2 = map(int, box)
#                 if cv2.pointPolygonTest(self.area, (x2, y2), False) >= 0:
#                     current_position = self.calculate_center(box)
#                     if track_id in self.vehicle_data:
#                         last_position = self.vehicle_data[track_id]['last_position']
#                         dx = current_position[0] - last_position[0]
#                         dy = current_position[1] - last_position[1]
#                         vx = dx * self.fps
#                         vy = dy * self.fps
#                         speed = np.sqrt(vx**2 + vy**2)
#                         self.vehicle_data[track_id]['velocity'] = (vx, vy)
#                         self.vehicle_data[track_id]['speed'] = speed
#                         self.vehicle_data[track_id]['last_position'] = current_position
#                     else:
#                         self.vehicle_data[track_id] = {'last_position': current_position, 'velocity': None, 'speed': None}

#                     color = (0, 255, 0)
#                     if self.vehicle_data[track_id]['speed'] is not None:
#                         speed = self.vehicle_data[track_id]['speed']
#                         vx = self.vehicle_data[track_id]['velocity'][0]
#                         if speed > self.speed_limit:
#                             color = (0, 0, 255)
#                         if self.expected_direction == 'right' and vx < 0:
#                             color = (255, 0, 0)

#                     cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
#                     cvzone.putTextRect(frame, f"ID: {track_id}", (x2, y2), 1, 1)
#                     cvzone.putTextRect(frame, class_name, (x1, y1), 1, 1)
#                     if self.vehicle_data[track_id]['speed'] is not None:
#                         speed_text = f"Speed: {speed:.2f} px/s"
#                         cvzone.putTextRect(frame, speed_text, (x1, y1 - 20), 1, 1)
#                         if speed > self.speed_limit:
#                             cvzone.putTextRect(frame, "Speeding", (x1, y1 - 40), 1, 1)
#                         if self.expected_direction == 'right' and vx < 0:
#                             cvzone.putTextRect(frame, "Wrong Way", (x1, y1 - 60), 1, 1)

#                     self.crop_and_process(frame, box, track_id)
#         return frame

#     def start_processing(self):
#         """Starts the video processing loop and saves output to a video file."""
#         output_path = "output.mp4"
#         fourcc = cv2.VideoWriter_fourcc(*"mp4v")
#         frame_size = (1020, 600)
#         out = cv2.VideoWriter(output_path, fourcc, self.fps, frame_size)

#         while self.cap.isOpened():
#             ret, frame = self.cap.read()
#             if not ret:
#                 break
#             frame = self.process_video_frame(frame)
#             cv2.polylines(frame, [self.area], True, (0, 255, 0), 2)
#             out.write(frame)

#         self.cap.release()
#         out.release()
#         cv2.destroyAllWindows()
#         print(f"✅ Processing completed. Output saved to {output_path}.")

In [None]:
if __name__ == "__main__":
    video_file = "my.mp4"
    processor = VehicleDetectionProcessor(video_file)
    processor.start_processing()

✅ Connected to SQLite database.
✅ Table `vehicles` ready.

0: 384x640 6 persons, 13 cars, 1 motorcycle, 1 bus, 4 trucks, 4 potted plants, 14.8ms
Speed: 2.3ms preprocess, 14.8ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 6 persons, 12 cars, 1 motorcycle, 1 bus, 5 trucks, 4 potted plants, 17.2ms
Speed: 6.4ms preprocess, 17.2ms inference, 1.4ms postprocess per image at shape (1, 3, 384, 640)
Track ID 1 already processed. Skipping.

0: 384x640 6 persons, 12 cars, 1 motorcycle, 1 bus, 5 trucks, 4 potted plants, 22.9ms
Speed: 3.1ms preprocess, 22.9ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 1 already processed. Skipping.

0: 384x640 6 persons, 12 cars, 1 motorcycle, 1 bus, 5 trucks, 4 potted plants, 16.0ms
Speed: 3.1ms preprocess, 16.0ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 1 already processed. Skipping.

0: 384x640 7 persons, 12 cars, 1 motorcycle, 1 bus, 4 trucks, 4 potted plants, 17.5ms

  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 41
}
].


0: 384x640 8 persons, 1 bicycle, 13 cars, 1 motorcycle, 1 bus, 5 trucks, 4 potted plants, 40.8ms
Speed: 3.1ms preprocess, 40.8ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)
Track ID 726 already processed. Skipping.

0: 384x640 8 persons, 1 bicycle, 13 cars, 1 motorcycle, 1 bus, 5 trucks, 4 potted plants, 26.3ms
Speed: 8.2ms preprocess, 26.3ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)
Track ID 726 already processed. Skipping.

0: 384x640 9 persons, 1 bicycle, 14 cars, 1 motorcycle, 1 bus, 4 trucks, 4 potted plants, 25.1ms
Speed: 4.0ms preprocess, 25.1ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)
Track ID 726 already processed. Skipping.

0: 384x640 9 persons, 2 bicycles, 14 cars, 1 bus, 4 trucks, 4 potted plants, 31.9ms
Speed: 3.8ms preprocess, 31.9ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)
Track ID 726 already processed. Skipping.

0: 384x640 9 persons, 2 bicycles, 14 cars, 1 bus, 4 trucks, 4 pot

  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 36
}
].


Track ID 1119 already processed. Skipping.

0: 384x640 8 persons, 15 cars, 1 motorcycle, 2 buss, 3 trucks, 5 potted plants, 14.7ms
Speed: 3.0ms preprocess, 14.7ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 1119 already processed. Skipping.

0: 384x640 8 persons, 14 cars, 1 motorcycle, 2 buss, 4 trucks, 6 potted plants, 17.6ms
Speed: 2.9ms preprocess, 17.6ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 1119 already processed. Skipping.

0: 384x640 8 persons, 14 cars, 2 buss, 4 trucks, 6 potted plants, 20.2ms
Speed: 3.4ms preprocess, 20.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 861 already processed. Skipping.
Track ID 1119 already processed. Skipping.



  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 36
}
].


0: 384x640 9 persons, 14 cars, 2 buss, 4 trucks, 6 potted plants, 21.2ms
Speed: 3.0ms preprocess, 21.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 861 already processed. Skipping.
Track ID 1119 already processed. Skipping.

0: 384x640 9 persons, 14 cars, 2 buss, 4 trucks, 6 potted plants, 16.2ms
Speed: 3.1ms preprocess, 16.2ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 861 already processed. Skipping.
Track ID 1119 already processed. Skipping.

0: 384x640 9 persons, 14 cars, 1 bus, 4 trucks, 6 potted plants, 15.5ms
Speed: 3.0ms preprocess, 15.5ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 861 already processed. Skipping.
Track ID 1119 already processed. Skipping.

0: 384x640 9 persons, 14 cars, 1 bus, 4 trucks, 6 potted plants, 17.3ms
Speed: 3.0ms preprocess, 17.3ms inference, 1.3ms postprocess per image at shape (1, 3, 384, 640)
Track ID 861 already processed. Skipping.
Track ID 1119 already 

## Pushing the repository in the Github

In [51]:
# Step 1: Initialize repo
!git init

# Step 2: Set your Git identity
!git config --global user.email "anshajshukla03@gmail.com"
!git config --global user.name "anshajshukla"

# Step 3: Add files
!git add .

# Step 4: Commit
!git commit -m "First Commit"

# Step 5: Add remote (Correct HTTPS link)
!git remote add origin https://github.com/anshajshukla/Traffic-Management-System.git

# Step 6: Push to main branch using token authentication
# REPLACE below: <USERNAME> and <TOKEN> with your actual GitHub username and PAT (Personal Access Token)
!git push https://anshajshukla:<TOKEN>@github.com/anshajshukla/Traffic-Management-System.git main


Reinitialized existing Git repository in /content/.git/
On branch master
nothing to commit, working tree clean
error: src refspec main does not match any
[31merror: failed to push some refs to 'git@github.com:anshajshukla/https://github.com/anshajshukla/Traffic-Management-System.git.git'
[m

For example, here we download and display a PNG image of the Colab logo:

On branch master
nothing to commit, working tree clean


In [47]:
# The provided input is not code and therefore cannot be corrected.  It appears to be an instruction.

fatal: protocol 'git@github.com:anshajshukla/https' is not supported
