In [1]:
# ==========================================================================
# CELL 1: DRIVER CLASS (Run this first)
# ==========================================================================
from pynq import Overlay, allocate
import numpy as np
import time

class FisherfaceDriver:
    def __init__(self, bitfile_path, max_classes=5):
        print(f"Loading overlay: {bitfile_path}")
        self.overlay = Overlay(bitfile_path)
        self.dma = self.overlay.axi_dma_0
        self.ip = self.overlay.fisherface_accel_0
        
        self.VECTOR_SIZE = 10000
        self.MAX_CLASSES = max_classes
        self.HW_SCALE = 128.0
        self.SW_BOOST = 20.0
        self.TOTAL_SCALE = self.HW_SCALE * self.SW_BOOST
        
        self.input_buffer = allocate(shape=(self.VECTOR_SIZE,), dtype=np.int32)
        self.num_classes = 0
        print(f"Driver initialized. MAX_CLASSES={self.MAX_CLASSES}")
    
    def _send_chunked(self, data_array, scale):
        flat_data = data_array.flatten().astype(np.float64)
        quantized = (flat_data * scale).astype(np.int32)
        np.copyto(self.input_buffer, quantized)
        
        CHUNK_SIZE = 4000
        for i in range(0, self.VECTOR_SIZE, CHUNK_SIZE):
            end = min(i + CHUNK_SIZE, self.VECTOR_SIZE)
            self.dma.sendchannel.transfer(self.input_buffer[i:end])
            self.dma.sendchannel.wait()
    
    def load_mean(self, mean_vector):
        assert len(mean_vector) == self.VECTOR_SIZE
        self.ip.register_map.mode = 1
        self.ip.register_map.class_id = 0
        self.ip.register_map.CTRL.AP_START = 1
        self._send_chunked(mean_vector, self.HW_SCALE)
        print("Mean vector loaded.")
    
    def load_weights(self, weight_matrix):
        if weight_matrix.shape[0] == self.VECTOR_SIZE:
            num_classes = weight_matrix.shape[1] if weight_matrix.ndim > 1 else 1
        elif weight_matrix.shape[1] == self.VECTOR_SIZE:
            weight_matrix = weight_matrix.T
            num_classes = weight_matrix.shape[1]
        else:
            raise ValueError(f"Invalid weight shape: {weight_matrix.shape}")
        
        if weight_matrix.ndim == 1:
            weight_matrix = weight_matrix.reshape(-1, 1)
            num_classes = 1
        
        if num_classes > self.MAX_CLASSES:
            print(f"WARNING: Truncating {num_classes} to {self.MAX_CLASSES} classes")
            num_classes = self.MAX_CLASSES
            weight_matrix = weight_matrix[:, :self.MAX_CLASSES]
        
        for c in range(num_classes):
            self.ip.register_map.mode = 2
            self.ip.register_map.class_id = c
            self.ip.register_map.CTRL.AP_START = 1
            self._send_chunked(weight_matrix[:, c], self.TOTAL_SCALE)
            print(f"  Class {c} weights loaded.")
        
        self.num_classes = num_classes
        print(f"All {num_classes} weight vectors loaded.")
    
    def inference(self, face_vector):
        assert len(face_vector) == self.VECTOR_SIZE
        
        self.ip.register_map.mode = 0
        self.ip.register_map.class_id = 0
        self.ip.register_map.CTRL.AP_START = 1
        
        self._send_chunked(face_vector, self.HW_SCALE)
        
        while self.ip.register_map.CTRL.AP_DONE == 0:
            pass
        
        ACC_SCALE = 4096.0
        results = np.zeros(self.num_classes, dtype=np.float64)
        OUTPUT_BASE_OFFSET = 0x20
        
        for c in range(self.num_classes):
            raw_int = self.ip.mmio.read(OUTPUT_BASE_OFFSET + c * 4)
            if raw_int >= 0x80000000:
                raw_int -= 0x100000000
            results[c] = raw_int / (ACC_SCALE * self.SW_BOOST)
        
        return results
    
"""
==========================================================================
CELL 2: LOAD MODEL & INITIALIZE HARDWARE (Run after Cell 1)
==========================================================================
"""
print("=" * 60)
print("Loading Model Data...")
print("=" * 60)

data = np.load("face_model_data.npz", allow_pickle=True)

mean_vec = data['mean_vec']
eigen_vecs = data['eigen_vecs']
train_proj_db = data['train_proj_db']
train_lbls_db = data['train_lbls_db']

# Handle label_names storage format
if 'label_keys' in data:
    keys = data['label_keys']
    vals = data['label_vals']
    label_names = dict(zip(keys, vals))
else:
    label_names = data['label_names'].item()

print(f"Model loaded:")
print(f"  - eigen_vecs shape: {eigen_vecs.shape}")
print(f"  - train_proj_db shape: {train_proj_db.shape}")
print(f"  - Classes: {list(label_names.values())}")

# Initialize driver
num_classes = eigen_vecs.shape[1] if eigen_vecs.ndim > 1 else 1
driver = FisherfaceDriver("fisherface.bit", max_classes=max(num_classes, 5))

# Load parameters to FPGA
driver.load_mean(mean_vec)
driver.load_weights(eigen_vecs)

print("=" * 60)
print("HARDWARE READY - All classes loaded!")
print("=" * 60)

Loading Model Data...
Model loaded:
  - eigen_vecs shape: (10000, 2)
  - train_proj_db shape: (1035, 2)
  - Classes: ['993', 'peidong', 'unknown']
Loading overlay: fisherface.bit


Driver initialized. MAX_CLASSES=5
Mean vector loaded.
  Class 0 weights loaded.
  Class 1 weights loaded.
All 2 weight vectors loaded.
HARDWARE READY - All classes loaded!


In [13]:
"""
==========================================================================
CELL 3: FACE RECOGNITION WITH VOTING (Fixed Broadcasting Error)
==========================================================================
"""
import cv2
import ipywidgets as widgets
from IPython.display import display
import threading
from collections import Counter
import time
import numpy as np

# --------------------------------------------------------------
# CONFIGURATION
# --------------------------------------------------------------
IMG_SIZE = (100, 100)
THRESHOLD = 450.0
SCALE_FACTOR = 1.0
BATCH_SIZE = 10

# --------------------------------------------------------------
# SETUP LBP CASCADE
# --------------------------------------------------------------
cascade_path = 'lbpcascade_frontalface_improved.xml'
face_cascade = cv2.CascadeClassifier(cascade_path)

if face_cascade.empty():
    print(f"WARNING: LBP Cascade XML not found at {cascade_path}!")
    class DummyCascade:
        def detectMultiScale(self, *args, **kwargs): 
            return []
    face_cascade = DummyCascade()
else:
    print(f"LBP Cascade loaded successfully from {cascade_path}")

# --------------------------------------------------------------
# GLOBAL VARIABLES
# --------------------------------------------------------------
current_frame = None
latest_result_box = None
latest_result_name = ""
latest_result_color = (255, 255, 255)
system_running = True
frame_lock = threading.Lock()

# --------------------------------------------------------------
# HELPER FUNCTIONS
# --------------------------------------------------------------
def preprocess(img):
    g = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) if img.ndim == 3 else img
    g = cv2.resize(g, IMG_SIZE, interpolation=cv2.INTER_AREA)
    g = cv2.equalizeHist(g)
    g = cv2.normalize(g, None, 0, 255, cv2.NORM_MINMAX)
    return g

# --------------------------------------------------------------
# AI WORKER THREAD
# --------------------------------------------------------------
def ai_processing_worker():
    global latest_result_box, latest_result_name, latest_result_color, current_frame
    
    prediction_buffer = []
    print("[THREAD] FPGA AI Worker Started")
    
    while system_running:
        frame_to_process = None
        with frame_lock:
            if current_frame is not None:
                frame_to_process = current_frame.copy()
        
        if frame_to_process is None:
            time.sleep(0.05)
            continue
        
        if SCALE_FACTOR != 1.0:
            small_frame = cv2.resize(frame_to_process, (0, 0), 
                                     fx=SCALE_FACTOR, fy=SCALE_FACTOR)
        else:
            small_frame = frame_to_process
        
        gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
        
        # LBP detection
        faces = face_cascade.detectMultiScale(gray, 1.1, 5)
        
        if len(faces) > 0:
            largest_face = max(faces, key=lambda r: r[2] * r[3])
            (sx, sy, sw, sh) = largest_face
            
            scale_inv = 1.0 / SCALE_FACTOR
            x = int(sx * scale_inv)
            y = int(sy * scale_inv)
            w = int(sw * scale_inv)
            h = int(sh * scale_inv)
            
            if y + h < frame_to_process.shape[0] and x + w < frame_to_process.shape[1]:
                # Use LBP detected face
                face_roi = cv2.cvtColor(frame_to_process[y:y+h, x:x+w], 
                                        cv2.COLOR_BGR2GRAY)
                processed_face = preprocess(face_roi)
                flat_face = processed_face.reshape(-1).astype(np.float64)
                
                # FPGA INFERENCE
                test_proj = driver.inference(flat_face)
                
                # --- FIXED DISTANCE CALCULATION ---
                # Slice train_proj_db to match the number of dimensions returned by FPGA
                dims = test_proj.shape[0]
                if train_proj_db.shape[1] >= dims:
                    # Robust subtraction: Use only the first 'dims' columns of the DB
                    dists = np.linalg.norm(train_proj_db[:, :dims] - test_proj, axis=1)
                else:
                    # Fallback if DB is smaller than result (rare)
                    dists = np.linalg.norm(train_proj_db - test_proj[:train_proj_db.shape[1]], axis=1)

                min_index = np.argmin(dists)
                min_dist = dists[min_index]
                predicted_lbl = train_lbls_db[min_index]
                
                if min_dist > THRESHOLD:
                    instant_name = "Unknown"
                else:
                    instant_name = label_names.get(predicted_lbl, "Unknown")
                    if instant_name.lower() == "unknown":
                        instant_name = "Unknown"
                
                # VOTING LOGIC
                prediction_buffer.append(instant_name)
                
                if len(prediction_buffer) < BATCH_SIZE:
                    latest_result_name = f"Analyzing... {len(prediction_buffer)}/{BATCH_SIZE}"
                    latest_result_color = (200, 200, 200)
                else:
                    counts = Counter(prediction_buffer)
                    if not counts:
                        winner_name, vote_count = "Unknown", 0
                    else:
                        winner_name, vote_count = counts.most_common(1)[0]
                    
                    if winner_name == "Unknown":
                        latest_result_name = "Unknown"
                        latest_result_color = (0, 0, 255)
                    elif vote_count >= 8:
                        latest_result_name = f"{winner_name} ({vote_count}/{BATCH_SIZE})"
                        latest_result_color = (0, 255, 0)
                    elif vote_count >= 5:
                        latest_result_name = f"Maybe {winner_name}? ({vote_count}/{BATCH_SIZE})"
                        latest_result_color = (0, 255, 255)
                    else:
                        latest_result_name = "Unknown (Low Conf)"
                        latest_result_color = (0, 0, 255)
                    
                    print(f">>> [RESULT] {latest_result_name}")
                    prediction_buffer = []
                
                latest_result_box = (x, y, w, h)
        else:
            prediction_buffer = []
            latest_result_box = None
        
        time.sleep(0.01)

# --------------------------------------------------------------
# MAIN DISPLAY LOOP
# --------------------------------------------------------------
image_widget = widgets.Image(format='jpeg', width=160, height=120)
display(image_widget)

cap = cv2.VideoCapture(0, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 160)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 120)

if not cap.isOpened():
    print("ERROR: Could not open camera.")
else:
    system_running = True
    worker_thread = threading.Thread(target=ai_processing_worker)
    worker_thread.start()
    print("[INFO] System Running! Press 'Stop' in Jupyter to end.")
    
    try:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            with frame_lock:
                current_frame = frame
            
            if latest_result_box is not None:
                (x, y, w, h) = latest_result_box
                cv2.rectangle(frame, (x, y), (x+w, y+h), latest_result_color, 2)
                cv2.putText(frame, latest_result_name, (x, y-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.8, latest_result_color, 2)
            
            _, encoded = cv2.imencode('.jpg', frame, 
                                      [int(cv2.IMWRITE_JPEG_QUALITY), 50])
            image_widget.value = encoded.tobytes()
            time.sleep(0.001)
    
    except KeyboardInterrupt:
        print("Stopping...")
    
    finally:
        system_running = False
        worker_thread.join()
        cap.release()
        print("[INFO] System Stopped")

LBP Cascade loaded successfully from lbpcascade_frontalface_improved.xml


Image(value=b'', format='jpeg', height='120', width='160')

[THREAD] FPGA AI Worker Started
[INFO] System Running! Press 'Stop' in Jupyter to end.
>>> [RESULT] 993 (10/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe peidong? (5/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe peidong? (6/10)
>>> [RESULT] Maybe peidong? (6/10)
>>> [RESULT] Maybe peidong? (6/10)
>>> [RESULT] Maybe 993? (6/10)
>>> [RESULT] Maybe 993? (5/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe 993? (6/10)
>>> [RESULT] 993 (8/10)
>>> [RESULT] Maybe peidong? (7/10)
>>> [RESULT] Maybe 993? (6/10)
>>> [RESULT] 993 (9/10)
>>> [RESULT] 993 (10/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe peidong? (5/10)
>>> [RESULT] Maybe peidong? (5/10)
>>> [RESULT] 993 (10/10)
>>> [RESULT] 993 (8/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] 993 (8/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] Maybe 993? (7/10)
>>> [RESULT] 993 (8/10)
>>> [RESULT] Maybe 993? (6/10)
>>> [RESULT] 993 (8/10)
>>> [RESULT] 993 (10/10)
>>> [RESULT] Maybe 99