In [19]:
import sys
from datetime import datetime
import logging
import time
import os
import tempfile
import cv2
import numpy as np
from collections import defaultdict, deque
from ultralytics import YOLO
from sklearn.ensemble import IsolationForest

In [20]:
CONFIG = {
    "model_path": "model/best.pt",
    "video_source": 0,  # 0 or "/dev/video10"

    "resolution": (1280, 720),   # capture size
    "process_res": (640, 640),    # inference/display size (stretched)

    "imgsz": 640,
    "conf_threshold": 0.50,       # start here; raise only if too many false positives
    "iou_threshold": 0.35,        # NMS IoU for detections

    "min_track_frames": 20,       # trust a track after N frames
    "calibration_frames": 300,    # frames to collect normal behavior
    "anomaly_cooldown": 60,       # frames to wait before flagging same track again

    "history_len": 50,            # velocity history size
    "contamination": 0.01,        # anomaly rate assumption

    "vector_scale": 2.5,          # arrow scale multiplier (increase if arrows are tiny)
    "max_frames": 10_000,         # safety for notebooks
    "dead_track_ttl": 120,        # frames before forgetting an ID

    "heatmap_blur_sigma": 15,
    "heatmap_alpha": 0.35,        # overlay strength
}

In [21]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

In [22]:
def write_bytetrack_yaml() -> str:
    yaml_text = """tracker_type: bytetrack
track_high_thresh: 0.60
track_low_thresh: 0.25
new_track_thresh: 0.60
track_buffer: 45
match_thresh: 0.80
fuse_score: True
"""
    fd, path = tempfile.mkstemp(suffix="_bytetrack.yaml")
    with os.fdopen(fd, "w") as f:
        f.write(yaml_text.strip() + "\n")
    return path

In [25]:
class ChickenMonitor:
    def __init__(self):
        logging.info(f"Loading model: {CONFIG['model_path']}")
        self.model = YOLO(CONFIG["model_path"])

        self.tracker_yaml = write_bytetrack_yaml()
        logging.info(f"Using ByteTrack config: {self.tracker_yaml}")

        self.iso_forest = IsolationForest(
            n_estimators=200,
            contamination=CONFIG["contamination"],
            random_state=42,
            n_jobs=-1
        )

        self.is_trained = False
        self.training_buffer = []
        self.frame_count = 0
        self.output_number = 0

        # Track state
        self.position_history = defaultdict(lambda: deque(maxlen=2))
        self.velocity_history = defaultdict(lambda: deque(maxlen=CONFIG["history_len"]))
        self.track_age = defaultdict(int)
        self.anomaly_cooldown = defaultdict(int)
        self.last_seen = defaultdict(int)

        # Timing
        self.prev_time = time.time()
        self.last_dt = 1 / 30

        # Heatmap (H x W)
        w, h = CONFIG["process_res"]
        self.heatmap = np.zeros((h, w), dtype=np.float32)
        self.show_heatmap = True

    @staticmethod
    def extract_velocity_features(velocities):
        # Need enough history to be meaningful
        if len(velocities) < 10:
            return None
        v = np.asarray(velocities, dtype=np.float32)
        return [
            float(np.mean(v)),
            float(np.std(v)),
            float(np.max(v)),
            float(np.percentile(v, 90)),
        ]

    def _clean_dead_tracks(self):
        ttl = CONFIG["dead_track_ttl"]
        dead_ids = [tid for tid, last in self.last_seen.items()
                    if self.frame_count - last > ttl]
        for tid in dead_ids:
            for d in (
                self.position_history,
                self.velocity_history,
                self.track_age,
                self.anomaly_cooldown,
                self.last_seen,
            ):
                d.pop(tid, None)

    def _draw_fps(self, frame):
        fps = 1.0 / max(self.last_dt, 1e-6)
        cv2.putText(
            frame, f"FPS: {fps:.1f}",
            (20, frame.shape[0] - 20),
            cv2.FONT_HERSHEY_SIMPLEX, 0.7,
            (255, 255, 255), 2
        )

    def _draw_motion_arrow(self, frame, tid, color=(255, 0, 0)):
        # Arrow from previous to current center, scaled for visibility
        if len(self.position_history[tid]) < 2:
            return

        (x_prev, y_prev), (x_curr, y_curr) = self.position_history[tid]
        dx = (x_curr - x_prev)
        dy = (y_curr - y_prev)

        # Scale arrows (make visible even if movement is small)
        scale = CONFIG["vector_scale"]
        start = (int(x_curr), int(y_curr))
        end = (int(x_curr + dx * scale), int(y_curr + dy * scale))

        cv2.arrowedLine(frame, start, end, color, 2, tipLength=0.25)

    def _update_heatmap(self, cx, cy):
        ix, iy = int(cx), int(cy)
        if 0 <= iy < self.heatmap.shape[0] and 0 <= ix < self.heatmap.shape[1]:
            self.heatmap[iy, ix] += 1.0

    def _overlay_heatmap(self, frame):
        if not self.show_heatmap:
            return

        heat = self.heatmap

        # Blur for a smooth heat distribution
        sigma = CONFIG["heatmap_blur_sigma"]
        heat_blur = cv2.GaussianBlur(heat, (0, 0), sigmaX=sigma, sigmaY=sigma)

        # Normalize to 0..255
        heat_norm = cv2.normalize(heat_blur, None, 0, 255, cv2.NORM_MINMAX)
        heat_u8 = heat_norm.astype(np.uint8)

        heat_color = cv2.applyColorMap(heat_u8, cv2.COLORMAP_JET)

        alpha = CONFIG["heatmap_alpha"]
        cv2.addWeighted(heat_color, alpha, frame, 1 - alpha, 0, frame)

        cv2.putText(
            frame, "Heatmap: ON (press 'h' toggle, 'c' clear, 's' save output, 'q' exit)",
            (20, 30),
            cv2.FONT_HERSHEY_SIMPLEX, 0.6,
            (255, 255, 255), 2
        )

    def _update_and_draw(self, frame, results):
        # If tracker didn't return IDs this frame, clean dead tracks and exit
        if results.boxes is None or results.boxes.id is None:
            self._clean_dead_tracks()
            return

        boxes = results.boxes.xyxy.cpu().numpy()
        ids = results.boxes.id.cpu().numpy().astype(int)

        current_features, feature_ids = [], []
        anomalous = set()

        # Update global track bookkeeping
        for tid in ids:
            self.track_age[tid] += 1
            self.last_seen[tid] = self.frame_count
            self.anomaly_cooldown[tid] = max(0, self.anomaly_cooldown[tid] - 1)

        self._clean_dead_tracks()

        # Compute velocity history + collect features
        for box, tid in zip(boxes, ids):
            x1, y1, x2, y2 = box
            cx, cy = (x1 + x2) / 2.0, (y1 + y2) / 2.0

            # Update heatmap from raw centers (even before min_track_frames)
            self._update_heatmap(cx, cy)

            self.position_history[tid].append((cx, cy))

            if len(self.position_history[tid]) == 2:
                (px, py), (qx, qy) = self.position_history[tid]
                speed = float(np.hypot(qx - px, qy - py) / max(self.last_dt, 1e-6))
                self.velocity_history[tid].append(speed)

            if self.track_age[tid] >= CONFIG["min_track_frames"]:
                feats = self.extract_velocity_features(self.velocity_history[tid])
                if feats is not None:
                    if not self.is_trained:
                        self.training_buffer.append(feats)
                    current_features.append(feats)
                    feature_ids.append(tid)

        # Train anomaly model after calibration period
        if not self.is_trained and self.frame_count >= CONFIG["calibration_frames"]:
            if len(self.training_buffer) > 50:
                self.iso_forest.fit(self.training_buffer)
                self.is_trained = True
                logging.info("IsolationForest trained.")
                self.training_buffer.clear()

        # Predict anomalies
        if self.is_trained and current_features:
            preds = self.iso_forest.predict(current_features)  # -1 anomaly
            for tid, p in zip(feature_ids, preds):
                if p == -1 and self.anomaly_cooldown[tid] == 0:
                    anomalous.add(tid)
                    self.anomaly_cooldown[tid] = CONFIG["anomaly_cooldown"]

        # Draw heatmap overlay first (so boxes sit on top)
        self._overlay_heatmap(frame)

        # Draw boxes, arrows, text
        for box, tid in zip(boxes, ids):
            x1, y1, x2, y2 = map(int, box)

            is_anom = tid in anomalous
            color = (0, 0, 255) if is_anom else (0, 255, 0)

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Arrow from movement
            self._draw_motion_arrow(frame, tid, color=(255, 0, 0))

            # Speed text
            speed = self.velocity_history[tid][-1] if self.velocity_history[tid] else 0.0
            cv2.putText(
                frame, f"ID:{tid} v:{speed:.1f}px/s",
                (x1, max(20, y1 - 10)),
                cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2
            )

            if is_anom:
                cv2.putText(
                    frame, "ANOMALY",
                    (x1, min(frame.shape[0] - 10, y2 + 20)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2
                )
                current_datetime = datetime.now()
                print(current_datetime, " - ANOMALOUS BEHAVIOR")
                

    def process_stream(self):
        # Open camera
        cap = cv2.VideoCapture(CONFIG["video_source"], cv2.CAP_V4L2)
        if not cap.isOpened():
            cap = cv2.VideoCapture(CONFIG["video_source"])

        cap.set(cv2.CAP_PROP_FRAME_WIDTH, CONFIG["resolution"][0])
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CONFIG["resolution"][1])

        if not cap.isOpened():
            logging.error("Video source could not be opened.")
            return

        logging.info("Video source opened successfully.")
        logging.info("Controls: q=quit, h=toggle heatmap, c=clear heatmap")

        try:
            while self.frame_count < CONFIG["max_frames"]:
                ret, raw_frame = cap.read()
                if not ret:
                    time.sleep(0.05)
                    continue

                # Resize for inference (stretched to process_res)
                frame = cv2.resize(raw_frame, CONFIG["process_res"], interpolation=cv2.INTER_LINEAR)
                frame = np.ascontiguousarray(frame)

                # Timing
                now = time.time()
                self.last_dt = max(now - self.prev_time, 1e-6)
                self.prev_time = now

                self.frame_count += 1

                # Run YOLO tracking
                results = self.model.track(
                    frame,
                    imgsz=CONFIG["imgsz"],
                    conf=CONFIG["conf_threshold"],
                    iou=CONFIG["iou_threshold"],
                    tracker=self.tracker_yaml,
                    persist=True,
                    rect=False,
                    verbose=False
                )[0]

                # Update + draw overlays
                self._update_and_draw(frame, results)
                self._draw_fps(frame)

                cv2.imshow("Chicken Monitor", frame)
                
                key = cv2.waitKey(1) & 0xFF
                if key == ord("q"):
                    break
                elif key == ord("h"):
                    self.show_heatmap = not self.show_heatmap
                elif key == ord("c"):
                    self.heatmap.fill(0.0)
                elif key == ord('s'):
                    self.output_number += 1
                    cv2.imwrite("outputs/output-"+str(self.output_number)+".png", frame)

        finally:
            cap.release()
            cv2.destroyAllWindows()

In [26]:
monitor = ChickenMonitor()
monitor.process_stream()

2025-12-19 13:44:23,134 | INFO | Loading model: model/best.pt
2025-12-19 13:44:23,172 | INFO | Using ByteTrack config: /tmp/tmp6xzqkv4c_bytetrack.yaml
2025-12-19 13:44:23,173 | INFO | Video source opened successfully.
2025-12-19 13:44:23,174 | INFO | Controls: q=quit, h=toggle heatmap, c=clear heatmap
2025-12-19 13:44:33,429 | INFO | IsolationForest trained.


In [None]:
import cv2
cv2.destroyAllWindows()
cv2.waitKey(1)