# Traffic Analytics Pipeline

**Vehicle Detection, Tracking & Counting with YOLOv8**

Демо-проект для позиции CV Engineer в проекте «Безопасный город»

---

## Возможности:
- Детекция транспорта (car, bus, truck, motorcycle)
- Multi-object tracking (ByteTrack)
- Подсчёт по линии (IN/OUT)
- Визуализация траекторий

## 1. Установка зависимостей

In [None]:
!pip install -q ultralytics supervision

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
from datetime import datetime
from IPython.display import Video, display, clear_output
import matplotlib.pyplot as plt

print("Imports OK")
print(f"OpenCV: {cv2.__version__}")

## 2. Проверка GPU

In [None]:
import torch

print(f"PyTorch: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    DEVICE = 'cuda'
else:
    DEVICE = 'cpu'
print(f"\nUsing device: {DEVICE}")

## 3. Загрузка тестового видео

Используем публичное traffic видео для демонстрации.

In [None]:
# Скачиваем тестовое видео с трафиком
# Вариант 1: Публичное видео
!wget -q -O traffic_video.mp4 "https://github.com/ultralytics/assets/releases/download/v0.0.0/traffic.mp4"

# Проверяем
import os
if os.path.exists('traffic_video.mp4'):
    cap = cv2.VideoCapture('traffic_video.mp4')
    fps = cap.get(cv2.CAP_PROP_FPS)
    frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    print(f"Video: {w}x{h} @ {fps:.1f} FPS, {frames} frames ({frames/fps:.1f} sec)")
else:
    print("Failed to download video")

## 4. Классы для пайплайна

In [None]:
# Vehicle classes (COCO)
VEHICLE_CLASSES = {
    2: "car",
    3: "motorcycle", 
    5: "bus",
    7: "truck",
}

CLASS_COLORS = {
    "car": (0, 255, 0),        # Green
    "motorcycle": (255, 0, 0),  # Blue
    "bus": (0, 165, 255),       # Orange
    "truck": (255, 0, 255),     # Magenta
}

@dataclass
class Track:
    """Tracked object."""
    track_id: int
    bbox: np.ndarray
    confidence: float
    class_id: int
    class_name: str
    trajectory: List[Tuple[float, float]] = field(default_factory=list)
    
    @property
    def center(self) -> Tuple[float, float]:
        x1, y1, x2, y2 = self.bbox
        return ((x1 + x2) / 2, (y1 + y2) / 2)


@dataclass 
class CrossingEvent:
    """Line crossing event."""
    track_id: int
    direction: str  # "in" or "out"
    class_name: str
    frame_id: int
    timestamp: str

print("Classes defined")

In [None]:
class VehicleTracker:
    """YOLO + ByteTrack tracker."""
    
    def __init__(self, model_name="yolov8s", confidence=0.5, device="cuda"):
        self.model = YOLO(f"{model_name}.pt")
        self.confidence = confidence
        self.device = device
        self.classes = list(VEHICLE_CLASSES.keys())
        self.trajectories = defaultdict(list)
        self.trajectory_length = 30
        
    def track(self, frame, frame_id=0):
        """Run tracking on frame."""
        results = self.model.track(
            frame,
            conf=self.confidence,
            classes=self.classes,
            device=self.device,
            tracker="bytetrack.yaml",
            persist=True,
            verbose=False
        )[0]
        
        tracks = []
        if results.boxes is not None and results.boxes.id is not None:
            boxes = results.boxes.xyxy.cpu().numpy()
            track_ids = results.boxes.id.cpu().numpy().astype(int)
            confidences = results.boxes.conf.cpu().numpy()
            class_ids = results.boxes.cls.cpu().numpy().astype(int)
            
            for bbox, tid, conf, cid in zip(boxes, track_ids, confidences, class_ids):
                class_name = VEHICLE_CLASSES.get(cid, "unknown")
                center = ((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2)
                
                self.trajectories[tid].append(center)
                if len(self.trajectories[tid]) > self.trajectory_length:
                    self.trajectories[tid] = self.trajectories[tid][-self.trajectory_length:]
                
                tracks.append(Track(
                    track_id=int(tid),
                    bbox=bbox,
                    confidence=float(conf),
                    class_id=int(cid),
                    class_name=class_name,
                    trajectory=list(self.trajectories[tid])
                ))
        
        return tracks

print("VehicleTracker defined")

In [None]:
class LineCrossingCounter:
    """Counts vehicles crossing a line."""
    
    def __init__(self, line_start, line_end, in_direction=(0, 1)):
        self.line_start = np.array(line_start)
        self.line_end = np.array(line_end)
        self.in_direction = np.array(in_direction)
        self.line_vector = self.line_end - self.line_start
        
        self.crossed_ids = set()
        self.count_in = 0
        self.count_out = 0
        self.events = []
        
    def _point_side(self, point):
        """Which side of line is point on."""
        point_vec = np.array(point) - self.line_start
        return np.cross(self.line_vector, point_vec)
    
    def _check_crossing(self, prev_pos, curr_pos):
        """Check if movement crosses line."""
        prev_side = self._point_side(prev_pos)
        curr_side = self._point_side(curr_pos)
        return (prev_side * curr_side) < 0
    
    def _get_direction(self, prev_pos, curr_pos):
        """Get crossing direction."""
        movement = np.array(curr_pos) - np.array(prev_pos)
        dot = np.dot(movement, self.in_direction)
        return "in" if dot > 0 else "out"
    
    def update(self, tracks, frame_id=0):
        """Update counter with new tracks."""
        new_events = []
        
        for track in tracks:
            if len(track.trajectory) < 2:
                continue
            if track.track_id in self.crossed_ids:
                continue
                
            prev_pos = track.trajectory[-2]
            curr_pos = track.trajectory[-1]
            
            if self._check_crossing(prev_pos, curr_pos):
                direction = self._get_direction(prev_pos, curr_pos)
                self.crossed_ids.add(track.track_id)
                
                if direction == "in":
                    self.count_in += 1
                else:
                    self.count_out += 1
                
                event = CrossingEvent(
                    track_id=track.track_id,
                    direction=direction,
                    class_name=track.class_name,
                    frame_id=frame_id,
                    timestamp=datetime.now().isoformat()
                )
                self.events.append(event)
                new_events.append(event)
        
        return new_events
    
    @property
    def total(self):
        return self.count_in + self.count_out

print("LineCrossingCounter defined")

In [None]:
class Visualizer:
    """Draw annotations on frames."""
    
    @staticmethod
    def draw_tracks(frame, tracks):
        """Draw bounding boxes and trajectories."""
        for track in tracks:
            color = CLASS_COLORS.get(track.class_name, (255, 255, 255))
            x1, y1, x2, y2 = map(int, track.bbox)
            
            # Bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            
            # Trajectory
            if len(track.trajectory) > 1:
                points = np.array(track.trajectory, dtype=np.int32)
                for i in range(1, len(points)):
                    thickness = max(1, int(2 * i / len(points)))
                    cv2.line(frame, tuple(points[i-1]), tuple(points[i]), color, thickness)
            
            # Label
            label = f"ID:{track.track_id} {track.class_name}"
            (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(frame, (x1, y1-th-8), (x1+tw+4, y1), color, -1)
            cv2.putText(frame, label, (x1+2, y1-4), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
        
        return frame
    
    @staticmethod
    def draw_counting_line(frame, line_start, line_end, count_in, count_out):
        """Draw counting line with stats."""
        cv2.line(frame, tuple(line_start), tuple(line_end), (0, 255, 255), 3)
        
        mid_x = (line_start[0] + line_end[0]) // 2
        mid_y = (line_start[1] + line_end[1]) // 2
        
        text = f"IN:{count_in} OUT:{count_out}"
        (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
        cv2.rectangle(frame, (mid_x-tw//2-5, mid_y-th-10), (mid_x+tw//2+5, mid_y+5), (0,0,0), -1)
        cv2.putText(frame, text, (mid_x-tw//2, mid_y), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,255), 2)
        
        return frame
    
    @staticmethod
    def draw_stats(frame, total_count, fps):
        """Draw stats panel."""
        cv2.rectangle(frame, (10, 10), (200, 70), (0,0,0), -1)
        cv2.putText(frame, f"Total: {total_count}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)
        cv2.putText(frame, f"FPS: {fps:.1f}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,255,0), 2)
        return frame

print("Visualizer defined")

## 5. Запуск пайплайна

In [None]:
def run_pipeline(video_path, output_path="output.mp4", max_frames=None):
    """Run the full traffic analytics pipeline."""
    
    # Open video
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    if max_frames:
        total_frames = min(total_frames, max_frames)
    
    print(f"Video: {width}x{height} @ {fps:.1f} FPS")
    print(f"Processing {total_frames} frames...")
    
    # Output video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    # Initialize components
    tracker = VehicleTracker(model_name="yolov8s", confidence=0.5, device=DEVICE)
    
    # Counting line (horizontal, middle of frame)
    line_y = int(height * 0.6)
    line_start = (50, line_y)
    line_end = (width - 50, line_y)
    counter = LineCrossingCounter(line_start, line_end, in_direction=(0, 1))
    
    # Process frames
    import time
    frame_count = 0
    start_time = time.time()
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret or (max_frames and frame_count >= max_frames):
            break
        
        frame_count += 1
        
        # Track vehicles
        tracks = tracker.track(frame, frame_count)
        
        # Update counter
        events = counter.update(tracks, frame_count)
        for e in events:
            print(f"  [{e.direction.upper()}] Vehicle {e.track_id} ({e.class_name})")
        
        # Calculate FPS
        elapsed = time.time() - start_time
        current_fps = frame_count / elapsed if elapsed > 0 else 0
        
        # Visualize
        frame = Visualizer.draw_tracks(frame, tracks)
        frame = Visualizer.draw_counting_line(frame, line_start, line_end, 
                                               counter.count_in, counter.count_out)
        frame = Visualizer.draw_stats(frame, counter.total, current_fps)
        
        # Write frame
        out.write(frame)
        
        # Progress
        if frame_count % 50 == 0:
            print(f"  Frame {frame_count}/{total_frames} | FPS: {current_fps:.1f} | Count: {counter.total}")
    
    cap.release()
    out.release()
    
    # Summary
    print(f"\n{'='*50}")
    print(f"DONE! Processed {frame_count} frames in {elapsed:.1f}s")
    print(f"Average FPS: {frame_count/elapsed:.1f}")
    print(f"Total vehicles: {counter.total} (IN: {counter.count_in}, OUT: {counter.count_out})")
    print(f"Output saved: {output_path}")
    print(f"{'='*50}")
    
    return counter.events, output_path

In [None]:
# Run pipeline!
events, output_path = run_pipeline(
    video_path="traffic_video.mp4",
    output_path="traffic_output.mp4",
    max_frames=500  # Limit for demo, remove for full video
)

## 6. Результаты

In [None]:
# Show output video
Video("traffic_output.mp4", embed=True, width=800)

In [None]:
# Events summary
import pandas as pd

if events:
    df = pd.DataFrame([{
        'Track ID': e.track_id,
        'Direction': e.direction,
        'Class': e.class_name,
        'Frame': e.frame_id
    } for e in events])
    
    print("Crossing Events:")
    display(df)
    
    print("\nBy Class:")
    display(df.groupby(['class_name', 'direction']).size().unstack(fill_value=0))
else:
    print("No crossing events recorded")

In [None]:
# Show sample frames
cap = cv2.VideoCapture("traffic_output.mp4")
frames_to_show = [50, 150, 300, 450]

fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()

for idx, frame_num in enumerate(frames_to_show):
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
    ret, frame = cap.read()
    if ret:
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        axes[idx].imshow(frame_rgb)
        axes[idx].set_title(f"Frame {frame_num}")
        axes[idx].axis('off')

cap.release()
plt.tight_layout()
plt.savefig('sample_frames.png', dpi=150)
plt.show()

## 7. Скачать результат

In [None]:
# Save events to JSON
import json

output_data = {
    "total_count": len(events),
    "count_in": sum(1 for e in events if e.direction == "in"),
    "count_out": sum(1 for e in events if e.direction == "out"),
    "events": [
        {
            "track_id": e.track_id,
            "direction": e.direction,
            "class": e.class_name,
            "frame": e.frame_id
        } for e in events
    ]
}

with open('events.json', 'w') as f:
    json.dump(output_data, f, indent=2)

print("Saved: events.json")
print("\nFiles ready for download:")
!ls -lh *.mp4 *.json *.png 2>/dev/null

---

## Summary

This notebook demonstrates a complete traffic analytics pipeline:

| Component | Implementation |
|-----------|---------------|
| Detection | YOLOv8s (car, bus, truck, motorcycle) |
| Tracking | ByteTrack with persistent IDs |
| Counting | Line crossing with direction |
| Output | Annotated video + JSON events |

**Author:** [Your Name]  
**GitHub:** [your-repo-link]  
**Position:** CV Engineer - Safe City Project