In [3]:
import re
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from math import radians, cos, sin, sqrt, atan2
import os

# =====================================================
# CONFIGURATION
# =====================================================
VIDEO_PATH = "video1.MP4"
SRT_PATH = VIDEO_PATH.replace(".mp4", ".srt")
OUT_PATH = "output_speed_corrected.mp4"

# Camera parameters (update if known)
SENSOR_WIDTH_MM = 13.2      # sensor width (e.g., 1" DJI)
IMAGE_WIDTH_PX = 5472       # image width in pixels
MODEL_NAME = "yolov8n.pt"   # can be yolov8s.pt for better accuracy

# =====================================================
# HELPER FUNCTIONS
# =====================================================

def haversine(lat1, lon1, lat2, lon2):
    """Compute ground distance in meters between two GPS coordinates."""
    R = 6371000  # Earth radius in m
    d_lat = radians(lat2 - lat1)
    d_lon = radians(lon2 - lon1)
    a = sin(d_lat/2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(d_lon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return R * c


import re
import chardet

def parse_srt(srt_path):
    """Extract per-frame data from .srt telemetry with auto encoding detection."""
    with open(srt_path, "rb") as f:
        raw = f.read()
    enc = chardet.detect(raw)["encoding"] or "utf-8"
    text = raw.decode(enc, errors="ignore")

    frame_data = []
    blocks = text.strip().split("\n\n")
    for block in blocks:
        diff_match = re.search(r"DiffTime:\s*(\d+)ms", block)
        alt_match = re.search(r"rel_alt:\s*([\d.]+)", block)
        focal_match = re.search(r"focal_len:\s*([\d.]+)", block)
        lat_match = re.search(r"latitude:\s*([\d.]+)", block)
        lon_match = re.search(r"longitude:\s*([\d.]+)", block)

        diff_time = float(diff_match.group(1)) / 1000.0 if diff_match else 0.04
        rel_alt = float(alt_match.group(1)) if alt_match else 200.0
        focal_len = float(focal_match.group(1)) if focal_match else 24.0
        latitude = float(lat_match.group(1)) if lat_match else 0.0
        longitude = float(lon_match.group(1)) if lon_match else 0.0

        frame_data.append({
            "diff_time": diff_time,
            "altitude": rel_alt,
            "focal_len": focal_len,
            "lat": latitude,
            "lon": longitude
        })
    return frame_data



def compute_gsd(sensor_width_mm, focal_len_mm, altitude_m, img_width_px):
    """Ground Sampling Distance: meters per pixel."""
    return (sensor_width_mm * altitude_m) / (focal_len_mm * img_width_px)


# =====================================================
# MAIN PIPELINE
# =====================================================
def main():
    # --- Load telemetry ---
    frame_meta = parse_srt(SRT_PATH)
    print(f"✅ Loaded telemetry for {len(frame_meta)} frames")

    # --- Setup video IO ---
    cap = cv2.VideoCapture(VIDEO_PATH)
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(OUT_PATH, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))

    # --- Load YOLO model ---
    model = YOLO(MODEL_NAME)

    # --- Data holders ---
    last_positions = {}
    speeds = defaultdict(list)
    last_gps = None

    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # --- Extract frame metadata ---
        if frame_idx < len(frame_meta):
            meta = frame_meta[frame_idx]
            dt = meta["diff_time"]
            alt = meta["altitude"]
            focal = meta["focal_len"]
            lat, lon = meta["lat"], meta["lon"]
        else:
            dt, alt, focal, lat, lon = 0.04, 226.5, 24.0, 0.0, 0.0

        # --- Ground Sampling Distance ---
        gsd = compute_gsd(SENSOR_WIDTH_MM, focal, alt, IMAGE_WIDTH_PX)

        # --- Drone motion compensation ---
        drone_shift_m = 0.0
        if last_gps is not None and (lat != 0.0 and lon != 0.0):
            drone_shift_m = haversine(last_gps[0], last_gps[1], lat, lon)
        last_gps = (lat, lon)

        # --- Detection + Tracking ---
        results = model.track(frame, persist=True, classes=[2, 3, 5, 7])  # car, bike, bus, truck
        if results[0].boxes.id is None:
            out.write(frame)
            frame_idx += 1
            continue

        ids = results[0].boxes.id.cpu().numpy().astype(int)
        boxes = results[0].boxes.xyxy.cpu().numpy()

        for box, obj_id in zip(boxes, ids):
            x1, y1, x2, y2 = box
            cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)

            # Speed calc
            if obj_id in last_positions:
                px, py = last_positions[obj_id]
                pixel_dist = np.sqrt((cx - px)**2 + (cy - py)**2)
                real_dist_m = max(pixel_dist * gsd - drone_shift_m, 0)  # subtract drone motion
                speed_mps = real_dist_m / dt
                speed_kph = speed_mps * 3.6
                speeds[obj_id].append(speed_kph)
            else:
                speed_kph = 0.0
                speeds[obj_id].append(speed_kph)

            last_positions[obj_id] = (cx, cy)
            avg_speed = np.mean(speeds[obj_id])

            # --- Overlay info ---
            label = f"ID:{obj_id} | {speed_kph:.1f} km/h | Avg:{avg_speed:.1f}"
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
            cv2.putText(frame, label, (int(x1), int(y1) - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        out.write(frame)
        frame_idx += 1

    cap.release()
    out.release()
    print(f"✅ Done! Output saved as: {OUT_PATH}")


if __name__ == "__main__":
    main()


KeyboardInterrupt: 

In [2]:
!pip install chardet

Collecting chardet
  Downloading chardet-5.2.0-py3-none-any.whl.metadata (3.4 kB)
Downloading chardet-5.2.0-py3-none-any.whl (199 kB)
Installing collected packages: chardet
Successfully installed chardet-5.2.0
