In [1]:
!pip install ultralytics opencv-python numpy scipy



In [2]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import deque
from scipy.spatial import distance as dist
from collections import OrderedDict

In [3]:
import cv2
import numpy as np
from collections import OrderedDict, deque
import time, os
from ultralytics import YOLO

# ==== User Configurations ====
VIDEO_PATH = r"C:\Users\harsh\Downloads\samplevideo.mp4"   # <-- Change to your video path or 0 for webcam"
OUTPUT_PATH = "output.avi"
DISPLAY = True                   # Set False if you donâ€™t want to open video window
COUNT_LINE_POSITION = 0.5        # Line position (as fraction of frame height)
LINE_THICKNESS = 2
MAX_DISAPPEARED = 40
MAX_DISTANCE = 80
HISTORY_LEN = 64

# Ensure old output is removed
if os.path.exists(OUTPUT_PATH):
    os.remove(OUTPUT_PATH)

In [4]:
class CentroidTracker:
    def _init_(self, maxDisappeared=MAX_DISAPPEARED, maxDistance=MAX_DISTANCE):
        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()
        self.maxDisappeared = maxDisappeared
        self.maxDistance = maxDistance
        self.tracks = {}

    def register(self, centroid):
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.tracks[self.nextObjectID] = deque(maxlen=HISTORY_LEN)
        self.tracks[self.nextObjectID].append(centroid)
        self.nextObjectID += 1

    def deregister(self, objectID):
        if objectID in self.objects: del self.objects[objectID]
        if objectID in self.disappeared: del self.disappeared[objectID]
        if objectID in self.tracks: del self.tracks[objectID]

    def update(self, rects):
        if len(rects) == 0:
            for oid in list(self.disappeared.keys()):
                self.disappeared[oid] += 1
                if self.disappeared[oid] > self.maxDisappeared:
                    self.deregister(oid)
            return self.objects

        inputCentroids = np.zeros((len(rects), 2), dtype="int")
        for (i, (startX, startY, endX, endY)) in enumerate(rects):
            cX, cY = int((startX + endX) / 2.0), int((startY + endY) / 2.0)
            inputCentroids[i] = (cX, cY)

        if len(self.objects) == 0:
            for i in range(0, len(inputCentroids)):
                self.register(tuple(inputCentroids[i]))
        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())
            D = np.linalg.norm(np.array(objectCentroids)[:, np.newaxis] - inputCentroids[np.newaxis, :], axis=2)

            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]
            usedRows, usedCols = set(), set()

            for (row, col) in zip(rows, cols):
                if row in usedRows or col in usedCols: continue
                if D[row, col] > self.maxDistance: continue
                objectID = objectIDs[row]
                self.objects[objectID] = tuple(inputCentroids[col])
                self.tracks[objectID].append(tuple(inputCentroids[col]))
                self.disappeared[objectID] = 0
                usedRows.add(row)
                usedCols.add(col)

            unusedRows = set(range(0, D.shape[0])).difference(usedRows)
            unusedCols = set(range(0, D.shape[1])).difference(usedCols)

            for row in unusedRows:
                objectID = objectIDs[row]
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)
            for col in unusedCols:
                self.register(tuple(inputCentroids[col]))

        return self.objects

In [5]:
def intersects_line(prev, cur, line_y):
    return (prev[1] < line_y and cur[1] >= line_y) or (prev[1] > line_y and cur[1] <= line_y)

def draw_info(frame, total_in, total_out):
    cv2.putText(frame, f"IN: {total_in}  OUT: {total_out}", (20, 40),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

In [6]:
# -----------------------------
# Centroid Tracker Class (Fixed)
# -----------------------------
from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np

class CentroidTracker:
    def __init__(self, maxDisappeared=50, maxDistance=80):  # âœ… fixed constructor
        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()  # âœ… now properly initialized
        self.maxDisappeared = maxDisappeared
        self.maxDistance = maxDistance  # âœ… optional: distance threshold for better tracking

    def register(self, centroid):
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self, objectID):
        del self.objects[objectID]
        del self.disappeared[objectID]

    def update(self, rects):
        # if no detections, mark all objects as disappeared
        if len(rects) == 0:
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)
            return self.objects

        # compute centroids for current frame
        inputCentroids = np.zeros((len(rects), 2), dtype="int")
        for (i, (x1, y1, x2, y2)) in enumerate(rects):
            cX = int((x1 + x2) / 2.0)
            cY = int((y1 + y2) / 2.0)
            inputCentroids[i] = (cX, cY)

        # if no existing objects, register all
        if len(self.objects) == 0:
            for i in range(0, len(inputCentroids)):
                self.register(inputCentroids[i])
        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            D = dist.cdist(np.array(objectCentroids), inputCentroids)
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            usedRows, usedCols = set(), set()

            for (row, col) in zip(rows, cols):
                if row in usedRows or col in usedCols:
                    continue
                if D[row, col] > self.maxDistance:  # âœ… optional distance filter
                    continue
                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0
                usedRows.add(row)
                usedCols.add(col)

            unusedRows = set(range(0, D.shape[0])).difference(usedRows)
            unusedCols = set(range(0, D.shape[1])).difference(usedCols)

            if D.shape[0] >= D.shape[1]:
                for row in unusedRows:
                    objectID = objectIDs[row]
                    self.disappeared[objectID] += 1
                    if self.disappeared[objectID] > self.maxDisappeared:
                        self.deregister(objectID)
            else:
                for col in unusedCols:
                    self.register(inputCentroids[col])

        return self.objects

In [7]:
def process_video(video_path=VIDEO_PATH, output_path="output.avi"):
    from collections import deque

    model = YOLO("yolov8n.pt")
    ct = CentroidTracker(maxDisappeared=40)
    trackable = {}
    total_in, total_out = 0, 0

    cap = cv2.VideoCapture(video_path)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"XVID"), fps, (w, h))

    line_y = h // 2  # middle line
    pts = [deque(maxlen=30) for _ in range(9999)]

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = model(frame, stream=True)
        rects = []

        for r in results:
            for box in r.boxes:
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                rects.append((x1, y1, x2, y2))
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)

        objects = ct.update(rects)

        for (objectID, centroid) in objects.items():
            to = trackable.get(objectID, None)
            if to is None:
                trackable[objectID] = {"centroids": [centroid], "counted": False}
            else:
                trackable[objectID]["centroids"].append(centroid)
                if not to["counted"]:
                    if centroid[1] < line_y and np.mean([c[1] for c in to["centroids"][-5:]]) >= line_y:
                        total_out += 1
                        to["counted"] = True
                    elif centroid[1] > line_y and np.mean([c[1] for c in to["centroids"][-5:]]) <= line_y:
                        total_in += 1
                        to["counted"] = True

            text = f"ID {objectID}"
            cv2.putText(frame, text, (centroid[0] - 10, centroid[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
            cv2.circle(frame, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)

        cv2.line(frame, (0, line_y), (w, line_y), (0, 0, 255), 2)
        cv2.putText(frame, f"IN: {total_in}  OUT: {total_out}",
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

        writer.write(frame)

    cap.release()
    writer.release()
    print("âœ… Processing complete. Video saved as:", output_path)
    return trackable, {"in": total_in, "out": total_out}

In [8]:
trackable, counts = process_video()


0: 384x640 (no detections), 627.7ms
Speed: 45.8ms preprocess, 627.7ms inference, 46.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 313.3ms
Speed: 6.6ms preprocess, 313.3ms inference, 3.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 332.1ms
Speed: 9.5ms preprocess, 332.1ms inference, 3.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 364.8ms
Speed: 12.9ms preprocess, 364.8ms inference, 4.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 363.4ms
Speed: 10.4ms preprocess, 363.4ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 302.4ms
Speed: 11.5ms preprocess, 302.4ms inference, 2.4ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 301.1ms
Speed: 9.8ms preprocess, 301.1ms inference, 2.6ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 113.2ms
Speed: 10.4ms

In [9]:
# Generate Heatmap from tracked centroids
def create_heatmap(trackable, w, h):
    heatmap = np.zeros((h, w), np.float32)
    for obj in trackable.values():
        for (x, y) in obj["centroids"]:
            if 0 <= y < h and 0 <= x < w:
                heatmap[y, x] += 1
    heatmap = cv2.GaussianBlur(heatmap, (0, 0), sigmaX=25, sigmaY=25)
    heatmap = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX)
    heatmap = np.uint8(heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    return heatmap


# ðŸ§  Use the trackable data returned from process_video()
if "trackable" in locals() and trackable:
    cap = cv2.VideoCapture(VIDEO_PATH)
    ret, frame = cap.read()
    if ret:
        h, w = frame.shape[:2]
        heatmap = create_heatmap(trackable, w, h)
        cv2.imwrite("heatmap.png", heatmap)
        print("ðŸ”¥ Heatmap saved as heatmap.png")
    else:
        print("âš  Could not read the first frame from video.")
    cap.release()
else:
    print("âš  Run the cell with 'trackable, counts = process_video()' first.")

ðŸ”¥ Heatmap saved as heatmap.png


In [13]:
import cv2
import numpy as np
import threading
from ultralytics import YOLO
from scipy.spatial import distance as dist
from collections import OrderedDict

# ==========================
# Centroid Tracker Class
# ==========================
class CentroidTracker:
    def __init__(self, maxDisappeared=50, maxDistance=50):
        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()
        self.maxDisappeared = maxDisappeared
        self.maxDistance = maxDistance

    def register(self, centroid):
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self, objectID):
        del self.objects[objectID]
        del self.disappeared[objectID]

    def update(self, rects):
        if len(rects) == 0:
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)
            return self.objects

        inputCentroids = np.zeros((len(rects), 2), dtype="int")

        for (i, (x1, y1, x2, y2)) in enumerate(rects):
            cX = int((x1 + x2) / 2.0)
            cY = int((y1 + y2) / 2.0)
            inputCentroids[i] = (cX, cY)

        if len(self.objects) == 0:
            for i in range(len(inputCentroids)):
                self.register(inputCentroids[i])

        else:
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            D = dist.cdist(np.array(objectCentroids), inputCentroids)

            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            usedRows = set()
            usedCols = set()

            for (row, col) in zip(rows, cols):
                if row in usedRows or col in usedCols:
                    continue
                if D[row, col] > self.maxDistance:
                    continue
                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0
                usedRows.add(row)
                usedCols.add(col)

            unusedCols = set(range(0, D.shape[1])).difference(usedCols)

            for col in unusedCols:
                self.register(inputCentroids[col])

        return self.objects


# ==========================
# Real-time Footfall Tracker
# ==========================
model = YOLO("yolov8n.pt")
ct_live = CentroidTracker(maxDisappeared=40, maxDistance=80)
trackable_live = {}
total_in_live, total_out_live = 0, 0

cap = cv2.VideoCapture(0)
heat_accum = None

print("ðŸŽ¥ Starting real-time Footfall Counter...")

# ==========================
# Enter-Key Stop Thread
# ==========================
stop_flag = False

def stop_on_enter():
    global stop_flag
    input("ðŸ”´ Press ENTER anytime to stop the webcam...\n")
    stop_flag = True

threading.Thread(target=stop_on_enter, daemon=True).start()


while True:
    ret, frame = cap.read()
    if not ret:
        break

    h, w = frame.shape[:2]
    line_y = int(h * 0.5)

    results = model(frame, imgsz=640, verbose=False)[0]

    boxes = []
    for box in results.boxes:
        if int(box.cls[0]) == 0:
            x1, y1, x2, y2 = map(int, box.xyxy[0])
            boxes.append((x1, y1, x2, y2))

    objects = ct_live.update(boxes)

    for (objectID, centroid) in objects.items():
        to = trackable_live.get(objectID, None)
        if to is None:
            trackable_live[objectID] = {'counted': False, 'centroids': [centroid]}
            to = trackable_live[objectID]
        else:
            to['centroids'].append(centroid)

        if len(to['centroids']) >= 2 and not to['counted']:
            prev, cur = to['centroids'][-2], to['centroids'][-1]
            if prev[1] < line_y <= cur[1]:
                total_in_live += 1
                to['counted'] = True
            elif prev[1] > line_y >= cur[1]:
                total_out_live += 1
                to['counted'] = True

        for i in range(1, len(to['centroids'])):
            cv2.line(frame, to['centroids'][i-1], to['centroids'][i], (255, 0, 0), 2)

        cv2.circle(frame, centroid, 4, (0, 255, 0), -1)
        cv2.putText(frame, f"ID {objectID}", (centroid[0]-10, centroid[1]-10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

    if heat_accum is None:
        heat_accum = np.zeros((h, w), dtype=np.float32)

    for obj in trackable_live.values():
        for (x, y) in obj['centroids']:
            if 0 <= x < w and 0 <= y < h:
                heat_accum[y, x] += 1

    cv2.line(frame, (0, line_y), (w, line_y), (0, 0, 255), 2)
    cv2.putText(frame, f"IN: {total_in_live} | OUT: {total_out_live}",
                (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)

    cv2.imshow("ðŸŽ¦ Real-time Footfall Counter", frame)

    # Stop conditions
    if stop_flag:
        print("ðŸ›‘ Stopping webcam feed...")
        break

    if cv2.waitKey(1) == 27:
        break

cap.release()
cv2.destroyAllWindows()

if heat_accum is not None:
    heatmap = cv2.applyColorMap(cv2.normalize(heat_accum, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8),
                                cv2.COLORMAP_JET)
    cv2.imwrite("real_time_heatmap.png", heatmap)
    print("ðŸ”¥ Saved Real-time Heatmap: real_time_heatmap.png")

print(f"âœ… Final Counts â€” People IN: {total_in_live}, OUT: {total_out_live}")


ðŸŽ¥ Starting real-time Footfall Counter...


ðŸ”´ Press ENTER anytime to stop the webcam...
 


ðŸ›‘ Stopping webcam feed...
ðŸ”¥ Saved Real-time Heatmap: real_time_heatmap.png
âœ… Final Counts â€” People IN: 6, OUT: 3
