In [None]:
pip install ultralytics


In [5]:
pip install pillow-heif


Collecting pillow-heif
  Downloading pillow_heif-0.22.0-cp312-cp312-macosx_13_0_x86_64.whl.metadata (9.6 kB)
Downloading pillow_heif-0.22.0-cp312-cp312-macosx_13_0_x86_64.whl (5.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.4/5.4 MB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: pillow-heif
Successfully installed pillow-heif-0.22.0
Note: you may need to restart the kernel to use updated packages.


## Change input image file

In [11]:
from pillow_heif import register_heif_opener
from PIL import Image

register_heif_opener()  # Enable HEIC support in PIL

image = Image.open("test2.heic")
image.save("test2.jpg", "JPEG")


In [12]:
from ultralytics import YOLO

# Load the YOLOv9 tiny model
model = YOLO("yolov9t.pt")

# Perform inference on an image
results = model("test2.jpg")

# Display the results
results[0].show()

# Save the result to disk
results[0].save(filename="output2.jpg")



image 1/1 /Users/apple/Downloads/Capstone Project/test2.jpg: 640x480 11 cars, 2 trucks, 187.1ms
Speed: 4.7ms preprocess, 187.1ms inference, 0.8ms postprocess per image at shape (1, 3, 640, 480)


'output2.jpg'

In [10]:

# Run inference on a video file and save output
model.predict(
    source="input.mp4",           # path to your .mov video
    save=True,                    # save annotated video
    save_txt=False,               # if you want bounding box data saved too
    conf=0.3,                     # optional: confidence threshold
    iou=0.5,                      # optional: NMS threshold
    stream=False,                 # process as a video (not frame-by-frame)
)




errors for large sources or long-running streams and videos. See https://docs.ultralytics.com/modes/predict/ for help.

Example:
    results = model(source=..., stream=True)  # generator of Results objects
    for r in results:
        boxes = r.boxes  # Boxes object for bbox outputs
        masks = r.masks  # Masks object for segment masks outputs
        probs = r.probs  # Class probabilities for classification outputs

video 1/1 (frame 1/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 222.9ms
video 1/1 (frame 2/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 175.3ms
video 1/1 (frame 3/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 140.5ms
video 1/1 (frame 4/195) /Users/apple/Downloads/Capstone Project/input.mp4: 640x384 1 person, 2 cars, 2 traffic lights, 162.9ms
video 1/1 (frame 5/195) /Users/apple/Downloads/Capstone Project/

[ultralytics.engine.results.Results object with attributes:
 
 boxes: ultralytics.engine.results.Boxes object
 keypoints: None
 masks: None
 names: {0: 'person', 1: 'bicycle', 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck', 8: 'boat', 9: 'traffic light', 10: 'fire hydrant', 11: 'stop sign', 12: 'parking meter', 13: 'bench', 14: 'bird', 15: 'cat', 16: 'dog', 17: 'horse', 18: 'sheep', 19: 'cow', 20: 'elephant', 21: 'bear', 22: 'zebra', 23: 'giraffe', 24: 'backpack', 25: 'umbrella', 26: 'handbag', 27: 'tie', 28: 'suitcase', 29: 'frisbee', 30: 'skis', 31: 'snowboard', 32: 'sports ball', 33: 'kite', 34: 'baseball bat', 35: 'baseball glove', 36: 'skateboard', 37: 'surfboard', 38: 'tennis racket', 39: 'bottle', 40: 'wine glass', 41: 'cup', 42: 'fork', 43: 'knife', 44: 'spoon', 45: 'bowl', 46: 'banana', 47: 'apple', 48: 'sandwich', 49: 'orange', 50: 'broccoli', 51: 'carrot', 52: 'hot dog', 53: 'pizza', 54: 'donut', 55: 'cake', 56: 'chair', 57: 'couch', 58: 'potted p

In [16]:
!pip install plyer

Collecting plyer
  Downloading plyer-2.1.0-py2.py3-none-any.whl.metadata (61 kB)
Downloading plyer-2.1.0-py2.py3-none-any.whl (142 kB)
Installing collected packages: plyer
Successfully installed plyer-2.1.0


In [2]:
import cv2
import time
import csv
import os
import platform
import subprocess
from ultralytics import YOLO
from collections import defaultdict, deque
from datetime import datetime

if platform.system() == "Windows":
    from plyer import notification

# === Configuration ===
input_path = "library2.mp4"
output_path = "library2_detected_output.mp4"
csv_path = "library2_log.csv"
model_path = "yolov9t.pt"
target_classes = {0: "person", 2: "car"}
frame_skip = 5
focal_px = 700
real_height_m = {0: 1.7, 2: 1.5}
persistence_duration_sec = 2

# === Setup ===
os.makedirs("suspicious", exist_ok=True)
model = YOLO(model_path)
cap = cv2.VideoCapture(input_path)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
    fps = 30
width, height = int(cap.get(3)), int(cap.get(4))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

csv_file = open(csv_path, mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["Frame", "Behavior", "Class", "Distance (m)", "Timestamp (s)", "Event Time (system)"])

track_history = defaultdict(deque)
distance_history = defaultdict(deque)
last_boxes = {}
behavior_flags = set()
disappeared_tracks = {}
track_timestamps = {}
suspicious_events = []  # Store clip info for post-processing

# === Notification ===
def notify_local(title, message):
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    if platform.system() == "Darwin":
        script = f'display notification "{message} at {now}" with title \"{title}\"'
        subprocess.run(["osascript", "-e", script])
    elif platform.system() == "Windows":
        notification.notify(
            title=title,
            message=f"{message} at {now}",
            timeout=5
        )
    else:
        print(f"[{title}] {message} at {now}")

# === Utility ===
def get_center(box):
    x1, y1, x2, y2 = box
    return int((x1 + x2) / 2), int((y1 + y2) / 2)

def save_suspicious_clip(start_frame, end_frame, output_filename):
    cap_clip = cv2.VideoCapture(output_path)
    cap_clip.set(cv2.CAP_PROP_POS_FRAMES, max(0, start_frame))
    out_clip = cv2.VideoWriter(output_filename, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
    
    for _ in range(end_frame - start_frame):
        ret, frame = cap_clip.read()
        if not ret:
            break
        out_clip.write(frame)

    cap_clip.release()
    out_clip.release()

# === Main Processing Loop ===
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    frame_num = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
    timestamp = frame_num / fps
    annotated = frame.copy()
    current_ids = set()

    if frame_num % frame_skip == 0:
        results = model.track(frame, persist=True, verbose=False)[0]

        for box, cls_id, track_id in zip(results.boxes.xyxy, results.boxes.cls, results.boxes.id):
            cls_id = int(cls_id)
            track_id = int(track_id)
            if cls_id not in target_classes:
                continue

            label = target_classes[cls_id]
            x1, y1, x2, y2 = map(int, box.tolist())
            center = get_center((x1, y1, x2, y2))
            track_history[track_id].append(center)
            if len(track_history[track_id]) > int(fps * 30):
                track_history[track_id].popleft()

            box_height = y2 - y1
            height_m = real_height_m[cls_id]
            distance_m = (focal_px * height_m) / box_height if box_height > 0 else None

            if distance_m:
                distance_history[track_id].append(distance_m)
                if len(distance_history[track_id]) > 5:
                    distance_history[track_id].popleft()

                if track_id not in track_timestamps:
                    track_timestamps[track_id] = [frame_num, frame_num]
                else:
                    track_timestamps[track_id][1] = frame_num

                last_boxes[track_id] = ((x1, y1, x2, y2), f"{label}: {distance_m:.2f} m", frame_num)
                disappeared_tracks[track_id] = (label, distance_m, frame_num)
                current_ids.add(track_id)

    # === Handle disappearance and behavior detection ===
    for track_id in list(disappeared_tracks):
        label, last_distance, last_frame = disappeared_tracks[track_id]
        if frame_num - last_frame > int(persistence_duration_sec * fps):
            if track_id in track_timestamps:
                first_seen, last_seen = track_timestamps[track_id]
                duration_sec = (last_seen - first_seen) / fps

                if (label == "person" and (last_distance < 5.0 or duration_sec > 10)) or \
                   (label == "car" and last_distance < 5.0 and duration_sec > 10):

                    behavior = "Suspicious Activity"
                    if f"{track_id}_{behavior}" not in behavior_flags:
                        behavior_flags.add(f"{track_id}_{behavior}")
                        event_time = round(frame_num / fps, 2)
                        real_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

                        csv_writer.writerow([
                            frame_num, behavior, label, round(last_distance, 2),
                            event_time, real_time
                        ])

                        notify_local(
                            "Suspicious Activity Detected",
                            f"{label} at {round(last_distance, 2)}m around {event_time}s"
                        )

                        # Defer clip creation until after video is finalized
                        clip_margin = int(fps * 4)
                        clip_start = max(0, frame_num - clip_margin)
                        clip_end = min(int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), frame_num + clip_margin)
                        clip_time_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S-%f")
                        suspicious_events.append((clip_start, clip_end, clip_time_str))

            del disappeared_tracks[track_id]
            track_timestamps.pop(track_id, None)

    # === Draw persistent bounding boxes ===
    for track_id, (box, text, last_seen_frame) in list(last_boxes.items()):
        if frame_num - last_seen_frame > int(persistence_duration_sec * fps):
            del last_boxes[track_id]
            continue
        x1, y1, x2, y2 = box
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)
        cv2.putText(annotated, text, (x1, y1 - 4),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

    out.write(annotated)
    cv2.imshow("Suspicious Behavior Detection", annotated)
    if cv2.waitKey(1) in [27, ord("q")]:
        break

# === Finalize and save suspicious clips ===
cap.release()
out.release()
csv_file.close()
cv2.destroyAllWindows()

print("Saving suspicious video clips from output video...")
for clip_start, clip_end, clip_time_str in suspicious_events:
    clip_filename = f"suspicious/{clip_time_str}.mp4"
    save_suspicious_clip(clip_start, clip_end, clip_filename)
print("All suspicious clips saved.")


Saving suspicious video clips from output video...
All suspicious clips saved.


0