In [2]:
pip install opencv-python

Note: you may need to restart the kernel to use updated packages.


In [30]:
pip show kafka-python

Name: kafka-python
Version: 2.2.15
Summary: Pure Python client for Apache Kafka
Home-page: https://github.com/dpkp/kafka-python
Author: 
Author-email: Dana Powers <dana.powers@gmail.com>
License: 
Location: E:\python projects\face recognition system\.venv_py311\Lib\site-packages
Requires: 
Required-by: 
Note: you may need to restart the kernel to use updated packages.


In [26]:
pip install numpy torch torchvision

Note: you may need to restart the kernel to use updated packages.


In [27]:
pip install facenet-pytorch

Note: you may need to restart the kernel to use updated packages.


In [28]:
pip install kafka-python

Note: you may need to restart the kernel to use updated packages.


In [29]:
# Cell 1: Imports and Setup
import torch
import cv2
import os
import time
from PIL import Image
from facenet_pytorch import MTCNN, InceptionResnetV1

# --- Imports (ensure these are at the top of your cell) ---
from collections import deque
import time
import csv
from datetime import datetime

import json # (NEW) For creating JSON strings

# --- (NEW) KAFKA INTEGRATION: Import the Kafka Producer ---
# You'll need to run: pip install kafka-python
from kafka import KafkaProducer


print("All libraries imported successfully.")

All libraries imported successfully.


In [31]:
# Cell 2: Model and Device Configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Running on device: {device}')

# Initialize MTCNN for face detection
mtcnn = MTCNN(image_size=160, margin=14, keep_all=True, device=device)

# Initialize InceptionResnetV1 for face embedding extraction
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

print("Models initialized successfully.")

Running on device: cpu
Models initialized successfully.


In [9]:
# Cell 3: Function to Build the Initial Database
def generate_known_embeddings(database_path):
    """
    Processes a directory of images to create a database of known face embeddings.
    This function will run only if a pre-saved embeddings file is not found.
    """
    known_embeddings = {}
    for person_name in os.listdir(database_path):
        person_dir = os.path.join(database_path, person_name)
        if not os.path.isdir(person_dir):
            continue
        embeddings_list = []
        for image_name in os.listdir(person_dir):
            image_path = os.path.join(person_dir, image_name)
            if not os.path.isfile(image_path):
                continue
            try:
                img = Image.open(image_path).convert('RGB')
                face_tensors = mtcnn(img)
                if face_tensors is not None:
                    for face_tensor in face_tensors:
                        embedding = resnet(face_tensor.unsqueeze(0).to(device))
                        embeddings_list.append(embedding.detach().cpu())
            except Exception as e:
                print(f"Could not process image {image_path}: {e}")
        if embeddings_list:
            known_embeddings[person_name] = torch.cat(embeddings_list).mean(0, keepdim=True)
            print(f"Generated initial embeddings for {person_name}")
    return known_embeddings

print("Database generation function is defined.")

Database generation function is defined.


In [32]:
# Cell 4: Load or Create the Face Database
print("Initializing face database...")

# Define paths
database_folder = 'face_database'
embeddings_file = 'known_faces_embeddings.pt'

# Create database folder if it doesn't exist
os.makedirs(database_folder, exist_ok=True) 

# Logic to load existing database or create a new one
if os.path.exists(embeddings_file):
    print("Loading known faces from saved file.")
    known_faces_embeddings = torch.load(embeddings_file)
else:
    print("No saved database found. Generating new embeddings from folder...")
    known_faces_embeddings = generate_known_embeddings(database_folder)
    # Save the newly generated embeddings for future runs
    torch.save(known_faces_embeddings, embeddings_file)

print("\n---------------------------------")
print("Known faces database is ready.")

if known_faces_embeddings:
    print("The following people are in the database:")
    print(list(known_faces_embeddings.keys()))
else:
    print("The database is empty. You can start with an empty 'face_database' folder.")

Initializing face database...
Loading known faces from saved file.

---------------------------------
Known faces database is ready.
The following people are in the database:
['noman', 'talha']


In [36]:
# Cell 5

# =============================================================================
# --- FINAL VERSION: Playback with PROVEN Confluent Cloud Integration ---
# =============================================================================

# --- Imports (ensure these are at the top of your cell) ---
import time
from datetime import datetime, timezone
import cv2
from PIL import Image
import json # this is for kafka response
from kafka import KafkaProducer

# --- General Configuration ---
DISTANCE_THRESHOLD = 0.75 # Threshold Level
MIN_FACE_SIZE = 40 * 40 # Pixels Size
DATABASE_FOLDER = 'face_database'  # Database Foler
VIDEO_FILE_PATH = 'recording 4.mkv' # <--- YOUR VIDEO FILE
FRAME_SKIP = 5 # it does this blank, blank, blank, blank, DRAWING, blank, blank, blank, blank, DRAWING

# --- System & Message Configuration ---
KAFKA_TOPIC = 'presence_events'
# BAKERY_ID = "MainStreetBakery"
CAMERA_ID = "FrontCounterCam"

# --- CONFLUENT CLOUD CREDENTIALS ---
# Paste the exact, proven-working credentials from your kafka_test.py
BOOTSTRAP_SERVERS = "pkc-w8nyg.me-central1.gcp.confluent.cloud:9092" # <-- YOUR BOOTSTRAP SERVER
API_KEY = "ZZDIITJZC7HLTZZX"           # <-- YOUR API KEY
API_SECRET = "cfltSL1dRIX15BSXZN31q4g2ellCXCAvxkT5rp+byUmjkh1bNc+xt1hhXvQJOcBQ"     # <-- YOUR API SECRET

# --- Initialize Kafka Producer using the EXACT logic from kafka_test.py ---
producer = None # Start with a None producer
try:
    print("Attempting to connect to Confluent Cloud...")
    producer = KafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_username=API_KEY,
        sasl_plain_password=API_SECRET,
        # Adding a timeout can help in debugging, but is optional
        request_timeout_ms=15000, # 15 seconds
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    print(f"SUCCESS: Kafka producer created and connected to Confluent Cloud.")
except Exception as e:
    print(f"--- KAFKA WARNING: Could not connect to Confluent Cloud. ---")
    print(f"--- Reason: {e} ---")
    print(f"--- The script will run, but no signals will be sent. ---")


# --- Video Capture from a File ---
cap = cv2.VideoCapture(VIDEO_FILE_PATH) # Change this for LIVE FEED
fps = cap.get(cv2.CAP_PROP_FPS)
delay = int(1000 / fps) if fps > 0 else 1

if not cap.isOpened():
    print(f"Error: Could not open video file: {VIDEO_FILE_PATH}")
else:
    print(f"\nStarting video playback. Processing 1 in every {FRAME_SKIP} frames.")
    frame_count = 0 
    
    print("\n--- Real-time JSON Event Log ---")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            print("\nEnd of video file reached.")
            break

        frame_count += 1 
        
        if frame_count % FRAME_SKIP != 0:
            cv2.imshow('CCTV Face Recognition', frame)
            if cv2.waitKey(delay) & 0xFF == ord('q'): break
            continue 
        
        try:
            img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            boxes, _ = mtcnn.detect(img)
            face_tensors = mtcnn(img)
            
            if face_tensors is not None:
                for face_tensor, box in zip(face_tensors, boxes):
                    # --- Face Recognition Logic ---
                    # ... (this part is unchanged) ...
                    unknown_embedding = resnet(face_tensor.to(device).unsqueeze(0)).detach().cpu()
                    min_dist = float('inf')
                    recognized_name = "Unknown"
                    for name, known_emb in known_faces_embeddings.items():
                        distance = (known_emb - unknown_embedding).norm().item()
                        if distance < min_dist:
                            min_dist = distance
                            if min_dist < DISTANCE_THRESHOLD:
                                recognized_name = name
                    
                    # --- JSON LOGGING AND KAFKA SIGNALING ---
                    event_message = {
                      "event_type": "FACE_RECOGNIZED",
                      "timestamp": datetime.now(timezone.utc).isoformat(),
                      "presence": 1 if recognized_name != "Unknown" else 0,
                      "personName": recognized_name,
               #       "bakeryId": BAKERY_ID, #When we are deploying it on the client-side
                      "cameraId": CAMERA_ID
                    }
                    
                    print(json.dumps(event_message))
                    
                    if producer is not None:
                        try:
                            producer.send(KAFKA_TOPIC, value=event_message)
                        except Exception as e:
                            print(f"KAFKA ERROR: Could not send message. {e}")
                    
                    # --- Drawing Logic ---
                    x1, y1, x2, y2 = [int(b) for b in box]
                    color = (0, 255, 0) if recognized_name != "Unknown" else (0, 0, 255)
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                    display_text = f"{recognized_name} ({min_dist:.2f})"
                    cv2.rectangle(frame, (x1, y2 - 25), (x2, y2), color, cv2.FILLED)
                    cv2.putText(frame, display_text, (x1 + 6, y2 - 6), cv2.FONT_HERSHEY_DUPLEX, 0.7, (255, 255, 255), 1)

        except Exception as e:
            print(f"An error occurred during frame processing: {e}")

        cv2.imshow('CCTV Face Recognition', frame)
        if cv2.waitKey(delay) & 0xFF == ord('q'): break

    # --- Cleanup ---
    cap.release()
    cv2.destroyAllWindows()
    if producer is not None:
        print("\nFlushing remaining messages to Kafka...")
        producer.flush() 
        producer.close()
        print("Kafka producer connection closed.")
    for i in range(5): cv2.waitKey(1)
    print("Video playback stopped.")

Attempting to connect to Confluent Cloud...
SUCCESS: Kafka producer created and connected to Confluent Cloud.

Starting video playback. Processing 1 in every 5 frames.

--- Real-time JSON Event Log ---
{"event_type": "FACE_RECOGNIZED", "timestamp": "2025-09-25T12:47:06.160143+00:00", "presence": 1, "personName": "noman", "cameraId": "FrontCounterCam"}
{"event_type": "FACE_RECOGNIZED", "timestamp": "2025-09-25T12:47:06.754081+00:00", "presence": 0, "personName": "Unknown", "cameraId": "FrontCounterCam"}
{"event_type": "FACE_RECOGNIZED", "timestamp": "2025-09-25T12:47:06.878904+00:00", "presence": 0, "personName": "Unknown", "cameraId": "FrontCounterCam"}
{"event_type": "FACE_RECOGNIZED", "timestamp": "2025-09-25T12:47:07.035161+00:00", "presence": 0, "personName": "Unknown", "cameraId": "FrontCounterCam"}
{"event_type": "FACE_RECOGNIZED", "timestamp": "2025-09-25T12:47:09.063431+00:00", "presence": 1, "personName": "noman", "cameraId": "FrontCounterCam"}
{"event_type": "FACE_RECOGNIZED"

In [23]:
# Cell 6: Final Save of the Database
print("\nSaving updated face database to file...")
torch.save(known_faces_embeddings, embeddings_file)
print(f"Database saved successfully to '{embeddings_file}'.")
print("The following people are now in the database:")
print(list(known_faces_embeddings.keys()))


Saving updated face database to file...
Database saved successfully to 'known_faces_embeddings.pt'.
The following people are now in the database:
['noman', 'talha']
