<a href="https://colab.research.google.com/github/varun0010/cyclist-detection-and-trajectory-prediction/blob/main/notebooks/cyclist_detection_and_trajectory_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install ultralytics
%pip install deep\_sort\_realtime

Collecting ultralytics
  Downloading ultralytics-8.3.188-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.16-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.188-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m20.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.16-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.188 ultralytics-thop-2.0.16
Collecting deep_sort_realtime
  Downloading deep_sort_realtime-1.3.2-py3-none-any.whl.metadata (12 kB)
Downloading deep_sort_realtime-1.3.2-py3-none-any.whl (8.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.4/8.4 MB[0m [31m75.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: deep_sort_realtime
Successfully installed deep_sort_realtime-1.3.2


In [None]:
import os
import cv2
import time
import math
import numpy as np
import torch
import torch.nn as nn
from collections import defaultdict, deque
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
YOLO_WEIGHTS    = "/content/drive/MyDrive/best1.pt"
TRAJ_MODEL_PTH  = "/content/drive/MyDrive/normal_lstm_model (1).pth"
INPUT_VIDEO     = "/content/drive/MyDrive/Test video.mp4"
OUTPUT_VIDEO    = "/content/drive/MyDrive/cyclist_tracking_output_vo_homo_FINAL_FINAL_Video5.mp4"
LOG_TXT_PATH    = "/content/drive/MyDrive/trajectory_log.txt"

In [None]:
WRITE_LOG_FILE  = True
K = np.array([[1000.0, 0.0, 640.0],
              [0.0, 1000.0, 360.0],
              [0.0, 0.0, 1.0]], dtype=np.float64)
DIST_COEFFS = np.zeros((1, 5), dtype=np.float64)
APPLY_UNDISTORT = False
CAMERA_HEIGHT_M = 1.5
GROUND_NORMAL   = np.array([0.0, 1.0, 0.0], dtype=np.float64)
VO_SCALE_M_PER_UNIT = 1.0
OBS_LEN  = 8
PRED_LEN = 12
CYCLIST_CLASS_INDEX = 0
safety_distance = 1.0  # meters, collision threshold
MIN_HEIGHT = 50
MIN_WIDTH = 20
MIN_MOVE_THRESHOLD = 5.0  # minimum magnitude of movement to change direction
CONFIDENCE_THRESHOLD = 0.5

# Initialize DeepSort tracker
tracker = DeepSort(max_age=30, n_init=3)

class NormalLSTM(nn.Module):
    def __init__(self, input_dim=2, hidden_dim=64, output_dim=2):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc   = nn.Linear(hidden_dim, output_dim)

    def forward(self, obs):
        _, (h, c) = self.lstm(obs)
        decoder_input = obs[:, -1, :]
        outputs = []
        hidden = (h, c)
        for _ in range(PRED_LEN):
            out, hidden = self.lstm(decoder_input.unsqueeze(1), hidden)
            pred_pos = self.fc(out.squeeze(1))
            outputs.append(pred_pos)
            decoder_input = pred_pos
        return torch.stack(outputs, dim=1)

def preprocess_for_lstm(seq_xy):
    arr  = np.asarray(seq_xy, dtype=np.float32)
    mean = arr.mean(axis=0)
    std  = arr.std(axis=0) + 1e-6
    norm = (arr - mean) / std
    return norm, mean, std

def denormalize_lstm_output(preds, mean, std):
    return preds * std + mean

class ORBVO:
    def __init__(self, K):
        self.K = K
        self.orb = cv2.ORB_create(2000)
        self.bf  = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
        self.prev_gray = None
        self.prev_kp   = None
        self.prev_des  = None
        self.R_wc = np.eye(3, dtype=np.float64)
        self.t_wc = np.zeros((3, 1), dtype=np.float64)

    def _compose(self, R, t):
        self.t_wc = self.t_wc + self.R_wc @ (t * VO_SCALE_M_PER_UNIT)
        self.R_wc = R @ self.R_wc

    def step(self, frame_bgr):
        gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
        kp, des = self.orb.detectAndCompute(gray, None)
        vo_ok = False
        if self.prev_gray is None or self.prev_des is None or des is None:
            self.prev_gray, self.prev_kp, self.prev_des = gray, kp, des
            return self.R_wc, self.t_wc, vo_ok
        if len(self.prev_des) < 8 or len(des) < 8:
            self.prev_gray, self.prev_kp, self.prev_des = gray, kp, des
            return self.R_wc, self.t_wc, vo_ok
        matches = self.bf.match(self.prev_des, des)
        if len(matches) < 20:
            self.prev_gray, self.prev_kp, self.prev_des = gray, kp, des
            return self.R_wc, self.t_wc, vo_ok
        matches = sorted(matches, key=lambda m: m.distance)[:500]
        pts1 = np.float32([self.prev_kp[m.queryIdx].pt for m in matches]).reshape(-1, 1, 2)
        pts2 = np.float32([kp[m.trainIdx].pt for m in matches]).reshape(-1, 1, 2)
        E, mask = cv2.findEssentialMat(pts2, pts1, self.K, method=cv2.RANSAC, prob=0.999, threshold=1.0)
        if E is not None:
            _, R, t, mask_pose = cv2.recoverPose(E, pts2, pts1, self.K)
            self._compose(R, t)
            vo_ok = True
        self.prev_gray, self.prev_kp, self.prev_des = gray, kp, des
        return self.R_wc, self.t_wc, vo_ok

def homography_from_pose(K, R_wc, t_wc, ground_normal=GROUND_NORMAL, camera_height=CAMERA_HEIGHT_M):
    n = ground_normal.reshape(3, 1)
    d = float(camera_height)
    R_cw = R_wc.T
    t_cw = -R_wc.T @ t_wc
    H_cam = R_cw - (t_cw @ n.T) / d
    H = K @ H_cam @ np.linalg.inv(K)
    return H

def warp_points(H, pts_uv):
    if len(pts_uv) == 0:
        return np.zeros((0, 2), dtype=np.float32)
    pts = np.asarray(pts_uv, dtype=np.float32).reshape(-1, 1, 2)
    out = cv2.perspectiveTransform(pts, H)
    return out.reshape(-1, 2)

def discretize_direction(vec):
    angle = math.degrees(math.atan2(vec[1], vec[0]))  # angle in degrees [-180, 180]
    if -45 <= angle < 45:
        return np.array([1, 0])   # Right
    elif 45 <= angle < 135:
        return np.array([0, 1])   # Down (image coord y positive downward)
    elif angle >= 135 or angle < -135:
        return np.array([-1, 0])  # Left
    else:
        return np.array([0, -1])  # Up

In [None]:
def main():
    detector = YOLO(YOLO_WEIGHTS)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    traj_model = NormalLSTM().to(device)
    traj_model.load_state_dict(torch.load(TRAJ_MODEL_PTH, map_location=device))
    traj_model.eval()

    cap = cv2.VideoCapture(INPUT_VIDEO)
    if not cap.isOpened():
        raise RuntimeError(f"Failed to open video: {INPUT_VIDEO}")

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(OUTPUT_VIDEO, fourcc, fps, (width, height))

    if APPLY_UNDISTORT:
        newK, _ = cv2.getOptimalNewCameraMatrix(K, DIST_COEFFS, (width, height), 1, (width, height))
        map1, map2 = cv2.initUndistortRectifyMap(K, DIST_COEFFS, None, newK, (width, height), cv2.CV_16SC2)
        K_use = newK
    else:
        map1 = map2 = None
        K_use = K

    vo = ORBVO(K_use)

    traj_buffer = defaultdict(lambda: deque(maxlen=OBS_LEN))
    smoothed_pos = defaultdict(lambda: deque(maxlen=OBS_LEN))
    arrow_dir_buffer = defaultdict(lambda: deque(maxlen=5))
    last_dir = defaultdict(lambda: np.array([1, 0]))
    log_f = open(LOG_TXT_PATH, "w") if WRITE_LOG_FILE else None

    frame_idx, frames_written = 0, 0
    t0 = time.time()

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if APPLY_UNDISTORT:
            frame = cv2.remap(frame, map1, map2, interpolation=cv2.INTER_LINEAR)

        R_wc, t_wc, vo_ok = vo.step(frame)
        H_img2ground = homography_from_pose(K_use, R_wc, t_wc, GROUND_NORMAL, CAMERA_HEIGHT_M)
        H_ground2img = np.linalg.inv(H_img2ground)

        results = detector(frame, verbose=False)[0]
        bboxes, confs = [], []
        for box in results.boxes:
            cls = int(box.cls.cpu().item())
            conf = float(box.conf.cpu().item())
            if cls == CYCLIST_CLASS_INDEX and conf >= CONFIDENCE_THRESHOLD:
                # Access the first row of the numpy array to get the coordinates
                x1, y1, x2, y2 = map(int, box.xyxy.cpu().numpy()[0])
                width, height = x2 - x1, y2 - y1
                if height >= MIN_HEIGHT and width >= MIN_WIDTH:
                    bboxes.append([x1, y1, width, height])
                    confs.append(conf)

        tracks = tracker.update_tracks([(bb, cf, 'cyclist') for bb, cf in zip(bboxes, confs)], frame=frame)

        header = f"[Frame {frame_idx}] Tracked: {sum(t.is_confirmed() for t in tracks)} | VO: {'OK' if vo_ok else '…'}"
        print(header)
        if log_f:
            log_f.write(header + "\n")

        predictions = {}

        for tr in tracks:
            if not tr.is_confirmed():
                continue
            l, t_, r, b = map(int, tr.to_ltrb())
            cx, cy = (l + r) // 2, (t_ + b) // 2
            smoothed_pos[tr.track_id].append(np.array([cx, cy]))
            avg_pos = np.mean(smoothed_pos[tr.track_id], axis=0).astype(int)
            smooth_cx, smooth_cy = avg_pos[0], avg_pos[1]

            cv2.rectangle(frame, (l, t_), (r, b), (0, 255, 0), 2)
            cv2.putText(frame, f"ID:{tr.track_id}", (l, max(15, t_ - 8)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 0), 2)

            gpt = warp_points(H_img2ground, [(cx, cy)])[0]
            traj_buffer[tr.track_id].append(gpt)
            line = f"  ID {tr.track_id} | Img({cx},{cy}) -> Ground({gpt[0]:.2f},{gpt[1]:.2f})"
            print(line)
            if log_f:
                log_f.write(line + "\n")

            if len(traj_buffer[tr.track_id]) >= OBS_LEN:
                past = list(traj_buffer[tr.track_id])[-OBS_LEN:]
                normed, mean, std = preprocess_for_lstm(past)
                inp = torch.from_numpy(normed).float().unsqueeze(0).to(device)
                with torch.no_grad():
                    pred = traj_model(inp).cpu().squeeze(0).numpy()
                pred = denormalize_lstm_output(pred, mean, std)
                pred_rounded = np.round(pred, 2).tolist()
                pline = f"    → Future (ground): {pred_rounded}"
                print(pline)
                if log_f:
                    log_f.write(pline + "\n")

                predictions[tr.track_id] = pred

                pred_pts_img = warp_points(H_ground2img, pred)
                for i, (px, py) in enumerate(pred_pts_img):
                    # Removed the line drawing the small dots on the prediction trajectory
                    # cv2.circle(frame, (int(px), int(py)), 3, (0, 0, 255), -1)

                    if i == 0: # Draw arrow from the first predicted point
                        current_vec = np.array([int(px) - smooth_cx, int(py) - smooth_cy])
                        arrow_dir_buffer[tr.track_id].append(current_vec)
                        avg_vec = np.mean(arrow_dir_buffer[tr.track_id], axis=0)
                        mag = np.linalg.norm(avg_vec)
                        if mag > 1e-3:
                            if mag < MIN_MOVE_THRESHOLD:
                                discrete_vec = last_dir[tr.track_id]
                            else:
                                discrete_vec = discretize_direction(avg_vec)
                                last_dir[tr.track_id] = discrete_vec
                            desired_arrow_length = 40.0
                            end_point = (int(smooth_cx + discrete_vec[0] * desired_arrow_length),
                                         int(smooth_cy + discrete_vec[1] * desired_arrow_length))
                            cv2.arrowedLine(frame, (smooth_cx, smooth_cy), end_point,
                                            (0, 0, 255), 2, tipLength=0.15)


        # Collision detection and warnings
        ids = list(predictions.keys())
        collision_pairs = []
        for i in range(len(ids)):
            id1 = ids[i]
            traj1 = predictions[id1]
            for j in range(i + 1, len(ids)):
                id2 = ids[j]
                traj2 = predictions[id2]
                distances = np.linalg.norm(traj1 - traj2, axis=1)
                if np.any(distances < safety_distance):
                    collision_pairs.append((id1, id2))

        for id1, id2 in collision_pairs:
            pos1 = smoothed_pos[id1][-1]
            pos2 = smoothed_pos[id2][-1]
            cv2.circle(frame, tuple(pos1.astype(int)), 15, (0, 0, 255), 3) # Reduced circle size from 30 to 15
            cv2.circle(frame, tuple(pos2.astype(int)), 15, (0, 0, 255), 3) # Reduced circle size from 30 to 15
            cv2.putText(frame, f"Collision Risk: {id1} & {id2}", (50, 50),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
            if log_f:
                log_f.write(f"[Frame {frame_idx}] Collision predicted between ID {id1} and ID {id2}\n")


        out.write(frame)
        frames_written += 1
        frame_idx += 1

    cap.release()
    out.release()
    if log_f:
        log_f.close()
    dt = time.time() - t0
    print(f"Done. Wrote {OUTPUT_VIDEO}. Frames: {frames_written}, avg FPS (incl. I/O): {frames_written / max(dt, 1e-3):.2f}")
    if WRITE_LOG_FILE:
        print(f"Prediction log saved to: {LOG_TXT_PATH}")

if __name__ == "__main__":
    main()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  ID 271 | Img(257,139) -> Ground(2125.46,299.29)
    → Future (ground): [[230.1199951171875, -515.219970703125], [897.4099731445312, -67.41999816894531], [441.7699890136719, -292.5799865722656], [588.4400024414062, -254.8300018310547], [160.75999450683594, -542.5399780273438], [394.6000061035156, -598.3099975585938], [362.8599853515625, -647.7100219726562], [353.2200012207031, -678.6400146484375], [307.6600036621094, -699.530029296875], [263.2799987792969, -709.2100219726562], [240.64999389648438, -720.239990234375], [242.50999450683594, -739.469970703125]]
  ID 272 | Img(309,138) -> Ground(2110.62,292.97)
    → Future (ground): [[230.55999755859375, -515.1400146484375], [918.6300048828125, -75.58999633789062], [427.6199951171875, -288.19000244140625], [623.7000122070312, -270.1499938964844], [134.08999633789062, -538.1799926757812], [403.19000244140625, -603.6599731445312], [364.0899963378906, -649.8499755859375], [349.