# Traffic Flow Analysis Using CCTV Footage

## Comprehensive Demo: Object Detection, Vehicle Classification, and Congestion Analysis

This notebook demonstrates a complete traffic flow analysis system that:
- **Detects vehicles** in CCTV footage using YOLO
- **Classifies vehicle types** (car, bike, bus, truck)
- **Tracks vehicle movement** across frames
- **Estimates congestion levels** and traffic patterns
- **Provides analytics** for smart city applications

**Key Technologies:**
- Computer Vision: OpenCV, YOLO, Deep Learning
- Data Science: Pandas, NumPy, Statistical Modeling
- Visualization: Matplotlib, Seaborn, Plotly

## 1. Import Required Libraries

Import OpenCV, YOLO/detectron2, NumPy, pandas, matplotlib, and other necessary libraries for computer vision and data analysis.

In [None]:
# Core libraries for computer vision and data processing
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Deep learning and object detection
from ultralytics import YOLO
import torch
import torchvision.transforms as transforms

# Additional utilities
import os
import sys
import yaml
import json
import time
from datetime import datetime, timedelta
from collections import defaultdict, deque
from typing import List, Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# Add project path to system path
sys.path.append('../')

# Import our custom modules
from src.detection.vehicle_detector import VehicleDetector
from src.tracking.vehicle_tracker import VehicleTracker
from src.classification.vehicle_classifier import VehicleClassifier
from src.analytics.traffic_analyzer import TrafficAnalyzer
from src.visualization.traffic_visualizer import TrafficVisualizer
from src.utils.helpers import *

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("‚úÖ All libraries imported successfully!")
print(f"OpenCV version: {cv2.__version__}")
print(f"PyTorch version: {torch.__version__}")

## 2. Load and Preprocess Video Data

Load CCTV video files, extract frames, and apply preprocessing techniques like resizing and noise reduction.

In [None]:
# Configuration for video processing
VIDEO_PATH = "../data/videos/sample_traffic.mp4"  # Update with your video path
SAMPLE_FRAMES = 5  # Number of frames to display for preview

def load_and_preview_video(video_path: str, max_frames: int = 5):
    """Load video and show sample frames"""

    # Check if video file exists (for demo, we'll create a placeholder)
    if not os.path.exists(video_path):
        print(f"‚ö†Ô∏è Video file not found at {video_path}")
        print("üìù For this demo, we'll simulate video processing")
        print("   In a real scenario, place your CCTV footage in the data/videos/ directory")
        return None

    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("‚ùå Error opening video file")
        return None

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    duration = frame_count / fps

    print(f"üìπ Video Properties:")
    print(f"   Resolution: {width}x{height}")
    print(f"   FPS: {fps:.2f}")
    print(f"   Total Frames: {frame_count}")
    print(f"   Duration: {duration:.2f} seconds")

    # Extract sample frames
    frames = []
    frame_indices = np.linspace(0, frame_count-1, max_frames, dtype=int)

    for i, frame_idx in enumerate(frame_indices):
        cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
        ret, frame = cap.read()
        if ret:
            frames.append(frame)

    cap.release()

    # Display sample frames
    if frames:
        fig, axes = plt.subplots(1, len(frames), figsize=(15, 3))
        if len(frames) == 1:
            axes = [axes]

        for i, (frame, ax) in enumerate(zip(frames, axes)):
            # Convert BGR to RGB for matplotlib
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            ax.imshow(frame_rgb)
            ax.set_title(f'Frame {frame_indices[i]}')
            ax.axis('off')

        plt.suptitle('Sample Frames from Traffic Video')
        plt.tight_layout()
        plt.show()

    return frames

def preprocess_frame(frame: np.ndarray, target_size: Tuple[int, int] = None) -> np.ndarray:
    """Apply preprocessing to frame"""
    processed = frame.copy()

    # Resize if target size specified
    if target_size:
        processed = cv2.resize(processed, target_size)

    # Apply slight noise reduction
    processed = cv2.bilateralFilter(processed, 9, 75, 75)

    # Enhance contrast slightly
    processed = cv2.convertScaleAbs(processed, alpha=1.1, beta=10)

    return processed

# Load and preview video
print("üé¨ Loading and previewing video data...")
sample_frames = load_and_preview_video(VIDEO_PATH, SAMPLE_FRAMES)

# Demonstrate preprocessing
if sample_frames:
    print("\nüîÑ Demonstrating frame preprocessing...")

    original = sample_frames[0]
    processed = preprocess_frame(original, target_size=(640, 480))

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    ax1.imshow(cv2.cvtColor(original, cv2.COLOR_BGR2RGB))
    ax1.set_title('Original Frame')
    ax1.axis('off')

    ax2.imshow(cv2.cvtColor(processed, cv2.COLOR_BGR2RGB))
    ax2.set_title('Preprocessed Frame')
    ax2.axis('off')

    plt.tight_layout()
    plt.show()
else:
    # Create synthetic demo data for visualization
    print("\nüé® Creating synthetic demo frame...")

    # Create a synthetic traffic scene
    demo_frame = np.zeros((480, 640, 3), dtype=np.uint8)
    demo_frame[:] = [50, 50, 50]  # Dark gray background

    # Draw road
    cv2.rectangle(demo_frame, (0, 200), (640, 280), (60, 60, 60), -1)

    # Draw lane markings
    for x in range(0, 640, 40):
        cv2.rectangle(demo_frame, (x, 235), (x+20, 245), (255, 255, 255), -1)

    # Draw some "vehicles" as rectangles
    vehicles = [
        (100, 210, 160, 270, (0, 0, 255)),   # Red car
        (250, 215, 300, 265, (255, 0, 0)),   # Blue car
        (400, 205, 480, 275, (0, 255, 255)), # Yellow bus
    ]

    for x1, y1, x2, y2, color in vehicles:
        cv2.rectangle(demo_frame, (x1, y1), (x2, y2), color, -1)
        cv2.rectangle(demo_frame, (x1, y1), (x2, y2), (255, 255, 255), 2)

    plt.figure(figsize=(10, 6))
    plt.imshow(cv2.cvtColor(demo_frame, cv2.COLOR_BGR2RGB))
    plt.title('Synthetic Demo Traffic Scene')
    plt.axis('off')
    plt.show()

    sample_frames = [demo_frame]

print("‚úÖ Video data loading and preprocessing complete!")

## 3. Setup Object Detection Model

Initialize a pre-trained object detection model (YOLO or similar) and configure it for vehicle detection.

In [None]:
# Initialize YOLO model for vehicle detection
print("ü§ñ Setting up object detection model...")

try:
    # Load YOLOv8 model (you can also use YOLOv5 or other versions)
    model = YOLO('yolov8n.pt')  # nano version for faster inference
    print("‚úÖ YOLOv8 model loaded successfully!")

    # Display model information
    print(f"   Model classes: {len(model.names)} classes")
    print(f"   Vehicle-related classes: {[name for id, name in model.names.items() if name in ['car', 'motorcycle', 'bus', 'truck', 'bicycle']]}")

except Exception as e:
    print(f"‚ö†Ô∏è Could not load YOLOv8 model: {e}")
    print("   Using mock detection for demo purposes")
    model = None

# Vehicle class mapping from COCO dataset
VEHICLE_CLASSES = {
    'car': 2,
    'motorcycle': 3,
    'bus': 5,
    'truck': 7,
    'bicycle': 1
}

# Configuration for detection
DETECTION_CONFIG = {
    'confidence_threshold': 0.5,
    'nms_threshold': 0.4,
    'input_size': (640, 640)
}

def detect_vehicles_demo(frame, model=None, show_all_detections=False):
    """Demo function for vehicle detection"""

    if model is not None:
        # Real YOLO detection
        results = model(frame, conf=DETECTION_CONFIG['confidence_threshold'],
                       iou=DETECTION_CONFIG['nms_threshold'])

        detections = []
        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    confidence = box.conf[0].cpu().numpy()
                    class_id = int(box.cls[0].cpu().numpy())
                    class_name = model.names[class_id]

                    # Filter for vehicles or show all if requested
                    if show_all_detections or class_id in VEHICLE_CLASSES.values():
                        detection = {
                            'bbox': [int(x1), int(y1), int(x2), int(y2)],
                            'confidence': float(confidence),
                            'class': class_name,
                            'class_id': class_id,
                            'is_vehicle': class_id in VEHICLE_CLASSES.values()
                        }
                        detections.append(detection)

        return detections

    else:
        # Mock detection for demo
        mock_detections = [
            {'bbox': [100, 210, 160, 270], 'confidence': 0.85, 'class': 'car', 'class_id': 2, 'is_vehicle': True},
            {'bbox': [250, 215, 300, 265], 'confidence': 0.92, 'class': 'car', 'class_id': 2, 'is_vehicle': True},
            {'bbox': [400, 205, 480, 275], 'confidence': 0.78, 'class': 'bus', 'class_id': 5, 'is_vehicle': True},
        ]
        return mock_detections

def visualize_detections(frame, detections):
    """Visualize detection results on frame"""
    vis_frame = frame.copy()

    colors = {
        'car': (0, 255, 0),      # Green
        'motorcycle': (255, 0, 0), # Blue
        'bus': (0, 0, 255),      # Red
        'truck': (255, 255, 0),  # Cyan
        'bicycle': (255, 0, 255), # Magenta
    }

    for detection in detections:
        bbox = detection['bbox']
        class_name = detection['class']
        confidence = detection['confidence']

        x1, y1, x2, y2 = bbox
        color = colors.get(class_name, (255, 255, 255))

        # Draw bounding box
        cv2.rectangle(vis_frame, (x1, y1), (x2, y2), color, 2)

        # Draw label
        label = f"{class_name}: {confidence:.2f}"
        label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
        cv2.rectangle(vis_frame, (x1, y1 - label_size[1] - 10),
                     (x1 + label_size[0], y1), color, -1)
        cv2.putText(vis_frame, label, (x1, y1 - 5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2)

    return vis_frame

# Test detection on sample frame
if sample_frames:
    print("\nüîç Testing vehicle detection...")

    test_frame = sample_frames[0]
    detections = detect_vehicles_demo(test_frame, model)

    print(f"   Detected {len(detections)} vehicles:")
    for i, det in enumerate(detections):
        print(f"     {i+1}. {det['class']} (confidence: {det['confidence']:.2f})")

    # Visualize results
    vis_frame = visualize_detections(test_frame, detections)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

    ax1.imshow(cv2.cvtColor(test_frame, cv2.COLOR_BGR2RGB))
    ax1.set_title('Original Frame')
    ax1.axis('off')

    ax2.imshow(cv2.cvtColor(vis_frame, cv2.COLOR_BGR2RGB))
    ax2.set_title(f'Vehicle Detection Results ({len(detections)} vehicles)')
    ax2.axis('off')

    plt.tight_layout()
    plt.show()

    # Detection statistics
    vehicle_counts = {}
    for det in detections:
        vehicle_type = det['class']
        vehicle_counts[vehicle_type] = vehicle_counts.get(vehicle_type, 0) + 1

    if vehicle_counts:
        plt.figure(figsize=(8, 5))
        plt.bar(vehicle_counts.keys(), vehicle_counts.values(), color=['skyblue', 'orange', 'lightgreen', 'pink', 'yellow'])
        plt.title('Vehicle Detection Summary')
        plt.xlabel('Vehicle Type')
        plt.ylabel('Count')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()

print("‚úÖ Object detection model setup complete!")

## 4. Implement Vehicle Detection and Classification

Detect vehicles in video frames and classify them into categories (car, bike, bus, truck) using the trained model.

In [None]:
# Enhanced vehicle detection and classification system
print("üéØ Implementing enhanced vehicle detection and classification...")

class EnhancedVehicleClassifier:
    """Enhanced vehicle classifier with feature extraction"""

    def __init__(self):
        self.vehicle_features = {}

    def extract_features(self, image_crop, bbox):
        """Extract features from vehicle crop"""
        if image_crop.size == 0:
            return np.zeros(10)  # Return zero features for empty crop

        # Basic geometric features
        height, width = image_crop.shape[:2] if len(image_crop.shape) >= 2 else (1, 1)
        aspect_ratio = width / height if height > 0 else 0
        area = width * height

        # Color features (simplified)
        if len(image_crop.shape) == 3:
            mean_colors = np.mean(image_crop.reshape(-1, 3), axis=0)
        else:
            mean_colors = [0, 0, 0]

        # Texture features (simplified)
        gray = cv2.cvtColor(image_crop, cv2.COLOR_BGR2GRAY) if len(image_crop.shape) == 3 else image_crop
        if gray.size > 0:
            gradient = cv2.Laplacian(gray, cv2.CV_64F)
            texture_measure = np.var(gradient)
        else:
            texture_measure = 0

        features = np.array([
            width, height, aspect_ratio, area,
            mean_colors[0], mean_colors[1], mean_colors[2],
            texture_measure,
            np.std(gray) if gray.size > 0 else 0,
            np.mean(gray) if gray.size > 0 else 0
        ])

        return features

    def classify_vehicle_enhanced(self, image_crop, bbox, base_class):
        """Enhanced classification using feature analysis"""
        features = self.extract_features(image_crop, bbox)

        # Rule-based enhancement
        width, height, aspect_ratio, area = features[:4]

        # Refine classification based on features
        if base_class == 'car':
            if aspect_ratio > 2.0 and area > 8000:
                return 'bus', 0.8
            elif aspect_ratio < 1.2 and area > 6000:
                return 'truck', 0.75
        elif base_class == 'motorcycle':
            if area > 5000:  # Too large for motorcycle
                return 'car', 0.7

        return base_class, 0.9  # Return original with high confidence

# Initialize enhanced classifier
enhanced_classifier = EnhancedVehicleClassifier()

def process_frame_with_classification(frame, model=None):
    """Process frame with detection and enhanced classification"""
    detections = detect_vehicles_demo(frame, model)

    enhanced_detections = []
    for detection in detections:
        bbox = detection['bbox']
        x1, y1, x2, y2 = bbox

        # Extract vehicle crop
        vehicle_crop = frame[y1:y2, x1:x2]

        # Enhanced classification
        enhanced_class, confidence = enhanced_classifier.classify_vehicle_enhanced(
            vehicle_crop, bbox, detection['class']
        )

        enhanced_detection = detection.copy()
        enhanced_detection['enhanced_class'] = enhanced_class
        enhanced_detection['enhanced_confidence'] = confidence

        enhanced_detections.append(enhanced_detection)

    return enhanced_detections

# Demonstrate enhanced classification
if sample_frames:
    print("\nüß† Demonstrating enhanced vehicle classification...")

    enhanced_detections = process_frame_with_classification(sample_frames[0], model)

    # Create visualization comparing original vs enhanced classification
    fig, axes = plt.subplots(2, len(enhanced_detections), figsize=(15, 8))
    if len(enhanced_detections) == 1:
        axes = axes.reshape(-1, 1)

    for i, detection in enumerate(enhanced_detections):
        bbox = detection['bbox']
        x1, y1, x2, y2 = bbox

        # Extract vehicle crop
        vehicle_crop = sample_frames[0][y1:y2, x1:x2]
        if vehicle_crop.size > 0:
            vehicle_crop_rgb = cv2.cvtColor(vehicle_crop, cv2.COLOR_BGR2RGB)

            # Original classification
            axes[0, i].imshow(vehicle_crop_rgb)
            axes[0, i].set_title(f"Original: {detection['class']}\\n({detection['confidence']:.2f})")
            axes[0, i].axis('off')

            # Enhanced classification
            axes[1, i].imshow(vehicle_crop_rgb)
            axes[1, i].set_title(f"Enhanced: {detection['enhanced_class']}\\n({detection['enhanced_confidence']:.2f})")
            axes[1, i].axis('off')

    plt.suptitle('Vehicle Classification Comparison')
    plt.tight_layout()
    plt.show()

    # Classification summary
    print("\\nüìä Classification Results:")
    for i, detection in enumerate(enhanced_detections):
        print(f"   Vehicle {i+1}:")
        print(f"     Original: {detection['class']} ({detection['confidence']:.2f})")
        print(f"     Enhanced: {detection['enhanced_class']} ({detection['enhanced_confidence']:.2f})")

        # Feature analysis
        bbox = detection['bbox']
        x1, y1, x2, y2 = bbox
        vehicle_crop = sample_frames[0][y1:y2, x1:x2]
        features = enhanced_classifier.extract_features(vehicle_crop, bbox)

        print(f"     Features: W={features[0]:.0f}, H={features[1]:.0f}, AR={features[2]:.2f}")

print("\\n‚úÖ Enhanced vehicle detection and classification complete!")

## 5. Track Vehicle Movement

Implement object tracking algorithms to follow vehicles across frames and maintain consistent IDs.

In [None]:
# Vehicle tracking implementation
print("üéØ Implementing vehicle tracking system...")

class SimpleVehicleTracker:
    """Simple centroid-based vehicle tracker for demo"""

    def __init__(self, max_disappeared=10, max_distance=100):
        self.next_object_id = 0
        self.objects = {}
        self.disappeared = {}
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance

    def register(self, centroid, detection):
        """Register new object"""
        self.objects[self.next_object_id] = {
            'centroid': centroid,
            'detection': detection,
            'trajectory': [centroid],
            'frame_count': 1
        }
        self.disappeared[self.next_object_id] = 0
        object_id = self.next_object_id
        self.next_object_id += 1
        return object_id

    def deregister(self, object_id):
        """Remove object"""
        del self.objects[object_id]
        del self.disappeared[object_id]

    def update(self, detections):
        """Update tracker with new detections"""
        if len(detections) == 0:
            # Mark all objects as disappeared
            for object_id in list(self.disappeared.keys()):
                self.disappeared[object_id] += 1
                if self.disappeared[object_id] > self.max_disappeared:
                    self.deregister(object_id)
            return self.objects

        # Get centroids from detections
        input_centroids = []
        for detection in detections:
            bbox = detection['bbox']
            cx = (bbox[0] + bbox[2]) // 2
            cy = (bbox[1] + bbox[3]) // 2
            input_centroids.append((cx, cy))

        if len(self.objects) == 0:
            # Register all new objects
            for i, centroid in enumerate(input_centroids):
                self.register(centroid, detections[i])
        else:
            # Match existing objects with new detections
            object_centroids = [obj['centroid'] for obj in self.objects.values()]
            object_ids = list(self.objects.keys())

            # Compute distance matrix
            distances = np.linalg.norm(
                np.array(object_centroids)[:, np.newaxis] - np.array(input_centroids),
                axis=2
            )

            # Find minimum distance associations
            rows = distances.min(axis=1).argsort()
            cols = distances.argmin(axis=1)[rows]

            used_row_indices = set()
            used_col_indices = set()

            for (row, col) in zip(rows, cols):
                if row in used_row_indices or col in used_col_indices:
                    continue

                if distances[row, col] > self.max_distance:
                    continue

                # Update existing object
                object_id = object_ids[row]
                self.objects[object_id]['centroid'] = input_centroids[col]
                self.objects[object_id]['detection'] = detections[col]
                self.objects[object_id]['trajectory'].append(input_centroids[col])
                self.objects[object_id]['frame_count'] += 1
                self.disappeared[object_id] = 0

                used_row_indices.add(row)
                used_col_indices.add(col)

            # Handle unmatched detections and objects
            unused_rows = set(range(0, distances.shape[0])).difference(used_row_indices)
            unused_cols = set(range(0, distances.shape[1])).difference(used_col_indices)

            if distances.shape[0] >= distances.shape[1]:
                # More objects than detections
                for row in unused_rows:
                    object_id = object_ids[row]
                    self.disappeared[object_id] += 1
                    if self.disappeared[object_id] > self.max_disappeared:
                        self.deregister(object_id)
            else:
                # More detections than objects
                for col in unused_cols:
                    self.register(input_centroids[col], detections[col])

        return self.objects

def visualize_tracking(frame, tracked_objects):
    """Visualize tracking results"""
    vis_frame = frame.copy()

    colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
              (255, 0, 255), (0, 255, 255), (128, 0, 128), (255, 165, 0)]

    for i, (object_id, obj_info) in enumerate(tracked_objects.items()):
        centroid = obj_info['centroid']
        trajectory = obj_info['trajectory']
        detection = obj_info['detection']
        color = colors[i % len(colors)]

        # Draw bounding box
        bbox = detection['bbox']
        cv2.rectangle(vis_frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)

        # Draw trajectory
        if len(trajectory) > 1:
            for j in range(1, len(trajectory)):
                cv2.line(vis_frame, trajectory[j-1], trajectory[j], color, 2)

        # Draw centroid
        cv2.circle(vis_frame, centroid, 5, color, -1)

        # Draw ID
        cv2.putText(vis_frame, f"ID: {object_id}",
                   (centroid[0] - 20, centroid[1] - 20),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        # Draw vehicle type
        vehicle_type = detection.get('enhanced_class', detection.get('class', 'unknown'))
        cv2.putText(vis_frame, vehicle_type,
                   (bbox[0], bbox[1] - 5),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

    return vis_frame

# Initialize tracker
tracker = SimpleVehicleTracker(max_disappeared=5, max_distance=80)

# Simulate tracking across multiple frames
print("\nüé¨ Simulating vehicle tracking across frames...")

if sample_frames:
    # Create multiple frames with slight variations to simulate movement
    tracking_frames = []
    tracking_results = []

    base_detections = process_frame_with_classification(sample_frames[0], model)

    # Simulate movement across 5 frames
    for frame_idx in range(5):
        # Simulate vehicle movement
        simulated_detections = []
        for det in base_detections:
            bbox = det['bbox'].copy()

            # Simulate movement (move vehicles slightly)
            movement_x = frame_idx * 10 + np.random.randint(-5, 5)
            movement_y = np.random.randint(-2, 2)

            bbox[0] += movement_x
            bbox[2] += movement_x
            bbox[1] += movement_y
            bbox[3] += movement_y

            # Keep bbox within frame
            bbox[0] = max(0, bbox[0])
            bbox[2] = min(sample_frames[0].shape[1], bbox[2])
            bbox[1] = max(0, bbox[1])
            bbox[3] = min(sample_frames[0].shape[0], bbox[3])

            sim_det = det.copy()
            sim_det['bbox'] = bbox
            simulated_detections.append(sim_det)

        # Update tracker
        tracked_objects = tracker.update(simulated_detections)
        tracking_results.append(tracked_objects.copy())

        # Create visualization frame
        vis_frame = visualize_tracking(sample_frames[0], tracked_objects)
        tracking_frames.append(vis_frame)

    # Display tracking results
    fig, axes = plt.subplots(1, len(tracking_frames), figsize=(20, 4))
    if len(tracking_frames) == 1:
        axes = [axes]

    for i, (frame, ax) in enumerate(zip(tracking_frames, axes)):
        ax.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        ax.set_title(f'Frame {i+1}\\nTracked Objects: {len(tracking_results[i])}')
        ax.axis('off')

    plt.suptitle('Vehicle Tracking Across Frames')
    plt.tight_layout()
    plt.show()

    # Tracking statistics
    print(f"\\nüìä Tracking Statistics:")
    print(f"   Total objects tracked: {tracker.next_object_id}")
    print(f"   Currently active objects: {len(tracker.objects)}")

    # Trajectory analysis
    for obj_id, obj_info in tracker.objects.items():
        trajectory = obj_info['trajectory']
        if len(trajectory) > 1:
            distance_traveled = sum([
                np.sqrt((trajectory[i][0] - trajectory[i-1][0])**2 +
                       (trajectory[i][1] - trajectory[i-1][1])**2)
                for i in range(1, len(trajectory))
            ])
            print(f"   Object {obj_id}: {len(trajectory)} points, {distance_traveled:.1f}px traveled")

print("\\n‚úÖ Vehicle tracking implementation complete!")

## 6. Count Vehicles by Type

Count the number of each vehicle type passing through defined zones or crossing lines in the video.

In [None]:
# Vehicle counting system
print("üî¢ Implementing vehicle counting system...")

class VehicleCounter:
    """Vehicle counter with line crossing detection"""

    def __init__(self):
        self.total_counts = defaultdict(int)
        self.crossed_objects = set()
        self.counting_line = None

    def set_counting_line(self, line_points):
        """Set the counting line"""
        self.counting_line = line_points

    def check_line_crossing(self, object_id, current_pos, previous_pos):
        """Check if object crossed the counting line"""
        if self.counting_line is None or previous_pos is None:
            return False

        x1, y1 = self.counting_line
        x2, y2 = current_pos
        x0, y0 = previous_pos

        # Simple crossing detection (check if crossed horizontal line)
        if y0 < y1 and y2 >= y1:
            return True
        elif y0 > y1 and y2 <= y1:
            return True

        return False

    def update(self, tracked_objects):
        """Update counts based on tracked objects"""
        for obj_id, obj_info in tracked_objects.items():
            trajectory = obj_info['trajectory']
            vehicle_type = obj_info['detection'].get('enhanced_class', obj_info['detection'].get('class'))

            # Check if crossed counting line
            if len(trajectory) >= 2 and obj_id not in self.crossed_objects:
                if self.check_line_crossing(obj_id, trajectory[-1], trajectory[-2]):
                    self.total_counts[vehicle_type] += 1
                    self.total_counts['total'] += 1
                    self.crossed_objects.add(obj_id)

        return self.total_counts

def visualize_counts(frame, counts, counting_line=None):
    """Visualize vehicle counts"""
    vis_frame = frame.copy()

    # Draw counting line if specified
    if counting_line:
        y_line = counting_line[1]
        cv2.line(vis_frame, (0, y_line), (vis_frame.shape[1], y_line), (0, 255, 255), 3)
        cv2.putText(vis_frame, "Counting Line", (10, y_line - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)

    # Draw count information
    y_offset = 30
    cv2.rectangle(vis_frame, (10, 10), (300, 30 + len(counts) * 30), (0, 0, 0), -1)
    cv2.rectangle(vis_frame, (10, 10), (300, 30 + len(counts) * 30), (255, 255, 255), 2)

    cv2.putText(vis_frame, "Vehicle Counts", (20, y_offset),
               cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    y_offset += 30
    for vehicle_type, count in sorted(counts.items()):
        text = f"{vehicle_type.capitalize()}: {count}"
        cv2.putText(vis_frame, text, (20, y_offset),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
        y_offset += 25

    return vis_frame

# Initialize counter
counter = VehicleCounter()

# Set counting line (middle of frame)
if sample_frames:
    counting_line_y = sample_frames[0].shape[0] // 2
    counter.set_counting_line((0, counting_line_y))

    print(f"\\nüìè Counting line set at y={counting_line_y}")

    # Simulate counting across tracking results
    print("\\nüî¢ Counting vehicles...")

    count_frames = []
    for i, tracked_obj in enumerate(tracking_results):
        counts = counter.update(tracked_obj)
        vis_frame = visualize_counts(tracking_frames[i], counts, (0, counting_line_y))
        count_frames.append(vis_frame)

    # Display counting results
    fig, axes = plt.subplots(1, len(count_frames), figsize=(20, 4))
    if len(count_frames) == 1:
        axes = [axes]

    for i, (frame, ax) in enumerate(zip(count_frames, axes)):
        ax.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        ax.set_title(f'Frame {i+1}')
        ax.axis('off')

    plt.suptitle('Vehicle Counting Across Frames')
    plt.tight_layout()
    plt.show()

    # Final count summary
    print(f"\\nüìä Final Vehicle Counts:")
    final_counts = counter.total_counts
    for vehicle_type, count in sorted(final_counts.items()):
        print(f"   {vehicle_type.capitalize()}: {count}")

    # Visualize count distribution
    if len(final_counts) > 1:
        # Remove 'total' for pie chart
        count_dict = {k: v for k, v in final_counts.items() if k != 'total'}

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

        # Pie chart
        ax1.pie(count_dict.values(), labels=count_dict.keys(), autopct='%1.1f%%',
               startangle=90, colors=['skyblue', 'lightcoral', 'lightgreen', 'gold'])
        ax1.set_title('Vehicle Type Distribution')

        # Bar chart
        ax2.bar(count_dict.keys(), count_dict.values(), color=['skyblue', 'lightcoral', 'lightgreen', 'gold'])
        ax2.set_title('Vehicle Counts by Type')
        ax2.set_xlabel('Vehicle Type')
        ax2.set_ylabel('Count')
        ax2.tick_params(axis='x', rotation=45)

        plt.tight_layout()
        plt.show()

print("\\n‚úÖ Vehicle counting system complete!")

## 7. Calculate Traffic Flow Metrics & Estimate Congestion Levels

Compute traffic flow statistics and classify traffic conditions as light, moderate, or heavy congestion.

In [None]:
# Traffic flow metrics and congestion analysis
print("üìà Analyzing traffic flow and estimating congestion levels...")

class CongestionAnalyzer:
    """Analyze traffic congestion levels"""

    def __init__(self, thresholds={'low': 5, 'medium': 10, 'high': 15}):
        self.thresholds = thresholds
        self.flow_history = []

    def calculate_congestion_level(self, vehicle_count, frame_area):
        """Calculate congestion level based on vehicle density"""
        density = (vehicle_count * 10000) / frame_area  # Normalize by area

        if density <= self.thresholds['low']:
            return 'Low', 'green', density
        elif density <= self.thresholds['medium']:
            return 'Medium', 'yellow', density
        else:
            return 'High', 'red', density

    def calculate_flow_rate(self, vehicles_per_time_window, time_window_seconds):
        """Calculate vehicles per minute"""
        if time_window_seconds > 0:
            return (vehicles_per_time_window / time_window_seconds) * 60
        return 0

    def calculate_average_speed(self, tracked_objects):
        """Calculate average vehicle speed"""
        speeds = []
        for obj_id, obj_info in tracked_objects.items():
            trajectory = obj_info['trajectory']
            if len(trajectory) >= 2:
                p1, p2 = trajectory[-2], trajectory[-1]
                speed = np.sqrt((p2[0] - p1[0])**2 + (p2[1] - p1[1])**2)
                speeds.append(speed)

        return np.mean(speeds) if speeds else 0

    def analyze_traffic(self, tracked_objects, frame_shape):
        """Comprehensive traffic analysis"""
        frame_area = frame_shape[0] * frame_shape[1]
        vehicle_count = len(tracked_objects)

        # Congestion level
        congestion_level, congestion_color, density = self.calculate_congestion_level(
            vehicle_count, frame_area
        )

        # Average speed
        avg_speed = self.calculate_average_speed(tracked_objects)

        # Vehicle type breakdown
        type_counts = defaultdict(int)
        for obj_info in tracked_objects.values():
            vehicle_type = obj_info['detection'].get('enhanced_class',
                                                     obj_info['detection'].get('class'))
            type_counts[vehicle_type] += 1

        analysis = {
            'vehicle_count': vehicle_count,
            'congestion_level': congestion_level,
            'congestion_color': congestion_color,
            'traffic_density': density,
            'average_speed': avg_speed,
            'vehicle_breakdown': dict(type_counts),
            'timestamp': datetime.now()
        }

        self.flow_history.append(analysis)
        return analysis

def visualize_congestion(frame, analysis):
    """Visualize congestion information"""
    vis_frame = frame.copy()

    # Draw congestion indicator
    congestion_level = analysis['congestion_level']
    congestion_color_name = analysis['congestion_color']

    color_map = {
        'green': (0, 255, 0),
        'yellow': (0, 255, 255),
        'red': (0, 0, 255)
    }
    color = color_map.get(congestion_color_name, (255, 255, 255))

    # Draw congestion indicator circle
    cv2.circle(vis_frame, (vis_frame.shape[1] - 50, 50), 30, color, -1)
    cv2.circle(vis_frame, (vis_frame.shape[1] - 50, 50), 30, (255, 255, 255), 2)

    # Draw info panel
    panel_height = 150
    cv2.rectangle(vis_frame, (10, 10), (350, panel_height), (0, 0, 0), -1)
    cv2.rectangle(vis_frame, (10, 10), (350, panel_height), (255, 255, 255), 2)

    # Add text information
    info_text = [
        f"Congestion Level: {congestion_level}",
        f"Vehicles: {analysis['vehicle_count']}",
        f"Density: {analysis['traffic_density']:.2f}",
        f"Avg Speed: {analysis['average_speed']:.1f} px/frame",
    ]

    y_pos = 35
    for text in info_text:
        cv2.putText(vis_frame, text, (20, y_pos),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
        y_pos += 30

    return vis_frame

# Initialize analyzer
analyzer = CongestionAnalyzer(thresholds={'low': 2, 'medium': 4, 'high': 6})

if sample_frames and tracking_results:
    print("\\nüö¶ Analyzing traffic congestion...")

    # Analyze each frame
    congestion_frames = []
    congestion_analyses = []

    for i, tracked_obj in enumerate(tracking_results):
        analysis = analyzer.analyze_traffic(tracked_obj, sample_frames[0].shape)
        congestion_analyses.append(analysis)

        vis_frame = visualize_congestion(tracking_frames[i], analysis)
        congestion_frames.append(vis_frame)

    # Display congestion analysis
    fig, axes = plt.subplots(1, len(congestion_frames), figsize=(20, 4))
    if len(congestion_frames) == 1:
        axes = [axes]

    for i, (frame, ax, analysis) in enumerate(zip(congestion_frames, axes, congestion_analyses)):
        ax.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        ax.set_title(f'Frame {i+1}\\nCongestion: {analysis["congestion_level"]}')
        ax.axis('off')

    plt.suptitle('Traffic Congestion Analysis')
    plt.tight_layout()
    plt.show()

    # Traffic flow metrics over time
    print("\\nüìä Traffic Flow Metrics:")

    df_flow = pd.DataFrame([{
        'Frame': i+1,
        'Vehicles': analysis['vehicle_count'],
        'Congestion': analysis['congestion_level'],
        'Density': analysis['traffic_density'],
        'Speed': analysis['average_speed']
    } for i, analysis in enumerate(congestion_analyses)])

    print(df_flow.to_string(index=False))

    # Visualize metrics
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))

    # Vehicle count over time
    axes[0, 0].plot(df_flow['Frame'], df_flow['Vehicles'], marker='o', linewidth=2, color='blue')
    axes[0, 0].set_title('Vehicle Count Over Time')
    axes[0, 0].set_xlabel('Frame')
    axes[0, 0].set_ylabel('Number of Vehicles')
    axes[0, 0].grid(True, alpha=0.3)

    # Traffic density
    axes[0, 1].plot(df_flow['Frame'], df_flow['Density'], marker='s', linewidth=2, color='orange')
    axes[0, 1].set_title('Traffic Density Over Time')
    axes[0, 1].set_xlabel('Frame')
    axes[0, 1].set_ylabel('Density')
    axes[0, 1].grid(True, alpha=0.3)

    # Average speed
    axes[1, 0].plot(df_flow['Frame'], df_flow['Speed'], marker='^', linewidth=2, color='green')
    axes[1, 0].set_title('Average Speed Over Time')
    axes[1, 0].set_xlabel('Frame')
    axes[1, 0].set_ylabel('Speed (px/frame)')
    axes[1, 0].grid(True, alpha=0.3)

    # Congestion level distribution
    congestion_dist = df_flow['Congestion'].value_counts()
    colors = ['green' if level == 'Low' else 'yellow' if level == 'Medium' else 'red'
             for level in congestion_dist.index]
    axes[1, 1].bar(congestion_dist.index, congestion_dist.values, color=colors, alpha=0.7)
    axes[1, 1].set_title('Congestion Level Distribution')
    axes[1, 1].set_xlabel('Congestion Level')
    axes[1, 1].set_ylabel('Number of Frames')
    axes[1, 1].grid(True, alpha=0.3, axis='y')

    plt.tight_layout()
    plt.show()

    # Summary statistics
    print("\\nüìà Summary Statistics:")
    print(f"   Average vehicles per frame: {df_flow['Vehicles'].mean():.1f}")
    print(f"   Peak vehicle count: {df_flow['Vehicles'].max()}")
    print(f"   Average traffic density: {df_flow['Density'].mean():.2f}")
    print(f"   Average speed: {df_flow['Speed'].mean():.2f} px/frame")
    print(f"   Most common congestion level: {df_flow['Congestion'].mode()[0]}")

print("\\n‚úÖ Traffic flow and congestion analysis complete!")

## 8. Advanced Visualizations and Dashboard

Create comprehensive visualizations including heatmaps, interactive dashboards, and analytics reports for smart city applications.

In [None]:
# Advanced visualizations and interactive dashboard
print("üìä Creating advanced visualizations and interactive dashboard...")

# Create trajectory heatmap
def create_trajectory_heatmap(tracked_objects, frame_shape):
    """Create heatmap from vehicle trajectories"""
    heatmap = np.zeros((frame_shape[0], frame_shape[1]), dtype=np.float32)

    for obj_id, obj_info in tracked_objects.items():
        trajectory = obj_info['trajectory']
        for x, y in trajectory:
            if 0 <= x < frame_shape[1] and 0 <= y < frame_shape[0]:
                # Add Gaussian blob around each point
                y_min, y_max = max(0, y-10), min(frame_shape[0], y+10)
                x_min, x_max = max(0, x-10), min(frame_shape[1], x+10)
                heatmap[y_min:y_max, x_min:x_max] += 1

    # Apply Gaussian blur
    heatmap = cv2.GaussianBlur(heatmap, (15, 15), 0)

    # Normalize
    if heatmap.max() > 0:
        heatmap = (heatmap / heatmap.max() * 255).astype(np.uint8)

    return heatmap

if sample_frames and tracking_results:
    # Combine all tracked objects for heatmap
    all_tracked = {}
    for tracked in tracking_results:
        for obj_id, obj_info in tracked.items():
            if obj_id not in all_tracked:
                all_tracked[obj_id] = obj_info
            else:
                # Extend trajectory
                all_tracked[obj_id]['trajectory'].extend(obj_info['trajectory'])

    # Create heatmap
    print("\\nüó∫Ô∏è Creating traffic density heatmap...")
    heatmap = create_trajectory_heatmap(all_tracked, sample_frames[0].shape)
    heatmap_colored = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)

    # Overlay on frame
    overlay = cv2.addWeighted(sample_frames[0], 0.6, heatmap_colored, 0.4, 0)

    fig, axes = plt.subplots(1, 3, figsize=(18, 5))

    axes[0].imshow(cv2.cvtColor(sample_frames[0], cv2.COLOR_BGR2RGB))
    axes[0].set_title('Original Frame')
    axes[0].axis('off')

    axes[1].imshow(heatmap, cmap='hot')
    axes[1].set_title('Traffic Density Heatmap')
    axes[1].axis('off')

    axes[2].imshow(cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB))
    axes[2].set_title('Heatmap Overlay')
    axes[2].axis('off')

    plt.suptitle('Traffic Density Visualization')
    plt.tight_layout()
    plt.show()

# Create interactive dashboard with Plotly
if congestion_analyses:
    print("\\nüìà Creating interactive dashboard...")

    # Prepare data
    df_dashboard = pd.DataFrame([{
        'Frame': i+1,
        'Vehicles': analysis['vehicle_count'],
        'Congestion_Level': analysis['congestion_level'],
        'Density': analysis['traffic_density'],
        'Speed': analysis['average_speed'],
        'Time': i * 1  # seconds
    } for i, analysis in enumerate(congestion_analyses)])

    # Create interactive subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Vehicle Count Timeline', 'Congestion Distribution',
                       'Traffic Density vs Speed', 'Vehicle Type Breakdown'),
        specs=[[{'type': 'scatter'}, {'type': 'bar'}],
               [{'type': 'scatter'}, {'type': 'pie'}]]
    )

    # Vehicle count timeline
    fig.add_trace(
        go.Scatter(x=df_dashboard['Frame'], y=df_dashboard['Vehicles'],
                  mode='lines+markers', name='Vehicle Count',
                  line=dict(color='blue', width=3),
                  marker=dict(size=8)),
        row=1, col=1
    )

    # Congestion distribution
    congestion_counts = df_dashboard['Congestion_Level'].value_counts()
    colors_cong = ['green' if level == 'Low' else 'yellow' if level == 'Medium' else 'red'
                   for level in congestion_counts.index]
    fig.add_trace(
        go.Bar(x=congestion_counts.index, y=congestion_counts.values,
               marker_color=colors_cong, name='Congestion'),
        row=1, col=2
    )

    # Density vs Speed scatter
    fig.add_trace(
        go.Scatter(x=df_dashboard['Density'], y=df_dashboard['Speed'],
                  mode='markers', name='Density vs Speed',
                  marker=dict(size=12, color=df_dashboard['Vehicles'],
                            colorscale='Viridis', showscale=True)),
        row=2, col=1
    )

    # Vehicle type breakdown (if available)
    if counter.total_counts:
        count_dict = {k: v for k, v in counter.total_counts.items() if k != 'total'}
        fig.add_trace(
            go.Pie(labels=list(count_dict.keys()), values=list(count_dict.values()),
                  name='Vehicle Types'),
            row=2, col=2
        )

    fig.update_layout(
        height=800,
        showlegend=True,
        title_text="Traffic Flow Analysis Dashboard",
        title_font_size=20
    )

    fig.show()

# Generate comprehensive report
print("\\nüìù Generating Comprehensive Analysis Report...")
print("="*70)

report = f"""
TRAFFIC FLOW ANALYSIS REPORT
{'='*70}

ANALYSIS OVERVIEW:
- Total Frames Analyzed: {len(congestion_analyses) if congestion_analyses else 0}
- Analysis Duration: {len(congestion_analyses)} frames
- Frame Size: {sample_frames[0].shape if sample_frames else 'N/A'}

VEHICLE DETECTION & CLASSIFICATION:
- Total Objects Tracked: {tracker.next_object_id if tracker else 0}
- Currently Active Objects: {len(tracker.objects) if tracker else 0}

VEHICLE COUNT SUMMARY:
"""

if counter.total_counts:
    for vehicle_type, count in sorted(counter.total_counts.items()):
        report += f"- {vehicle_type.capitalize()}: {count}\\n"

if df_flow is not None:
    report += f"""
TRAFFIC METRICS:
- Average Vehicles per Frame: {df_flow['Vehicles'].mean():.2f}
- Peak Vehicle Count: {df_flow['Vehicles'].max()}
- Minimum Vehicle Count: {df_flow['Vehicles'].min()}
- Average Traffic Density: {df_flow['Density'].mean():.2f}
- Average Vehicle Speed: {df_flow['Speed'].mean():.2f} pixels/frame

CONGESTION ANALYSIS:
- Low Congestion Frames: {(df_flow['Congestion'] == 'Low').sum()}
- Medium Congestion Frames: {(df_flow['Congestion'] == 'Medium').sum()}
- High Congestion Frames: {(df_flow['Congestion'] == 'High').sum()}
- Predominant Congestion Level: {df_flow['Congestion'].mode()[0]}

SMART CITY INSIGHTS:
- Traffic Pattern: {'Stable' if df_flow['Vehicles'].std() < 2 else 'Variable'}
- Recommended Action: {'Monitor' if (df_flow['Congestion'] == 'High').sum() == 0 else 'Intervention Needed'}
- Flow Efficiency: {'Good' if df_flow['Speed'].mean() > 5 else 'Poor'}
"""

report += f"""
{'='*70}
Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
System: Traffic Flow Analysis Using CCTV Footage
Version: 1.0
{'='*70}
"""

print(report)

# Save report to file (optional)
# with open('../output/analytics/traffic_report.txt', 'w') as f:
#     f.write(report)

print("\\n‚úÖ All analyses and visualizations complete!")
print("\\nüéâ Traffic Flow Analysis Demo Successfully Completed!")
print("\\nThis system can be applied to:")
print("  ‚Ä¢ Smart city traffic management")
print("  ‚Ä¢ Congestion monitoring and prediction")
print("  ‚Ä¢ Traffic pattern analysis")
print("  ‚Ä¢ Urban planning and infrastructure optimization")
print("  ‚Ä¢ Incident detection and response")
print("  ‚Ä¢ Environmental impact assessment")