In [None]:
pip install ultralytics


In [5]:
pip install pillow-heif


Collecting pillow-heif
  Downloading pillow_heif-0.22.0-cp312-cp312-macosx_13_0_x86_64.whl.metadata (9.6 kB)
Downloading pillow_heif-0.22.0-cp312-cp312-macosx_13_0_x86_64.whl (5.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pillow-heif
Successfully installed pillow-heif-0.22.0
Note: you may need to restart the kernel to use updated packages.


## Change input image file

In [11]:
from pillow_heif import register_heif_opener
from PIL import Image

register_heif_opener()  # Enable HEIC support in PIL

image = Image.open("test2.heic")
image.save("test2.jpg", "JPEG")


In [12]:
from ultralytics import YOLO

# Load the YOLOv9 tiny model
model = YOLO("yolov9t.pt")

# Perform inference on an image
results = model("test2.jpg")

# Display the results
results[0].show()

# Save the result to disk
results[0].save(filename="output2.jpg")



image 1/1 /Users/apple/Downloads/Capstone Project/test2.jpg: 640x480 11 cars, 2 trucks, 187.1ms
Speed: 4.7ms preprocess, 187.1ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 480)


'output2.jpg'

In [10]:

# Run inference on a video file and save output
model.predict(
    source="input.mp4",           # path to your .mov video
    save=True,                    # save annotated video
    save_txt=False,               # if you want bounding box data saved too
    conf=0.3,                     # optional: confidence threshold
    iou=0.5,                      # optional: NMS threshold
    stream=False,                 # process as a video (not frame-by-frame)
)




errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 222.9ms
video 1/1 (frame 2/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 175.3ms
video 1/1 (frame 3/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 140.5ms
video 1/1 (frame 4/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 162.9ms
video 1/1 (frame 5/195) /Users/apple/Downloads/Capstone Project/

[ultralytics.engine.results.Results object with attributes:
 
 boxes: ultralytics.engine.results.Boxes object
 keypoints: None
 masks: None
 names: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted p

In [16]:
!pip install plyer

Collecting plyer
  Downloading plyer-2.1.0-py2.py3-none-any.whl.metadata (61 kB)
Downloading plyer-2.1.0-py2.py3-none-any.whl (142 kB)
Installing collected packages: plyer
Successfully installed plyer-2.1.0


In [None]:

import cv2
import time
import csv
import os
import platform
import subprocess
from ultralytics import YOLO
from collections import defaultdict, deque
from datetime import datetime

if platform.system() == "Windows":
    from plyer import notification

# === Configuration ===
input_path = "delivery.mp4"
output_path = "delivery_detected_output.mp4"
csv_path = "delivery_log.csv"
model_path = "yolov9t.pt"
target_classes = {0: "person", 2: "car", 16: "cat", 17: "dog"}  # Added pets
frame_skip = 5
focal_px = 700
real_height_m = {0: 1.7, 2: 1.5, 16: 0.3, 17: 0.5}
persistence_duration_sec = 2

# === Setup ===
os.makedirs("suspicious", exist_ok=True)
model = YOLO(model_path)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_file = open(csv_path, mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)"])

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []

# === Notification ===
def notify_local(title, message):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title \"{title}\"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        notification.notify(
            title=title,
            message=f"{message} at {now}",
            timeout=5
        )
    else:
        print(f"[{title}] {message} at {now}")

# === Utility ===
def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))

    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)

    cap_clip.release()
    out_clip.release()

# === Main Processing Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()

    if frame_num % frame_skip == 0:
        results = model.track(frame, persist=True, verbose=False)[0]

        for box, cls_id, track_id in zip(results.boxes.xyxy, results.boxes.cls, results.boxes.id):
            cls_id = int(cls_id)
            track_id = int(track_id)
            if cls_id not in target_classes:
                continue

            label = target_classes[cls_id]
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            box_height = y2 - y1
            height_m = real_height_m[cls_id]
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else None

            if distance_m:
                distance_history[track_id].append(distance_m)
                if len(distance_history[track_id]) > 5:
                    distance_history[track_id].popleft()

                if track_id not in track_timestamps:
                    track_timestamps[track_id] = [frame_num, frame_num]
                else:
                    track_timestamps[track_id][1] = frame_num

                last_boxes[track_id] = ((x1, y1, x2, y2), f"{label}: {distance_m:.2f} m", frame_num)
                disappeared_tracks[track_id] = (label, distance_m, frame_num)
                current_ids.add(track_id)

    # === Handle disappearance and behavior detection ===
    for track_id in list(disappeared_tracks):
        label, last_distance, last_frame = disappeared_tracks[track_id]
        if frame_num - last_frame > int(persistence_duration_sec * fps):
            if track_id in track_timestamps:
                first_seen, last_seen = track_timestamps[track_id]
                duration_sec = (last_seen - first_seen) / fps

                # Only apply suspicious behavior logic to person and car
                if (label == "person" and (last_distance < 5.0 or duration_sec > 10)) or \
                   (label == "car" and last_distance < 5.0 and duration_sec > 10):

                    behavior = "Suspicious Activity"
                    if f"{track_id}_{behavior}" not in behavior_flags:
                        behavior_flags.add(f"{track_id}_{behavior}")
                        event_time = round(frame_num / fps, 2)
                        real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

                        csv_writer.writerow([
                            frame_num, behavior, label, round(last_distance, 2),
                            event_time, real_time
                        ])

                        notify_local(
                            "Suspicious Activity Detected",
                            f"{label} at {round(last_distance, 2)}m around {event_time}s"
                        )

                        clip_margin = int(fps * 4)
                        clip_start = max(0, frame_num - clip_margin)
                        clip_end = min(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), frame_num + clip_margin)
                        clip_time_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")
                        suspicious_events.append((clip_start, clip_end, clip_time_str))

            del disappeared_tracks[track_id]
            track_timestamps.pop(track_id, None)

    # === Draw persistent bounding boxes ===
    for track_id, (box, text, last_seen_frame) in list(last_boxes.items()):
        if frame_num - last_seen_frame > int(persistence_duration_sec * fps):
            del last_boxes[track_id]
            continue
        x1, y1, x2, y2 = box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(annotated, text, (x1, y1 - 4),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    out.write(annotated)
    cv2.imshow("Suspicious Behavior Detection", annotated)
    if cv2.waitKey(1) in [27, ord("q")]:
        break

# === Finalize and save suspicious clips ===
cap.release()
out.release()
csv_file.close()
cv2.destroyAllWindows()

print("Saving suspicious video clips from output video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious clips saved.")


Saving suspicious video clips from output video...
All suspicious clips saved.


: 

In [None]:
from ultralytics import YOLO
import cv2
import os
import time
import csv
import platform
import subprocess
from datetime import datetime
from collections import defaultdict, deque

# === Config ===
input_path = "delivery.mp4"
output_path = "delivery_output.mp4"
csv_path = "delivery_log.csv"
alert_log_path = "alert_log.csv"

model_general = YOLO("yolov9t.pt")
model_box = YOLO("best.pt")

target_classes = {
    0: "person",
    2: "car",
    16: "cat",
    17: "dog",
    80: "box"
}
focal_px = 700
real_height_m = {
    0: 1.7, 2: 1.4, 16: 0.1, 17: 0.1, 80: 0.1
}
frame_skip = 5
persistence_duration_sec = 2

os.makedirs("suspicious", exist_ok=True)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
fps = 30 if fps == 0 else fps
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_file = open(csv_path, mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)", "Closest Person Distance (m)"])

alert_file = open(alert_log_path, mode="w", newline="")
alert_writer = csv.writer(alert_file)
alert_writer.writerow(["Alert Time", "Title", "Message"])

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
min_distance_tracker = {}
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []
proximity_flags = set()
box_appearance_nearby = defaultdict(bool)
recent_persons = deque()

logged_boxes = set()

def notify_local(title, message):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    alert_writer.writerow([now, title, message])
    alert_file.flush()
    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title "{title}"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        try:
            from plyer import notification
            notification.notify(title=title, message=f"{message} at {now}", timeout=5)
        except:
            pass
    else:
        print(f"[{title}] {message} at {now}")

def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)
    cap_clip.release()
    out_clip.release()

# === Main Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()

    if frame_num % frame_skip == 0:
        detections = []

        result_general = model_general.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml")[0]
        if result_general.boxes.id is not None:
            for box, cls_id, track_id in zip(result_general.boxes.xyxy, result_general.boxes.cls, result_general.boxes.id):
                cls_id = int(cls_id)
                track_id = int(track_id)
                if cls_id in [0, 2, 16, 17]:
                    detections.append((box, cls_id, track_id))

        result_box = model_box(frame, verbose=False)[0]
        for box, cls_id in zip(result_box.boxes.xyxy, result_box.boxes.cls):
            cls_id = int(cls_id)
            if cls_id == 80:
                box_center = get_center(box.tolist())
                for prev_center in logged_boxes:
                    if abs(box_center[0] - prev_center[0]) < 20 and abs(box_center[1] - prev_center[1]) < 20:
                        fake_track_id = hash(prev_center) % (10**6)
                        break
                else:
                    fake_track_id = hash(box_center) % (10**6)
                detections.append((box, cls_id, fake_track_id))

                for pid, dist_p, last_seen in recent_persons:
                    if dist_p < 5.0:
                        notify_local("Delivery Detected", "There is a mailbox in front, go check that out!")
                        recent_persons.clear()
                        break

        for box, cls_id, track_id in detections:
            label = target_classes[cls_id]
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            box_height = y2 - y1
            height_m = real_height_m.get(cls_id, 1.0)
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else None

            if distance_m:
                # Removed proximity logging from detection loop; will handle on disappearance

                if cls_id == 80 and distance_m < 5.0:
                    if all(abs(box_center[0] - prev[0]) > 20 or abs(box_center[1] - prev[1]) > 20 for prev in logged_boxes):
                        logged_boxes.add(box_center)
                        event_time = round(timestamp, 2)
                        real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        csv_writer.writerow([frame_num, "Box Appeared", label, round(distance_m, 2), event_time, real_time, ""])
                        notify_local("Delivery Alert", "A mailbox appeared close by.")

                if cls_id in [0, 2, 16, 17] and distance_m < 5.0:
                    if track_id not in proximity_flags:
                        notify_local("Proximity Alert", f"There's a {label} approaching at {round(distance_m, 2)} meters.")
                        proximity_flags.add(track_id)

                if cls_id == 80:
                    for pid, (label_p, dist_p, _) in disappeared_tracks.items():
                        if label_p == "person" and dist_p < 5.0:
                            box_appearance_nearby[pid] = True

            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            distance_history[track_id].append(distance_m)
            if len(distance_history[track_id]) > 5:
                distance_history[track_id].popleft()

            min_distance = min(min_distance_tracker.get(track_id, distance_m), min(distance_history[track_id]))
            min_distance_tracker[track_id] = min_distance
            if track_id not in track_timestamps:
                track_timestamps[track_id] = [frame_num, frame_num]
            else:
                track_timestamps[track_id][1] = frame_num

            last_boxes[track_id] = ((x1, y1, x2, y2), f"{label}: {distance_m:.2f} m", frame_num)
            disappeared_tracks[track_id] = (label, min_distance, frame_num)

    for track_id, (box, text, last_seen_frame) in list(last_boxes.items()):
        if frame_num - last_seen_frame > int(persistence_duration_sec * fps):
            label, _, _ = disappeared_tracks.get(track_id, (None, None, None))
            min_dist = min_distance_tracker.get(track_id, None)
            if label == "person" and min_dist is not None:
                event_time = round(frame_num / fps, 2)
                real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                csv_writer.writerow([frame_num, "Proximity Alert", label, round(min_dist, 2), event_time, real_time, round(min_dist, 2)])
                notify_local("Proximity Alert", f"There's a person approaching!")

            del last_boxes[track_id]
            continue
        x1, y1, x2, y2 = box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(annotated, text, (x1, y1 - 4), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    out.write(annotated)
    cv2.imshow("Suspicious Behavior Detection", annotated)
    if cv2.waitKey(1) in [27, ord("q")]:
        break

cap.release()
out.release()
csv_file.close()
alert_file.close()
cv2.destroyAllWindows()

print("Saving suspicious video clips from output video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious clips saved.")


Saving suspicious video clips from output video...
All suspicious clips saved.


In [27]:
pip install --upgrade google-generativeai


Collecting google-generativeai
  Downloading google_generativeai-0.8.5-py3-none-any.whl.metadata (3.9 kB)
Downloading google_generativeai-0.8.5-py3-none-any.whl (155 kB)
Installing collected packages: google-generativeai
  Attempting uninstall: google-generativeai
    Found existing installation: google-generativeai 0.8.4
    Uninstalling google-generativeai-0.8.4:
      Successfully uninstalled google-generativeai-0.8.4
Successfully installed google-generativeai-0.8.5
Note: you may need to restart the kernel to use updated packages.


In [41]:
import google.generativeai as genai
from PIL import Image

# Step 1: Authenticate
genai.configure(api_key="AIzaSyCsxy-8Wa3_jlNBA8rqPJsbBsO9CHAJl7M")

# Step 2: Load image
image = Image.open("geminitest.png")

# Step 3: Use the multimodal model
model = genai.GenerativeModel("models/gemini-1.5-flash")

# Step 4: Send image and text prompt
response = model.generate_content(
    [
        "is there anything infront of my house?(box or person,car is closed? don't include image processing, include the distance of box)",
        image
    ]
)

# Step 5: Print result
print(response.text)


Yes, there is a box in front of the house.  The box is 4.18 meters away.  There is also a car in the garage, which is closed.


# Face Recognition Testing

In [2]:
!pip install keras_facenet
!pip install mediapipe

Collecting keras_facenet
  Downloading keras-facenet-0.3.2.tar.gz (10 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting mtcnn (from keras_facenet)
  Downloading mtcnn-1.0.0-py3-none-any.whl.metadata (5.8 kB)
Collecting lz4>=4.3.3 (from mtcnn->keras_facenet)
  Downloading lz4-4.4.4-cp312-cp312-macosx_10_13_x86_64.whl.metadata (3.8 kB)
Downloading mtcnn-1.0.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading lz4-4.4.4-cp312-cp312-macosx_10_13_x86_64.whl (220 kB)
Building wheels for collected packages: keras_facenet
  Building wheel for keras_facenet (setup.py) ... [?25ldone
[?25h  Created wheel for keras_facenet: filename=keras_facenet-0.3.2-py3-none-any.whl size=10368 sha256=3b9a6b67231ee5e17fcab7bcb1235811b9d87427991c6d3a18601fc1dd1af06a
  Stored in directory: /Users/apple/Library/Caches/pip/wheels/05/b0/f5/19ac49fedc10b1df3ee56b096edbcfa39d45794fccc6bcdbbf

In [12]:
img = cv2.imread("William.jpeg")
if img is None:
    raise FileNotFoundError("William.jpeg not found or invalid")
known_faces["William"] = get_embedding(img)
print(known_faces["William"])


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 156ms/step
[ 0.01040817  0.01323743 -0.08136161  0.03703693 -0.00583531  0.09873086
 -0.02493108  0.06667081 -0.03646993 -0.02634571 -0.01645712  0.07746426
  0.02978943 -0.03657426 -0.00851209 -0.02543745  0.01909425  0.03340081
 -0.01135213 -0.12970874 -0.09350712 -0.00186395  0.07484038 -0.0429936
  0.03251211 -0.01524375 -0.01608281 -0.04873962 -0.04360047  0.0210877
 -0.00943478 -0.00025411  0.00208789 -0.02367698 -0.03761333  0.06066556
  0.04307506 -0.01791338 -0.11657034  0.02633477  0.01289767  0.0389986
 -0.00975662 -0.02284096  0.03940051  0.0072353   0.03982001  0.08939081
 -0.10278586 -0.0819013  -0.0051559   0.00425588  0.03345906 -0.0378776
 -0.07769363  0.08435165 -0.00782712  0.05853546 -0.00762564 -0.02032345
 -0.0259762   0.07670612 -0.0492494  -0.0445796  -0.04377818  0.0447792
  0.02838225  0.00930388  0.02227625 -0.00212531  0.04751861  0.03947476
 -0.03065567 -0.01284459  0.06253225 -0.09051684 -0.0759

In [13]:
pip install huggingface_hub


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 117ms/step
[ 0.01040817  0.01323743 -0.08136161  0.03703693 -0.00583531  0.09873086
 -0.02493108  0.06667081 -0.03646993 -0.02634571 -0.01645712  0.07746426
  0.02978943 -0.03657426 -0.00851209 -0.02543745  0.01909425  0.03340081
 -0.01135213 -0.12970874 -0.09350712 -0.00186395  0.07484038 -0.0429936
  0.03251211 -0.01524375 -0.01608281 -0.04873962 -0.04360047  0.0210877
 -0.00943478 -0.00025411  0.00208789 -0.02367698 -0.03761333  0.06066556
  0.04307506 -0.01791338 -0.11657034  0.02633477  0.01289767  0.0389986
 -0.00975662 -0.02284096  0.03940051  0.0072353   0.03982001  0.08939081
 -0.10278586 -0.0819013  -0.0051559   0.00425588  0.03345906 -0.0378776
 -0.07769363  0.08435165 -0.00782712  0.05853546 -0.00762564 -0.02032345
 -0.0259762   0.07670612 -0.0492494  -0.0445796  -0.04377818  0.0447792
  0.02838225  0.00930388  0.02227625 -0.00212531  0.04751861  0.03947476
 -0.03065567 -0.01284459  0.06253225 -0.09051684 -0.0759

In [31]:
pip install facenet-pytorch


Collecting facenet-pytorch
  Downloading facenet_pytorch-2.6.0-py3-none-any.whl.metadata (12 kB)
Collecting Pillow<10.3.0,>=10.2.0 (from facenet-pytorch)
  Downloading pillow-10.2.0-cp312-cp312-macosx_10_10_x86_64.whl.metadata (9.7 kB)
Downloading facenet_pytorch-2.6.0-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading pillow-10.2.0-cp312-cp312-macosx_10_10_x86_64.whl (3.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.5/3.5 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: Pillow, facenet-pytorch
  Attempting uninstall: Pillow
    Found existing installation: pillow 10.4.0
    Uninstalling pillow-10.4.0:
      Successfully uninstalled pillow-10.4.0
Successfully installed Pillow-10.2.0 facenet-pytorch-2.6.0
Note: you may need to restart the kernel to use updated packages.


In [4]:
from facenet_pytorch import InceptionResnetV1
import torch
import numpy as np
import cv2

# Load pretrained FaceNet model (VGGFace2-trained)
model = InceptionResnetV1(pretrained='vggface2').eval()

# Get embedding for a random image
def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()  # Convert BGR to RGB and fix stride
    face = torch.tensor(face.transpose(2, 0, 1)).float().unsqueeze(0)  # [1, 3, 160, 160]
    face = (face - 127.5) / 128.0  # Normalize to [-1, 1]
    with torch.no_grad():
        emb = model(face)
    return emb[0].numpy() / np.linalg.norm(emb[0].numpy())  # L2 normalize



In [7]:
img1 = np.random.randint(0, 255, (160, 160, 3), dtype=np.uint8)
img2 = np.random.randint(0, 255, (160, 160, 3), dtype=np.uint8)

emb1 = get_embedding(img1)
emb2 = get_embedding(img2)

print("Random image similarity:", np.dot(emb1, emb2))


Random image similarity: 0.98404086


In [43]:
img1 = cv2.imread("William.jpeg")  # Image of person A
img2 = cv2.imread("Lam.jpg")  # Image of person B
emb1 = get_embedding(img1)
emb2 = get_embedding(img2)
print("Cosine similarity (real faces):", np.dot(emb1, emb2))


Cosine similarity (real faces): 0.4475677


In [48]:
import cv2
import numpy as np
import torch
from facenet_pytorch import InceptionResnetV1
import mediapipe as mp

# === Initialize FaceNet ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = InceptionResnetV1(pretrained='vggface2').eval().to(device)

# === Embedding Function ===
def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()
    face = torch.tensor(face.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).to(device)
    face = (face - 127.5) / 128.0
    with torch.no_grad():
        emb = model(face)
    return emb[0].cpu().numpy() / np.linalg.norm(emb[0].cpu().numpy())

# === Load Known Faces with Face Detection ===
def load_known_face(name, filepath):
    img = cv2.imread(filepath)
    if img is None:
        print(f"[ERROR] Could not load {filepath}")
        return name, np.zeros(512)

    with mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7) as detector:
        results = detector.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        if results.detections:
            bbox = results.detections[0].location_data.relative_bounding_box
            h, w = img.shape[:2]
            x, y = int(bbox.xmin * w), int(bbox.ymin * h)
            w_box, h_box = int(bbox.width * w), int(bbox.height * h)
            face_crop = img[y:y+h_box, x:x+w_box]
            return name, get_embedding(face_crop)
        else:
            print(f"[ERROR] No face detected in {filepath}")
            return name, np.zeros(512)

known_faces = dict([
    load_known_face("William", "William.jpeg"),
    load_known_face("Lam", "Lam.jpg")
])

# === Live Face Recognition ===
cap = cv2.VideoCapture(0)
print("[INFO] Press Q to quit.\n")

with mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7) as mp_face:
    while True:
        ret, frame = cap.read()
        if not ret:
            print("❌ Camera error")
            break

        results = mp_face.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if results.detections:
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                h, w = frame.shape[:2]
                x, y = int(bbox.xmin * w), int(bbox.ymin * h)
                w_box, h_box = int(bbox.width * w), int(bbox.height * h)
                x, y = max(0, x), max(0, y)
                face_crop = frame[y:y+h_box, x:x+w_box]
                if face_crop.size == 0:
                    continue

                emb = get_embedding(face_crop)
                best_match, best_score = "Unknown", 0.0
                for name, known_emb in known_faces.items():
                    score = np.dot(emb, known_emb)
                    if score > best_score:
                        best_match, best_score = name, score

                label = f"{best_match} ({best_score:.2f})" if best_score > 0.7 else "Unknown"
                cv2.rectangle(frame, (x, y), (x + w_box, y + h_box), (0, 255, 0), 2)
                cv2.putText(frame, label, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        cv2.imshow("Face Recognition", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()


I0000 00:00:1748738366.601052 5969368 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748738366.605036 8783554 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
I0000 00:00:1748738367.090944 5969368 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748738367.096882 8783571 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[INFO] Press Q to quit.



I0000 00:00:1748738368.250135 5969368 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748738368.255328 8783632 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


In [None]:
# --- Imports ---
import cv2
import os
import time
import csv
import platform
import subprocess
from datetime import datetime
from collections import defaultdict, deque
import torch
import numpy as np
from ultralytics import YOLO
from facenet_pytorch import InceptionResnetV1
import mediapipe as mp

# === Config ===
input_path = "face_test5.mp4"
output_path = "face_test5_output.mp4"
csv_path = "face_test5_log.csv"
alert_log_path = "alert_log.csv"

model_general = YOLO("yolov9t.pt")
model_box = YOLO("best.pt")

focal_px = 700
frame_skip = 5
persistence_duration_sec = 2

# Target class mappings and heights in meters
target_classes = {0: "person", 2: "car", 16: "cat", 17: "dog", 80: "box"}
real_height_m = {0: 1.7, 2: 1.4, 16: 0.1, 17: 0.1, 80: 0.1}

# === Face Recognition Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
face_model = InceptionResnetV1(pretrained='vggface2').eval().to(device)
mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7)

def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()
    face = torch.tensor(face.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).to(device)
    face = (face - 127.5) / 128.0
    with torch.no_grad():
        emb = face_model(face)
    return emb[0].cpu().numpy() / np.linalg.norm(emb[0].cpu().numpy())

def load_known_face(name, filepath):
    img = cv2.imread(filepath)
    if img is None:
        print(f"[ERROR] Couldn't read {filepath}")
        return name, np.zeros(512)

    result = mp_face.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    if result.detections:
        bbox = result.detections[0].location_data.relative_bounding_box
        h, w = img.shape[:2]
        x, y = int(bbox.xmin * w), int(bbox.ymin * h)
        w_box = int(bbox.width * w)
        h_box = int(bbox.height * h)
        face_crop = img[y:y+h_box, x:x+w_box]
        return name, get_embedding(face_crop)
    else:
        print(f"[ERROR] No face detected in {filepath}")
        return name, np.zeros(512)

known_faces = dict([
    load_known_face("Lam", "Lam.jpg"),
    load_known_face("William", "William.jpeg")
])

# === Prepare output and logging ===
os.makedirs("suspicious", exist_ok=True)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
fps = 30 if fps == 0 else fps
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_file = open(csv_path, mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)", "Closest Person Distance (m)"])

alert_file = open(alert_log_path, mode="w", newline="")
alert_writer = csv.writer(alert_file)
alert_writer.writerow(["Alert Time", "Title", "Message"])

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
min_distance_tracker = {}
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []
proximity_flags = set()
box_appearance_nearby = defaultdict(bool)
recent_persons = deque()
logged_boxes = set()
home_arrivals = set()

# === Utilities ===
def notify_local(title, message):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    alert_writer.writerow([now, title, message])
    alert_file.flush()
    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title "{title}"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        try:
            from plyer import notification
            notification.notify(title=title, message=f"{message} at {now}", timeout=5)
        except:
            pass
    else:
        print(f"[{title}] {message} at {now}")

def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)
    cap_clip.release()
    out_clip.release()

# === Main Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()

    # --- Face Recognition Every 5 Frames ---
    if frame_num % 5 == 0:
        face_results = mp_face.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if face_results.detections:
            for detection in face_results.detections:
                bbox = detection.location_data.relative_bounding_box
                h, w = frame.shape[:2]
                x, y = int(bbox.xmin * w), int(bbox.ymin * h)
                w_box = int(bbox.width * w)
                h_box = int(bbox.height * h)
                face_crop = frame[y:y+h_box, x:x+w_box]
                if face_crop.size == 0:
                    continue
                try:
                    emb = get_embedding(face_crop)
                    best_match, best_score = "Unknown", 0.0
                    for name, known_emb in known_faces.items():
                        score = np.dot(emb, known_emb)
                        if score > best_score:
                            best_match, best_score = name, score
                    if best_score > 0.7 and best_match not in proximity_flags:
    proximity_flags.add(best_match)
    home_arrivals.add(best_match)
    timestamp_real = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    csv_writer.writerow([frame_num, "Face Recognized", best_match, "", round(timestamp, 2), timestamp_real, ""])
    alert_writer.writerow([timestamp_real, "Home Owner Detected", f"{best_match} just came home!"])
    alert_file.flush()
    notify_local("Home Owner Detected", f"{best_match} just came home!")
    print(f"[Frame {frame_num}] {best_match} just came home!")
                except Exception as e:
                    print("[Face Error]", e)

    # --- YOLO Detection and Annotation ---
    if frame_num % frame_skip == 0:
        detections = []
        result_general = model_general.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml")[0]
        if result_general.boxes.id is not None:
            for box, cls_id, track_id in zip(result_general.boxes.xyxy, result_general.boxes.cls, result_general.boxes.id):
                cls_id = int(cls_id)
                track_id = int(track_id)
                if cls_id in [0, 2, 16, 17]:
                    detections.append((box, cls_id, track_id))

        result_box = model_box(frame, verbose=False)[0]
        for box, cls_id in zip(result_box.boxes.xyxy, result_box.boxes.cls):
            cls_id = int(cls_id)
            if cls_id == 80:
                fake_track_id = hash(tuple(box.tolist())) % (10**6)
                detections.append((box, cls_id, fake_track_id))

        for box, cls_id, track_id in detections:
            label = target_classes.get(cls_id, str(cls_id))
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            box_height = y2 - y1
            height_m = real_height_m.get(cls_id, 1.0)
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else 0
            distance_text = f"{label}: {distance_m:.2f} m"

            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            last_boxes[track_id] = ((x1, y1, x2, y2), distance_text, frame_num)

    # Draw persistent YOLO boxes (green)
    for track_id, (box, text, last_seen_frame) in list(last_boxes.items()):
        if frame_num - last_seen_frame <= int(persistence_duration_sec * fps):
            x1, y1, x2, y2 = box
            cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(annotated, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
        else:
            del last_boxes[track_id]

    # Draw face recognition bounding boxes (red) (every frame if detected)
    if face_results and face_results.detections:
        for detection in face_results.detections:
            bbox = detection.location_data.relative_bounding_box
            h, w = frame.shape[:2]
            x, y = int(bbox.xmin * w), int(bbox.ymin * h)
            w_box = int(bbox.width * w)
            h_box = int(bbox.height * h)
            cv2.rectangle(annotated, (x, y), (x + w_box, y + h_box), (0, 0, 255), 2)
            cv2.putText(annotated, best_match if best_score > 0.7 else "Unknown", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    out.write(annotated)
    cv2.imshow("Detection + Face Recognition", annotated)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
csv_file.close()
alert_file.close()
cv2.destroyAllWindows()

# === Print Summary of Home Arrivals ===
if home_arrivals:
    print("\n\U0001F3E0 People who came home in the video:")
    for person in home_arrivals:
        print(f" - {person}")
else:
    print("\nNo known home owners were detected in the video.")

print("Saving suspicious video clips from output video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious clips saved.")


I0000 00:00:1748740734.783681 5969368 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748740734.788837 8849293 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[Frame 70] Lam just came home!

🏠 People who came home in the video:
 - Lam
Saving suspicious video clips from output video...
All suspicious clips saved.


In [61]:
import cv2
import numpy as np
import torch
from facenet_pytorch import InceptionResnetV1
import mediapipe as mp

# === Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = InceptionResnetV1(pretrained='vggface2').eval().to(device)
mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7)

# === Embedding Function ===
def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()
    face = torch.tensor(face.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).to(device)
    face = (face - 127.5) / 128.0
    with torch.no_grad():
        emb = model(face)
    return emb[0].cpu().numpy() / np.linalg.norm(emb[0].cpu().numpy())

# === Load Known Faces ===
def load_known_face(name, filepath):
    img = cv2.imread(filepath)
    if img is None:
        print(f"[ERROR] Couldn't read {filepath}")
        return name, np.zeros(512)
    
    result = mp_face.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    if result.detections:
        bbox = result.detections[0].location_data.relative_bounding_box
        h, w = img.shape[:2]
        x, y = int(bbox.xmin * w), int(bbox.ymin * h)
        w_box = int(bbox.width * w)
        h_box = int(bbox.height * h)
        face_crop = img[y:y+h_box, x:x+w_box]
        return name, get_embedding(face_crop)
    else:
        print(f"[ERROR] No face detected in {filepath}")
        return name, np.zeros(512)

known_faces = dict([
    load_known_face("Lam", "Lam.jpg"),
    load_known_face("William", "William.jpeg")
])

# === Video Setup ===
cap = cv2.VideoCapture("face_test5.mp4")
fps = cap.get(cv2.CAP_PROP_FPS)
fps = 30 if fps == 0 else fps
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter("only_face_detected_output.mp4", fourcc, fps, (width, height))

# === Logging Setup ===
frame_id = 0
log = []
recent_faces = {}  # Store: label -> (bbox, last_seen_frame)

# === Main Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    frame_id += 1

    if frame_id % 10 == 0:
        results = mp_face.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if results.detections:
            for detection in results.detections:
                bbox = detection.location_data.relative_bounding_box
                h, w = frame.shape[:2]
                x, y = int(bbox.xmin * w), int(bbox.ymin * h)
                w_box = int(bbox.width * w)
                h_box = int(bbox.height * h)
                x, y = max(0, x), max(0, y)

                face_crop = frame[y:y+h_box, x:x+w_box]
                if face_crop.size == 0:
                    continue

                try:
                    emb = get_embedding(face_crop)
                    best_match, best_score = "Unknown", 0.0
                    for name, known_emb in known_faces.items():
                        score = np.dot(emb, known_emb)
                        if score > best_score:
                            best_match, best_score = name, score

                    if best_score > 0.7:
                        label = f"{best_match} ({best_score:.2f})"
                        log.append((frame_id, best_match, best_score))
                        print(f"[Frame {frame_id}] {best_match} detected with score {best_score:.2f}")
                    else:
                        label = "Unknown"

                    # Save for drawing over next 1 second
                    recent_faces[label] = ((x, y, x + w_box, y + h_box), frame_id)

                except Exception as e:
                    print("[Face Error]", e)

    # Draw bounding boxes from recent_faces (lasting 1 second)
    for label, (bbox, seen_frame) in list(recent_faces.items()):
        if frame_id - seen_frame <= int(fps):  # Last 1 second
            x1, y1, x2, y2 = bbox
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        else:
            del recent_faces[label]

    out.write(frame)
    cv2.imshow("Face Recognition (1s Box)", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
out.release()
cv2.destroyAllWindows()

# === Save Log to Text File ===
with open("face_detection_log.txt", "w") as f:
    for entry in log:
        f.write(f"Frame {entry[0]}: {entry[1]} (score={entry[2]:.2f})\n")

print("✅ Saved: only_face_detected_output.mp4")
print("📝 Log saved to: face_detection_log.txt")


I0000 00:00:1748739735.978094 5969368 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748739735.983988 8822747 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[Frame 70] Lam detected with score 0.89
[Frame 80] Lam detected with score 0.85
[Frame 90] Lam detected with score 0.85
[Frame 100] Lam detected with score 0.74
[Frame 110] Lam detected with score 0.76
[Frame 120] Lam detected with score 0.84
[Frame 130] Lam detected with score 0.92
[Frame 150] Lam detected with score 0.74
[Frame 160] Lam detected with score 0.78
[Frame 170] Lam detected with score 0.86
[Frame 180] Lam detected with score 0.91
[Frame 190] Lam detected with score 0.82
[Frame 200] Lam detected with score 0.85
[Frame 210] Lam detected with score 0.87
[Frame 220] Lam detected with score 0.85
[Frame 240] Lam detected with score 0.91
[Frame 250] Lam detected with score 0.89
[Frame 260] Lam detected with score 0.91
[Frame 270] Lam detected with score 0.92
✅ Saved: only_face_detected_output.mp4
📝 Log saved to: face_detection_log.txt


In [6]:
# --- Imports ---
import cv2
import os
import time
import csv
import platform
import subprocess
from datetime import datetime, timedelta
from collections import defaultdict, deque
import torch
import numpy as np
from ultralytics import YOLO
from facenet_pytorch import InceptionResnetV1
import mediapipe as mp
import uuid

# === Config ===
input_path = "input_videos/face_test5.mp4"
output_path = "output_videos/face_test5_output.mp4"
csv_path = "log/face_test5_log.csv"
alert_log_path = "log/alert_log.csv"

model_general = YOLO("models/yolov9t.pt")
model_box = YOLO("models/best.pt")

focal_px = 700
frame_skip = 5
persistence_duration_sec = 2  # Reduced from 10 to 2 seconds for faster bounding box removal
alert_cooldown_sec = 10
delivery_suppression_sec = 5
log_cleanup_window_sec = 60
person_proximity_cooldown_sec = 10
mailbox_cooldown_sec = 3600
box_removal_timeout_sec = 60

# Target class mappings and heights in meters
target_classes = {0: "person", 2: "car", 16: "cat", 17: "dog", 80: "box"}
real_height_m = {0: 1.7, 2: 1.4, 16: 0.1, 17: 0.1, 80: 0.1}

# === Face Recognition Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
face_model = InceptionResnetV1(pretrained='vggface2').eval().to(device)
mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7)

def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()
    face = torch.tensor(face.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).to(device)
    face = (face - 127.5) / 128.0
    with torch.no_grad():
        emb = face_model(face)
    return emb[0].cpu().numpy() / np.linalg.norm(emb[0].cpu().numpy())

def load_known_face(name, filepath):
    img = cv2.imread(filepath)
    if img is None:
        print(f"[ERROR] Couldn't read {filepath}")
        return name, np.zeros(512)

    result = mp_face.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    if result.detections:
        bbox = result.detections[0].location_data.relative_bounding_box
        h, w = img.shape[:2]
        x, y = int(bbox.xmin * w), int(bbox.ymin * h)
        w_box = int(bbox.width * w)
        h_box = int(bbox.height * h)
        face_crop = img[y:y+h_box, x:x+w_box]
        return name, get_embedding(face_crop)
    else:
        print(f"[ERROR] No face detected in {filepath}")
        return name, np.zeros(512)

known_faces = dict([
    load_known_face("Lam", "home_owner_imgs/Lam/Lam.jpg"),
    load_known_face("William", "home_owner_imgs/William/William.jpeg")
])

# === Prepare output and logging ===
os.makedirs("suspicious", exist_ok=True)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
fps = 30 if fps == 0 else fps
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_log_buffer = []
alert_log_buffer = []

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
min_distance_tracker = {}
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []
proximity_flags = set()
box_appearance_nearby = defaultdict(bool)
recent_persons = deque()
logged_boxes = set()
box_last_seen = {}
home_arrivals = set()
last_alert_time = {}
track_to_name = {}
delivery_suppression_until = 0
last_person_proximity_time = 0

# === Utilities ===
def notify_local(title, message, key=None):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    current_time = time.time()
    if key and key in last_alert_time and (current_time - last_alert_time[key] < alert_cooldown_sec):
        return False
    last_alert_time[key] = current_time
    alert_log_buffer.append([now, title, message])
    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title "{title}"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        try:
            from plyer import notification
            notification.notify(title=title, message=f"{message} at {now}", timeout=5)
        except:
            pass
    else:
        print(f"[{title}] {message} at {now}")
    return True

def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)
    cap_clip.release()
    out_clip.release()

def is_face_in_box(face_bbox, person_box):
    fx, fy, fw, fh = face_bbox
    px1, py1, px2, py2 = person_box
    return px1 <= fx <= px2 and py1 <= fy <= py2 and px1 <= fx + fw <= px2 and py1 <= fy + fh <= py2

def cleanup_logs(person_name, current_time_str):
    current_time = datetime.strptime(current_time_str, "%Y-%m-%d %H:%M:%S")
    cutoff_time = current_time - timedelta(seconds=log_cleanup_window_sec)
    cleaned_alert_buffer = []
    cleaned_csv_buffer = []

    for entry in alert_log_buffer:
        entry_time_str, title, message = entry
        entry_time = datetime.strptime(entry_time_str, "%Y-%m-%d %H:%M:%S")
        if title == "Proximity Alert" and "person approaching" in message.lower() and entry_time >= cutoff_time:
            continue
        cleaned_alert_buffer.append(entry)

    for entry in csv_log_buffer:
        event_id, frame, behavior, cls, distance, ts, event_time, closest_dist = entry
        entry_time = datetime.strptime(event_time, "%Y-%m-%d %H:%M:%S")
        if behavior == "Proximity Alert" and cls == "person" and entry_time >= cutoff_time:
            continue
        cleaned_csv_buffer.append(entry)

    return cleaned_alert_buffer, cleaned_csv_buffer

# === Main Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()
    delivery_triggered = False
    current_time = time.time()

    # --- Face Recognition Every 5 Frames ---
    face_results = None
    if frame_num % 5 == 0:
        face_results = mp_face.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if face_results.detections:
            for detection in face_results.detections:
                bbox = detection.location_data.relative_bounding_box
                h, w = frame.shape[:2]
                x, y = int(bbox.xmin * w), int(bbox.ymin * h)
                w_box = int(bbox.width * w)
                h_box = int(bbox.height * h)
                face_crop = frame[y:y+h_box, x:x+w_box]
                if face_crop.size == 0:
                    continue
                try:
                    emb = get_embedding(face_crop)
                    best_match, best_score = "Unknown", 0.0
                    for name, known_emb in known_faces.items():
                        score = np.dot(emb, known_emb)
                        if score > best_score:
                            best_match, best_score = name, score
                    if best_score > 0.7 and best_match not in home_arrivals:
                        home_arrivals.add(best_match)
                        timestamp_real = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        event_id = str(uuid.uuid4())
                        csv_log_buffer.append([event_id, frame_num, "Face Recognized", best_match, "", round(timestamp, 2), timestamp_real, ""])
                        if notify_local("Home Owner Detected", f"{best_match} just came home!", key=f"face_{best_match}"):
                            print(f"[Frame {frame_num}] {best_match} just came home!")
                        alert_log_buffer[:], csv_log_buffer[:] = cleanup_logs(best_match, timestamp_real)
                        for track_id, ((x1, y1, x2, y2), _, _) in last_boxes.items():
                            if target_classes.get(disappeared_tracks.get(track_id, [None])[0], "") == "person":
                                if is_face_in_box((x, y, w_box, h_box), (x1, y1, x2, y2)):
                                    track_to_name[track_id] = best_match
                                    proximity_flags.discard(track_id)
                                    break
                except Exception as e:
                    print("[Face Error]", e)

    # --- YOLO Detection and Annotation ---
    if frame_num % frame_skip == 0:
        detections = []
        detected_box_centers = set()
        result_general = model_general.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml")[0]
        if result_general.boxes.id is not None:
            for box, cls_id, track_id in zip(result_general.boxes.xyxy, result_general.boxes.cls, result_general.boxes.id):
                cls_id = int(cls_id)
                track_id = int(track_id)
                if cls_id in [0, 2, 16, 17]:
                    detections.append((box, cls_id, track_id))
                    current_ids.add(track_id)  # Track IDs detected in this frame

        result_box = model_box(frame, verbose=False)[0]
        for box, cls_id in zip(result_box.boxes.xyxy, result_box.boxes.cls):
            cls_id = int(cls_id)
            if cls_id == 80:
                box_center = get_center(box.tolist())
                detected_box_centers.add(box_center)
                is_new_box = all(
                    abs(box_center[0] - prev_center[0]) > 20 or
                    abs(box_center[1] - prev_center[1]) > 20 or
                    (current_time - prev_time) > mailbox_cooldown_sec
                    for prev_center, prev_time in logged_boxes
                )
                if is_new_box:
                    fake_track_id = hash(str(box_center) + str(current_time)) % (10**6)
                    detections.append((box, cls_id, fake_track_id))
                box_last_seen[box_center] = current_time

        # Check for removed boxes
        for center in list(box_last_seen.keys()):
            if current_time - box_last_seen[center] > box_removal_timeout_sec:
                logged_boxes.discard(next((c, t) for c, t in logged_boxes if c == center))
                del box_last_seen[center]

        for box, cls_id, track_id in detections:
            label = target_classes.get(cls_id, str(cls_id))
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            box_height = y2 - y1
            height_m = real_height_m.get(cls_id, 1.0)
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else None
            distance_text = f"{label}: {distance_m:.2f} m" if distance_m else f"{label}: N/A"

            if distance_m and frame_num <= delivery_suppression_until:
                delivery_triggered = True

            if distance_m and not delivery_triggered:
                if cls_id == 80 and distance_m < 5.0:
                    is_within_cooldown = any(
                        abs(center[0] - prev_center[0]) <= 20 and
                        abs(center[1] - prev_center[1]) <= 20 and
                        (current_time - prev_time) <= mailbox_cooldown_sec
                        for prev_center, prev_time in logged_boxes
                    )
                    if not is_within_cooldown:
                        logged_boxes.add((center, current_time))
                        event_time = round(timestamp, 2)
                        real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        event_id = str(uuid.uuid4())
                        csv_log_buffer.append([event_id, frame_num, "Box Appeared", label, round(distance_m, 2), event_time, real_time, ""])
                        if notify_local("Delivery Alert", "A mailbox appeared close by.", key=f"box_{track_id}"):
                            delivery_suppression_until = frame_num + int(fps * delivery_suppression_sec)
                            delivery_triggered = True

                if cls_id in [0, 2, 16, 17] and distance_m < 5.0:
                    if cls_id == 0 and track_id in track_to_name:
                        continue
                    if cls_id == 0 and (current_time - last_person_proximity_time < person_proximity_cooldown_sec):
                        continue
                    if track_id not in proximity_flags:
                        if notify_local("Proximity Alert", f"There's a {label} approaching at {round(distance_m, 2)} meters.", key=f"prox_{track_id}"):
                            proximity_flags.add(track_id)
                            if cls_id == 0:
                                recent_persons.append((track_id, distance_m, frame_num))
                                last_person_proximity_time = current_time

                if cls_id == 80:
                    for pid, dist_p, last_seen in recent_persons:
                        if dist_p < 5.0 and (frame_num - last_seen) <= int(fps * persistence_duration_sec):
                            if pid in track_to_name:
                                continue
                            if notify_local("Delivery Detected", "There is a mailbox in front, go check that out!", key=f"delivery_{pid}"):
                                event_id = str(uuid.uuid4())
                                real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                csv_log_buffer.append([event_id, frame_num, "Delivery Detected", "person+box", "", round(timestamp, 2), real_time, round(dist_p, 2)])
                                recent_persons.clear()
                                proximity_flags.clear()
                                delivery_triggered = True
                                delivery_suppression_until = frame_num + int(fps * delivery_suppression_sec)
                                break

            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            distance_history[track_id].append(distance_m)
            if len(distance_history[track_id]) > 5:
                distance_history[track_id].popleft()

            min_distance = min(min_distance_tracker.get(track_id, distance_m or float('inf')), distance_m or float('inf'))
            min_distance_tracker[track_id] = min_distance
            if track_id not in track_timestamps:
                track_timestamps[track_id] = [frame_num, frame_num]
            else:
                track_timestamps[track_id][1] = frame_num

            # Update last_boxes with the latest detection
            last_boxes[track_id] = ((x1, y1, x2, y2), distance_text, frame_num)
            disappeared_tracks[track_id] = (label, min_distance, frame_num)

    # Handle disappeared tracks
    for track_id in list(last_boxes.keys()):
        if track_id not in current_ids:
            # If the track ID wasn't detected in this frame, check persistence
            last_seen_frame = last_boxes[track_id][2]
            if frame_num - last_seen_frame > int(persistence_duration_sec * fps):
                if track_id in track_to_name:
                    del track_to_name[track_id]
                del last_boxes[track_id]
                if track_id in disappeared_tracks:
                    del disappeared_tracks[track_id]
                if track_id in proximity_flags:
                    proximity_flags.remove(track_id)
                continue

        # Draw the bounding box if it hasn't expired
        box, text, last_seen_frame = last_boxes[track_id]
        x1, y1, x2, y2 = box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(annotated, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    # Draw face recognition bounding boxes (red)
    if face_results and face_results.detections:
        for detection in face_results.detections:
            bbox = detection.location_data.relative_bounding_box
            h, w = frame.shape[:2]
            x, y = int(bbox.xmin * w), int(bbox.ymin * h)
            w_box = int(bbox.width * w)
            h_box = int(bbox.height * h)
            cv2.rectangle(annotated, (x, y), (x + w_box, y + h_box), (0, 0, 255), 2)
            cv2.putText(annotated, best_match if best_score > 0.7 else "Unknown", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    out.write(annotated)
    cv2.imshow("Detection + Face Recognition", annotated)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Write logs to files
with open(csv_path, mode="w", newline="") as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(["Event ID", "Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)", "Closest Person Distance (m)"])
    for entry in csv_log_buffer:
        csv_writer.writerow(entry)

with open(alert_log_path, mode="w", newline="") as alert_file:
    alert_writer = csv.writer(alert_file)
    alert_writer.writerow(["Alert Time", "Title", "Message"])
    for entry in alert_log_buffer:
        alert_writer.writerow(entry)

cap.release()
out.release()
cv2.destroyAllWindows()


print("Saving suspicious video clips from output video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious clips saved.")

I0000 00:00:1748991781.580220 12329549 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1748991781.586255 12387640 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[Frame 70] Lam just came home!
Saving suspicious video clips from output video...
All suspicious clips saved.


In [None]:
# --- Imports ---
import cv2
import os
import time
import csv
import platform
import subprocess
from datetime import datetime, timedelta
from collections import defaultdict, deque
import torch
import numpy as np
from ultralytics import YOLO
from facenet_pytorch import InceptionResnetV1
import mediapipe as mp
import uuid

# === Config ===
input_path = "input_videos/face_test5.mp4"
output_path = "output_videos/face_test5_output.mp4"
csv_path = "log/face_test5_log.csv"
alert_log_path = "log/alert_log.csv"

model_general = YOLO("models/yolov9t.pt")
model_box = YOLO("models/box_yolov9t.pt")

focal_px = 700
frame_skip = 5
persistence_duration_sec = 2
alert_cooldown_sec = 10
delivery_suppression_sec = 5
log_cleanup_window_sec = 60
person_proximity_cooldown_sec = 10
mailbox_cooldown_sec = 3600
box_removal_timeout_sec = 60

# Target class mappings and heights in meters
target_classes = {0: "person", 2: "car", 16: "cat", 17: "dog", 80: "box"}
real_height_m = {0: 1.7, 2: 1.4, 16: 0.1, 17: 0.1, 80: 0.1}

# === Face Recognition Setup ===
device = 'cuda' if torch.cuda.is_available() else 'cpu'
face_model = InceptionResnetV1(pretrained='vggface2').eval().to(device)
mp_face = mp.solutions.face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.7)

def get_embedding(img):
    face = cv2.resize(img, (160, 160))[:, :, ::-1].copy()
    face = torch.tensor(face.transpose(2, 0, 1), dtype=torch.float32).unsqueeze(0).to(device)
    face = (face - 127.5) / 128.0
    with torch.no_grad():
        emb = face_model(face)
    return emb[0].cpu().numpy() / np.linalg.norm(emb[0].cpu().numpy())

def load_known_face(name, filepath):
    img = cv2.imread(filepath)
    if img is None:
        print(f"[ERROR] Couldn't read {filepath}")
        return name, np.zeros(512)

    result = mp_face.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    if result.detections:
        bbox = result.detections[0].location_data.relative_bounding_box
        h, w = img.shape[:2]
        x, y = int(bbox.xmin * w), int(bbox.ymin * h)
        w_box = int(bbox.width * w)
        h_box = int(bbox.height * h)
        face_crop = img[y:y+h_box, x:x+w_box]
        return name, get_embedding(face_crop)
    else:
        print(f"[ERROR] No face detected in {filepath}")
        return name, np.zeros(512)

known_faces = dict([
    load_known_face("Lam", "home_owner_imgs/Lam/Lam.jpg"),
    load_known_face("William", "home_owner_imgs/William/William.jpeg")
])

# === Prepare output and logging ===
os.makedirs("suspicious", exist_ok=True)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
fps = 30 if fps == 0 else fps
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_log_buffer = []
alert_log_buffer = []

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
min_distance_tracker = {}
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []
proximity_flags = set()
box_appearance_nearby = defaultdict(bool)
recent_persons = deque()
logged_boxes = set()
box_last_seen = {}
home_arrivals = set()
last_alert_time = {}
track_to_name = {}
delivery_suppression_until = 0
last_person_proximity_time = 0

# === Utilities ===
def notify_local(title, message, key=None):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    current_time = time.time()

    if title == "Proximity Alert":
        if alert_log_buffer:
            last_entry = alert_log_buffer[-1]
            if last_entry[1] == "Proximity Alert":
                last_msg = last_entry[2]
                if message == last_msg:
                    return False  # Exact message already exists
                try:
                    last_dist = float(last_msg.split("at")[1].split()[0])
                    new_dist = float(message.split("at")[1].split()[0])
                    if abs(last_dist - new_dist) < 0.05:
                        return False  # Skip if difference < 5cm
                except:
                    pass

        # Time-based suppression: ignore if similar alert within 10s
        for entry_time_str, entry_title, _ in reversed(alert_log_buffer[-5:]):
            if entry_title != "Proximity Alert":
                continue
            entry_time = datetime.strptime(entry_time_str, "%Y-%m-%d %H:%M:%S")
            if (datetime.now() - entry_time).total_seconds() < 10:
                return False

    if key and key in last_alert_time and (current_time - last_alert_time[key] < alert_cooldown_sec):
        return False
    last_alert_time[key] = current_time

    alert_log_buffer.append([now, title, message])

    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title "{title}"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        try:
            from plyer import notification
            notification.notify(title=title, message=f"{message} at {now}", timeout=5)
        except:
            pass
    else:
        print(f"[{title}] {message} at {now}")
    return True



def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)
    cap_clip.release()
    out_clip.release()

def is_face_in_box(face_bbox, person_box):
    fx, fy, fw, fh = face_bbox
    px1, py1, px2, py2 = person_box
    return px1 <= fx <= px2 and py1 <= fy <= py2 and px1 <= fx + fw <= px2 and py1 <= fy + fh <= py2

def cleanup_logs(person_name, current_time_str):
    current_time = datetime.strptime(current_time_str, "%Y-%m-%d %H:%M:%S")
    cutoff_time = current_time - timedelta(seconds=log_cleanup_window_sec)
    cleaned_alert_buffer = []
    cleaned_csv_buffer = []

    for entry in alert_log_buffer:
        entry_time_str, title, message = entry
        entry_time = datetime.strptime(entry_time_str, "%Y-%m-%d %H:%M:%S")
        if title == "Proximity Alert" and "person approaching" in message.lower() and entry_time >= cutoff_time:
            continue
        cleaned_alert_buffer.append(entry)

    for entry in csv_log_buffer:
        event_id, frame, behavior, cls, distance, ts, event_time, closest_dist = entry
        entry_time = datetime.strptime(event_time, "%Y-%m-%d %H:%M:%S")
        if behavior == "Proximity Alert" and cls == "person" and entry_time >= cutoff_time:
            continue
        cleaned_csv_buffer.append(entry)

    return cleaned_alert_buffer, cleaned_csv_buffer

# === Main Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()
    delivery_triggered = False
    current_time = time.time()

    # --- Face Recognition Every 5 Frames ---
    face_results = None
    if frame_num % 5 == 0:
        face_results = mp_face.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if face_results.detections:
            for detection in face_results.detections:
                bbox = detection.location_data.relative_bounding_box
                h, w = frame.shape[:2]
                x, y = int(bbox.xmin * w), int(bbox.ymin * h)
                w_box = int(bbox.width * w)
                h_box = int(bbox.height * h)
                face_crop = frame[y:y+h_box, x:x+w_box]
                if face_crop.size == 0:
                    continue
                try:
                    emb = get_embedding(face_crop)
                    best_match, best_score = "Unknown", 0.0
                    for name, known_emb in known_faces.items():
                        score = np.dot(emb, known_emb)
                        if score > best_score:
                            best_match, best_score = name, score
                    if best_score > 0.7 and best_match not in home_arrivals:
                        home_arrivals.add(best_match)
                        timestamp_real = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        event_id = str(uuid.uuid4())
                        csv_log_buffer.append([event_id, frame_num, "Door open", best_match, "", round(timestamp, 2), timestamp_real, ""])
                        if notify_local("Home Owner Detected", f"{best_match} just came home!", key=f"face_{best_match}"):
                            print(f"[Frame {frame_num}] {best_match} just came home!")
                            alert_log_buffer.append([datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "Door Open", f"Owner: {best_match}"])
                        alert_log_buffer[:], csv_log_buffer[:] = cleanup_logs(best_match, timestamp_real)
                        for track_id, ((x1, y1, x2, y2), _, _) in last_boxes.items():
                            if target_classes.get(disappeared_tracks.get(track_id, [None])[0], "") == "person":
                                if is_face_in_box((x, y, w_box, h_box), (x1, y1, x2, y2)):
                                    track_to_name[track_id] = best_match
                                    proximity_flags.discard(track_id)
                                    break
                except Exception as e:
                    print("[Face Error]", e)

    # --- YOLO Detection and Annotation ---
    if frame_num % frame_skip == 0:
        detections = []
        detected_box_centers = set()
        result_general = model_general.track(frame, persist=True, verbose=False, tracker="bytetrack.yaml")[0]
        if result_general.boxes.id is not None:
            for box, cls_id, track_id in zip(result_general.boxes.xyxy, result_general.boxes.cls, result_general.boxes.id):
                cls_id = int(cls_id)
                track_id = int(track_id)
                if cls_id in [0, 2, 16, 17]:
                    detections.append((box, cls_id, track_id))
                    current_ids.add(track_id)

        result_box = model_box(frame, verbose=False)[0]
        for box, cls_id in zip(result_box.boxes.xyxy, result_box.boxes.cls):
            cls_id = int(cls_id)
            if cls_id == 80:
                box_center = get_center(box.tolist())
                detected_box_centers.add(box_center)
                is_new_box = all(
                    abs(box_center[0] - prev_center[0]) > 20 or
                    abs(box_center[1] - prev_center[1]) > 20 or
                    (current_time - prev_time) > mailbox_cooldown_sec
                    for prev_center, prev_time in logged_boxes
                )
                if is_new_box:
                    fake_track_id = hash(str(box_center) + str(current_time)) % (10**6)
                    detections.append((box, cls_id, fake_track_id))
                box_last_seen[box_center] = current_time

        for center in list(box_last_seen.keys()):
            if current_time - box_last_seen[center] > box_removal_timeout_sec:
                logged_boxes.discard(next((c, t) for c, t in logged_boxes if c == center))
                del box_last_seen[center]

        for box, cls_id, track_id in detections:
            label = target_classes.get(cls_id, str(cls_id))
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            box_height = y2 - y1
            height_m = real_height_m.get(cls_id, 1.0)
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else None
            distance_text = f"{label}: {distance_m:.2f} m" if distance_m else f"{label}: N/A"

            if distance_m and frame_num <= delivery_suppression_until:
                delivery_triggered = True

            if distance_m and not delivery_triggered:
                if cls_id == 80 and distance_m < 5.0:
                    is_within_cooldown = any(
                        abs(center[0] - prev_center[0]) <= 20 and
                        abs(center[1] - prev_center[1]) <= 20 and
                        (current_time - prev_time) <= mailbox_cooldown_sec
                        for prev_center, prev_time in logged_boxes
                    )
                    if not is_within_cooldown:
                        logged_boxes.add((center, current_time))
                        event_time = round(timestamp, 2)
                        real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                        event_id = str(uuid.uuid4())
                        csv_log_buffer.append([event_id, frame_num, "Box Appeared", label, round(distance_m, 2), event_time, real_time, ""])
                        if notify_local("Delivery Alert", "A mailbox appeared close by.", key=f"box_{track_id}"):
                            delivery_suppression_until = frame_num + int(fps * delivery_suppression_sec)
                            delivery_triggered = True

                if cls_id in [0, 2, 16, 17] and distance_m < 5.0:
                    if cls_id == 0 and track_id in track_to_name:
                        continue
                    if cls_id == 0 and (current_time - last_person_proximity_time < person_proximity_cooldown_sec):
                        continue
                    if track_id not in proximity_flags:
                        if notify_local("Proximity Alert", f"There's a {label} approaching at {round(distance_m, 2)} meters.", key=f"prox_{track_id}"):
                            proximity_flags.add(track_id)
                            if cls_id == 0:
                                recent_persons.append((track_id, distance_m, frame_num))
                                last_person_proximity_time = current_time

                if cls_id == 80:
                    for pid, dist_p, last_seen in recent_persons:
                        if dist_p < 5.0 and (frame_num - last_seen) <= int(fps * persistence_duration_sec):
                            if pid in track_to_name:
                                continue
                            if notify_local("Delivery Detected", "There is a mailbox in front, go check that out!", key=f"delivery_{pid}"):
                                event_id = str(uuid.uuid4())
                                real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                                csv_log_buffer.append([event_id, frame_num, "Delivery Detected", "person+box", "", round(timestamp, 2), real_time, round(dist_p, 2)])
                                recent_persons.clear()
                                proximity_flags.clear()
                                delivery_triggered = True
                                delivery_suppression_until = frame_num + int(fps * delivery_suppression_sec)
                                break

            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            distance_history[track_id].append(distance_m)
            if len(distance_history[track_id]) > 5:
                distance_history[track_id].popleft()

            min_distance = min(min_distance_tracker.get(track_id, distance_m or float('inf')), distance_m or float('inf'))
            min_distance_tracker[track_id] = min_distance
            if track_id not in track_timestamps:
                track_timestamps[track_id] = [frame_num, frame_num]
            else:
                track_timestamps[track_id][1] = frame_num

            last_boxes[track_id] = ((x1, y1, x2, y2), distance_text, frame_num)
            disappeared_tracks[track_id] = (label, min_distance, frame_num)

    for track_id in list(last_boxes.keys()):
        if track_id not in current_ids:
            last_seen_frame = last_boxes[track_id][2]
            if frame_num - last_seen_frame > int(persistence_duration_sec * fps):
                if track_id in track_to_name:
                    del track_to_name[track_id]
                del last_boxes[track_id]
                if track_id in disappeared_tracks:
                    del disappeared_tracks[track_id]
                if track_id in proximity_flags:
                    proximity_flags.remove(track_id)
                continue

        box, text, last_seen_frame = last_boxes[track_id]
        x1, y1, x2, y2 = box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(annotated, text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    if face_results and face_results.detections:
        for detection in face_results.detections:
            bbox = detection.location_data.relative_bounding_box
            h, w = frame.shape[:2]
            x, y = int(bbox.xmin * w), int(bbox.ymin * h)
            w_box = int(bbox.width * w)
            h_box = int(bbox.height * h)
            cv2.rectangle(annotated, (x, y), (x + w_box, y + h_box), (0, 0, 255), 2)
            cv2.putText(annotated, best_match if best_score > 0.7 else "Unknown", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

    out.write(annotated)
    cv2.imshow("Detection + Face Recognition", annotated)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Write logs to files
with open(csv_path, mode="w", newline="") as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(["Event ID", "Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)", "Closest Person Distance (m)"])
    for entry in csv_log_buffer:
        csv_writer.writerow(entry)

with open(alert_log_path, mode="w", newline="") as alert_file:
    alert_writer = csv.writer(alert_file)
    alert_writer.writerow(["Alert Time", "Title", "Message"])
    for entry in alert_log_buffer:
        alert_writer.writerow(entry)

cap.release()
out.release()
cv2.destroyAllWindows()

print("Saving suspicious video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious videos saved.")

I0000 00:00:1749002542.392586 12329549 gl_context.cc:369] GL version: 2.1 (2.1 ATI-4.14.1), renderer: AMD Radeon Pro 560 OpenGL Engine
W0000 00:00:1749002542.398668 12629858 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


[Frame 70] Lam just came home!
Saving suspicious video...
All suspicious videos saved.
