# Person Detection and Movement Tracking System

This system uses OpenCV to detect people via webcam and tracks their movements to identify:
- People walking back and forth
- Anyone approaching your computer
- Real-time alerts for suspicious activity


In [2]:
import cv2
import numpy as np
import time
import pygame
from collections import deque
import matplotlib.pyplot as plt
from datetime import datetime
import os
from pathlib import Path

# Initialize pygame for sound alerts
pygame.mixer.init()

class MultiModelPersonDetector:
    def __init__(self):
        # Available detection models
        self.models = {
            'hog': 'HOG + SVM (OpenCV)',
            'yolo': 'YOLOv8 (Ultralytics)',
            'mobilenet': 'MobileNet SSD (OpenCV DNN)',
            'cascade': 'Haar Cascade (OpenCV)',
            'background_subtraction': 'Background Subtraction + Contours'
        }

        self.current_model = 'yolo'
        self.model_objects = {}

        # Initialize all models
        self._initialize_models()

        # Movement tracking
        self.person_tracks = {}
        self.track_id = 0
        self.approach_threshold = 0.3
        self.oscillation_threshold = 3
        self.pacing_alert_window = 10  # seconds
        self.pacing_tracks = {}  # track_id: {'start_time': float, 'last_seen': float, 'direction_changes': int}

        # Alert settings
        self.last_alert_time = 0
        self.alert_cooldown = 5

        # Statistics
        self.detection_count = 0
        self.approach_alerts = 0
        self.pacing_alerts = 0

        # Performance tracking
        self.fps_history = deque(maxlen=30)
        self.detection_confidence = deque(maxlen=30)

        # Notification settings
        self.notification_message = None
        self.notification_start_time = 0
        self.notification_duration = 3  # seconds
        self.notification_flash = False
        self.notification_flash_interval = 0.3  # seconds
        self.last_flash_time = 0

        # Close distance alert settings
        self.close_alert_distance = 0.45
        self.close_alert_time = 2  # seconds
        self.close_tracks = {}  # track_id: {'start_time': float, 'last_seen': float, 'alerted': bool}

    def _initialize_models(self):
        """Initialize all detection models"""
        print("Initializing detection models...")

        # 1. HOG + SVM
        try:
            hog = cv2.HOGDescriptor()
            hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
            self.model_objects['hog'] = hog
            print("✓ HOG + SVM initialized")
        except Exception as e:
            print(f"✗ HOG + SVM failed: {e}")

        # 2. YOLOv8
        try:
            from ultralytics import YOLO
            import torch
            device = 'mps' if hasattr(torch, 'backends') and torch.backends.mps.is_available() else 'cpu'
            yolo_model = YOLO('yolov8n.pt')
            yolo_model.to(device)
            self.model_objects['yolo'] = yolo_model
            print(f"✓ YOLOv8 initialized on device: {device}")
        except Exception as e:
            print(f"✗ YOLOv8 failed: {e}")

        # 3. MobileNet SSD
        try:
            # Download MobileNet SSD model if not exists
            model_dir = Path("models")
            model_dir.mkdir(exist_ok=True)

            config_path = model_dir / "MobileNetSSD_deploy.prototxt"
            weights_path = model_dir / "MobileNetSSD_deploy.caffemodel"

            # For demo purposes, we'll use a simpler approach
            # You can manually download these files if needed
            try:
                net = cv2.dnn.readNetFromCaffe(str(config_path), str(weights_path))
                self.model_objects['mobilenet'] = net
                print("✓ MobileNet SSD initialized")
            except:
                print("✗ MobileNet SSD files not found (download required)")
        except Exception as e:
            print(f"✗ MobileNet SSD failed: {e}")

        # 4. Haar Cascade
        try:
            # Use correct path to cascade file
            cascade_path = os.path.join(cv2.data.haarcascades, 'haarcascade_fullbody.xml')
            if os.path.exists(cascade_path):
                cascade = cv2.CascadeClassifier(cascade_path)
                self.model_objects['cascade'] = cascade
                print("✓ Haar Cascade initialized")
            else:
                print("✗ Haar Cascade file not found")
        except Exception as e:
            print(f"✗ Haar Cascade failed: {e}")

        # 5. Background Subtraction
        try:
            bg_subtractor = cv2.createBackgroundSubtractorMOG2(detectShadows=True)
            self.model_objects['background_subtraction'] = bg_subtractor
            print("✓ Background Subtraction initialized")
        except Exception as e:
            print(f"✗ Background Subtraction failed: {e}")

    def switch_model(self, model_name):
        """Switch to a different detection model"""
        if model_name in self.models and model_name in self.model_objects:
            self.current_model = model_name
            print(f"Switched to: {self.models[model_name]}")
            return True
        else:
            print(f"Model '{model_name}' not available")
            return False

    def detect_people_hog(self, frame):
        """HOG + SVM detection"""
        if 'hog' not in self.model_objects:
            return [], []

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        boxes, weights = self.model_objects['hog'].detectMultiScale(
            gray,
            winStride=(8, 8),
            padding=(32, 32),
            scale=1.05
        )

        # Convert to [x1, y1, x2, y2] format
        boxes = np.array([[x, y, x + w, y + h] for (x, y, w, h) in boxes])
        return boxes, weights

    def detect_people_yolo(self, frame):
        """YOLOv8 detection"""
        if 'yolo' not in self.model_objects:
            return [], []

        results = self.model_objects['yolo'](frame, verbose=False)
        boxes = []
        confidences = []

        for result in results:
            for box in result.boxes:
                if box.cls == 0:  # Person class
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    boxes.append([int(x1), int(y1), int(x2), int(y2)])
                    confidences.append(float(box.conf))

        return np.array(boxes), np.array(confidences)

    def detect_people_mobilenet(self, frame):
        """MobileNet SSD detection"""
        if 'mobilenet' not in self.model_objects:
            return [], []

        h, w = frame.shape[:2]
        blob = cv2.dnn.blobFromImage(frame, 0.007843, (300, 300), (127.5, 127.5, 127.5))

        net = self.model_objects['mobilenet']
        net.setInput(blob)
        detections = net.forward()

        boxes = []
        confidences = []

        for i in range(detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            class_id = int(detections[0, 0, i, 1])

            if class_id == 15 and confidence > 0.5:  # Person class
                x1 = int(detections[0, 0, i, 3] * w)
                y1 = int(detections[0, 0, i, 4] * h)
                x2 = int(detections[0, 0, i, 5] * w)
                y2 = int(detections[0, 0, i, 6] * h)

                boxes.append([x1, y1, x2, y2])
                confidences.append(confidence)

        return np.array(boxes), np.array(confidences)

    def detect_people_cascade(self, frame):
        """Haar Cascade detection"""
        if 'cascade' not in self.model_objects:
            return [], []

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        bodies = self.model_objects['cascade'].detectMultiScale(
            gray,
            scaleFactor=1.1,
            minNeighbors=3,
            minSize=(30, 30)
        )

        # Convert to [x1, y1, x2, y2] format
        boxes = np.array([[x, y, x + w, y + h] for (x, y, w, h) in bodies])
        weights = np.ones(len(boxes))  # No confidence scores from cascade

        return boxes, weights

    def detect_people_background_subtraction(self, frame):
        """Background subtraction with contour detection"""
        if 'background_subtraction' not in self.model_objects:
            return [], []

        # Apply background subtraction
        fg_mask = self.model_objects['background_subtraction'].apply(frame)

        # Morphological operations to clean up the mask
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (3, 3))
        fg_mask = cv2.morphologyEx(fg_mask, cv2.MORPH_OPEN, kernel)

        # Find contours
        contours, _ = cv2.findContours(fg_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

        boxes = []
        confidences = []

        for contour in contours:
            area = cv2.contourArea(contour)
            if area > 500:  # Filter small contours
                x, y, w, h = cv2.boundingRect(contour)
                # Filter by aspect ratio (rough person shape)
                aspect_ratio = h / w
                if 1.2 < aspect_ratio < 4.0:
                    boxes.append([x, y, x + w, y + h])
                    confidences.append(area / 10000)  # Use area as confidence

        return np.array(boxes), np.array(confidences)

    def detect_people(self, frame):
        """Detect people using the current model"""
        start_time = time.time()

        if self.current_model == 'hog':
            boxes, weights = self.detect_people_hog(frame)
        elif self.current_model == 'yolo':
            boxes, weights = self.detect_people_yolo(frame)
        elif self.current_model == 'mobilenet':
            boxes, weights = self.detect_people_mobilenet(frame)
        elif self.current_model == 'cascade':
            boxes, weights = self.detect_people_cascade(frame)
        elif self.current_model == 'background_subtraction':
            boxes, weights = self.detect_people_background_subtraction(frame)
        else:
            boxes, weights = [], []

        # Calculate FPS
        detection_time = time.time() - start_time
        fps = 1.0 / detection_time if detection_time > 0 else 0
        self.fps_history.append(fps)

        return boxes, weights

    def track_movement(self, boxes, frame_center):
        """Track person movement and detect patterns"""
        current_time = time.time()
        frame_height, frame_width = frame_center

        alerts = []
        active_track_ids = set()

        for box in boxes:
            x1, y1, x2, y2 = box
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2

            # Find closest existing track or create new one
            min_distance = float('inf')
            closest_track = None

            for track_id, track_data in self.person_tracks.items():
                if track_data['positions']:
                    last_pos = track_data['positions'][-1]
                    dist = np.sqrt((center_x - last_pos[0])**2 + (center_y - last_pos[1])**2)
                    if dist < min_distance and dist < 100:  # Maximum movement threshold
                        min_distance = dist
                        closest_track = track_id

            if closest_track is None:
                # Create new track
                self.track_id += 1
                self.person_tracks[self.track_id] = {
                    'positions': deque(maxlen=30),  # Keep last 30 positions
                    'timestamps': deque(maxlen=30),
                    'direction_changes': 0,
                    'last_direction': None,
                }
                closest_track = self.track_id

            # Update track
            track = self.person_tracks[closest_track]
            track['positions'].append((center_x, center_y))
            track['timestamps'].append(current_time)

            # Pacing logic
            if closest_track not in self.pacing_tracks:
                self.pacing_tracks[closest_track] = {
                    'start_time': current_time,
                    'last_seen': current_time,
                    'direction_changes': 0,
                    'last_direction': None,
                    'alerted': False
                }
            pacing_track = self.pacing_tracks[closest_track]
            pacing_track['last_seen'] = current_time

            # Detect pacing/oscillation
            if len(track['positions']) >= 3:
                positions = list(track['positions'])
                recent_direction = "left" if positions[-1][0] < positions[-3][0] else "right"

                if pacing_track['last_direction'] and pacing_track['last_direction'] != recent_direction:
                    pacing_track['direction_changes'] += 1

                pacing_track['last_direction'] = recent_direction

            active_track_ids.add(closest_track)

            # --- New: Close distance alert logic ---
            # Use the distance calculation from run_distance_demo
            distance = self.calculate_distance_to_camera(box)
            if closest_track not in self.close_tracks:
                self.close_tracks[closest_track] = {
                    'start_time': None,
                    'last_seen': current_time,
                    'alerted': False
                }
            close_track = self.close_tracks[closest_track]
            close_track['last_seen'] = current_time
            if distance < self.close_alert_distance:
                if close_track['start_time'] is None:
                    close_track['start_time'] = current_time
                elif not close_track['alerted'] and (current_time - close_track['start_time'] >= self.close_alert_time):
                    alerts.append(f"Person {closest_track} is too close to the camera! (Dist: {distance:.2f})")
                    close_track['alerted'] = True
            else:
                close_track['start_time'] = None
                close_track['alerted'] = False

        # Clean up old tracks and handle pacing window
        tracks_to_remove = []
        for track_id, pacing_track in self.pacing_tracks.items():
            if track_id not in active_track_ids:
                # If not seen for >1s, keep pacing window open
                if current_time - pacing_track['last_seen'] > 1:
                    continue  # Don't remove yet
            # If pacing for at least 10s and enough direction changes, alert
            if not pacing_track['alerted'] and current_time - pacing_track['start_time'] >= self.pacing_alert_window and pacing_track['direction_changes'] >= self.oscillation_threshold:
                alerts.append(f"Person {track_id} is pacing back and forth!")
                pacing_track['alerted'] = True
            # Remove tracks not seen for a long time
            if current_time - pacing_track['last_seen'] > 5:
                tracks_to_remove.append(track_id)

        for track_id, close_track in self.close_tracks.items():
            if track_id not in active_track_ids and current_time - close_track['last_seen'] > 5:
                tracks_to_remove.append(track_id)

        for track_id in tracks_to_remove:
            if track_id in self.pacing_tracks:
                del self.pacing_tracks[track_id]
            if track_id in self.person_tracks:
                del self.person_tracks[track_id]
            if track_id in self.close_tracks:
                del self.close_tracks[track_id]

        return alerts

    def play_alert_sound(self):
        """Play alert sound (beep)"""
        try:
            # Create a simple beep sound
            sample_rate = 22050
            duration = 0.5
            frequency = 800

            frames = int(duration * sample_rate)
            arr = np.sin(2 * np.pi * frequency * np.linspace(0, duration, frames))
            arr = (arr * 32767).astype(np.int16)

            sound = pygame.sndarray.make_sound(arr)
            sound.play()
        except:
            print("ALERT!")  # Fallback if sound fails

    def trigger_notification(self, message):
        """Trigger an on-screen notification"""
        self.notification_message = message
        self.notification_start_time = time.time()
        self.notification_flash = True
        self.last_flash_time = time.time()

    def draw_detections(self, frame, boxes, alerts):
        """Draw bounding boxes and information on frame"""
        for i, box in enumerate(boxes):
            x1, y1, x2, y2 = box

            # Draw bounding box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            # Show distance above each box
            distance = self.calculate_distance_to_camera(box)
            cv2.putText(frame, f"Dist: {distance:.2f}", (x1, y1-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        # Display current model
        cv2.putText(frame, f"Model: {self.models[self.current_model]}",
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

        # Display FPS
        if self.fps_history:
            avg_fps = np.mean(list(self.fps_history))
            cv2.putText(frame, f"FPS: {avg_fps:.1f}",
                       (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

        # Display alerts
        for i, alert in enumerate(alerts):
            cv2.putText(frame, alert, (10, 90 + i*30),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)

        # Display statistics
        stats = [
            f"People detected: {len(boxes)}",
            f"Total detections: {self.detection_count}",
            f"Approach alerts: {self.approach_alerts}",
            f"Pacing alerts: {self.pacing_alerts}"
        ]
        for i, stat in enumerate(stats):
            cv2.putText(frame, stat, (10, frame.shape[0] - 120 + i*20),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        # On-screen notification overlay (flashing)
        if self.notification_message:
            now = time.time()
            elapsed = now - self.notification_start_time
            if elapsed < self.notification_duration:
                # Flashing effect
                if now - self.last_flash_time > self.notification_flash_interval:
                    self.notification_flash = not self.notification_flash
                    self.last_flash_time = now
                if self.notification_flash:
                    overlay = frame.copy()
                    h, w = frame.shape[:2]
                    cv2.rectangle(overlay, (0, 0), (w, h), (0, 0, 255), -1)
                    alpha = 0.4
                    cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0, frame)
                    cv2.putText(frame, self.notification_message, (int(w*0.1), int(h*0.5)),
                                cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 6, cv2.LINE_AA)
            else:
                self.notification_message = None
        return frame

    def calculate_distance_to_camera(self, box):
        """Estimate distance from camera to detected person"""
        # Assuming box is in [x1, y1, x2, y2] format
        # Calculate the width of the detected person in the image
        person_width_pixels = box[2] - box[0]

        # Assuming a constant real-world width for a person (e.g., 0.5 meters)
        REAL_PERSON_WIDTH = 0.5  # meters

        # Focal length estimation (this should be calibrated for your camera)
        FOCAL_LENGTH = 800  # pixels (example value, needs calibration)

        # Distance calculation using the formula:
        # distance = (REAL_PERSON_WIDTH * FOCAL_LENGTH) / person_width_pixels
        distance = (REAL_PERSON_WIDTH * FOCAL_LENGTH) / person_width_pixels if person_width_pixels > 0 else 0

        return distance


pygame 2.6.1 (SDL 2.28.4, Python 3.12.4)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
def run_person_detection():
    """Main function to run the person detection system"""
    detector = MultiModelPersonDetector()

    # Initialize webcam
    cap = cv2.VideoCapture(0)

    if not cap.isOpened():
        print("Error: Could not open webcam")
        return

    print("Person Detection System Started")
    print("Press 'q' to quit")
    print("Press 's' to save screenshot")
    print("Press 'r' to reset statistics")

    screenshot_count = 0

    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Error: Could not read frame")
                break

            # Flip the camera feed horizontally
            frame = cv2.flip(frame, 1)

            # Detect people
            boxes, weights = detector.detect_people(frame)
            detector.detection_count += len(boxes)

            # Track movement and get alerts
            frame_center = (frame.shape[0], frame.shape[1])
            alerts = detector.track_movement(boxes, frame_center)

            # Handle alerts
            current_time = time.time()
            if alerts and current_time - detector.last_alert_time > detector.alert_cooldown:
                # detector.play_alert_sound()  # Sound disabled for now
                detector.last_alert_time = current_time
                print(f"[{datetime.now().strftime('%H:%M:%S')}] ALERTS: {', '.join(alerts)}")
                detector.trigger_notification("Suspicious behavior detected!")

            # Draw detections and information
            frame = detector.draw_detections(frame, boxes, alerts)

            # Display frame
            cv2.imshow('Person Detection - Security Monitor', frame)

            # Handle key presses
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break
            elif key == ord('s'):
                screenshot_count += 1
                filename = f"data/screenshot_{screenshot_count}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
                cv2.imwrite(filename, frame)
                print(f"Screenshot saved: {filename}")
            elif key == ord('r'):
                detector.detection_count = 0
                detector.approach_alerts = 0
                detector.pacing_alerts = 0
                detector.person_tracks.clear()
                print("Statistics reset")
    except KeyboardInterrupt:
        print("\nStopping detection system...")
    finally:
        cap.release()
        cv2.destroyAllWindows()
        print("Person detection system stopped")


In [4]:
# Model comparison tool
def compare_models():
    """Compare performance of different detection models"""
    detector = MultiModelPersonDetector()

    # Test each model on a sample frame
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam")
        return

    ret, test_frame = cap.read()
    cap.release()

    if not ret:
        print("Error: Could not capture test frame")
        return

    print("\nModel Performance Comparison:")
    print("-" * 50)

    results = {}

    for model_name in detector.models.keys():
        if model_name in detector.model_objects:
            start_time = time.time()

            # Switch to model and detect
            detector.switch_model(model_name)
            boxes, weights = detector.detect_people(test_frame)

            detection_time = time.time() - start_time
            fps = 1.0 / detection_time if detection_time > 0 else 0

            results[model_name] = {
                'detections': len(boxes),
                'fps': fps,
                'avg_confidence': np.mean(weights) if len(weights) > 0 else 0
            }

            print(f"{detector.models[model_name]}:")
            print(f"  Detections: {len(boxes)}")
            print(f"  FPS: {fps:.1f}")
            print(f"  Avg Confidence: {results[model_name]['avg_confidence']:.2f}")
            print()

    # Recommend best model
    if results:
        best_fps = max(results.items(), key=lambda x: x[1]['fps'])
        best_detections = max(results.items(), key=lambda x: x[1]['detections'])

        print("Recommendations:")
        print(f"  Best Speed: {detector.models[best_fps[0]]} ({best_fps[1]['fps']:.1f} FPS)")
        print(f"  Most Detections: {detector.models[best_detections[0]]} ({best_detections[1]['detections']} people)")

# Uncomment to run model comparison
# compare_models()


# Advanced Features

You can extend this system with:
- Face recognition to identify specific individuals
- Motion history tracking
- Email/SMS alerts for security breaches
- Integration with home automation systems
- Multiple camera support


In [5]:
# Optional: Analyze detection patterns
def analyze_detection_patterns():
    """Analyze patterns in the detection data"""
    # This would analyze stored detection data
    # For now, just show how to visualize detection frequency

    # Example visualization
    times = [datetime.now().hour + np.random.randint(-2, 3) for _ in range(50)]

    plt.figure(figsize=(10, 6))
    plt.hist(times, bins=24, alpha=0.7, color='blue')
    plt.xlabel('Hour of Day')
    plt.ylabel('Detection Frequency')
    plt.title('Person Detection Frequency by Hour')
    plt.grid(True, alpha=0.3)
    plt.show()

# Uncomment to run analysis
# analyze_detection_patterns()


# Run the Detection System

Execute the cell below to start the person detection system with interactive model selection.


In [6]:
def run_distance_demo():
    detector = MultiModelPersonDetector()
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam")
        return
    print("Distance Demo Started. Press 'q' to quit.")
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                print("Error: Could not read frame")
                break
            frame = cv2.flip(frame, 1)
            boxes, _ = detector.detect_people(frame)
            for box in boxes:
                x1, y1, x2, y2 = box
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                # Calculate and display distance
                distance = detector.calculate_distance_to_camera(box)
                cv2.putText(frame, f"Dist: {distance:.2f}", (x1, y1-10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.imshow('Distance Demo', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    finally:
        cap.release()
        cv2.destroyAllWindows()
        print("Distance demo stopped.")


In [7]:
run_person_detection()

Initializing detection models...
✓ HOG + SVM initialized
✓ YOLOv8 initialized on device: mps
✗ MobileNet SSD files not found (download required)
✓ Haar Cascade initialized
✓ Background Subtraction initialized




Person Detection System Started
Press 'q' to quit
Press 's' to save screenshot
Press 'r' to reset statistics
[22:53:43] ALERTS: Person 1 is too close to the camera! (Dist: 0.39)
Person detection system stopped
