In [5]:
import cv2
import numpy as np
import os
from numpy.linalg import norm
import tflite_runtime.interpreter as tflite

# -----------------------------
# SETUP
# -----------------------------
MODEL_PATH = "facenet.tflite"
EMBEDDINGS_DIR = "face_embeddings"
FACE_DETECTION_SCALE = 1.3
FACE_DETECTION_MIN_NEIGHBORS = 5
MATCH_THRESHOLD = 0.7

# Load Haar Cascade for face detection
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# Load TFLite model
interpreter = tflite.Interpreter(model_path=MODEL_PATH)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Ensure embeddings directory exists
os.makedirs(EMBEDDINGS_DIR, exist_ok=True)

# Load saved embeddings
def load_embeddings():
    embeddings = {}
    for file in os.listdir(EMBEDDINGS_DIR):
        if file.endswith(".npy"):
            name = os.path.splitext(file)[0]
            embeddings[name] = np.load(os.path.join(EMBEDDINGS_DIR, file))
    return embeddings

# Preprocess face image

def preprocess_face(face_img):
    face_resized = cv2.resize(face_img, (160, 160))
    face_normalized = (face_resized / 127.5) - 1.0  # normalize to [-1, 1]
    return face_normalized.astype(np.float32)

# Get embedding from TFLite model
def get_embedding(face_img):
    input_data = np.expand_dims(face_img, axis=0)
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])
    return output_data[0]

# Cosine similarity
def cosine_similarity(a, b):
    return np.dot(a, b) / (norm(a) * norm(b))

# Save embedding to file
def save_embedding(name, embedding):
    name = ''.join(c for c in name if c.isalnum())
    path = os.path.join(EMBEDDINGS_DIR, f"{name}.npy")
    np.save(path, embedding)
    print(f"Saved embedding for {name}")

# -----------------------------
# MAIN LOOP
# -----------------------------
cap = cv2.VideoCapture(0)
saved_embeddings = load_embeddings()
save_mode = False
input_name = ""

print("Press 's' to save a new face, 'q' to quit")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, FACE_DETECTION_SCALE, FACE_DETECTION_MIN_NEIGHBORS)

    for (x, y, w, h) in faces:
        face_crop = frame[y:y+h, x:x+w]
        processed_face = preprocess_face(face_crop)
        embedding = get_embedding(processed_face)

        match_found = False
        for name, saved_emb in saved_embeddings.items():
            sim = cosine_similarity(embedding, saved_emb)
            if sim > MATCH_THRESHOLD:
                cv2.putText(frame, f"{name} ({sim:.2f})", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                match_found = True
                break

        if not match_found:
            cv2.putText(frame, "Unknown", (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)

        if save_mode:
            save_embedding(input_name, embedding)
            saved_embeddings = load_embeddings()
            save_mode = False
            input_name = ""

    cv2.imshow("Face Recognition (TFLite)", frame)
    key = cv2.waitKey(1) & 0xFF

    if key == ord('q'):
        break
    elif key == ord('s'):
        input_name = input("Enter name: ").strip()
        if input_name:
            save_mode = True

cap.release()
cv2.destroyAllWindows()


ModuleNotFoundError: No module named 'tflite_runtime'

In [None]:
# import torch
# import cv2
# import numpy as np
# from facenet_pytorch import MTCNN, InceptionResnetV1
# from scipy.spatial.distance import cosine
# from torchvision import transforms

# # Initialize MTCNN (Face Detection)
# mtcnn = MTCNN()

# # Initialize Inception Resnet (Face Recognition)
# model = InceptionResnetV1(pretrained='vggface2').eval()

# # Load the saved face embedding (if it exists)
# saved_embedding = torch.load('my_face_embeddings.pth')

# # Load the webcam
# cap = cv2.VideoCapture(0)

# # Threshold for face recognition (cosine distance threshold)
# threshold = 0.6

# # Image preprocessing pipeline
# preprocess = transforms.Compose([
#     transforms.ToPILImage(),
#     transforms.Resize((160, 160)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0, 0, 0], std=[255, 255, 255])  # Normalize to [-1, 1]
# ])

# # Function to recognize face by comparing embeddings
# def recognize_face(face_tensor):
#     # Get the face embedding
#     embedding = model(face_tensor)
    
#     # Flatten both the saved embedding and the current face embedding to 1D
#     embedding_flat = embedding.flatten()
#     saved_embedding_flat = saved_embedding.flatten()
    
#     # Calculate the cosine distance between saved embedding and the current face embedding
#     distance = cosine(saved_embedding_flat.detach().numpy(), embedding_flat.detach().numpy())
#     print(f'Cosine distance: {distance}')
    
#     if distance < threshold:
#         print("User recognized: Gaindu")  # Print the recognized user on the console
#         return "Gaindu"
#     else:
#         print("User recognized: Unknown")  # Print "Unknown" on the console if not recognized
#         return "Unknown"

# while True:
#     ret, frame = cap.read()
#     if not ret:
#         break

#     # Detect faces
#     faces, _ = mtcnn.detect(frame)
    
#     if faces is not None:
#         for face in faces:
#             # Draw bounding box around the detected face
#             cv2.rectangle(frame, 
#                           (int(face[0]), int(face[1])), 
#                           (int(face[2]), int(face[3])), 
#                           (0, 255, 0), 2)
            
#             # Crop the face from the frame
#             face_crop = frame[int(face[1]):int(face[3]), int(face[0]):int(face[2])]
            
#             if face_crop.size != 0:
#                 # Preprocess the cropped face
#                 face_tensor = preprocess(face_crop)

#                 # Add batch dimension (for model compatibility)
#                 face_tensor = face_tensor.unsqueeze(0)

#                 # Recognize the face
#                 label = recognize_face(face_tensor)

#                 # Display the label on the image
#                 cv2.putText(frame, label, (int(face[0]), int(face[1]) - 10), 
#                             cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

#         # Show the frame with bounding boxes and labels
#         cv2.imshow('Face Recognition', frame)

#     # Exit when the 'Esc' key is pressed (key code 27)
#     if cv2.waitKey(1) & 0xFF == 27:
#         break

# cap.release()
# cv2.destroyAllWindows()


In [8]:
import torch
import cv2
import numpy as np
import os
from facenet_pytorch import MTCNN, InceptionResnetV1
from scipy.spatial.distance import cosine
from torchvision import transforms
import time
import logging
import gc
import threading
import queue

# ===== Logging Setup =====
logging.basicConfig(filename='intruder_detection.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s')
logging.info("Starting intruder detection system.")

# ===== GPIO SETUP (Handles PC or Pi) =====
try:
    import RPi.GPIO as GPIO
    ON_PI = True
except (ImportError, RuntimeError):
    logging.info("GPIO not available — running in mock mode.")
    ON_PI = False

if ON_PI:
    GPIO.setmode(GPIO.BCM)
    GREEN_LED_PIN = 17
    RED_LED_PIN = 27
    GPIO.setup(GREEN_LED_PIN, GPIO.OUT)
    GPIO.setup(RED_LED_PIN, GPIO.OUT)
    GPIO.output(GREEN_LED_PIN, GPIO.LOW)
    GPIO.output(RED_LED_PIN, GPIO.LOW)
else:
    GREEN_LED_PIN = RED_LED_PIN = None
    def gpio_mock(*args, **kwargs): pass
    GPIO = type('GPIO', (), {'output': gpio_mock, 'cleanup': gpio_mock})
    logging.info("Running without physical GPIO.")

# ===== Initialize Face Detection and Recognition =====
mtcnn = MTCNN(thresholds=[0.7, 0.8, 0.8])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = InceptionResnetV1(pretrained='vggface2').eval().to(device)

embeddings_dir = 'face_embeddings'

# Set lower resolution for Raspberry Pi optimization
CAP_WIDTH = 640
CAP_HEIGHT = 480

threshold = 0.5  # Stricter threshold for better unknown detection

# Preprocessing steps to normalize and resize image for model input
preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Load saved face embeddings from disk
def load_embeddings():
    embeddings = {}
    if os.path.exists(embeddings_dir):
        for filename in os.listdir(embeddings_dir):
            if filename.endswith('.pth'):
                name = os.path.splitext(filename)[0]
                path = os.path.join(embeddings_dir, filename)
                embeddings[name] = torch.load(path, map_location=device)
    logging.info(f"Loaded {len(embeddings)} face embeddings")
    return embeddings

saved_embeddings = load_embeddings()

# Prepare face tensor for recognition model
def preprocess_face(face_crop):
    face_resized = cv2.resize(face_crop, (160, 160))
    face_tensor = preprocess(face_resized).unsqueeze(0)
    return face_tensor

# Recognize face by comparing embedding distances
def recognize_face(face_tensor):
    with torch.no_grad():
        embedding = model(face_tensor.to(device)).detach().cpu().numpy()

    min_distance = float('inf')
    recognized_name = "Unknown"

    for name, saved_embedding in saved_embeddings.items():
        saved_embedding = saved_embedding.cpu().numpy()
        distance = cosine(embedding.flatten(), saved_embedding.flatten())
        if distance < min_distance:
            min_distance = distance
            recognized_name = name

    logging.info(f"→ Closest match: {recognized_name}, Cosine distance: {min_distance:.4f}")
    return (recognized_name, min_distance) if min_distance < threshold else ("Unknown", min_distance)

# ===== Motion Detection Setup =====
motion_threshold = 50000  # Adjust based on your environment
previous_frame = None

# Global variable to store the frame with bounding boxes
processed_frame = None
frame_lock = threading.Lock()

# Thread-safe queue to send frames for recognition
frame_queue = queue.Queue(maxsize=2)

# Thread worker to process motion and face recognition
def recognition_worker():
    global previous_frame, processed_frame
    while True:
        try:
            frame = frame_queue.get(timeout=1)
        except queue.Empty:
            continue

        # Motion detection on a grayscale copy
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (21, 21), 0)

        if previous_frame is None:
            previous_frame = gray
            continue

        frame_delta = cv2.absdiff(previous_frame, gray)
        thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
        motion_score = np.sum(thresh)
        previous_frame = gray

        # Make a copy of the frame to draw on
        display_frame = frame.copy()

        if motion_score > motion_threshold:
            start_time = time.time()

            # Reset LED states
            if ON_PI:
                GPIO.output(GREEN_LED_PIN, GPIO.LOW)
                GPIO.output(RED_LED_PIN, GPIO.LOW)

            boxes, probs = mtcnn.detect(frame, landmarks=False)

            if boxes is not None and len(boxes) > 0:
                valid_indices = [i for i, prob in enumerate(probs) if prob is not None and prob > 0.7]
                valid_boxes = [boxes[i] for i in valid_indices]

                for box in valid_boxes:
                    x1, y1, x2, y2 = [int(coord) for coord in box]
                    x1, y1 = max(0, x1), max(0, y1)
                    x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)

                    if x1 < x2 and y1 < y2:
                        face_crop = frame[y1:y2, x1:x2]
                        if face_crop.size > 0:
                            try:
                                face_tensor = preprocess_face(face_crop)
                                label, distance = recognize_face(face_tensor)

                                # Draw bounding box and label on the frame copy
                                box_color = (0, 255, 0) if label != "Unknown" else (0, 0, 255)
                                cv2.rectangle(display_frame, (x1, y1), (x2, y2), box_color, 2)
                                label_text = f"{label} ({distance:.2f})"
                                text_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)[0]
                                cv2.rectangle(display_frame, 
                                              (x1, y1 - text_size[1] - 10), 
                                              (x1 + text_size[0], y1), 
                                              box_color, -1)
                                cv2.putText(display_frame, label_text, (x1, y1 - 5), 
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA)

                                # Handle LED status and logging
                                if label == "Unknown":
                                    logging.warning("[!] ALERT: Unrecognized face!")
                                    if ON_PI:
                                        GPIO.output(RED_LED_PIN, GPIO.HIGH)
                                    else:
                                        logging.info("[MOCK] Red LED ON")
                                else:
                                    logging.info(f"[+] Recognized: {label}")
                                    if ON_PI:
                                        GPIO.output(GREEN_LED_PIN, GPIO.HIGH)
                                    else:
                                        logging.info(f"[MOCK] Green LED ON for {label}")

                            except Exception as e:
                                logging.error(f"Error processing face: {e}")

            end_time = time.time()
            cycle_time = end_time - start_time
            logging.info(f"Recognition cycle took {cycle_time:.4f} seconds")
            if cycle_time > 2.0:
                logging.warning("Recognition cycle took longer than expected.")
        else:
            logging.debug("Motion score too low, skipping recognition.")

        # Update the processed frame with lock to avoid race conditions
        with frame_lock:
            processed_frame = display_frame

        # Perform garbage collection
        gc.collect()

# Start the recognition thread
recognition_thread = threading.Thread(target=recognition_worker, daemon=True)
recognition_thread.start()

# Main video capture loop
cap = cv2.VideoCapture(0)
cap.set(3, CAP_WIDTH)
cap.set(4, CAP_HEIGHT)

try:
    logging.info("Starting video capture loop")
    while True:
        ret, frame = cap.read()
        if not ret:
            logging.error("Failed to capture frame from camera.")
            break

        # If queue is not full, add the frame for processing
        if not frame_queue.full():
            frame_queue.put(frame)

        # Get the processed frame (with bounding boxes) if available
        display_frame = None
        with frame_lock:
            if processed_frame is not None:
                display_frame = processed_frame.copy()

        # Display either the processed frame or the original
        cv2.imshow('Live Feed', display_frame if display_frame is not None else frame)
        if cv2.waitKey(1) & 0xFF == 27:  # ESC key to exit
            break

except KeyboardInterrupt:
    logging.info("KeyboardInterrupt received; shutting down.")

finally:
    cap.release()
    cv2.destroyAllWindows()
    if ON_PI:
        GPIO.cleanup()


In [None]:
import torch
import cv2
import numpy as np
import os
from facenet_pytorch import MTCNN, InceptionResnetV1
from scipy.spatial.distance import cosine
from torchvision import transforms
import time
import logging
import gc
import threading
import queue

# ===== Logging Setup =====
logging.basicConfig(filename='intruder_detection.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')
logging.info("Starting intruder detection system.")

# ===== GPIO SETUP =====
try:
    import RPi.GPIO as GPIO
    ON_PI = True
except (ImportError, RuntimeError):
    logging.info("GPIO not available — running in mock mode.")
    ON_PI = False

if ON_PI:
    GPIO.setmode(GPIO.BCM)
    GREEN_LED_PIN = 17
    RED_LED_PIN = 27
    GPIO.setup(GREEN_LED_PIN, GPIO.OUT)
    GPIO.setup(RED_LED_PIN, GPIO.OUT)
    GPIO.output(GREEN_LED_PIN, GPIO.LOW)
    GPIO.output(RED_LED_PIN, GPIO.LOW)
else:
    GREEN_LED_PIN = RED_LED_PIN = None
    def gpio_mock(*args, **kwargs): pass
    GPIO = type('GPIO', (), {'output': gpio_mock, 'cleanup': gpio_mock})
    logging.info("Running without physical GPIO.")

# ===== Initialize Face Detection and Recognition =====
mtcnn = MTCNN(thresholds=[0.7, 0.8, 0.8])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = InceptionResnetV1(pretrained='vggface2').eval().to(device)

embeddings_dir = 'face_embeddings'
CAP_WIDTH = 320  # Reduced resolution for smoother performance
CAP_HEIGHT = 240  # Reduced resolution for smoother performance
threshold = 0.5  # Cosine distance threshold

# Preprocessing
preprocess = transforms.Compose([ 
    transforms.ToPILImage(),
    transforms.Resize((160, 160)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# ===== Load TFLite-Compatible `.npy` Embeddings =====
def load_embeddings():
    embeddings = {}
    if os.path.exists(embeddings_dir):
        for filename in os.listdir(embeddings_dir):
            if filename.endswith('.npy'):
                name = os.path.splitext(filename)[0]
                path = os.path.join(embeddings_dir, filename)
                embeddings[name] = np.load(path)
    logging.info(f"Loaded {len(embeddings)} face embeddings")
    return embeddings

saved_embeddings = load_embeddings()

def preprocess_face(face_crop):
    face_resized = cv2.resize(face_crop, (160, 160))
    face_tensor = preprocess(face_resized).unsqueeze(0)
    return face_tensor

def recognize_face(face_tensor):
    with torch.no_grad():
        embedding = model(face_tensor.to(device)).detach().cpu().numpy()

    min_distance = float('inf')
    recognized_name = "Unknown"

    for name, saved_embedding in saved_embeddings.items():
        distance = cosine(embedding.flatten(), saved_embedding.flatten())
        if distance < min_distance:
            min_distance = distance
            recognized_name = name

    logging.info(f"→ Closest match: {recognized_name}, Cosine distance: {min_distance:.4f}")
    return (recognized_name, min_distance) if min_distance < threshold else ("Unknown", min_distance)

# ===== Motion Detection Setup =====
motion_threshold = 50000
previous_frame = None
processed_frame = None
frame_lock = threading.Lock()
frame_queue = queue.Queue(maxsize=2)

# ===== Worker Thread =====
frame_counter = 0  # Frame counter for skipping frames
frame_skip = 5  # Skip every 5th frame to reduce processing load

def recognition_worker():
    global previous_frame, processed_frame, frame_counter
    while True:
        try:
            frame = frame_queue.get(timeout=1)
        except queue.Empty:
            continue

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        gray = cv2.GaussianBlur(gray, (21, 21), 0)

        if previous_frame is None:
            previous_frame = gray
            continue

        frame_delta = cv2.absdiff(previous_frame, gray)
        thresh = cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1]
        motion_score = np.sum(thresh)
        previous_frame = gray

        display_frame = frame.copy()

        if motion_score > motion_threshold:
            start_time = time.time()
            if ON_PI:
                GPIO.output(GREEN_LED_PIN, GPIO.LOW)
                GPIO.output(RED_LED_PIN, GPIO.LOW)

            boxes, probs = mtcnn.detect(frame, landmarks=False)

            if boxes is not None:
                valid_indices = [i for i, prob in enumerate(probs) if prob and prob > 0.7]
                valid_boxes = [boxes[i] for i in valid_indices]

                for box in valid_boxes:
                    x1, y1, x2, y2 = [int(coord) for coord in box]
                    x1, y1 = max(0, x1), max(0, y1)
                    x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)

                    if x1 < x2 and y1 < y2:
                        face_crop = frame[y1:y2, x1:x2]
                        if face_crop.size > 0:
                            try:
                                face_tensor = preprocess_face(face_crop)
                                label, distance = recognize_face(face_tensor)

                                color = (0, 255, 0) if label != "Unknown" else (0, 0, 255)
                                cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 2)
                                label_text = f"{label} ({distance:.2f})"
                                text_size = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)[0]
                                cv2.rectangle(display_frame,
                                              (x1, y1 - text_size[1] - 10),
                                              (x1 + text_size[0], y1),
                                              color, -1)
                                cv2.putText(display_frame, label_text, (x1, y1 - 5),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2, cv2.LINE_AA)

                                if label == "Unknown":
                                    logging.warning("[!] ALERT: Unrecognized face!")
                                    if ON_PI:
                                        GPIO.output(RED_LED_PIN, GPIO.HIGH)
                                else:
                                    logging.info(f"[+] Recognized: {label}")
                                    if ON_PI:
                                        GPIO.output(GREEN_LED_PIN, GPIO.HIGH)

                            except Exception as e:
                                logging.error(f"Error processing face: {e}")

            logging.info(f"Recognition cycle took {time.time() - start_time:.4f} seconds")

        with frame_lock:
            processed_frame = display_frame

        gc.collect()

# Start thread
recognition_thread = threading.Thread(target=recognition_worker, daemon=True)
recognition_thread.start()

# ===== Video Capture Loop =====
cap = cv2.VideoCapture(0)
cap.set(3, CAP_WIDTH)
cap.set(4, CAP_HEIGHT)

try:
    logging.info("Starting video capture loop")
    while True:
        ret, frame = cap.read()
        if not ret:
            logging.error("Failed to capture frame from camera.")
            break

        frame_counter += 1
        # Skip frames to reduce detection frequency
        if frame_counter % frame_skip != 0:
            continue

        if not frame_queue.full():
            frame_queue.put(frame)

        display_frame = None
        with frame_lock:
            if processed_frame is not None:
                display_frame = processed_frame.copy()

        # Display the frame with reduced resolution for faster processing
        cv2.imshow('Live Feed', display_frame if display_frame is not None else frame)
        if cv2.waitKey(1) & 0xFF == 27:  # Exit on 'Esc' key
            break

except KeyboardInterrupt:
    logging.info("KeyboardInterrupt received; shutting down.")

finally:
    cap.release()
    cv2.destroyAllWindows()
    if ON_PI:
        GPIO.cleanup()
