<a href="https://colab.research.google.com/github/moashraf7474/Smart-Crowd-Tracker-Real-Time-AI-Person-Detection-Tracking/blob/main/finalleee.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ultralytics supervision opencv-python-headless -q



[notice] A new release of pip is available: 25.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
#rcnn upload
# Optimized: Faster R-CNN (MobileNet) + DeepSORT + Tkinter (Video upload)
# - Uses smaller detector: fasterrcnn_mobilenet_v3_large_fpn
# - Uses GPU + mixed precision if available
# - Resizes frames before inference (scales boxes back)
# - Frame skipping option to improve throughput
# - Better filtering of tracks for stable counting

import cv2
import torch
import numpy as np
import threading
import tkinter as tk
from tkinter import ttk, filedialog
from deep_sort_realtime.deepsort_tracker import DeepSort
import os
from datetime import datetime
from torchvision.models.detection import fasterrcnn_mobilenet_v3_large_fpn
from torchvision.ops import boxes as box_ops

# -----------------------
# Config / Tunables
# -----------------------
TARGET_WIDTH = 640            # resize width for inference (lower -> faster)
FRAME_SKIP = 1                # process every FRAME_SKIP-th frame (1 = every frame)
CONF_THRESH = 0.6             # detection score threshold (higher reduces false positives)
TRACK_TIME_SINCE_UPDATE = 2   # allow tracks with time_since_update <= this to count
MODEL_TO_USE = "mobilenet"    # label only; code uses mobilenet-based model

# -----------------------
# Device & Model
# -----------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Smaller, faster detector
model = fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
model.to(device)
model.eval()

# -----------------------
# Tracker
# -----------------------
# Keep default DeepSort params but you can tune max_age/n_init/nn_budget
tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)

# -----------------------
# Globals
# -----------------------
running = False
recording = False
out = None
video_save_path = None
video_path = None

download_path = os.path.join(os.path.expanduser("~"), "Downloads")
os.makedirs(download_path, exist_ok=True)

GREEN = (0, 255, 0)
RED = (0, 0, 255)
YELLOW = (0, 255, 255)
PERSON_CLASS_ID = 1  # COCO person

# -----------------------
# Helpers
# -----------------------
def resize_and_scale(frame, target_w=TARGET_WIDTH):
    h, w = frame.shape[:2]
    if w <= target_w:
        return frame, 1.0  # no scaling
    scale = target_w / float(w)
    new_h = int(h * scale)
    resized = cv2.resize(frame, (target_w, new_h))
    return resized, scale

def scale_boxes(boxes, scale):
    # boxes: Nx4 with x1,y1,x2,y2 on resized image -> scale up to original
    if scale == 1.0:
        return boxes
    inv = 1.0 / scale
    return boxes * inv

# -----------------------
# Main processing
# -----------------------
def detect_and_track_video():
    global running, recording, out, video_save_path, video_path, tracker
    if not video_path:
        print("‚ö† Please upload a video first.")
        return

    # reset tracker state for new video
    try:
        tracker.tracker.clear_tracks()
    except Exception:
        # safe fallback if internals differ
        tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(" Cannot open video.")
        return

    frame_idx = 0

    # Use autocast for mixed precision on CUDA
    use_amp = (device.type == "cuda")
    if use_amp:
        scaler_ctx = torch.cuda.amp.autocast
    else:
        # dummy context manager when no amp available
        from contextlib import nullcontext
        scaler_ctx = nullcontext

    while running:
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        # Optionally skip frames to boost throughput
        if FRAME_SKIP > 1 and (frame_idx % FRAME_SKIP) != 0:
            # still show frame but don't run detection (optional: could skip showing as well)
            cv2.imshow("Tracking (skipping frames for speed)", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            continue

        # Resize for faster inference, keep scale to map boxes back
        resized, scale = resize_and_scale(frame, TARGET_WIDTH)
        img_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
        img_tensor = torch.from_numpy(img_rgb / 255.0).permute(2, 0, 1).float().to(device).unsqueeze(0)

        with torch.no_grad():
            with scaler_ctx():
                preds = model(img_tensor)[0]

        boxes = preds['boxes'].cpu().numpy()    # x1,y1,x2,y2 on resized frame
        scores = preds['scores'].cpu().numpy()
        labels = preds['labels'].cpu().numpy()

        # Filter detections: score + person class
        detections = []
        for box, score, label in zip(boxes, scores, labels):
            if score < CONF_THRESH:
                continue
            if int(label) != PERSON_CLASS_ID:
                continue
            # scale box coords back to original frame size
            x1, y1, x2, y2 = box
            x1, y1, x2, y2 = [int(x / (scale if scale != 0 else 1.0)) for x in (x1, y1, x2, y2)]
            w = x2 - x1
            h = y2 - y1
            # deep_sort_realtime expects [x, y, w, h]
            detections.append(([float(x1), float(y1), float(w), float(h)], float(score), "person"))

        # Update tracker with detections on original-sized frame
        tracks = tracker.update_tracks(detections, frame=frame)

        # Count only stable confirmed tracks
        current_count = 0
        for track in tracks:
            # require confirmed and not too stale
            if not track.is_confirmed():
                continue
            if track.time_since_update > TRACK_TIME_SINCE_UPDATE:
                continue
            current_count += 1
            x1, y1, x2, y2 = map(int, track.to_ltrb())
            cv2.rectangle(frame, (x1, y1), (x2, y2), GREEN, 2)
            cv2.putText(frame, f"ID:{track.track_id}", (x1, max(y1 - 8, 8)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, GREEN, 2)

        # Display count on frame
        cv2.putText(frame, f"Current: {current_count}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, RED, 2)

        # Recording
        if recording:
            if out is None:
                filename = f"tracked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
                video_save_path = os.path.join(download_path, filename)
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                h, w = frame.shape[:2]
                out = cv2.VideoWriter(video_save_path, fourcc, 20.0, (w, h))
                print("üé• Recording ->", video_save_path)
            out.write(frame)

        cv2.imshow("Tracking (optimized)", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    if out is not None:
        out.release()
    cv2.destroyAllWindows()

# -----------------------
# Basic controls + tkinter UI (same pattern)
# -----------------------
def start_tracking():
    global running
    if not running and video_path:
        running = True
        threading.Thread(target=detect_and_track_video, daemon=True).start()
    elif not video_path:
        print("‚ö† Upload a video before starting.")

def stop_tracking():
    global running
    running = False

def start_recording():
    global recording, out, video_save_path
    if not recording:
        recording = True
        video_save_path = None
        out = None
        print("üé¨ Recording started")

def stop_recording():
    global recording, out, video_save_path
    if recording:
        recording = False
        if out is not None:
            out.release()
            out = None
        if video_save_path:
            print("üíæ Recording saved to:", video_save_path)
        else:
            print("Recording stopped (no file)")

def upload_video():
    global video_path, tracker
    file_path = filedialog.askopenfilename(
        title="Select a video file",
        filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv")]
    )
    if file_path:
        video_path = file_path
        # reset tracker on new video to avoid leftover IDs
        try:
            tracker.tracker.clear_tracks()
        except Exception:
            tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)
        print("‚úÖ Video loaded:", video_path)
        video_label.config(text=f"Video: {os.path.basename(video_path)}", fg="lightgreen")

# Tkinter UI
root = tk.Tk()
root.title("Optimized Tracker")
root.geometry("460x420")
root.configure(bg="#202020")

style = ttk.Style()
style.configure("TButton", font=("Arial", 12), padding=10)

title_label = tk.Label(root, text="Optimized: Detector + DeepSORT",
                       bg="#202020", fg="white", font=("Arial", 14))
title_label.pack(pady=15)

ttk.Button(root, text=" Upload Video", command=upload_video).pack(pady=5)
video_label = tk.Label(root, text="No video selected", bg="#202020", fg="gray", font=("Arial", 10))
video_label.pack()

ttk.Button(root, text=" Start Tracking", command=start_tracking).pack(pady=5)
ttk.Button(root, text=" Stop Tracking", command=stop_tracking).pack(pady=5)
ttk.Button(root, text=" Start Recording", command=start_recording).pack(pady=5)
ttk.Button(root, text=" Stop Recording", command=stop_recording).pack(pady=5)
ttk.Button(root, text=" Exit", command=root.destroy).pack(pady=5)

root.mainloop()


Using device: cpu


  import pkg_resources
  self.model.load_state_dict(torch.load(model_wts_path))


In [None]:
# ============================================================== edit
# YOLOv8 + ByteTrack + Tkinter GUI (Webcam + Video Tracking)
# - Detects and tracks people in webcam or uploaded video
# - GPU accelerated (if available)
# - Frame skipping, resize optimization, and recording
# - Shows ONLY current people count (no unique counter)
# ==============================================================

import cv2
import threading
import torch
import os
import tkinter as tk
from tkinter import ttk, filedialog
from ultralytics import YOLO
from datetime import datetime

# -----------------------
# Config
# -----------------------
MODEL_NAME = "yolov8n.pt"   # lightweight model
TARGET_WIDTH = 640          # resize width (lower = faster)
FRAME_SKIP = 1              # 1 = every frame, 2 = every other frame
CONF_THRESH = 0.5           # YOLO confidence threshold
PERSON_CLASS_ID = 0         # YOLO person ID

# -----------------------
# Globals
# -----------------------
running = False
recording = False
using_webcam = False
out = None
video_save_path = None
video_path = None
download_path = os.path.join(os.path.expanduser("~"), "Downloads")
os.makedirs(download_path, exist_ok=True)

GREEN = (0, 255, 0)
RED = (0, 0, 255)

# -----------------------
# Device & Model
# -----------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(" Using device:", device)

print(" Loading YOLOv8 model...")
model = YOLO(MODEL_NAME)
model.to(device)
print(" Model loaded successfully.")

# -----------------------
# Helper functions
# -----------------------
def resize_frame(frame, target_w=TARGET_WIDTH):
    """Resize frame keeping aspect ratio."""
    h, w = frame.shape[:2]
    if w <= target_w:
        return frame, 1.0
    scale = target_w / float(w)
    new_h = int(h * scale)
    resized = cv2.resize(frame, (target_w, new_h))
    return resized, scale


# -----------------------
# Main Detection & Tracking Loop
# -----------------------
def detect_and_track(source):
    global running, recording, out, video_save_path

    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        print("Cannot open source.")
        return

    frame_idx = 0
    out = None

    while running:
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        if FRAME_SKIP > 1 and (frame_idx % FRAME_SKIP) != 0:
            cv2.imshow("YOLOv8 + ByteTrack", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            continue

        resized, scale = resize_frame(frame)

        # YOLOv8 + ByteTrack tracking
        results = model.track(
            resized,
            persist=True,
            conf=CONF_THRESH,
            tracker="bytetrack.yaml",
            verbose=False
        )

        # Draw detections
        if len(results) > 0 and results[0].boxes is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            ids = results[0].boxes.id
            cls = results[0].boxes.cls.cpu().numpy()

            if ids is not None:
                ids = ids.int().cpu().numpy()
                for box, track_id, c in zip(boxes, ids, cls):
                    if int(c) != PERSON_CLASS_ID:
                        continue
                    x1, y1, x2, y2 = box
                    if scale != 1.0:
                        inv = 1.0 / scale
                        x1, y1, x2, y2 = [int(v * inv) for v in (x1, y1, x2, y2)]
                    cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), GREEN, 2)
                    cv2.putText(frame, f"ID:{int(track_id)}", (int(x1), int(y1) - 5),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, GREEN, 2)

                current_count = len(set(ids))
            else:
                current_count = 0
        else:
            current_count = 0

        # Display count
        cv2.putText(frame, f"People: {current_count}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, RED, 2)

        # Recording logic
        if recording:
            if out is None:
                filename = f"yolo_track_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
                video_save_path = os.path.join(download_path, filename)
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                h, w = frame.shape[:2]
                out = cv2.VideoWriter(video_save_path, fourcc, 20.0, (w, h))
                print("üé• Recording ->", video_save_path)
            out.write(frame)

        cv2.imshow("YOLOv8 + ByteTrack", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    if out is not None:
        out.release()
    cv2.destroyAllWindows()


# -----------------------
# Tkinter UI Functions
# -----------------------
def start_tracking_video():
    global running, using_webcam
    if not video_path:
        print("‚ö† Upload a video first.")
        return
    if not running:
        running = True
        using_webcam = False
        threading.Thread(target=detect_and_track, args=(video_path,), daemon=True).start()

def start_tracking_webcam():
    global running, using_webcam
    if not running:
        running = True
        using_webcam = True
        threading.Thread(target=detect_and_track, args=(0,), daemon=True).start()

def stop_tracking():
    global running
    running = False

def start_recording():
    global recording, out
    if not recording:
        recording = True
        out = None
        print(" Recording started")

def stop_recording():
    global recording, out, video_save_path
    if recording:
        recording = False
        if out is not None:
            out.release()
            out = None
        if video_save_path:
            print("Saved to:", video_save_path)
        else:
            print("Recording stopped (no file)")

def upload_video():
    global video_path
    file_path = filedialog.askopenfilename(
        title="Select a video file",
        filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv")]
    )
    if file_path:
        video_path = file_path
        print(" Video loaded:", video_path)
        video_label.config(text=f"Video: {os.path.basename(video_path)}", fg="lightgreen")


# -----------------------
# GUI setup
# -----------------------
root = tk.Tk()
root.title("YOLOv8 + ByteTrack Tracker")
root.geometry("460x480")
root.configure(bg="#202020")

style = ttk.Style()
style.configure("TButton", font=("Arial", 12), padding=10)

tk.Label(root, text="YOLOv8 + ByteTrack Tracker", bg="#202020", fg="white",
         font=("Arial", 14)).pack(pady=15)

ttk.Button(root, text=" Start Webcam Tracking", command=start_tracking_webcam).pack(pady=5)
ttk.Button(root, text=" Upload Video", command=upload_video).pack(pady=5)
video_label = tk.Label(root, text="No video selected", bg="#202020", fg="gray", font=("Arial", 10))
video_label.pack()

ttk.Button(root, text=" Start Video Tracking", command=start_tracking_video).pack(pady=5)
ttk.Button(root, text=" Stop Tracking", command=stop_tracking).pack(pady=5)
ttk.Button(root, text=" Start Recording", command=start_recording).pack(pady=5)
ttk.Button(root, text=" Stop Recording", command=stop_recording).pack(pady=5)
ttk.Button(root, text=" Exit", command=root.destroy).pack(pady=10)

root.mainloop()


‚úÖ Using device: cpu
‚è≥ Loading YOLOv8 model...
‚úÖ Model loaded successfully.


In [None]:
# ===============================
# Faster R-CNN + DeepSORT + Tkinter Live Tracking (fixed counting + person-only)
# ===============================
import cv2
import torch
import numpy as np
import threading
import tkinter as tk
from tkinter import ttk
from deep_sort_realtime.deepsort_tracker import DeepSort
import os
from datetime import datetime
from torchvision.models.detection import fasterrcnn_resnet50_fpn

# Load Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)
model.eval()

# Initialize DeepSORT
tracker = DeepSort(max_age=30, n_init=3, nn_budget=70)

# Global flags
running = False
recording = False
out = None
video_save_path = None

# Output folder (Downloads)
download_path = os.path.join(os.path.expanduser("~"), "Downloads")
os.makedirs(download_path, exist_ok=True)

# Colors
GREEN = (0, 255, 0)
RED = (0, 0, 255)

# COCO person class id
PERSON_CLASS_ID = 1

def detect_and_track():
    global running, recording, out, video_save_path
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Cannot open camera")
        return

    # Use while running loop
    while running:
        ret, frame = cap.read()
        if not ret:
            break

        # Prepare image tensor
        img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img_tensor = torch.from_numpy(img / 255.).permute(2, 0, 1).float().unsqueeze(0)

        # Forward pass
        with torch.no_grad():
            preds = model(img_tensor)[0]

        boxes = preds['boxes'].cpu().numpy()          # (N,4) x1,y1,x2,y2
        scores = preds['scores'].cpu().numpy()        # (N,)
        labels = preds['labels'].cpu().numpy()        # (N,)

        # Build detections list for DeepSORT: only person class
        detections = []
        CONF_THRESH = 0.5
        for box, score, label in zip(boxes, scores, labels):
            if score < CONF_THRESH:
                continue
            if int(label) != PERSON_CLASS_ID:
                continue
            x1, y1, x2, y2 = box
            w = x2 - x1
            h = y2 - y1
            # DeepSort expects [x, y, w, h]
            detections.append(([float(x1), float(y1), float(w), float(h)], float(score), "person"))

        # Update/deepsort tracks
        tracks = tracker.update_tracks(detections, frame=frame)

        # Draw tracks and count confirmed active tracks
        active_tracks = []
        for track in tracks:
            # DeepSort track API: is_confirmed(), time_since_update, to_ltrb(), track_id
            if not track.is_confirmed():
                continue
            if track.time_since_update > 1:  # skip stale
                continue
            active_tracks.append(track)
            x1, y1, x2, y2 = map(int, track.to_ltrb())
            track_id = track.track_id
            cv2.rectangle(frame, (x1, y1), (x2, y2), GREEN, 2)
            cv2.putText(frame, f"ID: {track_id}", (x1, max(y1 - 10, 10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, GREEN, 2)

        # Current count = number of active confirmed tracks
        current_count = len(active_tracks)
        cv2.putText(frame, f"Count: {current_count}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, RED, 2)

        # Initialize video writer if recording (use frame shape)
        if recording:
            if out is None:
                # create unique filename
                filename = f"tracked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
                video_save_path = os.path.join(download_path, filename)
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                h, w = frame.shape[:2]
                out = cv2.VideoWriter(video_save_path, fourcc, 20.0, (w, h))
                print("Recording ->", video_save_path)
            out.write(frame)
        else:
            # if not recording but writer exists, release it (safety)
            if out is not None and video_save_path is None:
                out.release()

        cv2.imshow("Faster R-CNN + DeepSORT Tracking", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    if out is not None:
        out.release()
    cv2.destroyAllWindows()

def start_tracking():
    global running
    if not running:
        running = True
        threading.Thread(target=detect_and_track, daemon=True).start()

def stop_tracking():
    global running
    running = False

def start_recording():
    global recording, out, video_save_path
    if not recording:
        recording = True
        video_save_path = None
        out = None
        print("Recording started")

def stop_recording():
    global recording, out, video_save_path
    if recording:
        recording = False
        if out is not None:
            out.release()
            out = None
        if video_save_path:
            print("Recording saved to:", video_save_path)
        else:
            print("Recording stopped (no file)")

# ===============================
# Tkinter UI
# ===============================
root = tk.Tk()
root.title("Faster R-CNN + DeepSORT Tracker")
root.geometry("400x300")
root.configure(bg="#202020")

style = ttk.Style()
style.configure("TButton", font=("Arial", 12), padding=10)

title_label = tk.Label(root, text=" Faster R-CNN + DeepSORT Tracker", bg="#202020", fg="white", font=("Arial", 14))
title_label.pack(pady=15)

ttk.Button(root, text="Start Tracking", command=start_tracking).pack(pady=5)
ttk.Button(root, text="Stop Tracking", command=stop_tracking).pack(pady=5)
ttk.Button(root, text="Start Recording", command=start_recording).pack(pady=5)
ttk.Button(root, text="Stop Recording", command=stop_recording).pack(pady=5)
ttk.Button(root, text="Exit", command=root.destroy).pack(pady=5)

root.mainloop()