In [22]:
import whisper
import sounddevice as sd
import soundfile as sf
import tempfile
import threading
import time
import pyttsx3
import cv2
import os
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import ipywidgets as widgets
import numpy as np
import time
from sklearn.metrics.pairwise import cosine_similarity

In [23]:
# -------- Configuration --------
MODEL_NAME = "medium.en"
RECORD_SECONDS = 4
SAMPLE_RATE = 16000
WAKE_PHRASES = ["guard my room", "guard my room please", "guard", "activate guard", "start guard"]
DISARM_PHRASES = ["stop guard", "disarm", "deactivate guard", "stop guard please", "stop"]

In [24]:
# Set default audio input device (change index if needed)
try:
    sd.default.device = sd.query_devices(kind='input')['name']
except Exception as e:
    print("[audio] Could not set default device:", e)

# persistent webcam
cap = cv2.VideoCapture(0)
if not cap.isOpened():
    print("[webcam] ERROR: cannot open webcam")

def record_chunk(seconds=RECORD_SECONDS, samplerate=SAMPLE_RATE):
    print(f"[audio] recording {seconds}s...")
    try:
        recording = sd.rec(int(seconds * samplerate), samplerate=samplerate, channels=1)
        sd.wait()
    except Exception as e:
        print("[audio] ERROR:", e)
        return None
    tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
    sf.write(tmp.name, recording, samplerate)
    return tmp.name

def contains_phrase(text, phrases):
    text = text.lower()
    return any(p in text for p in phrases)

def get_webcam_frame():
    ret, frame = cap.read()
    if not ret:
        return None
    return frame

out_widget = widgets.Output()
display(out_widget)
cap.release()


Output()

In [25]:
# def main():
#     global cap
#     cap = cv2.VideoCapture(0)
#     if not cap.isOpened():
#         print("[webcam] ERROR: cannot open webcam")
#         return

#     agent = GuardAgent()
#     print("Say a wake phrase (e.g., 'Guard my room') to activate. Say a disarm phrase to stop.")

#     try:
#         while True:
#             agent.listen_and_toggle()

#             if not agent.guard_mode:
#                 frame = get_webcam_frame()
#                 if frame is not None:
#                     _, encoded_img = cv2.imencode('.png', frame)
#                     img_bytes = encoded_img.tobytes()
#                     with out_widget:
#                         clear_output(wait=True)
#                         display(widgets.Image(value=img_bytes, format='png', width=640, height=480))
#             time.sleep(0.1)

#     except KeyboardInterrupt:
#         print("\n[exit] KeyboardInterrupt received — cleaning up.")
#     finally:
#         cap.release()
#         print("[exit] done.")

# main()

In [26]:
import mediapipe as mp
import cv2
import numpy as np
import os
from sklearn.metrics.pairwise import cosine_similarity

mp_face_mesh = mp.solutions.face_mesh

BASE_DIR = "enrolled_faces"
os.makedirs(BASE_DIR, exist_ok=True)


def extract_embedding(image_path):
    """Extract a 3D face landmark embedding from an image."""
    img = cv2.imread(image_path)
    if img is None:
        print("[enroll] Could not read image:", image_path)
        return None

    with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True) as face_mesh:
        results = face_mesh.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        if not results.multi_face_landmarks:
            print("[enroll] No face found in", image_path)
            return None
        landmarks = results.multi_face_landmarks[0].landmark
        emb = np.array([[lm.x, lm.y, lm.z] for lm in landmarks]).flatten()
        emb = emb / np.linalg.norm(emb)
        return emb

def enroll_face(person_name, image_paths):
    """
    Enroll one person using multiple images.
    Creates a folder for that person and saves one .npy file per image.
    """
    person_dir = os.path.join(BASE_DIR, person_name)
    os.makedirs(person_dir, exist_ok=True)

    for i, path in enumerate(image_paths):
        emb = extract_embedding(path)
        if emb is not None:
            np.save(os.path.join(person_dir, f"{i}.npy"), emb)
            print(f"[enroll] Saved embedding {i+1} for {person_name}")
    print(f"[enroll] Completed enrollment for {person_name} ✅")


In [27]:
enroll_face("darshan", ["darshan1.jpg", "darshan2.jpg", "darshan3.jpg"])
enroll_face("roommate", ["roommate1.jpg", "roommate2.jpg"])

[enroll] Could not read image: darshan1.jpg
[enroll] Could not read image: darshan2.jpg
[enroll] Could not read image: darshan3.jpg
[enroll] Completed enrollment for darshan ✅
[enroll] Could not read image: roommate1.jpg
[enroll] Could not read image: roommate2.jpg
[enroll] Completed enrollment for roommate ✅


[ WARN:0@1923.188] global loadsave.cpp:268 findDecoder imread_('darshan1.jpg'): can't open/read file: check file path/integrity
[ WARN:0@1923.189] global loadsave.cpp:268 findDecoder imread_('darshan2.jpg'): can't open/read file: check file path/integrity
[ WARN:0@1923.189] global loadsave.cpp:268 findDecoder imread_('darshan3.jpg'): can't open/read file: check file path/integrity
[ WARN:0@1923.189] global loadsave.cpp:268 findDecoder imread_('roommate1.jpg'): can't open/read file: check file path/integrity
[ WARN:0@1923.189] global loadsave.cpp:268 findDecoder imread_('roommate2.jpg'): can't open/read file: check file path/integrity


In [None]:
def recognize_face_with_score(frame):
    """
    Recognize face from frame by comparing with all enrolled faces.
    Averages similarity scores per person.
    """
    with mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True) as face_mesh:
        results = face_mesh.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if not results.multi_face_landmarks:
            return "Unknown", frame, 0

        landmarks = results.multi_face_landmarks[0].landmark
        emb = np.array([[lm.x, lm.y, lm.z] for lm in landmarks]).flatten()
        emb = emb / np.linalg.norm(emb)

        best_name, best_sim = "Unknown", 0.0

        for person_name in os.listdir(BASE_DIR):
            person_dir = os.path.join(BASE_DIR, person_name)
            if not os.path.isdir(person_dir):
                continue

            sims = []
            for f in os.listdir(person_dir):
                if not f.endswith(".npy"):
                    continue
                known_emb = np.load(os.path.join(person_dir, f))
                sim = cosine_similarity([emb], [known_emb])[0][0]
                sims.append(sim)

            if sims:
                avg_sim = np.mean(sims)
                if avg_sim > best_sim:
                    best_sim = avg_sim
                    best_name = person_name

        # Apply threshold
        if best_sim < 0.90:
            best_name = "Unknown"

        return best_name, frame, best_sim

In [29]:
import threading
import cv2
import matplotlib.pyplot as plt
from IPython.display import display, clear_output
import mediapipe as mp
import time

class GuardAgent:
    def __init__(self, cap):
        print("[init] loading Whisper model:", MODEL_NAME)
        self.model = whisper.load_model(MODEL_NAME)
        self.tts = pyttsx3.init()
        self.tts.setProperty("rate", 150)
        self.guard_mode = False
        self.cap = cap
        self.last_name = None
        self.last_detect_time = 0
        self.guard_thread = None
        self.guard_stop_event = threading.Event()

    def listen_and_toggle(self):
        wav_path = record_chunk()
        if not wav_path: return
        try:
            result = self.model.transcribe(wav_path, language="en")
            transcript = result.get("text", "").strip().lower()
        except Exception as e:
            print("[transcribe] error:", e)
            transcript = ""
        finally:
            os.remove(wav_path)

        print("[heard]", transcript)

        if not self.guard_mode and contains_phrase(transcript, WAKE_PHRASES):
            self.guard_mode = True
            self.tts.say("Guard mode activated.")
            self.tts.runAndWait()
            self.guard_stop_event.clear()
            self.guard_thread = threading.Thread(target=self.run_guard_mode, daemon=True)
            self.guard_thread.start()

        elif self.guard_mode and contains_phrase(transcript, DISARM_PHRASES):
            self.guard_mode = False
            self.tts.say("Guard mode deactivated.")
            self.tts.runAndWait()
            self.guard_stop_event.set()
            if self.guard_thread:
                self.guard_thread.join()

    def run_guard_mode(self):
        print("[guard] Running face detection...")
        mp_face = mp.solutions.face_detection
        plt.ion()
        fig, ax = plt.subplots()

        with mp_face.FaceDetection(model_selection=0, min_detection_confidence=0.5) as detector:
            while self.guard_mode and not self.guard_stop_event.is_set():
                ret, frame = self.cap.read()
                if not ret: break

                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = detector.process(rgb)

                if results.detections:
                    for det in results.detections:
                        bboxC = det.location_data.relative_bounding_box
                        ih, iw, _ = frame.shape
                        x1 = int(bboxC.xmin * iw)
                        y1 = int(bboxC.ymin * ih)
                        w = int(bboxC.width * iw)
                        h = int(bboxC.height * ih)
                        x2 = x1 + w
                        y2 = y1 + h

                        # Draw rectangle
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0,255,0), 2)

                        # Crop face for recognition
                        face_crop = frame[max(0,y1):min(ih,y2), max(0,x1):min(iw,x2)]
                        name = "Chamar"
                        if face_crop.size > 0:
                            rec_name, _, sim = recognize_face_with_score(face_crop)
                            name = rec_name

                            # Welcome back logic
                            if name != "Chamar" and name != self.last_name:
                                if time.time() - self.last_detect_time > 5:  # avoid repeated announcements
                                    self.tts.say(f"Welcome back, {name}.")
                                    self.tts.runAndWait()
                                    self.last_name = name
                                    self.last_detect_time = time.time()

                        # Draw name below rectangle
                        cv2.putText(frame, f"{name}", (x1, y2 + 30),
                                    cv2.FONT_HERSHEY_SIMPLEX, 1.0,
                                    (0,255,0) if name != "Chamar" else (0,0,255), 2)

                # Display frame in Jupyter
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                ax.clear()
                ax.imshow(frame_rgb)
                ax.axis('off')
                clear_output(wait=True)
                display(fig)
                plt.pause(0.01)

        plt.close(fig)


In [None]:
def main():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("[webcam] cannot open webcam")
        return
    agent = GuardAgent(cap)
    print("Say 'guard my room' to activate. Say 'stop guard' to stop.")

    try:
        while True:
            agent.listen_and_toggle()
            # Optional: show webcam preview when not in guard mode
            if not agent.guard_mode:
                frame = get_webcam_frame()
                if frame is not None:
                    _, encoded_img = cv2.imencode('.png', frame)
                    img_bytes = encoded_img.tobytes()
                    with out_widget:
                        clear_output(wait=True)
                        display(widgets.Image(value=img_bytes, format='png', width=640, height=480))
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\n[exit] KeyboardInterrupt received — cleaning up.")
    finally:
        cap.release()
        print("[exit] done.")

main()