In [None]:
# Improved Face Recognition and Security Monitoring System
# Enhanced for readability, maintainability, performance, and robustness

import cv2
import numpy as np
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
import logging
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from dataclasses import dataclass, field


# Constants for better maintainability
@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11l.pt"
    KNOWN_FACES_DIR: str = "family_members"
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "/media_files/WIN_20251103_14_11_20_Pro.mp4"
    # VIDEO_SOURCE: str = 0
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    FACE_CONFIDENCE: float = 0.5
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"
    
    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack", 
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop", 
        "book", "scissors", "knife"
    ])
    # New stricter recognition controls
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0


class SecuritySystem:
    """
    Enhanced security monitoring system with face recognition and object detection.
    
    Features:
    - Modular design with separate concerns
    - Robust error handling
    - Performance optimizations
    - Comprehensive logging
    - Configurable parameters
    """
    
    def __init__(self, config: Config):
        """Initialize the security system with given configuration."""
        self.config = config
        self.logger = self._setup_logging()
        
        # Initialize models and resources
        self.yolo_model = None
        self.mp_face_detection = None
        self.known_face_encodings = []
        self.known_face_names = []
        self.alarm_loaded = False
        
        # State variables
        self.frame_count = 0
        self.last_alert_time = 0
        # detection history to reduce false positives per coarse person id
        # { person_id: { 'name_counts': defaultdict(int), 'last_name': str, 'consecutive': int, 'last_update': float } }
        self.detection_history: Dict[str, Dict[str, Any]] = {}
        
        self._initialize_resources()
    
    def _setup_logging(self) -> logging.Logger:
        """Setup logging configuration."""
        logger = logging.getLogger('SecuritySystem')
        logger.setLevel(logging.INFO)
        
        # Create logs directory
        os.makedirs(self.config.LOG_DIR, exist_ok=True)
        
        # File handler
        log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
        
        logger.addHandler(file_handler)
        return logger
    
    def _initialize_resources(self) -> None:
        """Initialize all required models and resources."""
        try:
            # Initialize YOLO model
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)
            
            # Initialize MediaPipe
            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0, 
                min_detection_confidence=self.config.FACE_CONFIDENCE
            )
            
            # Load known faces
            self._load_known_faces()
            
            # Setup alarm
            self._setup_alarm()
            
            self.logger.info("System initialization completed successfully")
            
        except Exception as e:
            self.logger.error(f"Failed to initialize resources: {e}")
            raise
    
    def _load_known_faces(self) -> None:
        """Load known faces from directory with error handling."""
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning(f"Known faces directory {self.config.KNOWN_FACES_DIR} not found")
            return
        
        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
                
            for image_name in os.listdir(person_dir):
                image_path = os.path.join(self.config.KNOWN_FACES_DIR, person_name, image_name)
                try:
                    image = face_recognition.load_image_file(image_path)
                    encodings = face_recognition.face_encodings(image)
                    
                    if encodings:
                        self.known_face_encodings.append(encodings[0])
                        self.known_face_names.append(person_name)
                        self.logger.info(f"Loaded face: {person_name} from {image_name}")
                    else:
                        self.logger.warning(f"No faces found in {image_path}")
                        
                except Exception as e:
                    self.logger.error(f"Error loading {image_path}: {e}")
        
        unique_people = len(set(self.known_face_names))
        self.logger.info(f"Loaded {len(self.known_face_encodings)} face encodings for {unique_people} people")
    
    def _setup_alarm(self) -> None:
        """Setup alarm sound system."""
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm system initialized")
            else:
                self.logger.warning(f"Alarm file {self.config.ALARM_FILE} not found")
        except Exception as e:
            self.logger.error(f"Failed to setup alarm: {e}")
    
    def detect_objects(self, frame: np.ndarray) -> List[Dict[str, Any]]:
        """Detect objects using YOLO model."""
        try:
            results = self.yolo_model(frame, imgsz=640, verbose=False)
            detections = []
            
            for result in results:
                if result.boxes is None:
                    continue
                    
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls]
                    
                    # Validate bounding box
                    if x2 <= x1 or y2 <= y1:
                        continue
                        
                    detections.append({
                        'bbox': (x1, y1, x2, y2),
                        'class_name': class_name,
                        'confidence': conf,
                        'class_id': cls
                    })
            
            return detections
            
        except Exception as e:
            self.logger.error(f"Object detection failed: {e}")
            return []
    
    def detect_faces_mediapipe(self, roi: np.ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect faces in a region of interest using MediaPipe."""
        try:
            rgb_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb_roi)
            
            face_boxes = []
            if results.detections:
                h, w = roi.shape[:2]
                for detection in results.detections:
                    bbox = detection.location_data.relative_bounding_box
                    x = int(bbox.xmin * w)
                    y = int(bbox.ymin * h)
                    width = int(bbox.width * w)
                    height = int(bbox.height * h)
                    face_boxes.append((x, y, width, height))
            
            return face_boxes
            
        except Exception as e:
            self.logger.error(f"Face detection failed: {e}")
            return []
    
    def recognize_face(self, face_roi: np.ndarray) -> Optional[str]:
        """Recognize face in given region of interest."""
        try:
            if not self.known_face_encodings:
                return None
            
            # Resize for performance
            small_roi = cv2.resize(face_roi, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            rgb_small_roi = cv2.cvtColor(small_roi, cv2.COLOR_BGR2RGB)
            
            face_locations = face_recognition.face_locations(rgb_small_roi)
            if not face_locations:
                return None
            
            face_encodings = face_recognition.face_encodings(rgb_small_roi, face_locations)
            if not face_encodings:
                return None
            
            for face_encoding in face_encodings:
                matches = face_recognition.compare_faces(self.known_face_encodings, face_encoding)
                if any(matches):
                    distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
                    best_match_index = np.argmin(distances)
                    if matches[best_match_index]:
                        return self.known_face_names[best_match_index]
            
            return "Unknown"
            
        except Exception as e:
            self.logger.error(f"Face recognition failed: {e}")
            return None
    
    def draw_detections(self, frame: np.ndarray, detections: List[Dict[str, Any]], 
                       person_results: List[Dict[str, Any]]) -> np.ndarray:
        """Draw detection results on frame."""
        display_frame = frame.copy()
        
        # Draw object detections
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != "person":
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display_frame, label, (x1, y1 - 10), 
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
        
        # Draw person detections with recognition results
        for person in person_results:
            x1, y1, x2, y2 = person['bbox']
            color = (0, 255, 0) if person.get('recognized') else (0, 0, 255)
            
            cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 2)
            
            label = person.get('name', 'Person') if person.get('recognized') else "UNKNOWN"
            cv2.putText(display_frame, label, (x1, y1 - 10), 
                      cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            
            # Draw face bounding boxes if available
            for face_bbox in person.get('face_boxes', []):
                fx1, fy1, fx2, fy2 = face_bbox
                cv2.rectangle(display_frame, (fx1 + x1, fy1 + y1), (fx2 + x1, fy2 + y1), (255, 0, 0), 2)
                cv2.putText(display_frame, "Face", (fx1 + x1 + 5, fy1 + y1 - 5), 
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        return display_frame
    
    def process_alert(self, person_name: str, detected_objects: List[str]) -> None:
        """Handle alert logic with cooldown."""
        current_time = time.time()
        if current_time - self.last_alert_time > self.config.ALERT_COOLDOWN:
            self._trigger_alert(person_name, detected_objects)
            self.last_alert_time = current_time
    
    def _trigger_alert(self, person_name: str, detected_objects: List[str]) -> None:
        """Trigger alert mechanisms."""
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        
        self.logger.info(f"ALERT: Known person {person_name} detected with objects: {objects_str}")
        print(f"✅ Known Person Detected: {person_name}")
        
        # Play alarm if available
        if self.alarm_loaded and not pygame.mixer.music.get_busy():
            try:
                pygame.mixer.music.play()
            except Exception as e:
                self.logger.error(f"Failed to play alarm: {e}")
        
        # Email alert could be implemented here
        # send_email_alert(person_name, detected_objects)
    
    # --- New helper methods for robust recognition ---
    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        """Compute a coarse person id from bbox center quantized to reduce jitter."""
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    def _update_detection_history(self, person_id: str, name: str) -> None:
        """Update per-person recent votes and consecutive counts."""
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            from collections import defaultdict
            entry = {
                'name_counts': defaultdict(int),
                'last_name': None,
                'consecutive': 0,
                'last_update': now
            }
            self.detection_history[person_id] = entry

        # Reset history if stale
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            from collections import defaultdict
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0

        # Tally vote and consecutive
        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        """Decide whether the name is confirmed for the person_id using votes / consecutive frames / strict distance."""
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        # Quick accept if distance is very small (very confident)
        if name != "UNKNOWN" and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        # Accept if enough votes in time window
        votes = entry['name_counts'].get(name, 0)
        if name != "UNKNOWN" and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        # Accept if same name seen consecutively sufficient frames
        if name != "UNKNOWN" and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False

    def confirm_face_from_roi(self, face_roi: np.ndarray, bbox: Tuple[int, int, int, int]) -> Optional[Tuple[str, float]]:
        """Attempt recognition for the face_roi. Returns (name, distance) if confirmed, else None."""
        try:
            if not self.known_face_encodings:
                return None
            # resize & convert
            small = cv2.resize(face_roi, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
            face_locations = face_recognition.face_locations(rgb)
            if not face_locations:
                return None
            encodings = face_recognition.face_encodings(rgb, face_locations)
            if not encodings:
                return None
            # Use first face encoding for person ROI (speed)
            enc = encodings[0]
            distances = face_recognition.face_distance(self.known_face_encodings, enc)
            if distances is None or len(distances) == 0:
                return None
            best_idx = int(np.argmin(distances))
            best_dist = float(distances[best_idx])
            candidate_name = self.known_face_names[best_idx] if best_dist <= max(self.config.FACE_CONFIDENCE, self.config.RECOGNITION_DISTANCE_THRESHOLD) else "UNKNOWN"
            person_id = self._get_person_id(bbox)
            self._update_detection_history(person_id, candidate_name)
            confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
            if confirmed and candidate_name != "UNKNOWN":
                return candidate_name, best_dist
            return None
        except Exception as e:
            self.logger.error(f"confirm_face_from_roi failed: {e}")
            return None
    # --- end helpers ---

    def process_frame(self, frame: np.ndarray) -> np.ndarray:
        """Process a single frame and return annotated frame."""
        self.frame_count += 1
        current_time = time.time()
        
        # Detect objects
        detections = self.detect_objects(frame)
        
        # Separate persons and objects
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections 
                          if d['class_name'] in self.config.OBJECTS_OF_INTEREST 
                          and d['class_name'] != 'person']
        
        person_results = []
        
        # Process each person
        for person_det in person_detections:
            x1, y1, x2, y2 = person_det['bbox']
            person_roi = frame[y1:y2, x1:x2]
            
            result = {
                'bbox': (x1, y1, x2, y2),
                'recognized': False,
                'name': None,
                'face_boxes': []
            }
            
            if person_roi.size > 0:
                # Detect faces
                face_boxes = self.detect_faces_mediapipe(person_roi)
                result['face_boxes'] = face_boxes
                
                # Face recognition (interval-based)
                if self.frame_count % self.config.FACE_RECOGNITION_INTERVAL == 0 and face_boxes:
                    # pick first detected face for speed
                    fx, fy, fw, fh = face_boxes[0]
                    face_crop = person_roi[fy:fy+fh, fx:fx+fw]
                    confirmed = self.confirm_face_from_roi(face_crop, (x1, y1, x2, y2))
                    if confirmed:
                        name, dist = confirmed
                        result['recognized'] = True
                        result['name'] = name
                        
                        self.process_alert(name, detected_objects)
                    else:
                        # not confirmed -> treat as unknown (do not trigger alert)
                        self.logger.debug("Face not confirmed or unknown; skipping alert.")
            
            person_results.append(result)  # ensure results are collected
        
        # Annotate frame
        annotated_frame = self.draw_detections(frame, detections, person_results)
        
        # Add overlay information
        fps = cv2.getTickFrequency() / (cv2.getTickCount() - cv2.getTickCount())
        cv2.putText(annotated_frame, f"FPS: {int(fps)}", (20, 30), 
                  cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        if detected_objects:
            objects_text = f"Objects: {', '.join(set(detected_objects))}"
            cv2.putText(annotated_frame, objects_text, (20, 60), 
                      cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        
        return annotated_frame
    
    def run(self) -> None:
        """Main execution loop."""
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture device")
            raise RuntimeError("Video capture failed")
        
        self.logger.info("Security monitoring started. Press 'q' to quit.")
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                
                # Process frame
                annotated_frame = self.process_frame(frame)
                
                # Display
                cv2.imshow(self.config.WINDOW_NAME, annotated_frame)
                
                # Exit on 'q' press
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                    
        except KeyboardInterrupt:
            self.logger.info("Monitoring interrupted by user")
        except Exception as e:
            self.logger.error(f"Runtime error: {e}")
            raise
        finally:
            self._cleanup(cap)
    
    def _cleanup(self, cap) -> None:
        """Clean up resources."""
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            pygame.mixer.quit()
            self.logger.info("System shutdown completed")
        except Exception as e:
            self.logger.error(f"Cleanup error: {e}")

# Main execution
if __name__ == "__main__":
    config = Config()
    system = SecuritySystem(config)
    system.run()

pygame 2.6.1 (SDL 2.28.4, Python 3.12.8)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [None]:
# Improved known faces loader: multiple samples, CNN fallback, jittering and per-person averaging
import os
import numpy as np
import face_recognition

# Globals expected by other cells
known_face_encodings = []
known_face_names = []

known_faces_dir = "family_members"  # change if your folder is different

def load_known_faces(known_faces_dir: str = "family_members", num_jitters: int = 5, use_cnn: bool = True):
    """Load known faces with higher accuracy:
    - prefer CNN detector (more accurate) with fallback to HOG
    - compute encodings with multiple jitters
    - average multiple encodings per person into one template
    """
    global known_face_encodings, known_face_names
    encodings_map = {}  # person_name -> list of encodings

    if not os.path.exists(known_faces_dir):
        print(f"Known faces directory '{known_faces_dir}' not found. Skipping load.")
        return

    for person_name in sorted(os.listdir(known_faces_dir)):
        person_dir = os.path.join(known_faces_dir, person_name)
        if not os.path.isdir(person_dir):
            continue
        encodings_map.setdefault(person_name, [])

        for image_name in sorted(os.listdir(person_dir)):
            image_path = os.path.join(person_dir, image_name)
            try:
                image = face_recognition.load_image_file(image_path)
                face_locations = []

                # Try CNN detector first for better accuracy, fallback to HOG on failure
                if use_cnn:
                    try:
                        face_locations = face_recognition.face_locations(image, model="cnn")
                        if not face_locations:
                            # fallback to hog if cnn finds nothing
                            face_locations = face_recognition.face_locations(image, model="hog")
                    except Exception:
                        face_locations = face_recognition.face_locations(image, model="hog")
                else:
                    face_locations = face_recognition.face_locations(image, model="hog")

                if not face_locations:
                    print(f"No faces found in {image_path}, skipping.")
                    continue

                # Use multiple jitters for more stable encodings (slower but better)
                encs = face_recognition.face_encodings(image, face_locations, num_jitters=num_jitters)
                if not encs:
                    print(f"Could not compute encodings for {image_path}, skipping.")
                    continue

                for enc in encs:
                    encodings_map[person_name].append(enc)

                print(f"Loaded {len(encs)} face(s) for '{person_name}' from {image_name}")
            except Exception as e:
                print(f"Error loading {image_path}: {e}")

    # Average encodings per person to create a single robust template
    known_face_encodings = []
    known_face_names = []
    for person_name, enc_list in encodings_map.items():
        if not enc_list:
            print(f"Warning: No valid encodings collected for {person_name}")
            continue
        enc_array = np.stack(enc_list)
        if enc_array.shape[0] == 1:
            averaged = enc_array[0]
        else:
            averaged = np.mean(enc_array, axis=0)
        known_face_encodings.append(averaged)
        known_face_names.append(person_name)

    print(f"Finished loading known faces. People loaded: {len(known_face_names)}. Total templates: {len(known_face_encodings)}")

# Run loader now so downstream cells can use the variables
load_known_faces(known_faces_dir, num_jitters=5, use_cnn=True)

## Edited Nominal copy of face_recognition.ipynb

In [None]:
import cv2
import numpy as np
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import smtplib
import pygame
import os
import time
from datetime import datetime

# Initialize MediaPipe Face Detection
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils

def detect_faces_mediapipe(frame):
    """
    Detect faces using MediaPipe
    Returns list of face bounding boxes in format [x, y, w, h]
    """
    face_boxes = []
    
    with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) as face_detection:
        # Convert BGR to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_detection.process(rgb_frame)
        
        if results.detections:
            h, w, _ = frame.shape
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                x = int(bbox.xmin * w)
                y = int(bbox.ymin * h)
                width = int(bbox.width * w)
                height = int(bbox.height * h)
                face_boxes.append([x, y, width, height])
    
    return face_boxes

# Initialize YOLO model for object detection
# model = YOLO("runs/detect/train2/weights/best.pt")
model = YOLO("yolo11n.pt")

# Initialize MediaPipe for pose detection (not directly used for the requested features but kept for completeness)
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Load known faces
known_face_encodings = []
known_face_names = []

known_faces_dir = "family_members" 
if os.path.exists(known_faces_dir):
    for person_name in os.listdir(known_faces_dir):
        person_dir = os.path.join(known_faces_dir, person_name)
        if os.path.isdir(person_dir):
            for image_name in os.listdir(person_dir):
                image_path = os.path.join(person_dir, image_name)
                try:
                    image = face_recognition.load_image_file(image_path)
                    face_encodings = face_recognition.face_encodings(image)
                    if face_encodings:
                        known_face_encodings.append(face_encodings[0])
                        known_face_names.append(person_name)
                        print(f"Loaded face: {person_name} from {image_name}")
                except Exception as e:
                    print(f"Error loading {image_path}: {e}")

if known_face_encodings:
    print(f"Successfully loaded {len(known_face_encodings)} face encodings for {len(set(known_face_names))} people")
else:
    print("Warning: No face encodings loaded. Face recognition will not work.")

# Setup alarm sound
pygame.mixer.init()
alarm_file = "pols-aagyi-pols.mp3"
if os.path.exists(alarm_file):
    pygame.mixer.music.load(alarm_file)
else:
    print(f"Warning: Alarm file {alarm_file} not found")

# Create log directory
log_dir = "security_logs"
os.makedirs(log_dir, exist_ok=True)

def log_event(event_type, details=""):
    """Log security events to file"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_file = os.path.join(log_dir, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
    with open(log_file, "a") as f:
        f.write(f"{timestamp} - {event_type}: {details}\n")

# This function is not fully implemented for actual email sending, but logs the intent.
def send_email_alert(person_status, person_name="N/A", objects_detected=None):
    """Function to simulate sending email alert when a person is detected."""
    if objects_detected is None:
        objects_detected = []
    
    objects_str = ", ".join(objects_detected) if objects_detected else "None"

    if person_status == "KNOWN":
        subject = f"Security Alert: Known Person Detected - {person_name}"
        body = f"A known person, {person_name}, was detected at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n" \
               f"Objects detected: {objects_str}"
        print(f"Simulating email alert for KNOWN person: {person_name} with objects: {objects_str}")
        log_event("EMAIL_ALERT_KNOWN", f"To: security_team@example.com, Subject: {subject}")
    elif person_status == "UNKNOWN":
        subject = f"URGENT Security Alert: Unknown Person Detected!"
        body = f"An UNKNOWN person was detected at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n" \
               f"Objects detected: {objects_str}"
        print(f"Simulating email alert for UNKNOWN person with objects: {objects_str}")
        log_event("EMAIL_ALERT_UNKNOWN", f"To: security_team@example.com, Subject: {subject}")
    
    # In a real application, you would add smtplib code here to send the email.
    # For example:
    # try:
    #     server = smtplib.SMTP('smtp.your_email_provider.com', 587)
    #     server.starttls()
    #     server.login('your_email@example.com', 'your_password')
    #     msg = f"Subject: {subject}\n\n{body}"
    #     server.sendmail('your_email@example.com', 'security_team@example.com', msg)
    #     server.quit()
    #     log_event("EMAIL_SENT", f"Subject: {subject}")
    # except Exception as e:
    #     log_event("EMAIL_ERROR", f"Failed to send email: {e}")


# Start Video Capture
# cap = cv2.VideoCapture(0)
cap = cv2.VideoCapture("./media_files/WIN_20251103_14_11_20_Pro.mp4")
if not cap.isOpened():
    print("Error: Could not open video capture device")
    exit()

# Performance optimization variables
frame_count = 0
face_recognition_interval = 5  # Process face recognition every 5 frames
last_alert_time = 0
alert_cooldown = 10  # Seconds between alerts for the same type of event

# Define objects of interest (subset of COCO classes that YOLO can detect)
objects_of_interest = [
    "person", "bicycle", "car", "motorcycle", "bus", "truck", "mouse",
    "backpack", "umbrella", "handbag", "tie", "suitcase",
    "cell phone", "laptop", "book", "scissors", "knife", "face"
]

print("Security monitoring started. Press 'q' to quit.")
log_event("SYSTEM_START")

try:
    while True:
        timer = cv2.getTickCount()
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame")
            break
            
        frame_count += 1
        process_faces = frame_count % face_recognition_interval == 0
        current_time = time.time()
        
        results = model(frame, conf=0.5, verbose=False)        
        
        detected_objects = []
        
        # Iterate through YOLO results
        for result in results:
            boxes = result.boxes
            
            for i, box in enumerate(boxes):
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
                
                if x2 <= x1 or y2 <= y1:
                    continue
                
                cls = int(box.cls[0])
                conf = float(box.conf[0])
                class_name = result.names[cls]
                
                if class_name in objects_of_interest and class_name != "person":
                    detected_objects.append(class_name)
                    
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2) # Cyan for other objects
                    label = f"{class_name}: {conf:.2f}"
                    cv2.putText(frame, label, (x1, y1 - 10), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                
                if class_name == "person":
                    person_roi = frame[y1:y2, x1:x2]
                    
                    person_status = "UNKNOWN" # Default to unknown
                    person_name = "UNKNOWN"
                    
                    if person_roi.size > 0 and person_roi.shape[0] > 0 and person_roi.shape[1] > 0:
                        face_boxes = detect_faces_mediapipe(person_roi)
                        
                        # for face_box in face_boxes:
                        #     fx, fy, fw, fh = face_box
                        #     face_x1 = x1 + fx
                        #     face_y1 = y1 + fy
                        #     face_x2 = face_x1 + fw
                        #     face_y2 = face_y1 + fh
                            
                        #     cv2.rectangle(frame, (face_x1, face_y1), (face_x2, face_y2), (0, 255, 255), 2) # Red for face itself
                        #     cv2.putText(frame, "Face", (face_x1 + 5, face_y1 - 5), 
                        #               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                                
                        if process_faces and face_boxes: # Only process face recognition if faces are detected by MediaPipe
                            rgb_small_frame = cv2.cvtColor(cv2.resize(person_roi, (0, 0), fx=0.25, fy=0.25), cv2.COLOR_BGR2RGB)
                            face_locations_small = face_recognition.face_locations(rgb_small_frame)
                            
                            if face_locations_small:
                                face_encodings_small = face_recognition.face_encodings(rgb_small_frame, face_locations_small)
                                
                                for face_encoding in face_encodings_small:
                                    if known_face_encodings:
                                        matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                                        
                                        if any(matches):
                                            face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
                                            best_match_index = np.argmin(face_distances)
                                            if matches[best_match_index]:
                                                person_name = known_face_names[best_match_index]
                                                person_status = "KNOWN"
                                                break # Found a known person, no need to check other faces in this ROI
                                    
                    # Draw person box based on status
                    if person_status == "KNOWN":
                        box_color = (0, 255, 0)  # Green for known
                        label = f"KNOWN: {person_name}"
                        cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
                        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
                        print(f"✅ Known Person Detected: {person_name}")
                        log_event("KNOWN_PERSON", f"Detected: {person_name} with objects: {', '.join(detected_objects) if detected_objects else 'None'}")
                        if current_time - last_alert_time > alert_cooldown:
                            # send_email_alert("KNOWN", person_name, detected_objects)
                            last_alert_time = current_time
                    else:
                        box_color = (0, 165, 255) # Orange for unknown
                        label = "UNKNOWN"
                        cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
                        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
                        print("⚠️ Unknown Person Detected!")
                        log_event("UNKNOWN_PERSON", f"With objects: {', '.join(detected_objects) if detected_objects else 'None'}")
                        if current_time - last_alert_time > alert_cooldown:
                            if os.path.exists(alarm_file) and not pygame.mixer.music.get_busy():
                                pygame.mixer.music.play()
                            # send_email_alert("UNKNOWN", objects_detected=detected_objects)
                            last_alert_time = current_time
        
        # Display detected objects summary
        if detected_objects:
            objects_text = f"Objects: {', '.join(set(detected_objects))}"
            cv2.putText(frame, objects_text, (20, 60), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        
        # Calculate and display FPS
        fps = cv2.getTickFrequency() / (cv2.getTickCount() - timer)
        cv2.putText(frame, f"FPS: {int(fps)}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        cv2.imshow('Security Monitoring', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
except Exception as e:
    print(f"An error occurred: {e}")
    log_event("SYSTEM_ERROR", str(e))
finally:
    cap.release()
    cv2.destroyAllWindows()
    pose.close()
    pygame.mixer.quit()
    log_event("SYSTEM_SHUTDOWN")
   

pygame 2.6.1 (SDL 2.28.4, Python 3.12.8)
Hello from the pygame community. https://www.pygame.org/contribute.html
Loaded face: robin from image1.jpg
Loaded face: ruhama from image1.jpg
Loaded face: ruhama from image2.jpg
Loaded face: ruhama from image3.jpg
Successfully loaded 4 face encodings for 2 people
Security monitoring started. Press 'q' to quit.
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
✅ Known Person Detected: robin
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
✅ Known Person Detected: robin
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
✅ Known Person Detected: robin
⚠️ Unknown Person Detected!
⚠️ Unknown Person Detected!
⚠️ Unknown Person Det

## Nominal copy of face_recognition.ipynb


In [None]:
import cv2
import numpy as np
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import smtplib
import pygame
import os
import time
from datetime import datetime

# Initialize MediaPipe Face Detection
mp_face_detection = mp.solutions.face_detection
mp_drawing = mp.solutions.drawing_utils

def detect_faces_mediapipe(frame):
    """
    Detect faces using MediaPipe
    Returns list of face bounding boxes in format [x, y, w, h]
    """
    face_boxes = []
    
    with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.5) as face_detection:
        # Convert BGR to RGB for MediaPipe
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_detection.process(rgb_frame)
        
        if results.detections:
            h, w, _ = frame.shape
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                x = int(bbox.xmin * w)
                y = int(bbox.ymin * h)
                width = int(bbox.width * w)
                height = int(bbox.height * h)
                face_boxes.append([x, y, width, height])
    
    return face_boxes

# Initialize YOLO model for object detection
# model = YOLO("runs/detect/train2/weights/best.pt")
model = YOLO("yolo11m.pt")

# Initialize MediaPipe for pose detection (not directly used for the requested features but kept for completeness)
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Load known faces
known_face_encodings = []
known_face_names = []

known_faces_dir = "family_members"  # Create this directory and add images
if os.path.exists(known_faces_dir):
    for person_name in os.listdir(known_faces_dir):
        person_dir = os.path.join(known_faces_dir, person_name)
        if os.path.isdir(person_dir):
            for image_name in os.listdir(person_dir):
                image_path = os.path.join(person_dir, image_name)
                try:
                    image = face_recognition.load_image_file(image_path)
                    face_encodings = face_recognition.face_encodings(image)
                    if face_encodings:
                        known_face_encodings.append(face_encodings[0])
                        known_face_names.append(person_name)
                        print(f"Loaded face: {person_name} from {image_name}")
                except Exception as e:
                    print(f"Error loading {image_path}: {e}")

if known_face_encodings:
    print(f"Successfully loaded {len(known_face_encodings)} face encodings for {len(set(known_face_names))} people")
else:
    print("Warning: No face encodings loaded. Face recognition will not work.")

# Setup alarm sound
pygame.mixer.init()
alarm_file = "pols-aagyi-pols.mp3"
if os.path.exists(alarm_file):
    pygame.mixer.music.load(alarm_file)
else:
    print(f"Warning: Alarm file {alarm_file} not found")

# Create log directory
log_dir = "security_logs"
os.makedirs(log_dir, exist_ok=True)

def log_event(event_type, details=""):
    """Log security events to file"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_file = os.path.join(log_dir, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
    with open(log_file, "a") as f:
        f.write(f"{timestamp} - {event_type}: {details}\n")

# This function is not fully implemented for actual email sending, but logs the intent.
def send_email_alert(person_status, person_name="N/A", objects_detected=None):
    """Function to simulate sending email alert when a person is detected."""
    if objects_detected is None:
        objects_detected = []
    
    objects_str = ", ".join(objects_detected) if objects_detected else "None"

    if person_status == "KNOWN":
        subject = f"Security Alert: Known Person Detected - {person_name}"
        body = f"A known person, {person_name}, was detected at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n" \
               f"Objects detected: {objects_str}"
        print(f"Simulating email alert for KNOWN person: {person_name} with objects: {objects_str}")
        log_event("EMAIL_ALERT_KNOWN", f"To: security_team@example.com, Subject: {subject}")
    elif person_status == "UNKNOWN":
        subject = f"URGENT Security Alert: Unknown Person Detected!"
        body = f"An UNKNOWN person was detected at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n" \
               f"Objects detected: {objects_str}"
        print(f"Simulating email alert for UNKNOWN person with objects: {objects_str}")
        log_event("EMAIL_ALERT_UNKNOWN", f"To: security_team@example.com, Subject: {subject}")
    
    # In a real application, you would add smtplib code here to send the email.
    # For example:
    # try:
    #     server = smtplib.SMTP('smtp.your_email_provider.com', 587)
    #     server.starttls()
    #     server.login('your_email@example.com', 'your_password')
    #     msg = f"Subject: {subject}\n\n{body}"
    #     server.sendmail('your_email@example.com', 'security_team@example.com', msg)
    #     server.quit()
    #     log_event("EMAIL_SENT", f"Subject: {subject}")
    # except Exception as e:
    #     log_event("EMAIL_ERROR", f"Failed to send email: {e}")


# Start Video Capture
# cap = cv2.VideoCapture(0)
cap = cv2.VideoCapture("./media_files/WIN_20251103_14_11_20_Pro.mp4")
if not cap.isOpened():
    print("Error: Could not open video capture device")
    exit()

# Performance optimization variables
frame_count = 0
face_recognition_interval = 5  # Process face recognition every 5 frames
last_alert_time = 0
alert_cooldown = 10  # Seconds between alerts for the same type of event

# Define objects of interest (subset of COCO classes that YOLO can detect)
objects_of_interest = [
    "person", "bicycle", "car", "motorcycle", "bus", "truck", "mouse",
    "backpack", "umbrella", "handbag", "tie", "suitcase",
    "cell phone", "laptop", "book", "scissors", "knife", "face"
]

print("Security monitoring started. Press 'q' to quit.")
log_event("SYSTEM_START")

try:
    while True:
        timer = cv2.getTickCount()
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame")
            break
            
        frame_count += 1
        process_faces = frame_count % face_recognition_interval == 0
        current_time = time.time()
        
        results = model(frame, conf=0.5, verbose=False)        
        
        detected_objects = []
        
        # Iterate through YOLO results
        for result in results:
            boxes = result.boxes
            
            for i, box in enumerate(boxes):
                x1, y1, x2, y2 = map(int, box.xyxy[0])
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)
                
                if x2 <= x1 or y2 <= y1:
                    continue
                
                cls = int(box.cls[0])
                conf = float(box.conf[0])
                class_name = result.names[cls]
                
                if class_name in objects_of_interest and class_name != "person":
                    detected_objects.append(class_name)
                    
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2) # Cyan for other objects
                    label = f"{class_name}: {conf:.2f}"
                    cv2.putText(frame, label, (x1, y1 - 10), 
                              cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                
                if class_name == "person":
                    person_roi = frame[y1:y2, x1:x2]
                    
                    person_status = "UNKNOWN" # Default to unknown
                    person_name = "UNKNOWN"
                    
                    if person_roi.size > 0 and person_roi.shape[0] > 0 and person_roi.shape[1] > 0:
                        face_boxes = detect_faces_mediapipe(person_roi)
                        
                        # for face_box in face_boxes:
                        #     fx, fy, fw, fh = face_box
                        #     face_x1 = x1 + fx
                        #     face_y1 = y1 + fy
                        #     face_x2 = face_x1 + fw
                        #     face_y2 = face_y1 + fh
                            
                        #     cv2.rectangle(frame, (face_x1, face_y1), (face_x2, face_y2), (0, 255, 255), 2) # Red for face itself
                        #     cv2.putText(frame, "Chor", (face_x1 + 5, face_y1 - 5), 
                        #               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                                
                        if process_faces and face_boxes: # Only process face recognition if faces are detected by MediaPipe
                            rgb_small_frame = cv2.cvtColor(cv2.resize(person_roi, (0, 0), fx=0.25, fy=0.25), cv2.COLOR_BGR2RGB)
                            face_locations_small = face_recognition.face_locations(rgb_small_frame)
                            
                            if face_locations_small:
                                face_encodings_small = face_recognition.face_encodings(rgb_small_frame, face_locations_small)
                                
                                for face_encoding in face_encodings_small:
                                    if known_face_encodings:
                                        matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                                        
                                        if any(matches):
                                            face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
                                            best_match_index = np.argmin(face_distances)
                                            if matches[best_match_index]:
                                                person_name = known_face_names[best_match_index]
                                                person_status = "KNOWN"
                                                break # Found a known person, no need to check other faces in this ROI
                                    
                    # Draw person box based on status
                    if person_status == "KNOWN":
                        box_color = (0, 255, 0)  # Green for known
                        label = f"KNOWN: {person_name}"
                        cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
                        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
                        print(f"✅ Known Person Detected: {person_name}")
                        log_event("KNOWN_PERSON", f"Detected: {person_name} with objects: {', '.join(detected_objects) if detected_objects else 'None'}")
                        if current_time - last_alert_time > alert_cooldown:
                            # send_email_alert("KNOWN", person_name, detected_objects)
                            last_alert_time = current_time
                    else:
                        box_color = (0, 165, 255) # Orange for unknown
                        label = "UNKNOWN"
                        cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
                        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, box_color, 2)
                        print("⚠️ Unknown Person Detected!")
                        log_event("UNKNOWN_PERSON", f"With objects: {', '.join(detected_objects) if detected_objects else 'None'}")
                        if current_time - last_alert_time > alert_cooldown:
                            if os.path.exists(alarm_file) and not pygame.mixer.music.get_busy():
                                pygame.mixer.music.play()
                            # send_email_alert("UNKNOWN", objects_detected=detected_objects)
                            last_alert_time = current_time
        
        # Display detected objects summary
        if detected_objects:
            objects_text = f"Objects: {', '.join(set(detected_objects))}"
            cv2.putText(frame, objects_text, (20, 60), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        
        # Calculate and display FPS
        fps = cv2.getTickFrequency() / (cv2.getTickCount() - timer)
        cv2.putText(frame, f"FPS: {int(fps)}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        
        cv2.imshow('Security Monitoring', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
except Exception as e:
    print(f"An error occurred: {e}")
    log_event("SYSTEM_ERROR", str(e))
finally:
    cap.release()
    cv2.destroyAllWindows()
    pose.close()
    pygame.mixer.quit()
    log_event("SYSTEM_SHUTDOWN")
   

In [None]:
!nvidia-smi

In [None]:
import torch
torch.cuda.is_available()

In [None]:
# Consolidated, cleaned and runnable version of the notebook code.
# Preserves comments and intent; fixes naming/type inconsistencies and unused imports.

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
from numpy import ndarray
import numpy as np
import cv2
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
import logging

@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11m.pt"
    KNOWN_FACES_DIR: str = "family_members"
    # use a neutral short filename to avoid word-checker flags
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "./media_files/WIN_20251103_14_11_20_Pro.mp4"  # camera index or path
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    FACE_CONFIDENCE: float = 0.5
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"

    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack",
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop",
        "book", "scissors", "knife"
    ])

    # Stricter recognition controls to reduce false positives
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0  # seconds
    

class SecuritySystem:
    """
    Enhanced security monitoring system with face recognition and object detection.

    Features:
    - Modular design with separate concerns
    - Robust error handling
    - Performance optimizations
    - Comprehensive logging
    - Configurable parameters
    """

    def __init__(self, config: Config):
        self.config = config
        self.logger = self._setup_logging()

        # Models and resources
        self.yolo_model: Optional[YOLO] = None
        self.mp_face_detection = None
        self.known_face_encodings: List[ndarray] = []
        self.known_face_names: List[str] = []
        self.alarm_loaded = False

        # State
        self.frame_count = 0
        self.last_alert_time = 0.0

        # Per-person detection history to reduce false positives
        # { person_id: { 'name_counts': defaultdict(int), 'last_name': str, 'consecutive': int, 'last_update': float } }
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # Initialize heavy resources
        self._initialize_resources()

    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SecuritySystem')
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            os.makedirs(self.config.LOG_DIR, exist_ok=True)
            log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
            fh = logging.FileHandler(log_file)
            fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(sh)
        return logger

    def _initialize_resources(self) -> None:
        try:
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)

            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0,
                min_detection_confidence=self.config.FACE_CONFIDENCE
            )

            self._load_known_faces()
            self._setup_alarm()

            self.logger.info("System initialization completed successfully")
        except Exception as e:
            self.logger.error(f"Failed to initialize resources: {e}")
            raise

    def _load_known_faces(self) -> None:
        """Load known faces from directory with error handling."""
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning(f"Known faces directory {self.config.KNOWN_FACES_DIR} not found")
            return

        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
            for image_name in os.listdir(person_dir):
                image_path = os.path.join(person_dir, image_name)
                try:
                    image = face_recognition.load_image_file(image_path)
                    encodings = face_recognition.face_encodings(image)
                    if encodings:
                        self.known_face_encodings.append(encodings[0])
                        self.known_face_names.append(person_name)
                        self.logger.info(f"Loaded face: {person_name} from {image_name}")
                    else:
                        self.logger.warning(f"No faces found in {image_path}")
                except Exception as e:
                    self.logger.error(f"Error loading {image_path}: {e}")

        unique_people = len(set(self.known_face_names))
        self.logger.info(f"Loaded {len(self.known_face_encodings)} face encodings for {unique_people} people")

    def _setup_alarm(self) -> None:
        """Setup alarm sound system."""
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm system initialized")
            else:
                self.logger.warning(f"Alarm file {self.config.ALARM_FILE} not found")
        except Exception as e:
            self.logger.error(f"Failed to setup alarm: {e}")

    def detect_objects(self, frame: ndarray) -> List[Dict[str, Any]]:
        """Detect objects using YOLO model and return normalized detections."""
        if not self.yolo_model:
            return []
        try:
            results = self.yolo_model(frame, imgsz=640, verbose=False)
            detections: List[Dict[str, Any]] = []
            for result in results:
                if getattr(result, "boxes", None) is None:
                    continue
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls] if hasattr(result, "names") else str(cls)
                    if x2 <= x1 or y2 <= y1:
                        continue
                    detections.append({
                        'bbox': (x1, y1, x2, y2),
                        'class_name': class_name,
                        'confidence': conf,
                        'class_id': cls
                    })
            return detections
        except Exception as e:
            self.logger.error(f"Object detection failed: {e}")
            return []

    def detect_faces_mediapipe(self, roi: ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect faces in a region of interest using MediaPipe and return list of (x,y,w,h)."""
        try:
            rgb_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb_roi)
            face_boxes: List[Tuple[int, int, int, int]] = []
            if results and getattr(results, "detections", None):
                h, w = roi.shape[:2]
                for detection in results.detections:
                    bbox = detection.location_data.relative_bounding_box
                    x = int(bbox.xmin * w)
                    y = int(bbox.ymin * h)
                    width = int(bbox.width * w)
                    height = int(bbox.height * h)
                    face_boxes.append((x, y, width, height))
            return face_boxes
        except Exception as e:
            self.logger.error(f"Face detection failed: {e}")
            return []

    def recognize_face(self, face_roi: ndarray) -> Optional[str]:
        """Recognize face in given ROI; returns known name or 'Unknown' or None on failure."""
        try:
            if not self.known_face_encodings:
                return None
            small_roi = cv2.resize(face_roi, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            rgb_small = cv2.cvtColor(small_roi, cv2.COLOR_BGR2RGB)
            face_locations = face_recognition.face_locations(rgb_small)
            if not face_locations:
                return None
            encodings = face_recognition.face_encodings(rgb_small, face_locations)
            if not encodings:
                return None
            for enc in encodings:
                matches = face_recognition.compare_faces(self.known_face_encodings, enc)
                distances = face_recognition.face_distance(self.known_face_encodings, enc)
                if len(distances) == 0:
                    continue
                best_index = int(np.argmin(distances))
                if matches and matches[best_index]:
                    return self.known_face_names[best_index]
            return "Unknown"
        except Exception as e:
            self.logger.error(f"Face recognition failed: {e}")
            return None

    # Robust recognition helpers (vote-based) to reduce false positives
    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    def _update_detection_history(self, person_id: str, name: str, distance: float) -> None:
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            entry = {
                'name_counts': defaultdict(int),
                'last_name': None,
                'consecutive': 0,
                'last_update': now
            }
            self.detection_history[person_id] = entry

        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0

        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now
    # def _update_detection_history(self, person_id: str, name: str, distance: Optional[float] = None) -> None:
    #     """
    #     Update per-person recent votes and consecutive counts.
    #     Stores last_distance to avoid 'unused parameter' warnings and enable distance-aware logic.
    #     """
    #     now = time.time()
    #     entry = self.detection_history.get(person_id)
    #     if entry is None:
    #         entry = {
    #             'name_counts': defaultdict(int),
    #             'last_name': None,
    #             'consecutive': 0,
    #             'last_update': now,
    #             'last_distance': None
    #         }
    #         self.detection_history[person_id] = entry

    #     # Reset history if stale
    #     if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
    #         entry['name_counts'] = defaultdict(int)
    #         entry['last_name'] = None
    #         entry['consecutive'] = 0
    #         entry['last_distance'] = None

    #     # Tally vote and consecutive
    #     entry['name_counts'][name] += 1
    #     if entry['last_name'] == name:
    #         entry['consecutive'] += 1
    #     else:
    #         entry['last_name'] = name
    #         entry['consecutive'] = 1
    #     entry['last_update'] = now
    #     entry['last_distance'] = distance

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        if name != "UNKNOWN" and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        votes = entry['name_counts'].get(name, 0)
        if name != "UNKNOWN" and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        if name != "UNKNOWN" and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False
    # def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
    #     """
    #     Decide whether the name is confirmed for the person_id using votes / consecutive frames / strict distance.
    #     - For known names: accept if distance <= threshold OR votes/consecutive reached.
    #     - For UNKNOWN: accept only if repeated UNKNOWN observations (votes or consecutive) indicate persistent unknown -> triggers alarm.
    #     """
    #     now = time.time()
    #     entry = self.detection_history.get(person_id)
    #     if not entry:
    #         return False
    #     if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
    #         return False

    #     # Confirm known persons by low distance OR votes / consecutive
    #     if name != "UNKNOWN":
    #         if distance is not None and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
    #             return True
    #         votes = entry['name_counts'].get(name, 0)
    #         if votes >= self.config.RECOGNITION_MIN_VOTES:
    #             return True
    #         if entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES and entry['last_name'] == name:
    #             return True
    #         return False

    #     # Confirm UNKNOWN only by repeated observations (avoid single-frame false alarms)
    #     # This prevents false alarms when a known person momentarily fails recognition.
    #     unknown_votes = entry['name_counts'].get("UNKNOWN", 0)
    #     if unknown_votes >= self.config.RECOGNITION_MIN_VOTES:
    #         return True
    #     if entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES and entry['last_name'] == "UNKNOWN":
    #         return True
    #     return False

    def confirm_face(self, face_encoding: ndarray, bbox: Tuple[int, int, int, int]) -> Tuple[Optional[str], bool, Optional[float]]:
        """Return (name_or_None, confirmed_bool, distance_or_None)."""
        if not self.known_face_encodings:
            return None, False, None
        try:
            distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
        except Exception as e:
            self.logger.error(f"face_distance failed: {e}")
            return None, False, None
        if distances is None or len(distances) == 0:
            return None, False, None
        best_idx = int(np.argmin(distances))
        best_dist = float(distances[best_idx])
        candidate_name = self.known_face_names[best_idx] if best_dist <= self.config.FACE_CONFIDENCE else "UNKNOWN"
        person_id = self._get_person_id(bbox)
        self._update_detection_history(person_id, candidate_name, best_dist)
        confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
        if confirmed and candidate_name != "UNKNOWN":
            return candidate_name, True, best_dist
        return None, False, best_dist

    def draw_detections(self, frame: ndarray, detections: List[Dict[str, Any]], person_results: List[Dict[str, Any]]) -> ndarray:
        """Draw detection results on frame."""
        display = frame.copy()
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != "person":
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 255), 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

        for person in person_results:
            x1, y1, x2, y2 = person['bbox']
            color = (0, 255, 0) if person.get('recognized') else (0, 165, 255)
            cv2.rectangle(display, (x1, y1), (x2, y2), color, 2)
            label = person.get('name', 'Person') if person.get('recognized') else "UNKNOWN"
            cv2.putText(display, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            for fx, fy, fw, fh in person.get('face_boxes', []):
                cv2.rectangle(display, (x1 + fx, y1 + fy), (x1 + fx + fw, y1 + fy + fh), (255, 0, 0), 2)
                cv2.putText(display, "Face", (x1 + fx + 5, y1 + fy - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        return display

    # def process_alert(self, person_name: str, detected_objects: List[str]) -> None:
    #     current_time = time.time()
    #     if current_time - self.last_alert_time > self.config.ALERT_COOLDOWN:
    #         self._trigger_alert(person_name, detected_objects)
    #         self.last_alert_time = current_time
    def process_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        """
        Handle alert logic with cooldown.
        Only triggers alarm sound for unknown persons (is_known == False).
        """
        current_time = time.time()
        if current_time - self.last_alert_time <= self.config.ALERT_COOLDOWN:
            # still in cooldown
            return

        # Only trigger alert for unknown persons to avoid false alarms for known people
        if not is_known:
            self._trigger_alert(person_name, detected_objects, is_known=False)
            self.last_alert_time = current_time
        else:
            # Log known person detection but do not play alarm
            self.logger.info(f"Known person '{person_name}' detected. No alarm triggered.")

    # def _trigger_alert(self, person_name: str, detected_objects: List[str]) -> None:
    #     objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
    #     self.logger.info(f"ALERT: Known person {person_name} detected with objects: {objects_str}")
    #     if self.alarm_loaded and not pygame.mixer.music.get_busy():
    #         try:
    #             pygame.mixer.music.play()
    #         except Exception as e:
    #             self.logger.error(f"Failed to play alarm: {e}")
    def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        """
        Trigger alert mechanisms.
        Plays alarm only when is_known is False.
        """
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        if is_known:
            # Do not play alarm for verified known persons
            self.logger.info(f"ALERT(Logged only): Known person {person_name} detected with objects: {objects_str}")
            return

        # Unknown person -> play alarm and log
        self.logger.info(f"ALERT: UNKNOWN person detected with objects: {objects_str}")
        try:
            if self.alarm_loaded and not pygame.mixer.music.get_busy():
                pygame.mixer.music.play()
        except Exception as e:
            self.logger.error(f"Failed to play alarm: {e}")

    def process_frame(self, frame: ndarray) -> ndarray:
        self.frame_count += 1
        timer = cv2.getTickCount()
        detections = self.detect_objects(frame)
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

        person_results: List[Dict[str, Any]] = []
        process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

        for det in person_detections:
            x1, y1, x2, y2 = det['bbox']
            person_roi = frame[y1:y2, x1:x2]
            result = {'bbox': (x1, y1, x2, y2), 'recognized': False, 'name': None, 'face_boxes': []}
            if person_roi.size > 0:
                face_boxes = self.detect_faces_mediapipe(person_roi)
                result['face_boxes'] = face_boxes
                if process_faces and face_boxes:
                    # attempt recognition from the first face region for speed
                    fx, fy, fw, fh = face_boxes[0]
                    face_crop = person_roi[fy:fy+fh, fx:fx+fw]
                    name = self.recognize_face(face_crop)
                    if name:
                        result['recognized'] = (name != "Unknown")
                        result['name'] = name if name != "Unknown" else None
                        if result['recognized'] and name:
                            self.process_alert(name, detected_objects, is_known=True)
                        else:
                            self.process_alert(name, detected_objects, is_known=False)   
                    # fx, fy, fw, fh = face_boxes[0]
                    # # clamp coordinates to ROI bounds
                    # h_roi, w_roi = person_roi.shape[:2]
                    # x0 = max(0, fx); y0 = max(0, fy)
                    # x1 = min(w_roi, fx + fw); y1 = min(h_roi, fy + fh)
                    # if x1 > x0 and y1 > y0:
                    #     face_crop = person_roi[y0:y1, x0:x1]
                    #     name = self.recognize_face(face_crop)
                    #     # if recognition failed (None), skip alerting
                    #     if name is None:
                    #         pass
                    #     else:
                    #         result['recognized'] = (name != "Unknown")
                    #         result['name'] = name if name != "Unknown" else None
                    #         self.process_alert(name, detected_objects, is_known=result['recognized']) 
            person_results.append(result)

        annotated = self.draw_detections(frame, detections, person_results)
        elapsed = max(1, cv2.getTickCount() - timer)
        fps = int(cv2.getTickFrequency() / elapsed)
        cv2.putText(annotated, f"FPS: {fps}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        if detected_objects:
            cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        return annotated
    
    # def process_frame(self, frame: ndarray) -> ndarray:
    #     """
    #     Process a single frame and return annotated frame.
    #     Updated to:
    #       - confirm known faces and avoid alarms for them
    #       - detect persistent UNKNOWN faces and trigger alarm only when unknown confirmed by voting/consecutive frames
    #     """
    #     self.frame_count += 1
    #     timer = cv2.getTickCount()
    #     detections = self.detect_objects(frame)
    #     person_detections = [d for d in detections if d['class_name'] == 'person']
    #     detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

    #     person_results: List[Dict[str, Any]] = []
    #     process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

    #     for det in person_detections:
    #         x1, y1, x2, y2 = det['bbox']
    #         person_roi = frame[y1:y2, x1:x2]
    #         result = {'bbox': (x1, y1, x2, y2), 'recognized': False, 'name': None, 'face_boxes': []}

    #         if person_roi.size > 0:
    #             face_boxes = self.detect_faces_mediapipe(person_roi)
    #             result['face_boxes'] = face_boxes

    #             if process_faces and face_boxes:
    #                 fx, fy, fw, fh = face_boxes[0]
    #                 face_crop = person_roi[fy:fy+fh, fx:fx+fw]

    #                 # Try to confirm a known person from face ROI
    #                 confirmed = self.confirm_face_from_roi(face_crop, (x1, y1, x2, y2))
    #                 person_id = self._get_person_id((x1, y1, x2, y2))

    #                 if confirmed:
    #                     # confirmed is (name, distance)
    #                     name, dist = confirmed
    #                     result['recognized'] = True
    #                     result['name'] = name
    #                     # update history already done in confirm_face_from_roi; ensure no alarm
    #                     self.logger.info(f"Recognized: {name} (dist={dist:.3f}) at id={person_id}")
    #                     # log but do not play alarm
    #                     self.process_alert(name, detected_objects, is_known=True)
    #                 else:
    #                     # Not confirmed as known -> mark UNKNOWN in history and check if UNKNOWN is persistent
    #                     # Use a placeholder distance value (e.g., large)
    #                     placeholder_distance = 1.0
    #                     self._update_detection_history(person_id, "UNKNOWN", placeholder_distance)

    #                     # If UNKNOWN is confirmed by voting/consecutive then trigger alarm
    #                     if self._confirm_recognition(person_id, "UNKNOWN", placeholder_distance):
    #                         self.logger.info(f"Persistent UNKNOWN detected at id={person_id} -> triggering alert")
    #                         self.process_alert("UNKNOWN", detected_objects, is_known=False)
    #                     else:
    #                         self.logger.debug(f"UNKNOWN observed at id={person_id} (not yet persistent)")

    #         person_results.append(result)

    #     annotated = self.draw_detections(frame, detections, person_results)
    #     elapsed = max(1, cv2.getTickCount() - timer)
    #     fps = int(cv2.getTickFrequency() / elapsed)
    #     cv2.putText(annotated, f"FPS: {fps}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
    #     if detected_objects:
    #         cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
    #     return annotated
      
# ...existing code...
    # def confirm_face_from_roi(self, face_roi: np.ndarray, bbox: Tuple[int, int, int, int]) -> Optional[Tuple[str, float]]:
    #     """Attempt recognition for the face_roi. Returns (name, distance) if confirmed, else None."""
    #     try:
    #         if not self.known_face_encodings:
    #             return None

    #         # Resize for performance and convert to RGB
    #         small = cv2.resize(face_roi, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
    #         rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)

    #         # Detect faces and compute encodings
    #         face_locations = face_recognition.face_locations(rgb)
    #         if not face_locations:
    #             return None
    #         encodings = face_recognition.face_encodings(rgb, face_locations)
    #         if not encodings:
    #             return None

    #         # Use first encoding for this ROI (speed)
    #         enc = encodings[0]
    #         distances = face_recognition.face_distance(self.known_face_encodings, enc)
    #         if distances is None or len(distances) == 0:
    #             return None

    #         best_idx = int(np.argmin(distances))
    #         best_dist = float(distances[best_idx])

    #         # Determine candidate name using strict threshold (allow using recognition distance config)
    #         threshold = max(self.config.FACE_CONFIDENCE, self.config.RECOGNITION_DISTANCE_THRESHOLD)
    #         candidate_name = self.known_face_names[best_idx] if best_dist <= threshold else "UNKNOWN"

    #         # Update history and confirm using voting/distance rules
    #         person_id = self._get_person_id(bbox)
    #         self._update_detection_history(person_id, candidate_name, best_dist)
    #         confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)

    #         if confirmed and candidate_name != "UNKNOWN":
    #             return candidate_name, best_dist

    #         return None

    #     except Exception as e:
    #         self.logger.error(f"confirm_face_from_roi failed: {e}")
    #         return None
# ...existing code...


    def run(self) -> None:
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture device")
            raise RuntimeError("Video capture failed")
        self.logger.info("Security monitoring started. Press 'q' to quit.")
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                annotated = self.process_frame(frame)
                cv2.imshow(self.config.WINDOW_NAME, annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Monitoring interrupted by user")
        except Exception as e:
            self.logger.error(f"Runtime error: {e}")
            raise
        finally:
            self._cleanup(cap)

    def _cleanup(self, cap) -> None:
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            pygame.mixer.quit()
            self.logger.info("System shutdown completed")
        except Exception as e:
            self.logger.error(f"Cleanup error: {e}")

if __name__ == "__main__":
    cfg = Config()
    system = SecuritySystem(cfg)
    system.run()  # uncomment to run live monitoring

In [None]:
# Consolidated, cleaned and runnable version of the notebook code.
# Preserves comments and intent; fixes naming/type inconsistencies and unused imports.

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
from numpy import ndarray
import numpy as np
import cv2
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
import logging

@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11n.pt"
    KNOWN_FACES_DIR: str = "family_members"
    # use a neutral short filename to avoid word-checker flags
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "./media_files/animal_surveillance/goru-churi.mp4"  # camera index or path
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    FACE_CONFIDENCE: float = 0.5
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"

    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack",
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop",
        "book", "scissors", "knife"
    ])

    # Stricter recognition controls to reduce false positives
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0  # seconds

class SecuritySystem:
    """
    Enhanced security monitoring system with face recognition and object detection.

    Features:
    - Modular design with separate concerns
    - Robust error handling
    - Performance optimizations
    - Comprehensive logging
    - Configurable parameters
    """

    def __init__(self, config: Config):
        self.config = config
        self.logger = self._setup_logging()

        # Models and resources
        self.yolo_model: Optional[YOLO] = None
        self.mp_face_detection = None
        self.known_face_encodings: List[ndarray] = []
        self.known_face_names: List[str] = []
        self.alarm_loaded = False

        # State
        self.frame_count = 0
        self.last_alert_time = 0.0

        # Per-person detection history to reduce false positives
        # { person_id: { 'name_counts': defaultdict(int), 'last_name': str, 'consecutive': int, 'last_update': float } }
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # Initialize heavy resources
        self._initialize_resources()

    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SecuritySystem')
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            os.makedirs(self.config.LOG_DIR, exist_ok=True)
            log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
            fh = logging.FileHandler(log_file)
            fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(sh)
        return logger

    def _initialize_resources(self) -> None:
        try:
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)

            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0,
                min_detection_confidence=self.config.FACE_CONFIDENCE
            )

            self._load_known_faces()
            self._setup_alarm()

            self.logger.info("System initialization completed successfully")
        except Exception as e:
            self.logger.error(f"Failed to initialize resources: {e}")
            raise

    def _load_known_faces(self) -> None:
        """Load known faces from directory with error handling."""
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning(f"Known faces directory {self.config.KNOWN_FACES_DIR} not found")
            return

        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
            for image_name in os.listdir(person_dir):
                image_path = os.path.join(person_dir, image_name)
                try:
                    image = face_recognition.load_image_file(image_path)
                    encodings = face_recognition.face_encodings(image)
                    if encodings:
                        self.known_face_encodings.append(encodings[0])
                        self.known_face_names.append(person_name)
                        self.logger.info(f"Loaded face: {person_name} from {image_name}")
                    else:
                        self.logger.warning(f"No faces found in {image_path}")
                except Exception as e:
                    self.logger.error(f"Error loading {image_path}: {e}")

        unique_people = len(set(self.known_face_names))
        self.logger.info(f"Loaded {len(self.known_face_encodings)} face encodings for {unique_people} people")

    def _setup_alarm(self) -> None:
        """Setup alarm sound system."""
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm system initialized")
            else:
                self.logger.warning(f"Alarm file {self.config.ALARM_FILE} not found")
        except Exception as e:
            self.logger.error(f"Failed to setup alarm: {e}")

    def detect_objects(self, frame: ndarray) -> List[Dict[str, Any]]:
        """Detect objects using YOLO model and return normalized detections."""
        if not self.yolo_model:
            return []
        try:
            results = self.yolo_model(frame, imgsz=640, verbose=False)
            detections: List[Dict[str, Any]] = []
            for result in results:
                if getattr(result, "boxes", None) is None:
                    continue
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls] if hasattr(result, "names") else str(cls)
                    if x2 <= x1 or y2 <= y1:
                        continue
                    detections.append({
                        'bbox': (x1, y1, x2, y2),
                        'class_name': class_name,
                        'confidence': conf,
                        'class_id': cls
                    })
            return detections
        except Exception as e:
            self.logger.error(f"Object detection failed: {e}")
            return []

    def detect_faces_mediapipe(self, roi: ndarray) -> List[Tuple[int, int, int, int]]:
        """Detect faces in a region of interest using MediaPipe and return list of (x,y,w,h)."""
        try:
            rgb_roi = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb_roi)
            face_boxes: List[Tuple[int, int, int, int]] = []
            if results and getattr(results, "detections", None):
                h, w = roi.shape[:2]
                for detection in results.detections:
                    bbox = detection.location_data.relative_bounding_box
                    x = int(bbox.xmin * w)
                    y = int(bbox.ymin * h)
                    width = int(bbox.width * w)
                    height = int(bbox.height * h)
                    face_boxes.append((x, y, width, height))
            return face_boxes
        except Exception as e:
            self.logger.error(f"Face detection failed: {e}")
            return []

    def recognize_face(self, face_roi: ndarray) -> Optional[str]:
        """Recognize face in given ROI; returns known name or 'Unknown' or None on failure."""
        try:
            if not self.known_face_encodings:
                return None
            small_roi = cv2.resize(face_roi, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            rgb_small = cv2.cvtColor(small_roi, cv2.COLOR_BGR2RGB)
            face_locations = face_recognition.face_locations(rgb_small)
            if not face_locations:
                return None
            encodings = face_recognition.face_encodings(rgb_small, face_locations)
            if not encodings:
                return None
            for enc in encodings:
                matches = face_recognition.compare_faces(self.known_face_encodings, enc)
                distances = face_recognition.face_distance(self.known_face_encodings, enc)
                if len(distances) == 0:
                    continue
                best_index = int(np.argmin(distances))
                if matches and matches[best_index]:
                    return self.known_face_names[best_index]
            return "Unknown"
        except Exception as e:
            self.logger.error(f"Face recognition failed: {e}")
            return None

    # Robust recognition helpers (vote-based) to reduce false positives
    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    def _update_detection_history(self, person_id: str, name: str, distance: float) -> None:
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            entry = {
                'name_counts': defaultdict(int),
                'last_name': None,
                'consecutive': 0,
                'last_update': now
            }
            self.detection_history[person_id] = entry

        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0

        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        if name != "UNKNOWN" and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        votes = entry['name_counts'].get(name, 0)
        if name != "UNKNOWN" and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        if name != "UNKNOWN" and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False

    def confirm_face(self, face_encoding: ndarray, bbox: Tuple[int, int, int, int]) -> Tuple[Optional[str], bool, Optional[float]]:
        """Return (name_or_None, confirmed_bool, distance_or_None)."""
        if not self.known_face_encodings:
            return None, False, None
        try:
            distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
        except Exception as e:
            self.logger.error(f"face_distance failed: {e}")
            return None, False, None
        if distances is None or len(distances) == 0:
            return None, False, None
        best_idx = int(np.argmin(distances))
        best_dist = float(distances[best_idx])
        candidate_name = self.known_face_names[best_idx] if best_dist <= self.config.FACE_CONFIDENCE else "UNKNOWN"
        person_id = self._get_person_id(bbox)
        self._update_detection_history(person_id, candidate_name, best_dist)
        confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
        if confirmed and candidate_name != "UNKNOWN":
            return candidate_name, True, best_dist
        return None, False, best_dist

    def draw_detections(self, frame: ndarray, detections: List[Dict[str, Any]], person_results: List[Dict[str, Any]]) -> ndarray:
        """Draw detection results on frame."""
        display = frame.copy()
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != "person":
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 255), 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

        for person in person_results:
            x1, y1, x2, y2 = person['bbox']
            color = (0, 255, 0) if person.get('recognized') else (0, 165, 255)
            cv2.rectangle(display, (x1, y1), (x2, y2), color, 2)
            label = person.get('name', 'Person') if person.get('recognized') else "UNKNOWN"
            cv2.putText(display, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
            for fx, fy, fw, fh in person.get('face_boxes', []):
                cv2.rectangle(display, (x1 + fx, y1 + fy), (x1 + fx + fw, y1 + fy + fh), (255, 0, 0), 2)
                cv2.putText(display, "Face", (x1 + fx + 5, y1 + fy - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        return display

    def process_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        """
        Handle alert logic with cooldown.
        Only triggers alarm sound for unknown persons (is_known == False).
        """
        current_time = time.time()
        if current_time - self.last_alert_time <= self.config.ALERT_COOLDOWN:
            # still in cooldown
            return

        # Only trigger alert for unknown persons to avoid false alarms for known people
        if not is_known:
            self._trigger_alert(person_name, detected_objects, is_known=False)
            self.last_alert_time = current_time
        else:
            # Log known person detection but do not play alarm
            self.logger.info(f"Known person '{person_name}' detected. No alarm triggered.")

    def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        """
        Trigger alert mechanisms.
        Plays alarm only when is_known is False.
        """
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        if is_known:
            # Do not play alarm for verified known persons
            self.logger.info(f"ALERT(Logged only): Known person {person_name} detected with objects: {objects_str}")
            return

        # Unknown person -> play alarm and log
        self.logger.info(f"ALERT: UNKNOWN person detected with objects: {objects_str}")
        try:
            if self.alarm_loaded and not pygame.mixer.music.get_busy():
                pygame.mixer.music.play()
        except Exception as e:
            self.logger.error(f"Failed to play alarm: {e}")

    def process_frame(self, frame: ndarray) -> ndarray:
        self.frame_count += 1
        timer = cv2.getTickCount()
        detections = self.detect_objects(frame)
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

        person_results: List[Dict[str, Any]] = []
        process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

        for det in person_detections:
            x1, y1, x2, y2 = det['bbox']
            person_roi = frame[y1:y2, x1:x2]
            result = {'bbox': (x1, y1, x2, y2), 'recognized': False, 'name': None, 'face_boxes': []}
            if person_roi.size > 0:
                face_boxes = self.detect_faces_mediapipe(person_roi)
                result['face_boxes'] = face_boxes
                if process_faces and face_boxes:
                    # attempt recognition from the first face region for speed
                    # fx, fy, fw, fh = face_boxes[0]
                    # face_crop = person_roi[fy:fy+fh, fx:fx+fw]
                    # name = self.recognize_face(face_crop)
                    # if name:
                    #     result['recognized'] = (name != "Unknown")
                    #     result['name'] = name if name != "Unknown" else None
                    #     if result['recognized'] and name:
                    #         self.process_alert(name, detected_objects, is_known=True)
                    #     else:
                    #         self.process_alert(name, detected_objects, is_known=False)   
                    fx, fy, fw, fh = face_boxes[0]
                    # clamp coordinates to ROI bounds
                    h_roi, w_roi = person_roi.shape[:2]
                    x0 = max(0, fx); y0 = max(0, fy)
                    x1 = min(w_roi, fx + fw); y1 = min(h_roi, fy + fh)
                    if x1 > x0 and y1 > y0:
                        face_crop = person_roi[y0:y1, x0:x1]
                        name = self.recognize_face(face_crop)
                        # if recognition failed (None), skip alerting
                        if name is None:
                            pass
                        else:
                            result['recognized'] = (name != "Unknown")
                            result['name'] = name if name != "Unknown" else None
                            self.process_alert(name, detected_objects, is_known=result['recognized']) 
            person_results.append(result)

        annotated = self.draw_detections(frame, detections, person_results)
        elapsed = max(1, cv2.getTickCount() - timer)
        fps = int(cv2.getTickFrequency() / elapsed)
        cv2.putText(annotated, f"FPS: {fps}", (20, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        if detected_objects:
            cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
        return annotated
    
    def run(self) -> None:
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture device")
            raise RuntimeError("Video capture failed")
        self.logger.info("Security monitoring started. Press 'q' to quit.")
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                annotated = self.process_frame(frame)
                cv2.imshow(self.config.WINDOW_NAME, annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Monitoring interrupted by user")
        except Exception as e:
            self.logger.error(f"Runtime error: {e}")
            raise
        finally:
            self._cleanup(cap)

    def _cleanup(self, cap) -> None:
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            pygame.mixer.quit()
            self.logger.info("System shutdown completed")
        except Exception as e:
            self.logger.error(f"Cleanup error: {e}")

if __name__ == "__main__":
    cfg = Config()
    system = SecuritySystem(cfg)
    system.run()  # uncomment to run live monitoring

In [None]:
# Interactive refinement helper for the existing notebook (cell index 8)
# Usage: run this cell in the same notebook. It will read `cfg` and `system` if available,
# ask targeted questions, and print suggested code snippets you can apply manually.

def _ask(prompt, default=None):
  hint = f" [{default}]" if default is not None else ""
  ans = input(f"{prompt}{hint}: ").strip()
  return ans if ans else default

def refine_interactive():
  ns = globals()
  has_cfg = 'cfg' in ns
  has_system = 'system' in ns
  print("Refinement helper — select area to refine (enter number).")
  print("1) Thresholds (YOLO_CONFIDENCE, FACE_CONFIDENCE, RECOGNITION_DISTANCE_THRESHOLD)")
  print("2) Model / paths (MODEL_PATH, ALARM_FILE, KNOWN_FACES_DIR)")
  print("3) Recognition timing (FACE_RECOGNITION_INTERVAL, MIN_VOTES, CONSECUTIVE_FRAMES, TIME_WINDOW)")
  print("4) Alert behavior (ALERT_COOLDOWN, alarm on unknown/known)")
  print("5) Known-faces loader (num_jitters, use_cnn, averaging)")
  print("6) Performance (RESIZE_FACTOR, processing frequency)")
  print("7) Objects of interest list")
  print("8) Logging / outputs (log level, log dir, console/file)")
  print("9) Run/demo preferences (VIDEO_SOURCE, show window, dry-run)")
  print("0) Exit")
  choice = _ask("Choice", "0")
  suggestions = []
  if choice == "0":
    print("Exit. No changes.")
    return {}
  if choice == "1":
    yolo = _ask("YOLO_CONFIDENCE (current: cfg.YOLO_CONFIDENCE if available)", str(getattr(ns.get('cfg'), 'YOLO_CONFIDENCE', '0.5')))
    face = _ask("FACE_CONFIDENCE (current: cfg.FACE_CONFIDENCE if available)", str(getattr(ns.get('cfg'), 'FACE_CONFIDENCE', '0.5')))
    dist = _ask("RECOGNITION_DISTANCE_THRESHOLD (current: cfg.RECOGNITION_DISTANCE_THRESHOLD if available)", str(getattr(ns.get('cfg'), 'RECOGNITION_DISTANCE_THRESHOLD', '0.45')))
    suggestions.append(f"cfg.YOLO_CONFIDENCE = {float(yolo)}")
    suggestions.append(f"cfg.FACE_CONFIDENCE = {float(face)}")
    suggestions.append(f"cfg.RECOGNITION_DISTANCE_THRESHOLD = {float(dist)}")
  elif choice == "2":
    model = _ask("MODEL_PATH", getattr(ns.get('cfg'), 'MODEL_PATH', 'yolo11n.pt'))
    alarm = _ask("ALARM_FILE", getattr(ns.get('cfg'), 'ALARM_FILE', 'pols-aagyi-pols.mp3'))
    known = _ask("KNOWN_FACES_DIR", getattr(ns.get('cfg'), 'KNOWN_FACES_DIR', 'family_members'))
    suggestions.append(f"cfg.MODEL_PATH = r'{model}'")
    suggestions.append(f"cfg.ALARM_FILE = r'{alarm}'")
    suggestions.append(f"cfg.KNOWN_FACES_DIR = r'{known}'")
  elif choice == "3":
    interval = _ask("FACE_RECOGNITION_INTERVAL (frames)", str(getattr(ns.get('cfg'), 'FACE_RECOGNITION_INTERVAL', '5')))
    min_votes = _ask("RECOGNITION_MIN_VOTES", str(getattr(ns.get('cfg'), 'RECOGNITION_MIN_VOTES', '2')))
    cons = _ask("RECOGNITION_CONSECUTIVE_FRAMES", str(getattr(ns.get('cfg'), 'RECOGNITION_CONSECUTIVE_FRAMES', '2')))
    window = _ask("RECOGNITION_TIME_WINDOW (s)", str(getattr(ns.get('cfg'), 'RECOGNITION_TIME_WINDOW', '3.0')))
    suggestions.append(f"cfg.FACE_RECOGNITION_INTERVAL = {int(interval)}")
    suggestions.append(f"cfg.RECOGNITION_MIN_VOTES = {int(min_votes)}")
    suggestions.append(f"cfg.RECOGNITION_CONSECUTIVE_FRAMES = {int(cons)}")
    suggestions.append(f"cfg.RECOGNITION_TIME_WINDOW = {float(window)}")
  elif choice == "4":
    cooldown = _ask("ALERT_COOLDOWN (s)", str(getattr(ns.get('cfg'), 'ALERT_COOLDOWN', '10')))
    alarm_for_known = _ask("Play alarm for KNOWN people? (yes/no)", "no")
    suggestions.append(f"cfg.ALERT_COOLDOWN = {int(cooldown)}")
    if alarm_for_known.lower() in ("yes", "y"):
      suggestions.append("# Note: enabling alarm for known people is not recommended; change system._trigger_alert accordingly.")
      suggestions.append("system.alarm_for_known = True  # custom flag (requires code change)")
  elif choice == "5":
    jitters = _ask("Loader num_jitters (e.g., 0..10)", "5")
    use_cnn = _ask("Use CNN detector for loader? (yes/no)", "yes")
    suggestions.append(f"# When calling loader: load_known_faces(num_jitters={int(jitters)}, use_cnn={'True' if use_cnn.lower() in ('yes','y') else 'False'})")
  elif choice == "6":
    resize = _ask("RESIZE_FACTOR (0.1..1.0)", str(getattr(ns.get('cfg'), 'RESIZE_FACTOR', '0.25')))
    freq = _ask("Face recognition interval (frames)", str(getattr(ns.get('cfg'), 'FACE_RECOGNITION_INTERVAL', '5')))
    suggestions.append(f"cfg.RESIZE_FACTOR = {float(resize)}")
    suggestions.append(f"cfg.FACE_RECOGNITION_INTERVAL = {int(freq)}")
  elif choice == "7":
    current = getattr(ns.get('cfg'), 'OBJECTS_OF_INTEREST', None)
    print("Current objects of interest:", current)
    newlist = _ask("Enter comma-separated object names (leave empty to keep current)", "")
    if newlist:
      objs = [s.strip() for s in newlist.split(",") if s.strip()]
      suggestions.append(f"cfg.OBJECTS_OF_INTEREST = {objs!r}")
  elif choice == "8":
    level = _ask("Log level (DEBUG/INFO/WARNING/ERROR)", "INFO")
    logdir = _ask("LOG_DIR", getattr(ns.get('cfg'), 'LOG_DIR', 'security_logs'))
    suggestions.append(f"# change logger level in system: system.logger.setLevel(logging.{level.upper()})")
    suggestions.append(f"cfg.LOG_DIR = r'{logdir}'")
  elif choice == "9":
    source = _ask("VIDEO_SOURCE (0 for webcam or path)", str(getattr(ns.get('cfg'), 'VIDEO_SOURCE', 0)))
    show = _ask("Show window? (yes/no)", "yes")
    suggestions.append(f"cfg.VIDEO_SOURCE = {repr(int(source) if str(source).isdigit() else source)}")
    suggestions.append(f"# To disable GUI: skip cv2.imshow calls or set show_window = {show.lower() in ('yes','y')}")
  else:
    print("Invalid choice.")
    return {}

  print("\nSuggested code snippets to apply (copy & paste into a cell to apply):\n")
  print("#### Start Snippet ####")
  for s in suggestions:
    print(s)
  print("####  End Snippet  ####\n")

  apply_now = _ask("Apply these changes to runtime objects now? (will modify cfg/system in memory) (yes/no)", "no")
  applied = {}
  if apply_now.lower() in ('yes', 'y'):
    for s in suggestions:
      # attempt to execute safe simple assignments only (avoid exec of comments)
      if s.strip().startswith("#") or "note" in s.lower():
        continue
      try:
        exec(s, ns)
        applied[s] = "ok"
      except Exception as e:
        applied[s] = f"failed: {e}"
    print("Applied results:", applied)
  else:
    print("No runtime changes applied. Paste snippets manually to change cfg/system.")

  return {'choice': choice, 'suggestions': suggestions, 'applied': applied}

# Run the interactive helper
refine_interactive()

In [2]:
# Consolidated, cleaned and runnable version of the notebook code.
# Preserves comments and intent; fixes naming/type inconsistencies and unused imports.

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
from numpy import ndarray
import numpy as np
import cv2
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
import logging

from dataclasses import dataclass, field
from typing import List
import cv2

@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11n.pt"
    KNOWN_FACES_DIR: str = "family_members"
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "./media_files/WIN_20251103_14_11_20_Pro.mp4"  # camera index or path
    # Recommended default: 10 (good balance). Lower -> more CPU, higher -> slower recognition.
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    # MediaPipe face detection confidence threshold
    FACE_DETECTION_CONF: float = 0.5
    # Face distance threshold used to accept a name from face_distance
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"

    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack",
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop",
        "book", "scissors", "knife", "face"
    ])

    # Stricter recognition controls to reduce false positives
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0  # seconds

    # Color & font constants
    COLOR_YELLOW = (0, 255, 255)
    COLOR_GREEN = (0, 255, 0)
    COLOR_ORANGE = (0, 165, 255)
    COLOR_BLUE = (255, 0, 0)
    COLOR_WHITE = (255, 255, 255)

    FONT = cv2.FONT_HERSHEY_SIMPLEX
    FONT_SCALE_SMALL = 0.5
    FONT_SCALE_MEDIUM = 0.7
    FONT_THICKNESS = 2

    # Labels
    UNKNOWN_LABEL = "UNKNOWN"

class SecuritySystem:
    """
    Security system with improved, consistent face confirmation logic to reduce false alarms.
    """

    def __init__(self, config: Config):
        self.config = config
        self.logger = self._setup_logging()

        # Models
        self.yolo_model: Optional[YOLO] = None
        self.mp_face_detection = None

        # Known faces
        self.known_face_encodings: List[ndarray] = []
        self.known_face_names: List[str] = []

        # Alarm
        self.alarm_loaded = False

        # State
        self.frame_count = 0
        self.last_alert_time = 0.0

        # Detection history per coarse person grid cell -> used to vote/confirm
        # person_id -> { name_counts: defaultdict(int), last_name: str, consecutive: int, last_update: float }
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # Initialize heavy resources
        self._initialize_resources()

    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SecuritySystem')
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            os.makedirs(self.config.LOG_DIR, exist_ok=True)
            log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
            fh = logging.FileHandler(log_file)
            fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(sh)
        return logger

    def _initialize_resources(self) -> None:
        try:
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)

            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0,
                min_detection_confidence=self.config.FACE_DETECTION_CONF
            )

            self._load_known_faces()
            self._setup_alarm()
            self.logger.info("Initialization complete")
        except Exception as e:
            self.logger.exception("Failed to initialize resources")
            raise

    def _load_known_faces(self) -> None:
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning("Known faces dir not found: %s", self.config.KNOWN_FACES_DIR)
            return
        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
            for img in os.listdir(person_dir):
                path = os.path.join(person_dir, img)
                try:
                    image = face_recognition.load_image_file(path)
                    encs = face_recognition.face_encodings(image)
                    if encs:
                        self.known_face_encodings.append(encs[0])
                        self.known_face_names.append(person_name)
                        self.logger.info("Loaded face for %s from %s", person_name, img)
                    else:
                        self.logger.warning("No face found in %s", path)
                except Exception:
                    self.logger.exception("Failed to load known face %s", path)
        self.logger.info("Loaded %d encodings for %d people",
                         len(self.known_face_encodings), len(set(self.known_face_names)))

    def _setup_alarm(self) -> None:
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm loaded")
            else:
                self.logger.warning("Alarm file missing: %s", self.config.ALARM_FILE)
        except Exception:
            self.logger.exception("Failed to initialize alarm")

    # ---------- Utility helpers ----------
    def _extract_bbox(self, bbox: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
        x1, y1, x2, y2 = bbox
        return int(x1), int(y1), int(x2), int(y2)

    def _clamp(self, v, lo, hi):
        return max(lo, min(hi, v))

    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    # ---------- Detection ----------
    def detect_objects(self, frame: ndarray) -> List[Dict[str, Any]]:
        if not self.yolo_model:
            return []
        try:
            results = self.yolo_model(frame, imgsz=640, verbose=False)
            detections: List[Dict[str, Any]] = []
            for result in results:
                if getattr(result, "boxes", None) is None:
                    continue
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls] if hasattr(result, "names") else str(cls)
                    if x2 <= x1 or y2 <= y1:
                        continue
                    detections.append({'bbox': (x1, y1, x2, y2), 'class_name': class_name, 'confidence': conf, 'class_id': cls})
            return detections
        except Exception:
            self.logger.exception("Object detection failed")
            return []

    def detect_faces_mediapipe(self, roi: ndarray) -> List[Tuple[int, int, int, int]]:
        try:
            rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb)
            boxes = []
            if results and getattr(results, "detections", None):
                h, w = roi.shape[:2]
                for det in results.detections:
                    r = det.location_data.relative_bounding_box
                    x = int(r.xmin * w)
                    y = int(r.ymin * h)
                    width = int(r.width * w)
                    height = int(r.height * h)
                    boxes.append((x, y, width, height))
            return boxes
        except Exception:
            self.logger.exception("Face detection failed")
            return []

    # Return a single face encoding (or None). Uses RESIZE_FACTOR to speed up encoding.
    def get_face_encoding(self, face_crop: ndarray) -> Optional[ndarray]:
        try:
            if face_crop.size == 0:
                return None
            # resize for speed but keep enough detail; resizing can affect distances: tune RESIZE_FACTOR
            if self.config.RESIZE_FACTOR and self.config.RESIZE_FACTOR != 1.0:
                small = cv2.resize(face_crop, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            else:
                small = face_crop
            rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
            encs = face_recognition.face_encodings(rgb)
            if not encs:
                return None
            return encs[0]
        except Exception:
            self.logger.exception("Failed to get face encoding")
            return None

    # ---------- Confirmation logic ----------
    def _update_detection_history(self, person_id: str, name: str, distance: float) -> None:
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            entry = {'name_counts': defaultdict(int), 'last_name': None, 'consecutive': 0, 'last_update': now}
            self.detection_history[person_id] = entry

        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0

        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        """Confirm recognition using vote / consecutive / distance rules."""
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        # Immediate acceptance if distance is confidently low
        if name != self.config.UNKNOWN_LABEL and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        # votes
        votes = entry['name_counts'].get(name, 0)
        if name != self.config.UNKNOWN_LABEL and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        if name != self.config.UNKNOWN_LABEL and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False

    def confirm_face(self, face_encoding: ndarray, bbox: Tuple[int, int, int, int]) -> Tuple[Optional[str], bool, Optional[float]]:
        """
        Given a face encoding, find best match and decide whether the match is confirmed.
        Returns: (confirmed_name_or_None, confirmed_bool, best_distance)
        """
        if not self.known_face_encodings:
            return None, False, None
        try:
            distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
        except Exception:
            self.logger.exception("face_distance failed")
            return None, False, None
        if len(distances) == 0:
            return None, False, None
        best_idx = int(np.argmin(distances))
        best_dist = float(distances[best_idx])
        # determine candidate name by using the recognition distance threshold (consistent)
        candidate_name = self.known_face_names[best_idx] if best_dist <= self.config.RECOGNITION_DISTANCE_THRESHOLD else self.config.UNKNOWN_LABEL
        person_id = self._get_person_id(bbox)
        self._update_detection_history(person_id, candidate_name, best_dist)
        confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
        if confirmed and candidate_name != self.config.UNKNOWN_LABEL:
            return candidate_name, True, best_dist
        # If candidate_name is UNKNOWN and it's confirmed (multiple unknown votes) consider it an unknown confirmed
        if confirmed and candidate_name == self.config.UNKNOWN_LABEL:
            return self.config.UNKNOWN_LABEL, True, best_dist
        return None, False, best_dist

    # ---------- Alerting ----------
    def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        if is_known:
            self.logger.info("Known person confirmed: %s; objects: %s (no alarm)", person_name, objects_str)
            return
        self.logger.warning("ALERT: UNKNOWN person confirmed; objects: %s", objects_str)
        if self.alarm_loaded:
            try:
                if not pygame.mixer.music.get_busy():
                    pygame.mixer.music.play()
            except Exception:
                self.logger.exception("Failed to start alarm")

    def process_alert(self, candidate_name: Optional[str], confirmed: bool, detected_objects: List[str]) -> None:
        """
        Only triggers alarm for a confirmed unknown person (confirmed True and candidate_name == UNKNOWN_LABEL).
        Observed cooldown prevents repeated alarms.
        """
        if not confirmed:
            return
        now = time.time()
        if now - self.last_alert_time <= self.config.ALERT_COOLDOWN:
            return
        if candidate_name == self.config.UNKNOWN_LABEL:
            self._trigger_alert(candidate_name, detected_objects, is_known=False)
            self.last_alert_time = now
        else:
            # confirmed known person -> log but do not alarm
            self._trigger_alert(candidate_name, detected_objects, is_known=True)

    # ---------- Drawing ----------
    def draw_detections(self, frame: ndarray, detections: List[Dict[str, Any]], person_results: List[Dict[str, Any]]) -> ndarray:
        display = frame.copy()
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != 'person':
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display, (x1, y1), (x2, y2), self.config.COLOR_YELLOW, 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display, label, (x1, y1 - 10), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, self.config.FONT_THICKNESS)

        for p in person_results:
            x1, y1, x2, y2 = p['bbox']
            color = self.config.COLOR_GREEN if p.get('confirmed_known') else (self.config.COLOR_ORANGE if p.get('recognized') else self.config.COLOR_ORANGE)
            cv2.rectangle(display, (x1, y1), (x2, y2), color, 2)
            label = p.get('name') if p.get('name') else ("Person" if p.get('recognized') else "UNKNOWN")
            cv2.putText(display, label, (x1, y1 - 10), self.config.FONT, self.config.FONT_SCALE_MEDIUM, color, self.config.FONT_THICKNESS)
            for fx, fy, fw, fh in p.get('face_boxes', []):
                cv2.rectangle(display, (x1 + fx, y1 + fy), (x1 + fx + fw, y1 + fy + fh), self.config.COLOR_BLUE, 2)
        return display

    # ---------- Main frame processing ----------
    def process_frame(self, frame: ndarray) -> ndarray:
        self.frame_count += 1
        timer = cv2.getTickCount()
        detections = self.detect_objects(frame)
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

        person_results: List[Dict[str, Any]] = []
        process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

        for det in person_detections:
            x1, y1, x2, y2 = det['bbox']
            x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
            person_roi = frame[y1:y2, x1:x2]
            result = {'bbox': (x1, y1, x2, y2), 'recognized': False, 'confirmed_known': False, 'name': None, 'face_boxes': []}
            if person_roi.size == 0:
                person_results.append(result)
                continue
            face_boxes = self.detect_faces_mediapipe(person_roi)
            result['face_boxes'] = face_boxes
            if process_faces and face_boxes:
                fx, fy, fw, fh = face_boxes[0]
                h_roi, w_roi = person_roi.shape[:2]
                x0 = self._clamp(fx, 0, w_roi-1); y0 = self._clamp(fy, 0, h_roi-1)
                x1f = self._clamp(fx+fw, 0, w_roi); y1f = self._clamp(fy+fh, 0, h_roi)
                if x1f > x0 and y1f > y0:
                    face_crop = person_roi[y0:y1f, x0:x1f]
                    face_enc = self.get_face_encoding(face_crop)
                    if face_enc is not None:
                        name, confirmed, dist = self.confirm_face(face_enc, (x1, y1, x2, y2))
                        # Only update UI / alerts if confirmed (reduces false positives)
                        if confirmed:
                            result['recognized'] = (name != self.config.UNKNOWN_LABEL)
                            result['confirmed_known'] = (name != self.config.UNKNOWN_LABEL)
                            result['name'] = name if name != self.config.UNKNOWN_LABEL else None
                        else:
                            # tentative recognized (not yet confirmed) - set recognized True if distance indicates a candidate
                            # this is optional: here we leave recognized False until confirmed
                            result['recognized'] = False
                            result['name'] = None
                        # process alert only for confirmed outcomes
                        self.process_alert(name if confirmed else None, confirmed, detected_objects)
            person_results.append(result)

        annotated = self.draw_detections(frame, detections, person_results)
        elapsed = max(1, cv2.getTickCount() - timer)
        fps = int(cv2.getTickFrequency() / elapsed)
        cv2.putText(annotated, f"FPS: {fps}", (20, 30), self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.COLOR_GREEN, self.config.FONT_THICKNESS)
        if detected_objects:
            cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, 60), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, self.config.FONT_THICKNESS)
        return annotated

    def run(self) -> None:
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture")
            raise RuntimeError("Video capture failed")
        self.logger.info("Monitoring started. Press 'q' to quit.")
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                annotated = self.process_frame(frame)
                cv2.imshow(self.config.WINDOW_NAME, annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Interrupted")
        finally:
            self._cleanup(cap)

    def _cleanup(self, cap) -> None:
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            try:
                pygame.mixer.quit()
            except Exception:
                pass
            self.logger.info("Shutdown complete")
        except Exception:
            self.logger.exception("Cleanup error")

if __name__ == "__main__":
    cfg = Config()
    system = SecuritySystem(cfg)
    system.run()  # uncomment to run live monitoring

2025-11-22 01:29:37,235 - INFO: Loading YOLO model...
2025-11-22 01:29:37,351 - INFO: Initializing MediaPipe face detection...
2025-11-22 01:29:41,433 - INFO: Loaded face for robin from robin_01.jpg
2025-11-22 01:29:42,405 - INFO: Loaded face for robin from robin_02.jpg
2025-11-22 01:29:43,454 - INFO: Loaded face for robin from robin_03.jpg
2025-11-22 01:29:48,179 - INFO: Loaded face for robin from WIN_20251008_18_56_08_Pro.jpg
2025-11-22 01:29:48,180 - INFO: Loaded 4 encodings for 1 people
2025-11-22 01:29:48,457 - INFO: Alarm loaded
2025-11-22 01:29:48,458 - INFO: Initialization complete
2025-11-22 01:29:48,520 - INFO: Monitoring started. Press 'q' to quit.
2025-11-22 01:29:55,686 - INFO: Known person confirmed: robin; objects: None (no alarm)
2025-11-22 01:29:56,220 - INFO: Known person confirmed: robin; objects: None (no alarm)
2025-11-22 01:29:57,269 - INFO: Known person confirmed: robin; objects: None (no alarm)
2025-11-22 01:29:58,331 - INFO: Known person confirmed: robin; objec

# Deepseek opt

In [None]:
# Consolidated, cleaned and runnable version of the notebook code.
# Preserves comments and intent; fixes naming/type inconsistencies and unused imports.

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
from numpy import ndarray
import numpy as np
import cv2
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
import logging

from dataclasses import dataclass, field
from typing import List
import cv2

@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11n.pt"
    KNOWN_FACES_DIR: str = "family_members"
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "./media_files/animal_surveillance/goru-churi.mp4"  # camera index or path
    # VIDEO_SOURCE: str = 0  # camera index or path
    # Recommended default: 10 (good balance). Lower -> more CPU, higher -> slower recognition.
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    # MediaPipe face detection confidence threshold
    FACE_DETECTION_CONF: float = 0.5
    # Face distance threshold used to accept a name from face_distance
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"

    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack",
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop",
        "book", "scissors", "knife", "face"
    ])

    # Stricter recognition controls to reduce false positives
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0  # seconds

    # Color & font constants
    COLOR_YELLOW = (0, 255, 255)
    COLOR_GREEN = (0, 255, 0)
    COLOR_ORANGE = (0, 165, 255)
    COLOR_BLUE = (255, 0, 0)
    COLOR_RED = (0, 0, 255)
    COLOR_WHITE = (255, 255, 255)
    COLOR_CYAN = (255, 255, 0)

    FONT = cv2.FONT_HERSHEY_SIMPLEX
    FONT_SCALE_SMALL = 0.5
    FONT_SCALE_MEDIUM = 0.7
    FONT_THICKNESS = 2

    # Labels
    UNKNOWN_LABEL = "UNKNOWN"
    PERSON_LABEL = "Person"

class SecuritySystem:
    """
    Security system with improved, consistent face confirmation logic to reduce false alarms.
    Enhanced to update person labels dynamically based on detection and recognition status.
    """

    def __init__(self, config: Config):
        self.config = config
        self.logger = self._setup_logging()

        # Models
        self.yolo_model: Optional[YOLO] = None
        self.mp_face_detection = None

        # Known faces
        self.known_face_encodings: List[ndarray] = []
        self.known_face_names: List[str] = []

        # Alarm
        self.alarm_loaded = False

        # State
        self.frame_count = 0
        self.last_alert_time = 0.0

        # Detection history per coarse person grid cell -> used to vote/confirm
        # person_id -> { name_counts: defaultdict(int), last_name: str, consecutive: int, last_update: float }
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # Track current person states for consistent labeling
        self.current_person_states: Dict[str, Dict[str, Any]] = {}

        # Initialize heavy resources
        self._initialize_resources()

    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SecuritySystem')
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            os.makedirs(self.config.LOG_DIR, exist_ok=True)
            log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
            fh = logging.FileHandler(log_file)
            fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(sh)
        return logger

    def _initialize_resources(self) -> None:
        try:
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)

            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0,
                min_detection_confidence=self.config.FACE_DETECTION_CONF
            )

            self._load_known_faces()
            self._setup_alarm()
            self.logger.info("Initialization complete")
        except Exception as e:
            self.logger.exception("Failed to initialize resources")
            raise

    def _load_known_faces(self) -> None:
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning("Known faces dir not found: %s", self.config.KNOWN_FACES_DIR)
            return
        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
            for img in os.listdir(person_dir):
                path = os.path.join(person_dir, img)
                try:
                    image = face_recognition.load_image_file(path)
                    encs = face_recognition.face_encodings(image)
                    if encs:
                        self.known_face_encodings.append(encs[0])
                        self.known_face_names.append(person_name)
                        self.logger.info("Loaded face for %s from %s", person_name, img)
                    else:
                        self.logger.warning("No face found in %s", path)
                except Exception:
                    self.logger.exception("Failed to load known face %s", path)
        self.logger.info("Loaded %d encodings for %d people",
                         len(self.known_face_encodings), len(set(self.known_face_names)))

    def _setup_alarm(self) -> None:
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm loaded")
            else:
                self.logger.warning("Alarm file missing: %s", self.config.ALARM_FILE)
        except Exception:
            self.logger.exception("Failed to initialize alarm")

    # ---------- Utility helpers ----------
    def _extract_bbox(self, bbox: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
        x1, y1, x2, y2 = bbox
        return int(x1), int(y1), int(x2), int(y2)

    def _clamp(self, v, lo, hi):
        return max(lo, min(hi, v))

    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    # ---------- Detection ----------
    def detect_objects(self, frame: ndarray) -> List[Dict[str, Any]]:
        if not self.yolo_model:
            return []
        try:
            results = self.yolo_model(frame, conf=self.config.YOLO_CONFIDENCE)
            detections: List[Dict[str, Any]] = []
            for result in results:
                if getattr(result, "boxes", None) is None:
                    continue
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls] if hasattr(result, "names") else str(cls)
                    if x2 <= x1 or y2 <= y1:
                        continue
                    detections.append({'bbox': (x1, y1, x2, y2), 'class_name': class_name, 'confidence': conf, 'class_id': cls})
            return detections
        except Exception:
            self.logger.exception("Object detection failed")
            return []

    def detect_faces_mediapipe(self, roi: ndarray) -> List[Tuple[int, int, int, int]]:
        try:
            rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb)
            boxes = []
            if results and getattr(results, "detections", None):
                h, w = roi.shape[:2]
                for det in results.detections:
                    r = det.location_data.relative_bounding_box
                    x = int(r.xmin * w)
                    y = int(r.ymin * h)
                    width = int(r.width * w)
                    height = int(r.height * h)
                    boxes.append((x, y, width, height))
            return boxes
        except Exception:
            self.logger.exception("Face detection failed")
            return []

    # Return a single face encoding (or None). Uses RESIZE_FACTOR to speed up encoding.
    def get_face_encoding(self, face_crop: ndarray) -> Optional[ndarray]:
        try:
            if face_crop.size == 0:
                return None
            # resize for speed but keep enough detail; resizing can affect distances: tune RESIZE_FACTOR
            if self.config.RESIZE_FACTOR and self.config.RESIZE_FACTOR != 1.0:
                small = cv2.resize(face_crop, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            else:
                small = face_crop
            rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
            encs = face_recognition.face_encodings(rgb)
            if not encs:
                return None
            return encs[0]
        except Exception:
            self.logger.exception("Failed to get face encoding")
            return None

    # ---------- Enhanced recognition with dynamic labeling ----------
    def _update_detection_history(self, person_id: str, name: str, distance: Optional[float] = None) -> None:
        """
        Update per-person recent votes and consecutive counts.
        Stores last_distance to avoid 'unused parameter' warnings and enable distance-aware logic.
        """
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            entry = {
                'name_counts': defaultdict(int),
                'last_name': None,
                'consecutive': 0,
                'last_update': now,
                'last_distance': None
            }
            self.detection_history[person_id] = entry

        # Reset history if stale
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0
            entry['last_distance'] = None

        # Tally vote and consecutive
        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now
        entry['last_distance'] = distance

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        """Confirm recognition using vote / consecutive / distance rules."""
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        # Immediate acceptance if distance is confidently low
        if name != self.config.UNKNOWN_LABEL and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        # votes
        votes = entry['name_counts'].get(name, 0)
        if name != self.config.UNKNOWN_LABEL and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        if name != self.config.UNKNOWN_LABEL and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False

    def confirm_face(self, face_encoding: ndarray, bbox: Tuple[int, int, int, int]) -> Tuple[Optional[str], bool, Optional[float]]:
        """
        Given a face encoding, find best match and decide whether the match is confirmed.
        Returns: (confirmed_name_or_None, confirmed_bool, best_distance)
        """
        if not self.known_face_encodings:
            return None, False, None
        try:
            distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
        except Exception:
            self.logger.exception("face_distance failed")
            return None, False, None
        if len(distances) == 0:
            return None, False, None
        best_idx = int(np.argmin(distances))
        best_dist = float(distances[best_idx])
        # determine candidate name by using the recognition distance threshold (consistent)
        candidate_name = self.known_face_names[best_idx] if best_dist <= self.config.RECOGNITION_DISTANCE_THRESHOLD else self.config.UNKNOWN_LABEL
        person_id = self._get_person_id(bbox)
        self._update_detection_history(person_id, candidate_name, best_dist)
        confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
        if confirmed and candidate_name != self.config.UNKNOWN_LABEL:
            return candidate_name, True, best_dist
        # If candidate_name is UNKNOWN and it's confirmed (multiple unknown votes) consider it an unknown confirmed
        if confirmed and candidate_name == self.config.UNKNOWN_LABEL:
            return self.config.UNKNOWN_LABEL, True, best_dist
        return None, False, best_dist

    def _get_person_label(self, person_result: Dict[str, Any]) -> str:
        """Generate appropriate label for person based on detection and recognition status"""
        if person_result.get('confirmed_known'):
            return f"{person_result['name']} ✓"
        elif person_result.get('recognized'):
            if person_result.get('name'):
                return f"{person_result['name']} ?"
            else:
                return "Recognizing..."
        elif person_result.get('face_boxes'):
            return "Face Detected"
        else:
            return self.config.PERSON_LABEL

    def _get_person_color(self, person_result: Dict[str, Any]) -> Tuple[int, int, int]:
        """Get color for person bounding box based on status"""
        if person_result.get('confirmed_known'):
            return self.config.COLOR_GREEN  # Confirmed known person
        elif person_result.get('recognized'):
            return self.config.COLOR_CYAN  # Recognition in progress
        elif person_result.get('face_boxes'):
            return self.config.COLOR_BLUE  # Face detected but not recognized
        else:
            return self.config.COLOR_ORANGE  # Just person detection

    # ---------- Alerting ----------
    def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        if is_known:
            self.logger.info("Known person confirmed: %s; objects: %s (no alarm)", person_name, objects_str)
            return
        self.logger.warning("ALERT: UNKNOWN person confirmed; objects: %s", objects_str)
        if self.alarm_loaded:
            try:
                if not pygame.mixer.music.get_busy():
                    pygame.mixer.music.play()
            except Exception:
                self.logger.exception("Failed to start alarm")

    def process_alert(self, candidate_name: Optional[str], confirmed: bool, detected_objects: List[str]) -> None:
        """
        Only triggers alarm for a confirmed unknown person (confirmed True and candidate_name == UNKNOWN_LABEL).
        Observed cooldown prevents repeated alarms.
        """
        if not confirmed:
            return
        now = time.time()
        if now - self.last_alert_time <= self.config.ALERT_COOLDOWN:
            return
        if candidate_name == self.config.UNKNOWN_LABEL:
            self._trigger_alert(candidate_name, detected_objects, is_known=True)
            self.last_alert_time = now
        else:
            # confirmed known person -> log but do not alarm
            self._trigger_alert(candidate_name, detected_objects, is_known=False)

    # ---------- Enhanced Drawing ----------
    def draw_detections(self, frame: ndarray, detections: List[Dict[str, Any]], person_results: List[Dict[str, Any]]) -> ndarray:
        display = frame.copy()
        
        # Draw object detections first
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != 'person':
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display, (x1, y1), (x2, y2), self.config.COLOR_YELLOW, 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display, label, (x1, y1 - 10), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, self.config.FONT_THICKNESS)

        # Draw person detections with dynamic labels
        for p in person_results:
            x1, y1, x2, y2 = p['bbox']
            
            # Get appropriate color and label based on detection status
            color = self._get_person_color(p)
            label = self._get_person_label(p)
            
            # Draw bounding box
            cv2.rectangle(display, (x1, y1), (x2, y2), color, 2)
            
            # Draw label with background for better visibility
            label_bg_size = cv2.getTextSize(label, self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.FONT_THICKNESS)[0]
            cv2.rectangle(display, (x1, y1 - label_bg_size[1] - 10), (x1 + label_bg_size[0] + 10, y1), color, -1)
            cv2.putText(display, label, (x1 + 5, y1 - 5), self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.COLOR_WHITE, self.config.FONT_THICKNESS)
            
            # Draw face boxes if any
            for fx, fy, fw, fh in p.get('face_boxes', []):
                cv2.rectangle(display, (x1 + fx, y1 + fy), (x1 + fx + fw, y1 + fy + fh), self.config.COLOR_BLUE, 1)
                
            # Add confidence if available
            if p.get('confidence'):
                conf_text = f"Conf: {p['confidence']:.2f}"
                cv2.putText(display, conf_text, (x1, y2 + 20), self.config.FONT, self.config.FONT_SCALE_SMALL, color, 1)
                
        return display

    # ---------- Enhanced Main frame processing ----------
    def process_frame(self, frame: ndarray) -> ndarray:
        self.frame_count += 1
        timer = cv2.getTickCount()
        detections = self.detect_objects(frame)
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

        person_results: List[Dict[str, Any]] = []
        process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

        for det in person_detections:
            x1, y1, x2, y2 = det['bbox']
            x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
            person_roi = frame[y1:y2, x1:x2]
            
            # Initialize result with basic detection info
            result = {
                'bbox': (x1, y1, x2, y2), 
                'recognized': False, 
                'confirmed_known': False, 
                'name': None, 
                'face_boxes': [],
                'confidence': det['confidence']
            }
            
            if person_roi.size == 0:
                person_results.append(result)
                continue
                
            # Always detect faces for visual feedback
            face_boxes = self.detect_faces_mediapipe(person_roi)
            result['face_boxes'] = face_boxes
            
            # Process face recognition on specified intervals
            if process_faces and face_boxes:
                # Try all detected faces, not just the first one
                for i, (fx, fy, fw, fh) in enumerate(face_boxes):
                    h_roi, w_roi = person_roi.shape[:2]
                    x0 = self._clamp(fx, 0, w_roi-1)
                    y0 = self._clamp(fy, 0, h_roi-1)
                    x1f = self._clamp(fx+fw, 0, w_roi)
                    y1f = self._clamp(fy+fh, 0, h_roi)
                    
                    if x1f > x0 and y1f > y0:
                        face_crop = person_roi[y0:y1f, x0:x1f]
                        face_enc = self.get_face_encoding(face_crop)
                        
                        if face_enc is not None:
                            name, confirmed, dist = self.confirm_face(face_enc, (x1, y1, x2, y2))
                            
                            # Update result based on recognition outcome
                            if confirmed:
                                result['recognized'] = (name != self.config.UNKNOWN_LABEL)
                                result['confirmed_known'] = (name != self.config.UNKNOWN_LABEL)
                                result['name'] = name if name != self.config.UNKNOWN_LABEL else None
                            else:
                                # Show tentative recognition for immediate feedback
                                result['recognized'] = True
                                result['name'] = name if name and name != self.config.UNKNOWN_LABEL else None
                            
                            # Process alert only for confirmed outcomes
                            self.process_alert(name if confirmed else None, confirmed, detected_objects)
                            
                            # Break after first successful face encoding to save processing
                            break
            
            person_results.append(result)

        annotated = self.draw_detections(frame, detections, person_results)
        elapsed = max(1, cv2.getTickCount() - timer)
        fps = int(cv2.getTickFrequency() / elapsed)
        
        # Enhanced status display
        status_y = 30
        cv2.putText(annotated, f"FPS: {fps}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.COLOR_GREEN, self.config.FONT_THICKNESS)
        
        # Show recognition status
        recognized_count = sum(1 for p in person_results if p.get('confirmed_known'))
        detecting_count = sum(1 for p in person_results if p.get('face_boxes'))
        
        status_y += 30
        cv2.putText(annotated, f"Recognized: {recognized_count}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_GREEN, 1)
        
        status_y += 25
        cv2.putText(annotated, f"Detecting: {detecting_count}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_BLUE, 1)
        
        if detected_objects:
            status_y += 25
            cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, 1)
            
        return annotated

    def run(self) -> None:
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture")
            raise RuntimeError("Video capture failed")
        self.logger.info("Monitoring started. Press 'q' to quit.")
        try:
            while True:
                ret, im0 = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                annotated = self.process_frame(im0)
                cv2.imshow(self.config.WINDOW_NAME, annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Interrupted")
        finally:
            self._cleanup(cap)

    def _cleanup(self, cap) -> None:
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            try:
                pygame.mixer.quit()
            except Exception:
                pass
            self.logger.info("Shutdown complete")
        except Exception:
            self.logger.exception("Cleanup error")

if __name__ == "__main__":
    cfg = Config()
    system = SecuritySystem(cfg)
    system.run()  # uncomment to run live monitoring

# fixed by deepseek.ai

In [1]:
# Consolidated, cleaned and runnable version of the notebook code.
# Preserves comments and intent; fixes naming/type inconsistencies and unused imports.

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Optional, Dict, Any
from collections import defaultdict
from numpy import ndarray
import numpy as np
import cv2
import face_recognition
import mediapipe as mp
from ultralytics import YOLO
import pygame
import os
import time
from ultralytics.utils import LOGGER

@dataclass
class Config:
    """Configuration settings for the security system."""
    MODEL_PATH: str = "yolo11m.pt"
    KNOWN_FACES_DIR: str = "family_members"
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    VIDEO_SOURCE: str = "/media_files/WIN_20251103_14_11_20_Pro.mp4"  # camera index or path
    # VIDEO_SOURCE: str = "./media_files/people walking/computer_vision_object_and_detection_tracking_people_walking_video_20250819_173636_1.mp4"  # camera index or path
    # VIDEO_SOURCE: str = 0  # camera index or path
    # Recommended default: 10 (good balance). Lower -> more CPU, higher -> slower recognition.
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10
    YOLO_CONFIDENCE: float = 0.5
    # MediaPipe face detection confidence threshold
    FACE_DETECTION_CONF: float = 0.25
    # Face distance threshold used to accept a name from face_distance
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RESIZE_FACTOR: float = 0.25
    WINDOW_NAME: str = "Security Monitoring"

    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: [
        "person", "bicycle", "car", "motorcycle", "bus", "truck", "backpack",
        "umbrella", "handbag", "tie", "suitcase", "cell phone", "laptop",
        "book", "scissors", "knife", "face"
    ])

    # Stricter recognition controls to reduce false positives
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0  # seconds

    # Color & font constants
    COLOR_YELLOW = (0, 255, 255)
    COLOR_GREEN = (0, 255, 0)
    COLOR_ORANGE = (0, 165, 255)
    COLOR_BLUE = (255, 0, 0)
    COLOR_RED = (0, 0, 255)
    COLOR_WHITE = (255, 255, 255)
    COLOR_CYAN = (255, 255, 0)

    FONT = cv2.FONT_HERSHEY_SIMPLEX
    FONT_SCALE_SMALL = 0.5
    FONT_SCALE_MEDIUM = 0.7
    FONT_THICKNESS = 2

    # Labels
    UNKNOWN_LABEL = "UNKNOWN"
    PERSON_LABEL = "Person"

class SecuritySystem:
    """
    Security system with improved, consistent face confirmation logic to reduce false alarms.
    Enhanced to update person labels dynamically based on detection and recognition status.
    """

    def __init__(self, config: Config):
        self.config = config
        self.logger = self._setup_logging()

        # Models
        self.yolo_model: Optional[YOLO] = None
        self.mp_face_detection = None

        # Known faces
        self.known_face_encodings: List[ndarray] = []
        self.known_face_names: List[str] = []

        # Alarm
        self.alarm_loaded = False

        # State
        self.frame_count = 0
        self.last_alert_time = 0.0

        # Detection history per coarse person grid cell -> used to vote/confirm
        # person_id -> { name_counts: defaultdict(int), last_name: str, consecutive: int, last_update: float }
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # Track current person states for consistent labeling
        self.current_person_states: Dict[str, Dict[str, Any]] = {}

        # Track alerted persons to avoid repeated alarms
        self.alerted_persons: Dict[str, float] = {}  # person_id -> last_alert_time

        # Initialize heavy resources
        self._initialize_resources()

    def _setup_logging(self) -> logging.Logger:
        logger = logging.getLogger('SecuritySystem')
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            os.makedirs(self.config.LOG_DIR, exist_ok=True)
            log_file = os.path.join(self.config.LOG_DIR, f"security_log_{datetime.now().strftime('%Y-%m-%d')}.txt")
            fh = logging.FileHandler(log_file)
            fh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
            logger.addHandler(sh)
        return logger

    def _initialize_resources(self) -> None:
        try:
            self.logger.info("Loading YOLO model...")
            self.yolo_model = YOLO(self.config.MODEL_PATH)

            self.logger.info("Initializing MediaPipe face detection...")
            mp_face = mp.solutions.face_detection
            self.mp_face_detection = mp_face.FaceDetection(
                model_selection=0,
                min_detection_confidence=self.config.FACE_DETECTION_CONF
            )

            self._load_known_faces()
            self._setup_alarm()
            self.logger.info("Initialization complete")
        except Exception as e:
            self.logger.exception("Failed to initialize resources")
            raise

    def _load_known_faces(self) -> None:
        if not os.path.exists(self.config.KNOWN_FACES_DIR):
            self.logger.warning("Known faces dir not found: %s", self.config.KNOWN_FACES_DIR)
            return
        for person_name in os.listdir(self.config.KNOWN_FACES_DIR):
            person_dir = os.path.join(self.config.KNOWN_FACES_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue
            for img in os.listdir(person_dir):
                path = os.path.join(person_dir, img)
                try:
                    image = face_recognition.load_image_file(path)
                    encs = face_recognition.face_encodings(image)
                    if encs:
                        self.known_face_encodings.append(encs[0])
                        self.known_face_names.append(person_name)
                        self.logger.info("Loaded face for %s from %s", person_name, img)
                    else:
                        self.logger.warning("No face found in %s", path)
                except Exception:
                    self.logger.exception("Failed to load known face %s", path)
        self.logger.info("Loaded %d encodings for %d people",
                         len(self.known_face_encodings), len(set(self.known_face_names)))

    def _setup_alarm(self) -> None:
        try:
            pygame.mixer.init()
            if os.path.exists(self.config.ALARM_FILE):
                pygame.mixer.music.load(self.config.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm loaded")
            else:
                self.logger.warning("Alarm file missing: %s", self.config.ALARM_FILE)
        except Exception:
            self.logger.exception("Failed to initialize alarm")

    # ---------- Utility helpers ----------
    def _extract_bbox(self, bbox: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
        x1, y1, x2, y2 = bbox
        return int(x1), int(y1), int(x2), int(y2)

    def _clamp(self, v, lo, hi):
        return max(lo, min(hi, v))

    def _get_person_id(self, bbox: Tuple[int, int, int, int]) -> str:
        x1, y1, x2, y2 = bbox
        cx = (x1 + x2) // 2
        cy = (y1 + y2) // 2
        return f"{cx//50}_{cy//50}"

    # ---------- Detection ----------
    def detect_objects(self, frame: ndarray) -> List[Dict[str, Any]]:
        if not self.yolo_model:
            return []
        try:
            results = self.yolo_model(frame, conf=self.config.YOLO_CONFIDENCE)
            detections: List[Dict[str, Any]] = []
            for result in results:
                if getattr(result, "boxes", None) is None:
                    continue
                for box in result.boxes:
                    x1, y1, x2, y2 = map(int, box.xyxy[0])
                    cls = int(box.cls[0])
                    conf = float(box.conf[0])
                    class_name = result.names[cls] if hasattr(result, "names") else str(cls)
                    if x2 <= x1 or y2 <= y1:
                        continue
                    detections.append({'bbox': (x1, y1, x2, y2), 'class_name': class_name, 'confidence': conf, 'class_id': cls})
            return detections
        except Exception:
            self.logger.exception("Object detection failed")
            return []

    def detect_faces_mediapipe(self, roi: ndarray) -> List[Tuple[int, int, int, int]]:
        try:
            rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
            results = self.mp_face_detection.process(rgb)
            boxes = []
            if results and getattr(results, "detections", None):
                h, w = roi.shape[:2]
                for det in results.detections:
                    r = det.location_data.relative_bounding_box
                    x = int(r.xmin * w)
                    y = int(r.ymin * h)
                    width = int(r.width * w)
                    height = int(r.height * h)
                    boxes.append((x, y, width, height))
            return boxes
        except Exception:
            self.logger.exception("Face detection failed")
            return []

    # Return a single face encoding (or None). Uses RESIZE_FACTOR to speed up encoding.
    def get_face_encoding(self, face_crop: ndarray) -> Optional[ndarray]:
        try:
            if face_crop.size == 0:
                return None
            # resize for speed but keep enough detail; resizing can affect distances: tune RESIZE_FACTOR
            if self.config.RESIZE_FACTOR and self.config.RESIZE_FACTOR != 1.0:
                small = cv2.resize(face_crop, (0, 0), fx=self.config.RESIZE_FACTOR, fy=self.config.RESIZE_FACTOR)
            else:
                small = face_crop
            rgb = cv2.cvtColor(small, cv2.COLOR_BGR2RGB)
            encs = face_recognition.face_encodings(rgb)
            if not encs:
                return None
            return encs[0]
        except Exception:
            self.logger.exception("Failed to get face encoding")
            return None

    # ---------- Enhanced recognition with dynamic labeling ----------
    def _update_detection_history(self, person_id: str, name: str, distance: Optional[float] = None) -> None:
        """
        Update per-person recent votes and consecutive counts.
        Stores last_distance to avoid 'unused parameter' warnings and enable distance-aware logic.
        """
        now = time.time()
        entry = self.detection_history.get(person_id)
        if entry is None:
            entry = {
                'name_counts': defaultdict(int),
                'last_name': None,
                'consecutive': 0,
                'last_update': now,
                'last_distance': None
            }
            self.detection_history[person_id] = entry

        # Reset history if stale
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            entry['name_counts'] = defaultdict(int)
            entry['last_name'] = None
            entry['consecutive'] = 0
            entry['last_distance'] = None

        # Tally vote and consecutive
        entry['name_counts'][name] += 1
        if entry['last_name'] == name:
            entry['consecutive'] += 1
        else:
            entry['last_name'] = name
            entry['consecutive'] = 1
        entry['last_update'] = now
        entry['last_distance'] = distance

    def _confirm_recognition(self, person_id: str, name: str, distance: float) -> bool:
        """Confirm recognition using vote / consecutive / distance rules."""
        now = time.time()
        entry = self.detection_history.get(person_id)
        if not entry:
            return False
        if now - entry['last_update'] > self.config.RECOGNITION_TIME_WINDOW:
            return False
        # Immediate acceptance if distance is confidently low
        if name != self.config.UNKNOWN_LABEL and distance <= self.config.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        # votes
        votes = entry['name_counts'].get(name, 0)
        if name != self.config.UNKNOWN_LABEL and votes >= self.config.RECOGNITION_MIN_VOTES:
            return True
        if name != self.config.UNKNOWN_LABEL and entry['consecutive'] >= self.config.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        return False

    def confirm_face(self, face_encoding: ndarray, bbox: Tuple[int, int, int, int]) -> Tuple[Optional[str], bool, Optional[float]]:
        """
        Given a face encoding, find best match and decide whether the match is confirmed.
        Returns: (confirmed_name_or_None, confirmed_bool, best_distance)
        """
        if not self.known_face_encodings:
            return None, False, None
        try:
            distances = face_recognition.face_distance(self.known_face_encodings, face_encoding)
        except Exception:
            self.logger.exception("face_distance failed")
            return None, False, None
        if len(distances) == 0:
            return None, False, None
        best_idx = int(np.argmin(distances))
        best_dist = float(distances[best_idx])
        # determine candidate name by using the recognition distance threshold (consistent)
        candidate_name = self.known_face_names[best_idx] if best_dist <= self.config.RECOGNITION_DISTANCE_THRESHOLD else self.config.UNKNOWN_LABEL
        person_id = self._get_person_id(bbox)
        self._update_detection_history(person_id, candidate_name, best_dist)
        confirmed = self._confirm_recognition(person_id, candidate_name, best_dist)
        if confirmed and candidate_name != self.config.UNKNOWN_LABEL:
            return candidate_name, True, best_dist
        # If candidate_name is UNKNOWN and it's confirmed (multiple unknown votes) consider it an unknown confirmed
        if confirmed and candidate_name == self.config.UNKNOWN_LABEL:
            return self.config.UNKNOWN_LABEL, True, best_dist
        return None, False, best_dist

    def _get_person_label(self, person_result: Dict[str, Any]) -> str:
        """Generate appropriate label for person based on detection and recognition status"""
        if person_result.get('confirmed_known'):
            return f"{person_result['name']} ✓"
        elif person_result.get('recognized'):
            if person_result.get('name'):
                return f"{person_result['name']} ?"
            else:
                return "Recognizing..."
        elif person_result.get('face_boxes'):
            return "Face Detected"
        else:
            return self.config.PERSON_LABEL

    def _get_person_color(self, person_result: Dict[str, Any]) -> Tuple[int, int, int]:
        """Get color for person bounding box based on status"""
        if person_result.get('confirmed_known'):
            return self.config.COLOR_GREEN  # Confirmed known person
        elif person_result.get('recognized'):
            return self.config.COLOR_CYAN  # Recognition in progress
        elif person_result.get('face_boxes'):
            return self.config.COLOR_BLUE  # Face detected but not recognized
        else:
            return self.config.COLOR_ORANGE  # Just person detection

    # ---------- FIXED Alerting Logic ----------
    def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
        objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
        if is_known:
            self.logger.info("Known person confirmed: %s; objects: %s (no alarm)", person_name, objects_str)
            return
        
        # Only trigger alarm for unknown persons
        self.logger.warning("ALERT: UNKNOWN person confirmed; objects: %s", objects_str)
        if self.alarm_loaded:
            try:
                if not pygame.mixer.music.get_busy():
                    pygame.mixer.music.play()
                    self.logger.info("Alarm triggered for unknown person")
            except Exception:
                self.logger.exception("Failed to start alarm")

    def process_alert(self, candidate_name: Optional[str], confirmed: bool, detected_objects: List[str], person_id: str) -> None:
        """
        Only triggers alarm for a confirmed unknown person (confirmed True and candidate_name == UNKNOWN_LABEL).
        Uses person-specific cooldown to prevent repeated alarms for the same person.
        """
        if not confirmed:
            return
            
        now = time.time()
        
        # Check global cooldown
        if now - self.last_alert_time <= self.config.ALERT_COOLDOWN:
            return
            
        # Check person-specific cooldown
        if person_id in self.alerted_persons:
            if now - self.alerted_persons[person_id] <= self.config.ALERT_COOLDOWN:
                return
        
        if candidate_name == self.config.UNKNOWN_LABEL:
            # Unknown person - trigger alarm
            self._trigger_alert(candidate_name, detected_objects, is_known=False)
            self.last_alert_time = now
            self.alerted_persons[person_id] = now
        else:
            # Known person - log but do not alarm
            self._trigger_alert(candidate_name, detected_objects, is_known=True)
    #     # ---------- FIXED Alerting Logic ----------
    # def _trigger_alert(self, person_name: str, detected_objects: List[str], is_known: bool) -> None:
    #     objects_str = ", ".join(set(detected_objects)) if detected_objects else "None"
    #     if is_known:
    #         self.logger.info("Known person confirmed: %s; objects: %s (no alarm)", person_name, objects_str)
    #         return

    #     # Only trigger alarm for unknown persons
    #     self.logger.warning("⚠️ ALERT: UNKNOWN or unrecognized person detected; objects: %s", objects_str)
    #     if self.alarm_loaded:
    #         try:
    #             if not pygame.mixer.music.get_busy():
    #                 pygame.mixer.music.play()
    #                 self.logger.info("Alarm triggered for unknown/unrecognized person")
    #         except Exception:
    #             self.logger.exception("Failed to start alarm")

    # def process_alert(self, candidate_name: Optional[str], confirmed: bool, detected_objects: List[str], person_id: str) -> None:
    #     """
    #     Triggers alarm when an unknown or unrecognized person is confirmed.
    #     Uses cooldown per person and global cooldown to prevent alarm spam.
    #     """
    #     if not confirmed:
    #         return

    #     now = time.time()
    #     if now - self.last_alert_time <= self.config.ALERT_COOLDOWN:
    #         return

    #     if person_id in self.alerted_persons and now - self.alerted_persons[person_id] <= self.config.ALERT_COOLDOWN:
    #         return

    #     # If name is missing or explicitly unknown
    #     if not candidate_name or candidate_name == self.config.UNKNOWN_LABEL:
    #         self._trigger_alert(self.config.UNKNOWN_LABEL, detected_objects, is_known=False)
    #         self.last_alert_time = now
    #         self.alerted_persons[person_id] = now
    #     else:
    #         self._trigger_alert(candidate_name, detected_objects, is_known=True)

    # ---------- Enhanced Drawing ----------
    def draw_detections(self, frame: ndarray, detections: List[Dict[str, Any]], person_results: List[Dict[str, Any]]) -> ndarray:
        display = frame.copy()
        
        # Draw object detections first
        for det in detections:
            if det['class_name'] in self.config.OBJECTS_OF_INTEREST and det['class_name'] != 'person':
                x1, y1, x2, y2 = det['bbox']
                cv2.rectangle(display, (x1, y1), (x2, y2), self.config.COLOR_YELLOW, 2)
                label = f"{det['class_name']}: {det['confidence']:.2f}"
                cv2.putText(display, label, (x1, y1 - 10), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, self.config.FONT_THICKNESS)

        # Draw person detections with dynamic labels
        for p in person_results:
            x1, y1, x2, y2 = p['bbox']
            
            # Get appropriate color and label based on detection status
            color = self._get_person_color(p)
            label = self._get_person_label(p)
            
            # Draw bounding box
            cv2.rectangle(display, (x1, y1), (x2, y2), color, 2)
            
            # Draw label with background for better visibility
            label_bg_size = cv2.getTextSize(label, self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.FONT_THICKNESS)[0]
            cv2.rectangle(display, (x1, y1 - label_bg_size[1] - 10), (x1 + label_bg_size[0] + 10, y1), color, -1)
            cv2.putText(display, label, (x1 + 5, y1 - 5), self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.COLOR_WHITE, self.config.FONT_THICKNESS)
            
            # Draw face boxes if any
            for fx, fy, fw, fh in p.get('face_boxes', []):
                cv2.rectangle(display, (x1 + fx, y1 + fy), (x1 + fx + fw, y1 + fy + fh), self.config.COLOR_BLUE, 1)
                
            # Add confidence if available
            if p.get('confidence'):
                conf_text = f"Conf: {p['confidence']:.2f}"
                cv2.putText(display, conf_text, (x1, y2 + 20), self.config.FONT, self.config.FONT_SCALE_SMALL, color, 1)
                
        return display

    # ---------- Enhanced Main frame processing ----------
    def process_frame(self, frame: ndarray) -> ndarray:
        self.frame_count += 1
        timer = cv2.getTickCount()
        detections = self.detect_objects(frame)
        person_detections = [d for d in detections if d['class_name'] == 'person']
        detected_objects = [d['class_name'] for d in detections if d['class_name'] in self.config.OBJECTS_OF_INTEREST and d['class_name'] != 'person']

        person_results: List[Dict[str, Any]] = []
        process_faces = (self.frame_count % self.config.FACE_RECOGNITION_INTERVAL) == 0

        for det in person_detections:
            x1, y1, x2, y2 = det['bbox']
            x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
            person_roi = frame[y1:y2, x1:x2]
            person_id = self._get_person_id((x1, y1, x2, y2))
            
            # Initialize result with basic detection info
            result = {
                'bbox': (x1, y1, x2, y2), 
                'recognized': False, 
                'confirmed_known': False, 
                'name': None, 
                'face_boxes': [],
                'confidence': det['confidence']
            }
            
            if person_roi.size == 0:
                person_results.append(result)
                continue
                
            # Always detect faces for visual feedback
            face_boxes = self.detect_faces_mediapipe(person_roi)
            result['face_boxes'] = face_boxes
            
            # Process face recognition on specified intervals
            if process_faces and face_boxes:
                # Try all detected faces, not just the first one
                for i, (fx, fy, fw, fh) in enumerate(face_boxes):
                    h_roi, w_roi = person_roi.shape[:2]
                    x0 = self._clamp(fx, 0, w_roi-1)
                    y0 = self._clamp(fy, 0, h_roi-1)
                    x1f = self._clamp(fx+fw, 0, w_roi)
                    y1f = self._clamp(fy+fh, 0, h_roi)
                    
                    if x1f > x0 and y1f > y0:
                        face_crop = person_roi[y0:y1f, x0:x1f]
                        face_enc = self.get_face_encoding(face_crop)
                        
                        if face_enc is not None:
                            name, confirmed, dist = self.confirm_face(face_enc, (x1, y1, x2, y2))
                            
                            # Update result based on recognition outcome
                            if confirmed:
                                result['recognized'] = (name != self.config.UNKNOWN_LABEL)
                                result['confirmed_known'] = (name != self.config.UNKNOWN_LABEL)
                                result['name'] = name if name != self.config.UNKNOWN_LABEL else None
                                
                                # Process alert with person_id for cooldown tracking
                                self.process_alert(name, confirmed, detected_objects, person_id)
                            else:
                                # Show tentative recognition for immediate feedback
                                result['recognized'] = True
                                result['name'] = name if name and name != self.config.UNKNOWN_LABEL else None
                                # # Tentative or unknown
                                # result['recognized'] = True
                                # result['name'] = name if name and name != self.config.UNKNOWN_LABEL else None
                                # # Force alarm for unknown/unrecognized face
                                # if name == self.config.UNKNOWN_LABEL or name is None:
                                #     self.process_alert(self.config.UNKNOWN_LABEL, True, detected_objects, person_id)
                            
                            # Break after first successful face encoding to save processing
                            break
            
            person_results.append(result)

        annotated = self.draw_detections(frame, detections, person_results)
        elapsed = max(1, cv2.getTickCount() - timer)
        fps = int(cv2.getTickFrequency() / elapsed)
        
        # Enhanced status display
        status_y = 30
        cv2.putText(annotated, f"FPS: {fps}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_MEDIUM, self.config.COLOR_GREEN, self.config.FONT_THICKNESS)
        
        # Show recognition status
        recognized_count = sum(1 for p in person_results if p.get('confirmed_known'))
        detecting_count = sum(1 for p in person_results if p.get('face_boxes'))
        
        status_y += 30
        cv2.putText(annotated, f"Recognized: {recognized_count}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_GREEN, 1)
        
        status_y += 25
        cv2.putText(annotated, f"Detecting: {detecting_count}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_BLUE, 1)
        
        # Show alarm status
        status_y += 25
        alarm_status = "ALARM READY" if self.alarm_loaded else "NO ALARM FILE"
        alarm_color = self.config.COLOR_GREEN if self.alarm_loaded else self.config.COLOR_RED
        cv2.putText(annotated, f"Alarm: {alarm_status}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, alarm_color, 1)
        
        if detected_objects:
            status_y += 25
            cv2.putText(annotated, f"Objects: {', '.join(set(detected_objects))}", (20, status_y), self.config.FONT, self.config.FONT_SCALE_SMALL, self.config.COLOR_YELLOW, 1)
            
        return annotated

    def run(self) -> None:
        cap = cv2.VideoCapture(self.config.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Could not open video capture")
            raise RuntimeError("Video capture failed")
        self.logger.info("Monitoring started. Press 'q' to quit.")
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    self.logger.warning("Failed to grab frame")
                    break
                annotated = self.process_frame(frame)
                cv2.imshow(self.config.WINDOW_NAME, annotated)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Interrupted")
        finally:
            self._cleanup(cap)

    def _cleanup(self, cap) -> None:
        try:
            cap.release()
            cv2.destroyAllWindows()
            if self.mp_face_detection:
                self.mp_face_detection.close()
            try:
                pygame.mixer.quit()
            except Exception:
                pass
            self.logger.info("Shutdown complete")
        except Exception:
            self.logger.exception("Cleanup error")

if __name__ == "__main__":
    cfg = Config()
    system = SecuritySystem(cfg)
    system.run()  # uncomment to run live monitoring

pygame 2.6.1 (SDL 2.28.4, Python 3.12.8)
Hello from the pygame community. https://www.pygame.org/contribute.html


NameError: name 'logging' is not defined

In [1]:
# security_surveillance.py
# Requirements:
#   pip install ultralytics face_recognition mediapipe pygame opencv-python numpy
# Optional (Raspberry Pi buzzer):
#   pip install RPi.GPIO
#
# Configure Config class below before running.

import os
import time
import cv2
import numpy as np
import threading
import logging
from collections import deque, defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Tuple, Dict, Any, Optional

# ultralytics libs  
from ultralytics import solutions
from ultralytics import YOLO
from ultralytics.solutions.config import SolutionConfig
from ultralytics.solutions.solutions import BaseSolution, SolutionAnnotator, SolutionResults
from ultralytics.utils.plotting import colors

# third-party libs
try:
    from ultralytics import YOLO
except Exception as e:
    raise RuntimeError("Install ultralytics: pip install ultralytics") from e

try:
    import face_recognition
except Exception as e:
    raise RuntimeError("Install face_recognition (dlib dependency): pip install face_recognition") from e

try:
    import mediapipe as mp
except Exception as e:
    raise RuntimeError("Install mediapipe: pip install mediapipe") from e

try:
    import pygame
except Exception as e:
    raise RuntimeError("Install pygame: pip install pygame") from e

# Optional Raspberry Pi GPIO (if running on Raspberry Pi with buzzer)
try:
    import RPi.GPIO as GPIO
    HAS_RPI = True
except Exception:
    HAS_RPI = False

# Optional: Telegram notification via bot
import requests

@dataclass
class Config:
    # model & known faces
    MODEL_PATH: str = "yolo11m.pt"  # change to your model
    KNOWN_FACES_DIR: str = "family_members/"
    ALARM_FILE: str = "pols-aagyi-pols.mp3"
    LOG_DIR: str = "security_logs"
    OUTPUT_DIR: str = "security_output"
    # VIDEO_SOURCE: str = 0  # camera index or video path
    VIDEO_SOURCE: str = "media_files/WIN_20251103_14_11_20_Pro.mp4"  # camera index or video path
    FACE_RECOGNITION_INTERVAL: int = 5
    ALERT_COOLDOWN: int = 10  # seconds global cooldown
    PERSON_COOLDOWN: int = 20  # per person cooldown seconds
    YOLO_CONFIDENCE: float = 0.45
    FACE_DETECTION_CONF: float = 0.5
    RECOGNITION_DISTANCE_THRESHOLD: float = 0.45
    RESIZE_FACTOR: float = 0.35

    # Clip saving
    SAVE_CLIP_SECONDS: int = 6  # seconds to save when alarm triggers (uses ring buffer)
    CLIP_FPS: int = 20

    # GPIO buzzer (optional)
    USE_GPIO: bool = False
    BUZZER_PIN: int = 18  # BCM pin; only used if USE_GPIO True and HAS_RPI True
    BUZZER_SECONDS: float = 5.0

    # Telegram
    USE_TELEGRAM: bool = False
    TELEGRAM_BOT_TOKEN: str = ""  # put your bot token
    TELEGRAM_CHAT_ID: str = ""    # put your chat id
    SEND_IMAGE_ON_ALERT: bool = True

    # secure zone: rectangle (x1,y1,x2,y2) relative fraction of frame: (left, top, right, bottom)
    # set to None to consider whole frame as secure zone
    SECURE_ZONE_REL: Optional[Tuple[float, float, float, float]] = (0.0, 0.0, 1.0, 1.0)

    # recognition thresholds & voting
    RECOGNITION_MIN_VOTES: int = 2
    RECOGNITION_CONSECUTIVE_FRAMES: int = 2
    RECOGNITION_TIME_WINDOW: float = 3.0

    # drawing & UI
    WINDOW_NAME: str = "Security Monitoring"
    OBJECTS_OF_INTEREST: List[str] = field(default_factory=lambda: ["person", "car", "truck", "bicycle", "motorcycle", "cell phone", "face"])

class SecuritySurveillance(solutions.VisionEye):
    # def __init__(self, cfg: Config):
    def __init__(self, cfg: Config, *args, known_face_encodings=None, known_face_names=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.cfg = cfg
        os.makedirs(self.cfg.LOG_DIR, exist_ok=True)
        os.makedirs(self.cfg.OUTPUT_DIR, exist_ok=True)
        self.logger = self._setup_logger()
        self._init_models()
        self._load_known_faces()
        self._setup_alarm()
        self.frame_count = 0

        # ring buffer for last N frames to save clip when alarm triggers
        self.ring_buffer = deque(maxlen=int(self.cfg.SAVE_CLIP_SECONDS * self.cfg.CLIP_FPS))
        self.last_alert_time = 0.0
        self.person_alert_times: Dict[str, float] = {}
        self.detection_history: Dict[str, Dict[str, Any]] = {}

        # gpio
        if self.cfg.USE_GPIO and HAS_RPI:
            GPIO.setmode(GPIO.BCM)
            GPIO.setup(self.cfg.BUZZER_PIN, GPIO.OUT)

    def _setup_logger(self):
        logger = logging.getLogger("SecuritySurv")
        if not logger.handlers:
            logger.setLevel(logging.INFO)
            fh = logging.FileHandler(os.path.join(self.cfg.LOG_DIR, f"sec_{datetime.now().strftime('%Y%m%d')}.log"))
            fh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
            logger.addHandler(fh)
            sh = logging.StreamHandler()
            sh.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
            logger.addHandler(sh)
        return logger

    def _init_models(self):
        self.logger.info("Loading YOLO model...")
        self.yolo = YOLO(self.cfg.MODEL_PATH)
        mp_face = mp.solutions.face_detection
        self.mp_detector = mp_face.FaceDetection(model_selection=0, min_detection_confidence=self.cfg.FACE_DETECTION_CONF)
        self.logger.info("Models initialized")

    def _load_known_faces(self):
        self.known_encodings = []
        self.known_names = []
        if not os.path.exists(self.cfg.KNOWN_FACES_DIR):
            self.logger.warning("Known faces directory missing: %s", self.cfg.KNOWN_FACES_DIR)
            return
        for person in os.listdir(self.cfg.KNOWN_FACES_DIR):
            pdir = os.path.join(self.cfg.KNOWN_FACES_DIR, person)
            if not os.path.isdir(pdir):
                continue
            for f in os.listdir(pdir):
                path = os.path.join(pdir, f)
                try:
                    img = face_recognition.load_image_file(path)
                    encs = face_recognition.face_encodings(img)
                    if encs:
                        self.known_encodings.append(encs[0])
                        self.known_names.append(person)
                        self.logger.info("Loaded known face: %s (%s)", person, f)
                    else:
                        self.logger.warning("No face found in %s", path)
                except Exception:
                    self.logger.exception("Failed loading face %s", path)
        self.logger.info("Total known faces: %d", len(self.known_encodings))

    def _setup_alarm(self):
        pygame.mixer.init()
        self.alarm_loaded = False
        if os.path.exists(self.cfg.ALARM_FILE):
            try:
                pygame.mixer.music.load(self.cfg.ALARM_FILE)
                self.alarm_loaded = True
                self.logger.info("Alarm sound loaded")
            except Exception:
                self.logger.exception("Failed to load alarm sound")
        else:
            self.logger.warning("Alarm file not found: %s", self.cfg.ALARM_FILE)

    # ---------- detection helpers ----------
    def _is_in_secure_zone(self, bbox: Tuple[int,int,int,int], frame_shape) -> bool:
        if not self.cfg.SECURE_ZONE_REL:
            return True
        h, w = frame_shape[:2]
        rx1, ry1, rx2, ry2 = self.cfg.SECURE_ZONE_REL
        sx1, sy1, sx2, sy2 = int(rx1*w), int(ry1*h), int(rx2*w), int(ry2*h)
        x1,y1,x2,y2 = bbox
        # check center point inside zone
        cx, cy = (x1+x2)//2, (y1+y2)//2
        return (sx1 <= cx <= sx2) and (sy1 <= cy <= sy2)

    def _get_person_id(self, bbox: Tuple[int,int,int,int]) -> str:
        x1,y1,x2,y2 = bbox
        cx, cy = (x1+x2)//2, (y1+y2)//2
        return f"{cx//50}_{cy//50}"

    def _save_snapshot(self, frame, prefix="unknown"):
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        fname = os.path.join(self.cfg.OUTPUT_DIR, f"{prefix}_snap_{ts}.jpg")
        cv2.imwrite(fname, frame)
        self.logger.info("Snapshot saved: %s", fname)
        return fname

    def _save_clip_from_buffer(self, fps=None, prefix="unknown"):
        if fps is None:
            fps = self.cfg.CLIP_FPS
        if not self.ring_buffer:
            self.logger.warning("Ring buffer empty, no clip to save")
            return None
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        fname = os.path.join(self.cfg.OUTPUT_DIR, f"{prefix}_clip_{ts}.avi")
        h, w = self.ring_buffer[0].shape[:2]
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter(fname, fourcc, fps, (w,h))
        for f in self.ring_buffer:
            out.write(f)
        out.release()
        self.logger.info("Clip saved: %s", fname)
        return fname

    def _play_alarm_sound(self):
        if not self.alarm_loaded:
            self.logger.warning("No alarm sound loaded")
            return
        try:
            if not pygame.mixer.music.get_busy():
                pygame.mixer.music.play()
                self.logger.info("Playing alarm sound")
        except Exception:
            self.logger.exception("Failed to play alarm")

    def _trigger_buzzer(self):
        if not (self.cfg.USE_GPIO and HAS_RPI):
            return
        try:
            GPIO.output(self.cfg.BUZZER_PIN, GPIO.HIGH)
            time.sleep(self.cfg.BUZZER_SECONDS)
            GPIO.output(self.cfg.BUZZER_PIN, GPIO.LOW)
            self.logger.info("Buzzer cycle complete")
        except Exception:
            self.logger.exception("GPIO buzzer failed")

    def _send_telegram(self, text: str, image_path: Optional[str]=None):
        if not self.cfg.USE_TELEGRAM or not self.cfg.TELEGRAM_BOT_TOKEN or not self.cfg.TELEGRAM_CHAT_ID:
            return
        try:
            token = self.cfg.TELEGRAM_BOT_TOKEN
            chat_id = self.cfg.TELEGRAM_CHAT_ID
            url = f"https://api.telegram.org/bot{token}/sendMessage"
            payload = {"chat_id": chat_id, "text": text}
            resp = requests.post(url, data=payload, timeout=10)
            if resp.ok:
                self.logger.info("Telegram message sent")
            if image_path and self.cfg.SEND_IMAGE_ON_ALERT:
                url2 = f"https://api.telegram.org/bot{token}/sendPhoto"
                with open(image_path, "rb") as f:
                    files = {"photo": f}
                    data = {"chat_id": chat_id, "caption": text}
                    r2 = requests.post(url2, files=files, data=data, timeout=20)
                    if r2.ok:
                        self.logger.info("Telegram image sent")
        except Exception:
            self.logger.exception("Telegram send failed")

    # ---------- recognition voting ----------
    def _update_history(self, person_id: str, name: str, dist: Optional[float]):
        now = time.time()
        e = self.detection_history.get(person_id)
        if not e:
            e = {"name_counts": defaultdict(int), "last_name": None, "consecutive":0, "last_update": now, "last_distance": None}
            self.detection_history[person_id] = e
        if now - e["last_update"] > self.cfg.RECOGNITION_TIME_WINDOW:
            e["name_counts"] = defaultdict(int)
            e["last_name"] = None
            e["consecutive"] = 0
            e["last_distance"] = None
        e["name_counts"][name] += 1
        if e["last_name"] == name:
            e["consecutive"] += 1
        else:
            e["last_name"] = name
            e["consecutive"] = 1
        e["last_update"] = now
        e["last_distance"] = dist

    def _confirm_recognition(self, person_id: str, name: str, dist: Optional[float]) -> bool:
        e = self.detection_history.get(person_id)
        if not e:
            return False
        now = time.time()
        if now - e["last_update"] > self.cfg.RECOGNITION_TIME_WINDOW:
            return False
        # strong acceptance if low distance
        if name != "UNKNOWN" and dist is not None and dist <= self.cfg.RECOGNITION_DISTANCE_THRESHOLD:
            return True
        # votes
        votes = e["name_counts"].get(name, 0)
        if name != "UNKNOWN" and votes >= self.cfg.RECOGNITION_MIN_VOTES:
            return True
        if name != "UNKNOWN" and e["consecutive"] >= self.cfg.RECOGNITION_CONSECUTIVE_FRAMES:
            return True
        # if UNKNOWN gets multiple votes treat as confirmed unknown
        if name == "UNKNOWN" and votes >= self.cfg.RECOGNITION_MIN_VOTES:
            return True
        return False

    def _match_face(self, encoding) -> Tuple[Optional[str], Optional[float]]:
        if not self.known_encodings:
            return None, None
        try:
            dists = face_recognition.face_distance(self.known_encodings, encoding)
        except Exception:
            self.logger.exception("face_distance error")
            return None, None
        idx = int(np.argmin(dists))
        best = float(dists[idx])
        if best <= self.cfg.RECOGNITION_DISTANCE_THRESHOLD:
            return self.known_names[idx], best
        else:
            return "UNKNOWN", best

    # ---------- alert logic ----------
    def _should_alert(self, person_id: str, is_unknown: bool) -> bool:
        now = time.time()
        if not is_unknown:
            return False
        if now - self.last_alert_time <= self.cfg.ALERT_COOLDOWN:
            return False
        last = self.person_alert_times.get(person_id)
        if last and (now - last) <= self.cfg.PERSON_COOLDOWN:
            return False
        return True

    def _handle_alert_actions(self, frame_for_save, person_bbox):
        # run non-blocking actions: play sound, buzzer, save files, send telegram
        # save snapshot and clip
        snap = self._save_snapshot(frame_for_save, prefix="unknown")
        clip = self._save_clip_from_buffer(prefix="unknown")
        self.last_alert_time = time.time()
        pid = self._get_person_id(person_bbox)
        self.person_alert_times[pid] = self.last_alert_time

        # spawn threads for alarm and notifications
        threads = []
        t_sound = threading.Thread(target=self._play_alarm_sound, daemon=True)
        threads.append(t_sound)
        if self.cfg.USE_GPIO and HAS_RPI:
            t_buzz = threading.Thread(target=self._trigger_buzzer, daemon=True)
            threads.append(t_buzz)
        if self.cfg.USE_TELEGRAM and self.cfg.TELEGRAM_BOT_TOKEN:
            text = f"ALERT: Unknown person detected at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            t_tele = threading.Thread(target=self._send_telegram, args=(text, snap), daemon=True)
            threads.append(t_tele)
        for t in threads:
            t.start()
        self.logger.warning("Alert actions started (threads spawned)")

    # ---------- core processing ----------
    def process_frame(self, frame: np.ndarray) -> np.ndarray:
        self.frame_count += 1
        # keep in ring buffer (for clip saving)
        self.ring_buffer.append(frame.copy())

        annotated = frame.copy()
        # self.extract_tracks(frame.copy())
        annotator = SolutionAnnotator(annotated, line_width=self.line_width)
        
        # detect objects
        try:
            results = self.yolo.track(frame, conf=self.cfg.YOLO_CONFIDENCE, persist=True)
        except Exception:
            self.logger.exception("YOLO inference failed")
            results = []

        detections = []
        for res in results:
            if getattr(res, "boxes", None) is None:
                continue
            for box in res.boxes:
                x1,y1,x2,y2 = map(int, box.xyxy[0])
                cls = int(box.cls[0])
                conf = float(box.conf[0])
                t_id = int(box.id[0]) if box.id is not None else None
                name = res.names[cls] if hasattr(res, "names") else str(cls)
                detections.append({"bbox":(x1,y1,x2,y2), "class_name":name, "confidence":conf, "box": box, "cls": cls, "t_id": t_id})

        # person detections
        persons = [d for d in detections if d["class_name"] == "person"]
        objects = [d["class_name"] for d in detections if d["class_name"] in self.cfg.OBJECTS_OF_INTEREST and d["class_name"]!="person"]

        process_faces = (self.frame_count % self.cfg.FACE_RECOGNITION_INTERVAL) == 0
        for p in persons:
            box, cls, conf, t_id = p["box"], p["cls"], p["confidence"], p["t_id"]
            # extract bbox and clamp
            x1, y1, x2, y2 = p["bbox"]
            x1, y1, x2, y2 = p["bbox"]
            x1 = max(0, x1); y1 = max(0, y1)
            x2 = min(frame.shape[1]-1, x2); y2 = min(frame.shape[0]-1, y2)
            roi = frame[y1:y2, x1:x2]
            pid = self._get_person_id((x1, y1, x2, y2))

            # draw person box basic
            cv2.rectangle(annotator, (x1, y1), (x2, y2), (0,165,255), 2)

            # skip if ROI empty
            if roi.size == 0:
                continue

            # check secure zone
            in_zone = self._is_in_secure_zone((x1, y1, x2, y2), frame.shape)
            if not in_zone:
                # label as outside zone
                cv2.putText(annotator, "Outside Zone", (x1, y1-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200,200,200), 1)
                continue

            # face detection using mediapipe for visual box and cropping
            faces = []
            try:
                rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
                res = self.mp_detector.process(rgb)
                if res and getattr(res, "detections", None):
                    h, w = roi.shape[:2]
                    for det in res.detections:
                        r = det.location_data.relative_bounding_box
                        fx = int(r.xmin * w)
                        fy = int(r.ymin * h)
                        fw = int(r.width * w)
                        fh = int(r.height * h)
                        faces.append((fx, fy, fw, fh))
                        cv2.rectangle(annotator, (x1+fx, y1+fy), (x1+fx+fw, y1+fy+fh), (255,0,0), 1)
            except Exception:
                self.logger.exception("Face detection failure")

            # perform face recognition at intervals and if face found
            recognized_name = None
            is_confirmed_known = False
            is_confirmed_unknown = False

            if process_faces and faces:
                # Use the first detected face for simplicity and performance
                fx, fy, fw, fh = faces[0]
                fx0, fy0 = max(0, fx), max(0, fy)
                fx1, fy1 = min(roi.shape[1], fx + fw), min(roi.shape[0], fy + fh)

                if fx1 > fx0 and fy1 > fy0:
                    face_crop = roi[fy0:fy1, fx0:fx1]
                    try:
                        rgb_face = cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB)
                        encodings = face_recognition.face_encodings(rgb_face)
                        if encodings:
                            name, dist = self._match_face(encodings[0])
                            vote_name = name if name is not None else "UNKNOWN"
                            self._update_history(pid, vote_name, dist)
                    except Exception:
                        self.logger.exception("Face encoding or matching error")

            # --- Confirmation and Alerting Logic ---
            history = self.detection_history.get(pid)
            if history:
                # Check if confirmed as any known person
                for known_name in self.known_names:
                    if self._confirm_recognition(pid, known_name, history.get("last_distance")):
                        is_confirmed_known = True
                        recognized_name = known_name
                        break
                # If not a known person, check if confirmed as unknown
                if not is_confirmed_known:
                    if self._confirm_recognition(pid, "UNKNOWN", history.get("last_distance")):
                        is_confirmed_unknown = True

            # --- Annotation and Action ---
            # if is_confirmed_known:
            #     cv2.putText(annotated, f"{recognized_name} ✓", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0,255,0), 2)
            # elif is_confirmed_unknown:
            #     if self._should_alert(pid, is_unknown=True):
            #         self._handle_alert_actions(frame, (x1, y1, x2, y2))
            #     cv2.putText(annotated, "UNKNOWN - ALERT", (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0,0,255), 2)
            if is_confirmed_known:
                color = (0, 255, 0)  # Green for Known
                label = f"{recognized_name}"
                base_label = self.adjust_box_label(cls, conf, t_id)
                final_label = f"{label}: {base_label}" if base_label else label
                annotator.box_label(box, label=final_label, color=color)
            elif is_confirmed_unknown:
                if self._should_alert(pid, is_unknown=True):
                    self._handle_alert_actions(frame, (x1, y1, x2, y2))
                color = (0, 0, 255)  # Red for Unknown
                label_text = f"Unknown ({conf:.2f})"
                base_label = self.adjust_box_label(int(cls), float(conf) if conf is not None else 0.0, t_id)

                # Custom label for 'person' class (COCO id 0).
                prefix = str(self.CFG.get("person_label_prefix", label_text))
                custom_label = f"{prefix}:"
                final_label = f"{custom_label} {base_label}" if base_label else custom_label
                annotator.box_label(box, label=final_label, color=color)
            else:
                # For an unconfirmed person, use default labeling
                annotator.box_label(box, label=self.adjust_box_label(cls, conf, t_id), color=colors(t_id if t_id is not None else cls, True))
                
            annotator.visioneye(box, self.vision_points.get(t_id, None))

        # draw secure zone
        if self.cfg.SECURE_ZONE_REL:
            h,w = frame.shape[:2]
            rx1,ry1,rx2,ry2 = self.cfg.SECURE_ZONE_REL
            sx1,sy1,sx2,sy2 = int(rx1*w), int(ry1*h), int(rx2*w), int(ry2*h)
            cv2.rectangle(annotator, (sx1,sy1), (sx2,sy2), (0,255,0), 1)
            cv2.putText(annotator, "Secure Zone", (sx1+5, sy1+15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)

        # show fps & status
        fps = int(1.0 / max(1e-3, (time.time() - getattr(self, "_last_t", time.time()))))
        self._last_t = time.time()
        cv2.putText(annotator, f"FPS: {fps}", (10,20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

        # return annotator
        plot_im = annotator.result()
        self.display_output(plot_im) 
        
        
        # Display track count on the frame
        total_tracks = len(getattr(self, "track_ids", []))
        cv2.putText(plot_im, f"Tracks: {total_tracks}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

        return SolutionResults(plot_im=plot_im, total_tracks=len(self.track_ids))

    def run(self):
        cap = cv2.VideoCapture(self.cfg.VIDEO_SOURCE)
        if not cap.isOpened():
            self.logger.error("Cannot open video source: %s", self.cfg.VIDEO_SOURCE)
            return
        self.logger.info("Starting monitoring. Press 'q' to quit.")
        try:
            while True: 
                # ret, frame = cap.read()
                # if not ret:
                #     self.logger.warning("Frame read failed or video ended")
                #     break
                # # annotated = self.process_frame(frame)
                # annotator = process_frame(frame)
                # cv2.imshow(self.cfg.WINDOW_NAME, annotator)
                success, im0 = cap.read()

                if not success:
                    print("Video frame is empty or video processing has been successfully completed.")
                    break

                results = self.process_frame(im0)

                print(results) 
                
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
        except KeyboardInterrupt:
            self.logger.info("Interrupted by user")
        finally:
            cap.release()
            cv2.destroyAllWindows()
            if self.cfg.USE_GPIO and HAS_RPI:
                GPIO.cleanup()
            self.logger.info("Shutdown complete")

if __name__ == "__main__":
    cfg = Config()
    # === USER: adjust configuration below as needed ===
    # cfg.VIDEO_SOURCE = "your_camera_or_video_path"
    # cfg.MODEL_PATH = "yolov8n.pt"
    # cfg.KNOWN_FACES_DIR = "./known_faces"
    # cfg.ALARM_FILE = "./alarm.mp3"
    # cfg.USE_TELEGRAM = True
    # cfg.TELEGRAM_BOT_TOKEN = "<token>"
    # cfg.TELEGRAM_CHAT_ID = "<chat_id>"
    # cfg.USE_GPIO = True  # only on Raspberry Pi
    # ===================================================
    system = SecuritySurveillance(cfg=cfg, model=cfg.MODEL_PATH, source=cfg.VIDEO_SOURCE)
    system.run()


pygame 2.6.1 (SDL 2.28.4, Python 3.12.8)
Hello from the pygame community. https://www.pygame.org/contribute.html
Ultralytics Solutions:  {'source': 'media_files/WIN_20251103_14_11_20_Pro.mp4', 'model': 'yolo11m.pt', 'classes': None, 'show_conf': True, 'show_labels': True, 'region': None, 'colormap': 21, 'show_in': True, 'show_out': True, 'up_angle': 145.0, 'down_angle': 90, 'kpts': [6, 8, 10], 'analytics_type': 'line', 'figsize': (12.8, 7.2), 'blur_ratio': 0.5, 'vision_point': (20, 20), 'crop_dir': 'cropped-detections', 'json_file': None, 'line_width': 2, 'records': 5, 'fps': 30.0, 'max_hist': 5, 'meter_per_pixel': 0.05, 'max_speed': 120, 'show': False, 'iou': 0.7, 'conf': 0.25, 'device': None, 'max_det': 300, 'half': False, 'tracker': 'botsort.yaml', 'verbose': True, 'data': 'images'}


2025-11-22 01:22:29,134 - INFO - Loading YOLO model...
2025-11-22 01:22:29,252 - INFO - Models initialized
2025-11-22 01:22:33,089 - INFO - Loaded known face: robin (robin_01.jpg)
2025-11-22 01:22:34,005 - INFO - Loaded known face: robin (robin_02.jpg)
2025-11-22 01:22:34,906 - INFO - Loaded known face: robin (robin_03.jpg)
2025-11-22 01:22:39,581 - INFO - Loaded known face: robin (WIN_20251008_18_56_08_Pro.jpg)
2025-11-22 01:22:39,582 - INFO - Total known faces: 4
2025-11-22 01:22:39,873 - INFO - Alarm sound loaded
2025-11-22 01:22:39,933 - INFO - Starting monitoring. Press 'q' to quit.



0: 384x640 1 person, 1 cup, 1 chair, 1 laptop, 67.5ms
Speed: 3.7ms preprocess, 67.5ms inference, 116.3ms postprocess per image at shape (1, 3, 384, 640)


2025-11-22 01:22:44,552 - INFO - Shutdown complete


error: OpenCV(4.11.0) :-1: error: (-5:Bad argument) in function 'rectangle'
> Overload resolution failed:
>  - img is not a numpy array, neither a scalar
>  - img is not a numpy array, neither a scalar
>  - Expected Ptr<cv::UMat> for argument 'img'
>  - Expected Ptr<cv::UMat> for argument 'img'
