In [7]:
import sys
import os
import json

# Add the project directory to the path
project_dir = os.path.abspath('..')
if project_dir not in sys.path:
    sys.path.append(project_dir)

# Now you can import your modules
from src.data.video_stream import VideoStream
from src.processing.motion_detector import MotionDetector
from src.data.database import get_database, test_connection
from src.utils.config_loader import load_config

In [8]:
# Load configuration
config = load_config()
print(json.dumps(config, indent=4))

{
    "video": {
        "source": 0,
        "frame_width": 640,
        "frame_height": 480,
        "fps": 30
    },
    "motion_detection": {
        "enabled": true,
        "threshold": 25,
        "min_area": 500
    },
    "face_recognition": {
        "enabled": true,
        "model": "hog",
        "recognition_threshold": 0.6,
        "unknown_face_threshold": 0.7
    },
    "database": {
        "mongodb_uri": "mongodb://localhost:27017/",
        "database_name": "face_recognition_db",
        "face_collection": "faces",
        "event_collection": "recognition_events"
    }
}


In [9]:
# Test MongoDB connection
test_connection()

Successfully connected to MongoDB. Collections: []


True

In [10]:
# Test VideoStream
# Note: This will open a window with your webcam feed
stream = VideoStream().start()
print(f"Stream properties: {stream.width}x{stream.height} @ {stream.fps}fps")

import cv2
try:
    for _ in range(100):  # Show 100 frames
        ret, frame = stream.read()
        if not ret:
            break
        
        cv2.imshow('Video Test', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    stream.stop()
    cv2.destroyAllWindows()

Stream properties: 640x480 @ 30.0fps


In [11]:
# Test Motion Detection
# This will show video with motion detection
stream = VideoStream().start()
detector = MotionDetector()

try:
    for _ in range(200):  # Show 200 frames with motion detection
        ret, frame = stream.read()
        if not ret:
            break
        
        motion, processed_frame = detector.detect(frame)
        cv2.imshow('Motion Detection', processed_frame)
        
        if motion:
            print("Motion detected!")
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    stream.stop()
    cv2.destroyAllWindows()

Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detected!
Motion detecte

In [None]:
# Test Face Detection
from src.processing.face_processor import FaceProcessor

# Create face processor
face_processor = FaceProcessor(detection_model="hog")

# Test with video stream
stream = VideoStream().start()

try:
    for _ in range(100):  # Process 100 frames
        ret, frame = stream.read()
        if not ret:
            break
        
        # Process frame for face detection only
        processed_frame, results = face_processor.process_frame(frame, detect_only=True)
        
        # Display number of faces detected
        cv2.putText(
            processed_frame, 
            f"Faces: {len(results)}", 
            (10, 30), 
            cv2.FONT_HERSHEY_SIMPLEX, 
            1, 
            (0, 0, 255), 
            2
        )
        
        # Display result
        cv2.imshow("Face Detection", processed_frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
finally:
    stream.stop()
    cv2.destroyAllWindows()

In [None]:
# Test Face Database
from src.data.face_database import FaceDatabase

# Create database connection
face_db = FaceDatabase()

# Print counts
face_count = face_db.faces_collection.count_documents({})
event_count = face_db.events_collection.count_documents({})

print(f"Connected to MongoDB. Found {face_count} faces and {event_count} recognition events.")

# Close connection
face_db.close()

In [None]:
# Test Integrated System
from src.processing.video_processor import VideoProcessor
from src.data.video_stream import VideoStream
from src.processing.motion_detector import MotionDetector
from src.processing.face_processor import FaceProcessor
from src.data.face_database import FaceDatabase
import time

# Create components
stream = VideoStream(0).start()
motion_detector = MotionDetector(threshold=20, min_area=500)
face_processor = FaceProcessor(detection_model="hog")
face_db = FaceDatabase()

# Create video processor
processor = VideoProcessor(
    video_stream=stream,
    motion_detector=motion_detector,
    face_processor=face_processor,
    face_database=face_db
)

try:
    # Start processing
    processor.start()
    
    # Let it run for 30 seconds
    time.sleep(30)
    
except KeyboardInterrupt:
    print("Interrupted by user")
    
finally:
    # Clean up
    processor.stop()
    stream.stop()
    face_db.close()
    print("Processing stopped")