# Traffic Monitoring System

This notebook allows you to run the Traffic Monitoring System on platforms like Google Colab, Kaggle, or any Jupyter environment. The system can:

- **Detect Vehicles in Real-Time**: Spot cars, trucks, and other vehicles in video feeds
- **Read License Plates**: Automatically identify and record license plate numbers
- **Track Moving Vehicles**: Follow each vehicle as it moves through the video
- **Count Traffic**: Count vehicles as they cross a line you define on the screen
- **Store Results**: Save all detection data for later analysis

## How It Works

1. **Video Ingestion**: Gets frames from your video source
2. **Detection**: Uses AI to find vehicles and license plates
3. **Tracking**: Keeps track of each vehicle as it moves
4. **Counting**: Counts vehicles when they cross your defined line
5. **OCR**: Reads license plate text when detected
6. **Storage**: Saves results to a database
7. **Main Application**: Coordinates all the components and shows results

## 1. Setup Environment

First, let's set up the environment and install all necessary dependencies.

In [None]:
# Check if we're running in Colab or Kaggle and set up environment
import sys, os
IN_COLAB = 'google.colab' in sys.modules
IN_KAGGLE = 'kaggle' in sys.modules

print(f"Running in Google Colab: {IN_COLAB}")
print(f"Running in Kaggle: {IN_KAGGLE}")

# Check available resources
import psutil
import platform
import subprocess
import importlib.metadata  # Using importlib.metadata instead of deprecated pkg_resources
try:
    import torch
    has_torch = True
except ImportError:
    has_torch = False

print(f"\nSystem Information:")
print(f"OS: {platform.platform()}")
print(f"Python: {platform.python_version()}")
print(f"CPU: {psutil.cpu_count(logical=True)} logical cores")
ram_gb = psutil.virtual_memory().total / (1024 ** 3)
print(f"RAM: {ram_gb:.2f} GB")

# Check for GPU
if has_torch:
    has_gpu = torch.cuda.is_available()
    gpu_name = torch.cuda.get_device_name(0) if has_gpu else "None"
    print(f"GPU: {gpu_name} (CUDA Available: {has_gpu})")
elif IN_COLAB:
    # Check for GPU in Colab using system commands
    gpu_info = !nvidia-smi -L 2>/dev/null || echo "No GPU found"
    has_gpu = "No GPU found" not in gpu_info[0]
    print(f"GPU: {gpu_info[0] if has_gpu else 'None'}")
else:
    print("GPU: Unknown (torch not installed)")
    has_gpu = False

print("\nThis information will help optimize the configuration.")

In [None]:
# Install necessary packages from requirements.txt
if IN_COLAB or IN_KAGGLE:
    # Clone the repository first
    !git clone https://github.com/leakless21/codespaces-blank.git
    # Install dependencies from requirements.txt
    if os.path.exists('codespaces-blank/traffic_monitoring/requirements.txt'):
        !pip install -r codespaces-blank/traffic_monitoring/requirements.txt
    else:
        # Fallback to essential packages if requirements.txt not found
        print("Requirements file not found, installing essential packages directly...")
        !pip install opencv-python-headless numpy ultralytics onnxruntime boxmot easyocr paho-mqtt PyYAML tqdm fastapi uvicorn pydantic sqlalchemy python-dotenv
else:
    # If running locally and requirements.txt exists
    if os.path.exists('requirements.txt'):
        !pip install -r requirements.txt
    elif os.path.exists('../requirements.txt'):
        !pip install -r ../requirements.txt
    else:
        print("requirements.txt not found, installing essential packages...")
        !pip install opencv-python-headless numpy ultralytics onnxruntime boxmot easyocr paho-mqtt PyYAML tqdm fastapi uvicorn pydantic sqlalchemy python-dotenv

In [None]:
# Set up the repository access
import os

if IN_COLAB or IN_KAGGLE:
    print("Setting up repository access...")
    # Check if we already cloned the repository
    if os.path.exists('codespaces-blank'):
        # Set up path to traffic_monitoring
        repo_dir = 'codespaces-blank/traffic_monitoring'
        if not os.path.exists(repo_dir):
            print(f"Error: Expected directory {repo_dir} not found")
        else:
            print(f"Using existing repository at {repo_dir}")
            # Change to traffic_monitoring directory
            os.chdir(repo_dir)
    else:
        print("Repository not found. Please run the installation cell first.")
else:
    # Check if we're already in the traffic_monitoring directory
    current_dir = os.path.basename(os.getcwd())
    if current_dir != 'traffic_monitoring':
        # Try to locate traffic_monitoring directory
        if os.path.exists('traffic_monitoring'):
            os.chdir('traffic_monitoring')
            print(f"Changed to traffic_monitoring directory")
        else:
            print("Running in local environment, assuming correct directory structure")
    else:
        print("Already in traffic_monitoring directory")

# Print current directory structure to verify setup
print(f"\nCurrent working directory: {os.getcwd()}")
print("Available files and directories:")
!ls -la

## 2. Download and Prepare Models

Download the AI models needed for vehicle detection and license plate recognition if they don't already exist.

In [None]:
import os
from pathlib import Path

# Create models directory if it doesn't exist
os.makedirs('models', exist_ok=True)

# Check if models already exist
vehicle_model_path = Path('./models/yolo11s.onnx')
plate_model_path = Path('./models/plate_v8n.onnx')

if not vehicle_model_path.exists() or not plate_model_path.exists():
    print("Downloading models...")
    !python utils/download_model.py
    print("Models downloaded successfully")
else:
    print("Models already exist, skipping download")

## 3. Upload a Video or Use Sample Data

Now you can either upload your own video or use one of the sample videos.

In [None]:
# Create a data directory if it doesn't exist
os.makedirs('data', exist_ok=True)

# For Colab: provide an upload widget
if IN_COLAB:
    from google.colab import files
    
    print("Please upload a video file:")
    uploaded = files.upload()
    
    for filename in uploaded.keys():
        video_path = os.path.join('data', filename)
        with open(video_path, 'wb') as f:
            f.write(uploaded[filename])
        print(f"Uploaded {filename} to {video_path}")
elif IN_KAGGLE:
    # For Kaggle: list available input files
    print("Available input files:")
    !ls ../input
    
    # Symlink input directory to make files accessible
    !ln -s ../input input
else:
    print("Available sample videos:")
    !ls data/*.mp4 data/*.MOV data/*.avi 2>/dev/null || echo "No sample videos found in data/ directory"

## 4. View and Edit Configuration

First, let's examine the current configuration. Then we'll set up the counting line to match your specific video.

In [None]:
import yaml

# Display the current configuration
config_path = 'config/settings/config.yaml'

# Ensure the config directory exists
os.makedirs(os.path.dirname(config_path), exist_ok=True)

# Read and display current config if it exists
if os.path.exists(config_path):
    with open(config_path, 'r') as file:
        config = yaml.safe_load(file)
        print("Current configuration:")
        print(yaml.dump(config, default_flow_style=False))
else:
    print("No configuration file found. Creating a default configuration.")
    # Create a default configuration
    config = {
        'video': {
            'source': 'data/sample.mp4',  # Will be updated with available video
            'frame_skip': 1,
            'process_resolution': [640, 480],
            'output_fps': 30.0
        },
        'detection': {
            'vehicle_model': 'models/yolo11s.onnx',
            'plate_model': 'models/plate_v8n.onnx',
            'vehicle_model_version': 'yolo11',
            'plate_model_version': 'yolov8',
            'confidence': 0.25,
            'iou_threshold': 0.45
        },
        'tracking': {
            'tracker_type': 'bytetrack',
            'confidence': 0.3
        },
        'counting': {
            'use_relative_coordinates': True,
            'line': {
                'start': [0.25, 0.6],
                'end': [0.75, 0.6]
            },
            'raw_coordinates': {
                'start': [320, 360],
                'end': [960, 360]
            }
        },
        'ocr': {
            'languages': ['en'],
            'use_gpu': False
        },
        'hardware': {
            'use_gpu': False,
            'provider': 'auto',
            'precision': 'fp32'
        },
        'storage': {
            'database_path': 'data/traffic_data.db',
            'save_images': False
        },
        'mqtt': {
            'enabled': False,
            'broker': 'localhost',
            'port': 1883,
            'topic_prefix': 'traffic'
        }
    }

    with open(config_path, 'w') as file:
        yaml.dump(config, file, default_flow_style=False)
    print("Default configuration created.")

## 5. Select Video Source

Choose which video to process and update the configuration.

In [None]:
# List available videos
import glob

# Search for videos in different locations
all_videos = []
for ext in ['*.mp4', '*.avi', '*.MOV', '*.mkv']:
    # Look in data directory
    all_videos.extend(glob.glob(os.path.join('data', ext)))
    # For Kaggle, look in input directory if exists
    if IN_KAGGLE and os.path.exists('input'):
        all_videos.extend(glob.glob(os.path.join('input', ext)))
        all_videos.extend(glob.glob(os.path.join('input', '*', ext)))

print("Found videos:")
for i, video in enumerate(all_videos):
    print(f"{i+1}. {video}")

# Allow selection
if all_videos:
    selection = input(f"Enter the number of the video to use (1-{len(all_videos)}): ")
    try:
        idx = int(selection) - 1
        if 0 <= idx < len(all_videos):
            selected_video = all_videos[idx]
            # Update configuration with selected video
            with open(config_path, 'r') as file:
                config = yaml.safe_load(file)
            config['video']['source'] = selected_video
            with open(config_path, 'w') as file:
                yaml.dump(config, file, default_flow_style=False)
            print(f"Configuration updated to use {selected_video}")
        else:
            print(f"Invalid selection. Please run the cell again.")
    except ValueError:
        print(f"Invalid input. Please run the cell again.")
else:
    print("No videos found. Please upload a video first.")

## 6. Set Counting Line for Your Video

**Important**: For accurate traffic counting, you need to set a counting line that matches the specific road position in your video. This cannot be done automatically as each video has roads in different positions.

You should set the counting line to cross the road perpendicularly to the direction of traffic. Vehicles will be counted when they cross this line.

In [None]:
import cv2
import numpy as np
from IPython.display import display, Image
import matplotlib.pyplot as plt
import time

def get_frame_from_video(video_path):
    """Extract a representative frame from video for counting line setup"""
    if not os.path.exists(video_path):
        print(f"Error: Video file {video_path} not found")
        return None
        
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return None
        
    # Get video properties
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    
    print(f"Video dimensions: {width}x{height} pixels")
    print(f"Total frames: {frame_count}, FPS: {fps:.2f}")
    
    # Jump to ~20% into the video for a better representative frame
    target_frame = int(frame_count * 0.2)
    cap.set(cv2.CAP_PROP_POS_FRAMES, target_frame)
    
    # Read the frame
    ret, frame = cap.read()
    cap.release()
    
    if not ret:
        print("Error: Could not read frame from video")
        return None
        
    return frame, width, height

# Load current config to get video source
with open(config_path, 'r') as file:
    config = yaml.safe_load(file)

video_path = config['video']['source']
print(f"Getting sample frame from {video_path} to help set counting line...")

# Get a frame from the video
result = get_frame_from_video(video_path)
if result is None:
    print("Cannot help set counting line without a valid video frame")
else:
    frame, width, height = result
    
    # Display the frame
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    plt.title('Sample frame from video - Use this to plan your counting line')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()
    
    # Show current counting line settings if they exist
    use_raw = config.get('counting', {}).get('use_raw_coordinates', True)  # Default to True for raw coordinates
    print(f"Current setting - Use raw coordinates: {use_raw}")
    
    if use_raw:
        if 'raw_coordinates' in config.get('counting', {}):
            start = config['counting']['raw_coordinates']['start']
            end = config['counting']['raw_coordinates']['end']
            print(f"Current raw coordinates: start={start}, end={end}")
    else:
        if 'normalized_coordinates' in config.get('counting', {}):
            start = config['counting']['normalized_coordinates']['start']
            end = config['counting']['normalized_coordinates']['end']
            print(f"Current normalized coordinates: start={start}, end={end}")
            # Convert to pixel positions for visualization
            raw_start = [int(start[0] * width), int(start[1] * height)]
            raw_end = [int(end[0] * width), int(end[1] * height)]
            print(f"Which is equivalent to pixel coordinates: start={raw_start}, end={raw_end}")
    
    # Prompt for new counting line coordinates
    print("\nNow let's set a counting line that crosses the road in your video")
    print("You have two options:")
    print("1. Use raw pixel coordinates (exact positions on screen)")
    print("2. Use normalized coordinates (0-1 values that work with any video size)\n")
    
    use_raw_input = input("Do you want to use raw pixel coordinates? (y/n): ").lower().strip()
    use_raw_coordinates = use_raw_input.startswith('y')
    
    if use_raw_coordinates:
        print(f"\nEnter raw pixel coordinates (values from 0 to {width} for X, 0 to {height} for Y)")
        start_x = int(input(f"Enter starting X position (0-{width}): ").strip())
        start_y = int(input(f"Enter starting Y position (0-{height}): ").strip())
        end_x = int(input(f"Enter ending X position (0-{width}): ").strip())
        end_y = int(input(f"Enter ending Y position (0-{height}): ").strip())
        
        # Save raw coordinates
        if 'counting' not in config:
            config['counting'] = {}
        config['counting']['use_raw_coordinates'] = True
        config['counting']['raw_coordinates'] = {
            'start': [start_x, start_y],
            'end': [end_x, end_y]
        }
        
        # Also calculate and save normalized coordinates for flexibility
        norm_start_x = start_x / width
        norm_start_y = start_y / height
        norm_end_x = end_x / width
        norm_end_y = end_y / height
        config['counting']['normalized_coordinates'] = {
            'start': [norm_start_x, norm_start_y],
            'end': [norm_end_x, norm_end_y]
        }
    else:
        print("\nEnter normalized coordinates (values from 0 to 1, where 0.5 is the middle)")
        norm_start_x = float(input("Enter starting X position (0-1): ").strip())
        norm_start_y = float(input("Enter starting Y position (0-1): ").strip())
        norm_end_x = float(input("Enter ending X position (0-1): ").strip())
        norm_end_y = float(input("Enter ending Y position (0-1): ").strip())
        
        # Save normalized coordinates
        if 'counting' not in config:
            config['counting'] = {}
        config['counting']['use_raw_coordinates'] = False
        config['counting']['normalized_coordinates'] = {
            'start': [norm_start_x, norm_start_y],
            'end': [norm_end_x, norm_end_y]
        }
        
        # Also calculate and save raw coordinates for visualization
        start_x = int(norm_start_x * width)
        start_y = int(norm_start_y * height)
        end_x = int(norm_end_x * width)
        end_y = int(norm_end_y * height)
        config['counting']['raw_coordinates'] = {
            'start': [start_x, start_y],
            'end': [end_x, end_y]
        }
    
    # Save the updated config
    with open(config_path, 'w') as file:
        yaml.dump(config, file, default_flow_style=False)
        
    # Display the frame with the counting line
    line_frame = frame.copy()
    cv2.line(line_frame, 
             (start_x, start_y), 
             (end_x, end_y), 
             (0, 0, 255), 2)
             
    # Add text labels  
    cv2.putText(line_frame, "Counting Line", (start_x + 10, start_y - 10),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(line_frame, cv2.COLOR_BGR2RGB))
    plt.title('Video frame with your counting line')
    plt.tight_layout()
    plt.show()
    
    print("\nCounting line has been set successfully!")
    if use_raw_coordinates:
        print(f"Raw coordinates: start=[{start_x}, {start_y}], end=[{end_x}, {end_y}]")
        print(f"Equivalent normalized: start=[{norm_start_x:.2f}, {norm_start_y:.2f}], end=[{norm_end_x:.2f}, {norm_end_y:.2f}]")
    else:
        print(f"Normalized coordinates: start=[{norm_start_x:.2f}, {norm_start_y:.2f}], end=[{norm_end_x:.2f}, {norm_end_y:.2f}]")
        print(f"Equivalent raw: start=[{start_x}, {start_y}], end=[{end_x}, {end_y}]")
    print("\nVehicles will be counted when they cross this line.")

## 6. Edit Configuration (Optional)

You can manually adjust various settings to customize how the system works. Add, remove, or modify entries as needed.

In [None]:
# Load current config
with open(config_path, 'r') as file:
    config = yaml.safe_load(file)

# Configure for NVIDIA GPU usage if available
# Update GPU configuration based on system detection
if has_gpu:
    print("NVIDIA GPU detected! Enabling GPU acceleration for all components.")
    # Enable GPU for all components
    config['hardware']['use_gpu'] = True
    config['hardware']['provider'] = 'CUDAExecutionProvider'
    config['hardware']['precision'] = 'fp16'  # Use half precision for better performance
    # Enable GPU for OCR as well
    config['ocr']['use_gpu'] = True
else:
    print("No GPU detected. Using CPU for processing.")
    config['hardware']['use_gpu'] = False
    config['hardware']['provider'] = 'CPUExecutionProvider'
    config['ocr']['use_gpu'] = False

# Set video processing parameters
config['video']['frame_skip'] = 1  # Process every frame (2 would process every other frame)
config['detection']['confidence'] = 0.25  # Lower = more detections but more false positives

# Save the updated config
with open(config_path, 'w') as file:
    yaml.dump(config, file, default_flow_style=False)

print("Configuration updated successfully.")

## 7. Run Traffic Monitoring System

Now let's run the traffic monitoring system using the main.py script.

In [None]:
import subprocess
import time
import os
from IPython.display import HTML, display
from pathlib import Path

# Get output directory path for recordings
output_dir = Path('data/recordings')
output_dir.mkdir(parents=True, exist_ok=True)

# Generate a timestamp for the output file
timestamp = time.strftime("%Y%m%d_%H%M%S")
output_file = str(output_dir / f"traffic_monitoring_{timestamp}.mp4")

print(f"Will save output to: {output_file}")

# Run main.py with arguments
if IN_COLAB or IN_KAGGLE:
    # In Colab/Kaggle, we need to run in headless mode (no UI) and just record
    cmd = [sys.executable, 'main.py', '--no-ui', '--record', f'--output={output_file}']
else:
    # If running locally, show UI and also record
    cmd = [sys.executable, 'main.py', '--record', f'--output={output_file}']

print(f"Running command: {' '.join(cmd)}")

# Execute the command
try:
    # Run the process and capture output
    process = subprocess.Popen(
        cmd, 
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        universal_newlines=True,
        bufsize=1
    )
    
    # Print output in real-time
    print("Process started. Output:")
    for line in process.stdout:
        print(line, end='')
        
    # Wait for process to complete
    process.wait()
    print(f"Process completed with return code: {process.returncode}")
except KeyboardInterrupt:
    print("\nProcess interrupted by user.")
    # Try to terminate the process gracefully
    process.terminate()
    try:
        process.wait(timeout=5)
    except subprocess.TimeoutExpired:
        process.kill()
except Exception as e:
    print(f"Error running traffic monitoring: {e}")

## 8. Display Results

Let's display the output video and provide download options.

In [None]:
from IPython.display import HTML, display
from base64 import b64encode

# Function to create HTML video player for notebook
def display_video(video_path):
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        return
    
    # Get video data
    video_data = open(video_path, 'rb').read()
    
    # Encode as base64
    video_base64 = b64encode(video_data).decode('utf-8')
    
    # Create HTML with video player
    html = f'''
    <div style="display: flex; flex-direction: column; align-items: center;">
        <h3>Processed Video: {os.path.basename(video_path)}</h3>
        <video width="640" height="480" controls>
            <source src="data:video/mp4;base64,{video_base64}" type="video/mp4">
            Your browser does not support the video tag.
        </video>
    </div>
    '''
    
    # Display the video
    display(HTML(html))

# Find and display the most recent output video
output_videos = glob.glob('data/recordings/*.mp4')

if output_videos:
    # Sort by modification time to get the most recent
    latest_video = max(output_videos, key=os.path.getmtime)
    print(f"Displaying most recent output video: {latest_video}")
    display_video(latest_video)
    
    # Provide download link for Colab users
    if IN_COLAB:
        from google.colab import files
        print("\nDownload the processed video:")
        files.download(latest_video)
else:
    print("No output videos found. The processing may have failed or been interrupted.")

## 9. Analyze Results from Database

Optionally, let's explore the data collected in the SQLite database.

In [None]:
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt

# Connect to the database
db_path = 'data/traffic_data.db'

if os.path.exists(db_path):
    try:
        # Connect to the database
        conn = sqlite3.connect(db_path)
        
        # Get table names
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()
        
        print("Database tables:")
        for table in tables:
            print(f"- {table[0]}")
            
        # Query vehicle counts
        df_counts = pd.read_sql("SELECT * FROM vehicle_counts ORDER BY timestamp DESC LIMIT 100", conn)
        if not df_counts.empty:
            print(f"\nVehicle counts (most recent {len(df_counts)} records):")
            display(df_counts.head())
            
            # Plot counts over time if data exists
            if len(df_counts) > 1:
                plt.figure(figsize=(10, 6))
                plt.plot(df_counts['timestamp'], df_counts['count'], marker='o')
                plt.title('Vehicle Counts Over Time')
                plt.xlabel('Timestamp')
                plt.ylabel('Count')
                plt.xticks(rotation=45)
                plt.tight_layout()
                plt.show()
        
        # Query license plates
        df_plates = pd.read_sql("SELECT * FROM license_plates ORDER BY timestamp DESC LIMIT 100", conn)
        if not df_plates.empty:
            print(f"\nDetected license plates (most recent {len(df_plates)} records):")
            display(df_plates.head())
            
        # Close connection
        conn.close()
        
    except Exception as e:
        print(f"Error accessing database: {e}")
else:
    print(f"Database file not found: {db_path}")

## 10. Conclusion

You've successfully run the Traffic Monitoring System! Here's what we accomplished:

1. Set up the necessary environment
2. Downloaded the required AI models
3. Configured the system by directly updating the config.yaml file
4. Processed a video to detect, track, count vehicles and read license plates
5. Saved and analyzed the results

This approach is more efficient as it leverages the existing main.py infrastructure rather than reimplementing functionality in the notebook.

### Next Steps

- Try different videos and configurations
- Explore the database in more detail for advanced analytics
- If you're interested in the technical details, explore the source code in the repository