In [None]:
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx'
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_skeleton.mp4'

# Model Input Size (RTMPose standard)
INPUT_H, INPUT_W = 256, 256

# --- 2. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose (AP-10K)...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose running on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Running on CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 3. HELPER FUNCTIONS ---

def preprocess_for_rtmpose(img_crop, h, w):
    # Resize, Normalize, Transpose (HWC -> BCHW)
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    """
    Decodes SimCC output from RTMPose.
    Returns: Array of (x, y) coordinates for each keypoint relative to the CROP.
    """
    # RTMPose usually returns 2 outputs: simcc_x and simcc_y
    # Shape is usually (Batch, Num_Points, Bins)
    simcc_x, simcc_y = outputs[0], outputs[1]
    
    # 1. Find the index with the highest score (Argmax)
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    
    # 2. Scale these indices back to the crop size
    # The model output bins usually correspond to the Input Size (256) * 2
    # But strictly, we just map 0..Bins to 0..CropSize
    
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    
    # Map model coordinates (0..256 usually) to Crop Coordinates (0..crop_w)
    # We assume the bins represent the range [0, 1] in normalized space
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    
    # Combine into (K, 2) array
    keypoints = np.stack([x_locs, y_locs], axis=-1)
    return keypoints

# --- 4. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Error: Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

print(f"Processing {width}x{height} @ {fps}fps...")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0:
        print(f"Processing frame {frame_idx}/{total_frames}...")

    # 1. Detect Person
    results = yolo_model.predict(frame, classes=0, verbose=False, conf=0.5)

    for result in results:
        boxes = result.boxes
        for box in boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            
            # Draw Box
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # 2. Crop for Pose
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                crop_h, crop_w = person_crop.shape[:2]
                
                # Preprocess
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                
                # Run Inference
                outputs = ort_session.run(None, {input_name: onnx_input})
                
                # Decode Keypoints (SimCC)
                # We check if output looks like SimCC (list of 2 arrays)
                if len(outputs) >= 2:
                    keypoints = decode_simcc_output(outputs, crop_h, crop_w)
                    
                    # Draw Keypoints on the MAIN frame
                    for i, (kp_x, kp_y) in enumerate(keypoints):
                        # Add the crop offset (x1, y1) to map back to full image
                        real_x = int(kp_x + x1_c)
                        real_y = int(kp_y + y1_c)
                        
                        # Draw Red Dot
                        cv2.circle(frame, (real_x, real_y), 3, (0, 0, 255), -1)
                else:
                    # Fallback for some export types that return just 1 tensor
                    cv2.putText(frame, "Format Error", (x1, y1), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1)

    out.write(frame)

cap.release()
out.release()
print(f"DONE. Video saved to: {OUTPUT_PATH}")

In [None]:
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
# NOTE: This should ideally be a Human Pose model (COCO trained) for correct indices
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_neck_zone.mp4'

INPUT_H, INPUT_W = 256, 256

# --- COCO KEYPOINT INDICES (Standard for Human Pose) ---
# We use these to find the neck and torso.
KEYPOINT_L_SHOULDER = 5
KEYPOINT_R_SHOULDER = 6
KEYPOINT_L_HIP = 11
KEYPOINT_R_HIP = 12

# --- 2. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 3. MATH FUNCTIONS (FROM YOUR DOCUMENT) ---

def get_distance(p1, p2):
    """Euclidean distance between two points (x,y)."""
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

def calculate_neck_logic(keypoints):
    """
    Implements Phase 2 Logic:
    1. Neck Center = Midpoint of Shoulders (5 & 6)
    2. Torso Length = Dist(Neck, Hips)
    3. Zone Radius = 0.3 * Torso Length
    """
    # We need at least up to index 12 (Right Hip) to calculate torso
    if len(keypoints) <= 12:
        return None, None

    # Extract coordinates
    l_shoulder = keypoints[KEYPOINT_L_SHOULDER]
    r_shoulder = keypoints[KEYPOINT_R_SHOULDER]
    l_hip = keypoints[KEYPOINT_L_HIP]
    r_hip = keypoints[KEYPOINT_R_HIP]

    # 1. Calculate Neck Center (Midpoint of Shoulders)
    neck_x = (l_shoulder[0] + r_shoulder[0]) / 2
    neck_y = (l_shoulder[1] + r_shoulder[1]) / 2
    neck_center = (int(neck_x), int(neck_y))

    # 2. Calculate Hip Center (Midpoint of Hips)
    hip_x = (l_hip[0] + r_hip[0]) / 2
    hip_y = (l_hip[1] + r_hip[1]) / 2
    hip_center = (hip_x, hip_y)

    # 3. Calculate Torso Length (Distance from Neck to Hips)
    torso_length = get_distance(neck_center, hip_center)

    # 4. Calculate Zone Radius (0.3 * Torso Length)
    # Using max(10, ...) ensures the circle doesn't disappear if torso is tiny
    radius = int(max(10, 0.3 * torso_length))

    return neck_center, radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 4. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print("Video not found.")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

print(f"Processing Logic: Neck Zone (Radius=0.3*Torso)...")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0:
        print(f"Processing frame {frame_idx}/{total_frames}...")

    # 1. Detect Person
    results = yolo_model.predict(frame, classes=0, verbose=False, conf=0.5)

    for result in results:
        boxes = result.boxes
        for box in boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            
            # Crop Person
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                crop_h, crop_w = person_crop.shape[:2]
                
                # RTMPose Inference
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                
                if len(outputs) >= 2:
                    # Get Keypoints relative to CROP
                    keypoints_crop = decode_simcc_output(outputs, crop_h, crop_w)
                    
                    # Convert to GLOBAL coordinates (Full Frame)
                    keypoints_global = []
                    for (kx, ky) in keypoints_crop:
                        keypoints_global.append([kx + x1_c, ky + y1_c])
                    
                    # --- APPLY NECK ZONE LOGIC ---
                    neck_center, neck_radius = calculate_neck_logic(keypoints_global)
                    
                    if neck_center is not None:
                        # Draw the "Neck Zone" (Cyan Circle)
                        cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
                        
                        # Draw the Center Dot
                        cv2.circle(frame, neck_center, 4, (0, 0, 255), -1)
                        
                        # Label it
                        cv2.putText(frame, "Neck Zone", (neck_center[0] - 20, neck_center[1] - neck_radius - 5),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1)

    out.write(frame)

cap.release()
out.release()
print(f"DONE. Output saved to: {OUTPUT_PATH}")

In [None]:
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_neck_zone_fixed.mp4'

INPUT_H, INPUT_W = 256, 256

# --- 2. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 3. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """
    FALLBACK: Calculates Neck Zone using the Bounding Box geometry.
    Used because the Animal Pose model gives wrong keypoints for humans.
    """
    box_h = y2 - y1
    box_w = x2 - x1
    
    # 1. Neck Center: Horizontally centered, Vertically at top 20% of box
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    
    # 2. Zone Radius: Scaled to box size (approx 15% of height)
    radius = int(box_h * 0.15)
    
    return (int(neck_x), int(neck_y)), radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

# --- 4. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print("Video not found.")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

print(f"Processing with GEOMETRIC FIX for Neck Zone...")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0:
        print(f"Processing frame {frame_idx}/{total_frames}...")

    # 1. Detect Person
    results = yolo_model.predict(frame, classes=0, verbose=False, conf=0.5)

    for result in results:
        boxes = result.boxes
        for box in boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            
            # --- FIX: USE BOX GEOMETRY FOR NECK ZONE ---
            # This ignores the bad animal keypoints and calculates the neck 
            # based on where the human is standing in the box.
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Draw the Corrected Neck Zone (Cyan)
            cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            cv2.circle(frame, neck_center, 4, (0, 0, 255), -1)
            cv2.putText(frame, "Neck Zone", (neck_center[0] - 20, neck_center[1] - neck_radius - 5),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 0), 1)

            # (Optional) Still run pose estimation if you want to see the dots
            # But we don't use it for the circle anymore.
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                
                # If you want to see the confused red dots, uncomment below:
                # ... (drawing code omitted to keep visual clean) ...

    out.write(frame)

cap.release()
out.release()
print(f"DONE. Output saved to: {OUTPUT_PATH}")

In [None]:

!pip install "numpy<2.0" "opencv-python-headless<4.10" "ultralytics" "onnxruntime-gpu"

import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM.mp4'

INPUT_H, INPUT_W = 256, 256

# --- LOGIC SETTINGS ---
SNATCH_TIME_THRESHOLD = 0.5  # Seconds
# Hand Keypoints (Wrists). In COCO these are 9 (Left) and 10 (Right).
# WARNING: In your Animal model, these might be "Paws". Look for Blue Dots!
KEYPOINT_L_WRIST = 9
KEYPOINT_R_WRIST = 10

# --- 2. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 3. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    """Returns True if point (x,y) is inside the circle."""
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 4. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print("Video not found.")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# Calculate required frames for alarm
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
print(f"Logic: Trigger Alarm if hand in Neck Zone for {REQUIRED_FRAMES} frames ({SNATCH_TIME_THRESHOLD}s)")

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

# --- THE MEMORY BANK ---
# format: { track_id: consecutive_frames_in_zone }
snatch_history = {}

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0:
        print(f"Processing frame {frame_idx}/{total_frames}...")

    # 1. TRACKING (model.track instead of predict)
    # persist=True keeps IDs across frames
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        
        # If no one detected, skip
        if boxes.id is None:
            continue
            
        # Get IDs and Coordinates
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            # Ensure ID is in history
            if box_id not in snatch_history:
                snatch_history[box_id] = 0
            
            # --- A. DRAW NECK ZONE ---
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # --- B. GET POSE ---
            hand_in_zone = False
            
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                crop_h, crop_w = person_crop.shape[:2]
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                
                if len(outputs) >= 2:
                    kpts_crop = decode_simcc_output(outputs, crop_h, crop_w)
                    
                    # Check Left Hand (9) and Right Hand (10)
                    for k_idx in [KEYPOINT_L_WRIST, KEYPOINT_R_WRIST]:
                        if k_idx < len(kpts_crop):
                            kx, ky = kpts_crop[k_idx]
                            real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                            
                            # DEBUG: Draw Hands as BLUE DOTS
                            cv2.circle(frame, (real_x, real_y), 4, (255, 0, 0), -1)
                            
                            # CHECK PROXIMITY
                            if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                                hand_in_zone = True

            # --- C. UPDATE HISTORY LOGIC ---
            if hand_in_zone:
                snatch_history[box_id] += 1
            else:
                # Reset if hand leaves zone (optional: or decay slowly)
                snatch_history[box_id] = 0
            
            # --- D. VISUALIZE STATUS ---
            current_duration = snatch_history[box_id]
            
            # DEFAULT: GREEN (Safe)
            color = (0, 255, 0)
            status_text = f"ID:{box_id}"
            
            # WARNING: ORANGE (Hand in zone but not long enough)
            if current_duration > 0 and current_duration < REQUIRED_FRAMES:
                color = (0, 165, 255) # Orange-ish
                status_text = f"Warn: {current_duration}/{REQUIRED_FRAMES}"
                cv2.circle(frame, neck_center, neck_radius, color, 3)

            # ALARM: RED (Snatch Detected!)
            elif current_duration >= REQUIRED_FRAMES:
                color = (0, 0, 255) # RED
                status_text = "SNATCH DETECTED!"
                
                # Draw Big Alarm Box
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1) # Fill zone
                
                # Add Alarm Label
                cv2.putText(frame, "!!! SNATCH !!!", (x1, y1 - 30), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 0, 255), 3)

            # Draw normal box if not alarming
            if current_duration < REQUIRED_FRAMES:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status_text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE. Logic Applied. Video saved to: {OUTPUT_PATH}")

In [None]:
# 1. Uninstall the conflicting libraries
!pip uninstall -y numpy ultralytics opencv-python opencv-python-headless onnxruntime onnxruntime-gpu

# 2. Reinstall them with strict version matching
# We force numpy<2.0 and older opencv to satisfy the environment's pre-compiled PyTorch
!pip install "numpy<2.0" "ultralytics==8.2.0" "opencv-python-headless<4.10" "onnxruntime-gpu"

In [None]:
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM.mp4'

INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.5  # Seconds
REQUIRED_FRAMES = 15 # Will be recalculated based on FPS

# --- 2. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 3. HELPER FUNCTIONS ---
def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    """True if point is inside circle."""
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 4. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print("Video not found.")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} # { track_id: consecutive_frames }

print(f"Processing... Alarm triggers at {REQUIRED_FRAMES} frames.")
frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0: print(f"Processing frame {frame_idx}")

    # Track with persist=True
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: snatch_history[box_id] = 0
            
            # 1. Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # 2. Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # Check wrists (Indices 9 & 10)
                for k in [9, 10]: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) # Blue Dot
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # 3. Update Status
            if hand_in_zone: snatch_history[box_id] += 1
            else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) # Decay
            
            # 4. Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            if cnt > 0: color = (0, 165, 255) # Orange
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            
            cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Saved to {OUTPUT_PATH}")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. THE "MAGIC FIX" FOR PYTORCH 2.6 ---
# This overrides the new strict security check so YOLO can load
_original_load = torch.load
def safe_load_wrapper(*args, **kwargs):
    # Force weights_only=False to allow loading older YOLO models
    if 'weights_only' not in kwargs:
        kwargs['weights_only'] = False
    return _original_load(*args, **kwargs)
torch.load = safe_load_wrapper
# ------------------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM.mp4'

INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.5  # Seconds for alarm

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """
    Calculates Geometric Neck Zone from Bounding Box.
    Neck is roughly at top 20% of the bounding box.
    """
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

# Memory Bank: { track_id: consecutive_frames_in_zone }
snatch_history = {} 

print(f"Processing Video... Alarm triggers after {REQUIRED_FRAMES} frames.")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0: 
        print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. THE "MAGIC FIX" FOR PYTORCH 2.6 ---
# This overrides the new strict security check so YOLO can load
_original_load = torch.load
def safe_load_wrapper(*args, **kwargs):
    # Force weights_only=False to allow loading older YOLO models
    if 'weights_only' not in kwargs:
        kwargs['weights_only'] = False
    return _original_load(*args, **kwargs)
torch.load = safe_load_wrapper
# ------------------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM.mp4'

INPUT_H, INPUT_W = 256, 192
SNATCH_TIME_THRESHOLD = 0.5  # Seconds for alarm

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """
    Calculates Geometric Neck Zone from Bounding Box.
    Neck is roughly at top 20% of the bounding box.
    """
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

# Memory Bank: { track_id: consecutive_frames_in_zone }
snatch_history = {} 

print(f"Processing Video... Alarm triggers after {REQUIRED_FRAMES} frames.")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0: 
        print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: 
                snatch_history[box_id] = 0
            
            # Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # Check wrists (Indices 9 & 10)
                for k in [9, 10]: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) 
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # Logic
            if hand_in_zone: 
                snatch_history[box_id] += 1
            else: 
                snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            status_text = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255) # Orange
                status_text = f"Check: {cnt}"
            
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status_text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Video saved to {OUTPUT_PATH}")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. THE "MAGIC FIX" FOR PYTORCH 2.6 ---
# This overrides the new strict security check so YOLO can load
_original_load = torch.load
def safe_load_wrapper(*args, **kwargs):
    # Force weights_only=False to allow loading older YOLO models
    if 'weights_only' not in kwargs:
        kwargs['weights_only'] = False
    return _original_load(*args, **kwargs)
torch.load = safe_load_wrapper
# ------------------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM.mp4'

INPUT_H, INPUT_W = 256, 192
SNATCH_TIME_THRESHOLD = 0.5  # Seconds for alarm

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """
    Calculates Geometric Neck Zone from Bounding Box.
    Neck is roughly at top 20% of the bounding box.
    """
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

# Memory Bank: { track_id: consecutive_frames_in_zone }
snatch_history = {} 

print(f"Processing Video... Alarm triggers after {REQUIRED_FRAMES} frames.")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0: 
        print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: 
                snatch_history[box_id] = 0
            
            # Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # Check wrists (Indices 9 & 10)
                for k in [9, 10]: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) 
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # Logic
            if hand_in_zone: 
                snatch_history[box_id] += 1
            else: 
                snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            status_text = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255) # Orange
                status_text = f"Check: {cnt}"
            
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status_text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Video saved to {OUTPUT_PATH}")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. THE "MAGIC FIX" (Safety Version) ---
# This version checks if we already fixed it, preventing the RecursionError
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper
# -------------------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'

# UPDATE THIS PATH to your new Human Pose Model (COCO)
# Example: '/kaggle/input/rtmpose-coco/rtmpose-m_coco_256x192.onnx'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 

VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_ALARM_v2.mp4'

# Update Input Size for Human Model (usually 192 width)
INPUT_H, INPUT_W = 256, 192 
SNATCH_TIME_THRESHOLD = 0.5

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
    print(f"RTMPose loaded on: {ort_session.get_providers()[0]}")
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box (Top 20%)."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    """True if point is inside circle."""
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)

# Safety check for FPS
if fps != fps or fps <= 0: 
    fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

# Memory Bank: { track_id: consecutive_frames_in_zone }
snatch_history = {} 

print(f"Processing Video... Alarm triggers after {REQUIRED_FRAMES} frames.")

frame_idx = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    if frame is None or frame.size == 0: continue

    frame_idx += 1
    if frame_idx % 20 == 0: 
        print(f"Processing frame {frame_idx}/{total_frames}")

    # 1. TRACKING
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: 
                snatch_history[box_id] = 0
            
            # --- A. NECK ZONE ---
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # --- B. CHECK HANDS ---
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # Check wrists (Indices 9 & 10 for COCO Humans)
                for k in [9, 10]: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        
                        # Draw Blue Dot for Hand
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) 
                        
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # --- C. UPDATE LOGIC ---
            if hand_in_zone: 
                snatch_history[box_id] += 1
            else: 
                snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # --- D. VISUALIZE ---
            cnt = snatch_history[box_id]
            
            color = (0, 255, 0)
            status_text = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255) # Orange
                status_text = f"Check: {cnt}"
            
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red
                # ALARM VISUALS
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                # Normal Visuals
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status_text, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Video saved to {OUTPUT_PATH}")

In [None]:
# 1. Uninstall the conflicting libraries
!pip uninstall -y numpy ultralytics opencv-python opencv-python-headless onnxruntime onnxruntime-gpu

# 2. Reinstall them with strict version matching
# We force numpy<2.0 and older opencv to satisfy the environment's pre-compiled PyTorch
!pip install "numpy<2.0" "ultralytics==8.2.0" "opencv-python-headless<4.10" "onnxruntime-gpu"



import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- SAFETY FIX FOR PYTORCH 2.6 ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs: kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper
# ----------------------------------

# CONFIGURATION
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_debug_indices.mp4'

INPUT_H, INPUT_W = 256, 256 # Keeping square for Animal model

# LOAD MODELS
yolo_model = YOLO(YOLO_PATH)
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except:
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name

# HELPERS
def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# PROCESSING LOOP
cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))

print("Generating Diagnostic Video... Look for the number on the HANDS.")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        coords = boxes.xyxy.int().cpu().tolist()
        
        for (x1, y1, x2, y2) in coords:
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
            
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # DRAW EVERY NUMBER
                for i, (kx, ky) in enumerate(kpts):
                    real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                    
                    # Draw White Dot
                    cv2.circle(frame, (real_x, real_y), 3, (255, 255, 255), -1)
                    # Draw Number in Blue
                    cv2.putText(frame, str(i), (real_x+4, real_y), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 0, 0), 1)

    out.write(frame)

cap.release()
out.release()
print(f"DONE. Watch {OUTPUT_PATH} and find the hand number.")

In [None]:
# Fix the conflict between Numpy and Scipy
!pip install "numpy<2.0" "scipy<1.13" --force-reinstall

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. PYTORCH SAFETY PATCH (Prevents RecursionError) ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper
# ---------------------------------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
# ‚ö†Ô∏è Ensure this path points to your model (Animal or Human)
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_FINAL.mp4'

# Input Size: 256 for Animal, 192 for Human
INPUT_H, INPUT_W = 256, 256 
SNATCH_TIME_THRESHOLD = 0.5

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box (Top 20%)."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print("Video not found.")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"Processing... Alarm at {REQUIRED_FRAMES} frames.")

frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    frame_idx += 1
    if frame_idx % 20 == 0: print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: snatch_history[box_id] = 0
            
            # Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # Check indices. 
                # If using ANIMAL model, try checking indices 5,6,7,8 (Paws)
                # If using HUMAN model, check 9,10 (Wrists)
                indices_to_check = [9, 10] 
                
                for k in indices_to_check: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) 
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # Logic
            if hand_in_zone: snatch_history[box_id] += 1
            else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            status = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255)
                status = f"Check:{cnt}"
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255)
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Saved to {OUTPUT_PATH}")

In [None]:
# Force install exact compatible versions
!pip install "numpy==1.26.4" "scipy==1.11.4" "ultralytics" "opencv-python-headless" --force-reinstall

In [None]:

import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- 1. PYTORCH SAFETY PATCH ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper
# -------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
# PATH TO YOUR MODEL (Update if you uploaded a Human one)
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 

# *** UPDATED VIDEO PATH ***
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_11_FINAL.mp4'

# Input Size: 256 for Animal, 192 for Human
INPUT_H, INPUT_W = 256, 256 
SNATCH_TIME_THRESHOLD = 0.5

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box (Top 20%)."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    neck_y = y1 + (box_h * 0.20) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Error: Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"Processing... Alarm at {REQUIRED_FRAMES} frames.")

frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    frame_idx += 1
    if frame_idx % 20 == 0: print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: snatch_history[box_id] = 0
            
            # Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # IMPORTANT: Indices to check for hands
                # Animal Model: Try [9, 10] first. If wrong, try [5, 6].
                indices_to_check = [9, 10] 
                
                for k in indices_to_check: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        # Draw Blue Dot
                        cv2.circle(frame, (real_x, real_y), 3, (255, 0, 0), -1) 
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # Logic
            if hand_in_zone: snatch_history[box_id] += 1
            else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            status = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255)
                status = f"Check:{cnt}"
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255)
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Saved to {OUTPUT_PATH}")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

import time  # Make sure this is imported

class SystemEvaluator:
    def __init__(self):
        # Confusion Matrix Counters (Rule #2: Metrics First)
        self.true_positives = 0  
        self.false_positives = 0 
        self.false_negatives = 0 
        self.true_negatives = 0  
        
        # Latency Tracking (Tuning Playbook: Input Pipeline Health)
        self.total_time_ms = 0
        self.frame_count = 0
        self.start_t = 0

    def start_timer(self):
        self.start_t = time.time()

    def stop_timer(self):
        end_t = time.time()
        duration_ms = (end_t - self.start_t) * 1000
        self.total_time_ms += duration_ms
        self.frame_count += 1
        return duration_ms

    def update_metrics(self, prediction, ground_truth):
        if prediction == True and ground_truth == True:
            self.true_positives += 1
        elif prediction == True and ground_truth == False:
            self.false_positives += 1  # False Alarm
        elif prediction == False and ground_truth == True:
            self.false_negatives += 1  # Missed Detection
        elif prediction == False and ground_truth == False:
            self.true_negatives += 1

    def print_report(self):
        avg_latency = self.total_time_ms / max(1, self.frame_count)
        print(f"\n--- PERFORMANCE REPORT (Rules of ML Check) ---")
        print(f"Avg Latency per Frame: {avg_latency:.2f} ms")
        print(f"True Positives (Caught): {self.true_positives}")
        print(f"False Positives (Alarms): {self.false_positives}")
        print(f"False Negatives (Missed): {self.false_negatives}")
        print(f"----------------------------------------------")

# --- 1. PYTORCH SAFETY PATCH ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper
# -------------------------------

# --- 2. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/yolo-v8/other/default/1/yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_snatching_11_IMPROVED.mp4'

INPUT_H, INPUT_W = 256, 256 
# LOWERED THRESHOLD: Catch faster snatches (0.3s instead of 0.5s)
SNATCH_TIME_THRESHOLD = 0.3 

# --- 3. LOAD MODELS ---
print("Loading YOLOv8...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except Exception as e:
    print(f"CUDA Error ({e}). Using CPU.")
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])

input_name = ort_session.get_inputs()[0].name

# --- 4. HELPER FUNCTIONS ---

def get_neck_from_box(x1, y1, x2, y2):
    """Calculates Geometric Neck Zone from Bounding Box."""
    box_h = y2 - y1
    box_w = x2 - x1
    neck_x = x1 + (box_w / 2)
    # Move neck slightly higher (15% from top instead of 20%) to catch bikers
    neck_y = y1 + (box_h * 0.15) 
    radius = int(box_h * 0.15)
    return (int(neck_x), int(neck_y)), radius

def is_point_in_circle(point, circle_center, radius):
    dist = np.sqrt((point[0] - circle_center[0])**2 + (point[1] - circle_center[1])**2)
    return dist < radius

def preprocess_for_rtmpose(img_crop, h, w):
    resized = cv2.resize(img_crop, (w, h))
    mean = np.array([123.675, 116.28, 103.53])
    std = np.array([58.395, 57.12, 57.375])
    img_data = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
    img_data = (img_data - mean) / std
    img_data = img_data.astype(np.float32).transpose(2, 0, 1)
    img_data = np.expand_dims(img_data, axis=0)
    return img_data

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0]
    y_locs = np.argmax(simcc_y, axis=2)[0]
    num_bins_x = simcc_x.shape[2]
    num_bins_y = simcc_y.shape[2]
    x_locs = x_locs / (num_bins_x) * crop_w
    y_locs = y_locs / (num_bins_y) * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- 5. PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"Error: Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if fps != fps or fps <= 0: fps = 25

REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"Processing... Alarm triggers after {REQUIRED_FRAMES} frames.")

frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    frame_idx += 1
    if frame_idx % 20 == 0: print(f"Processing frame {frame_idx}/{total_frames}")

    # Track
    results = yolo_model.track(frame, classes=0, persist=True, verbose=False, conf=0.5, tracker="bytetrack.yaml")

    for result in results:
        boxes = result.boxes
        if boxes.id is None: continue
        
        track_ids = boxes.id.int().cpu().tolist()
        coords = boxes.xyxy.int().cpu().tolist()
        
        for box_id, (x1, y1, x2, y2) in zip(track_ids, coords):
            if box_id not in snatch_history: snatch_history[box_id] = 0
            
            # Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # Check Hands
            hand_in_zone = False
            x1_c, y1_c = max(0, x1), max(0, y1)
            x2_c, y2_c = min(width, x2), min(height, y2)
            person_crop = frame[y1_c:y2_c, x1_c:x2_c]
            
            if person_crop.size > 0:
                onnx_input = preprocess_for_rtmpose(person_crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, person_crop.shape[0], person_crop.shape[1])
                
                # --- WIDE NET LOGIC ---
                # Check ALL likely limb points (Front Paws 5-8, Back Paws 9-12)
                # This increases the chance we catch the arm even if model is confused
                indices_to_check = [5, 6, 7, 8, 9, 10, 11, 12] 
                
                for k in indices_to_check: 
                    if k < len(kpts):
                        kx, ky = kpts[k]
                        real_x, real_y = int(kx + x1_c), int(ky + y1_c)
                        
                        # Only draw if near the person (filter out random noise points)
                        if x1 <= real_x <= x2 and y1 <= real_y <= y2:
                            # Draw Small Purple Dot for "Limb Candidate"
                            cv2.circle(frame, (real_x, real_y), 2, (255, 0, 255), -1) 
                            
                            if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                                hand_in_zone = True

            # Logic
            if hand_in_zone: snatch_history[box_id] += 1
            else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # Draw
            cnt = snatch_history[box_id]
            color = (0, 255, 0)
            status = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255) # Orange
                status = f"Chk:{cnt}" # Shorter text
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, status, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"DONE! Saved to {OUTPUT_PATH}")

In [None]:
# 1. Install specific versions to prevent crashes
!pip install "numpy==1.26.4" "scipy==1.11.4" "ultralytics" "onnxruntime-gpu" "lapx" --force-reinstall

# 2. Clear confusing warnings
import warnings
warnings.filterwarnings('ignore')

print("‚úÖ INSTALLATION COMPLETE.")
print("‚ö†Ô∏è CRITICAL STEP: Go to the Menu Bar -> 'Run' -> 'Restart Session'.")
print("Then skip this cell and run STEP 2 below.")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import sys
import os

# --- A. PYTORCH SAFETY PATCH (Prevents RecursionError) ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper

# --- B. SETUP DEEP OC-SORT TRACKER ---
# We look for the tracker in the input folder
sys.path.append('/kaggle/input/using-deep-oc-sort')

try:
    from deep_oc_sort.ocsort import OCSort
    print("‚úÖ Deep OC-SORT imported successfully!")
except ImportError:
    # If the folder name is slightly different, try a fallback path
    print("‚ö†Ô∏è Standard import failed. Trying fallback path...")
    sys.path.append('/kaggle/input/using-deep-oc-sort/deep_oc_sort')
    try:
        from ocsort import OCSort
        print("‚úÖ Deep OC-SORT imported from fallback!")
    except:
        print("‚ùå ERROR: Could not find OCSort. Tracking will fail.")
        # Dummy class to prevent code crashing (but tracking won't work)
        class OCSort:
            def __init__(self, **kwargs): pass
            def update(self, dets, img): return dets 

# Initialize Tracker (max_age=30 keeps IDs alive for 1 second of occlusion)
tracker = OCSort(det_thresh=0.4, max_age=30, min_hits=3)

# --- C. CONFIGURATION ---
# Path to YOUR CUSTOM YOLO MODEL
YOLO_PATH = '/kaggle/input/best-yolov8m/pytorch/default/1/best _yolov8m.pt'
# Path to RTMPose (Animal Model)
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
# Path to Video
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_final_custom_ocsort.mp4'

INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.3 # 0.3s trigger

# --- D. LOAD MODELS ---
print(f"Loading Custom YOLO from: {YOLO_PATH}")
try:
    yolo_model = YOLO(YOLO_PATH)
except Exception as e:
    print(f"‚ùå Error loading YOLO: {e}")
    print("Double check the filename (spaces?) or path.")
    exit()

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except:
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name

# --- E. HELPER FUNCTIONS ---
def get_neck_from_box(x1, y1, x2, y2):
    box_h, box_w = y2 - y1, x2 - x1
    # Neck is top 15% of the bounding box
    return (int(x1 + box_w/2), int(y1 + box_h*0.15)), int(box_h*0.15)

def is_point_in_circle(p, c, r):
    return np.sqrt((p[0]-c[0])**2 + (p[1]-c[1])**2) < r

def preprocess_for_rtmpose(img, h, w):
    resized = cv2.resize(img, (w, h))
    mean, std = np.array([123.675, 116.28, 103.53]), np.array([58.395, 57.12, 57.375])
    img_data = (cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) - mean) / std
    return np.expand_dims(img_data.astype(np.float32).transpose(2, 0, 1), axis=0)

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0] / simcc_x.shape[2] * crop_w
    y_locs = np.argmax(simcc_y, axis=2)[0] / simcc_y.shape[2] * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- F. MAIN PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"‚ùå Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width, height = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS) or 25
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"üöÄ Processing... Alarm triggers at {REQUIRED_FRAMES} frames.")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    # 1. DETECT (Using Custom YOLO)
    # We set classes=None to detect whatever your custom model was trained on
    results = yolo_model.predict(frame, verbose=False, conf=0.4)
    
    detections_list = []
    for r in results:
        for box in r.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().numpy()
            detections_list.append([x1, y1, x2, y2, conf])
    
    detections_array = np.array(detections_list)
    
    # 2. TRACK (Deep OC-SORT)
    if len(detections_array) > 0:
        track_results = tracker.update(detections_array, frame)
    else:
        track_results = np.empty((0, 5))

    # 3. ANALYZE BEHAVIOR
    for track in track_results:
        x1, y1, x2, y2, box_id = map(int, track[:5])
        
        if box_id not in snatch_history: snatch_history[box_id] = 0
        
        # Neck Zone
        neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
        
        # Check Hands (Wide Net for Animal Model)
        hand_in_zone = False
        crop = frame[max(0,y1):min(height,y2), max(0,x1):min(width,x2)]
        
        if crop.size > 0:
            onnx_input = preprocess_for_rtmpose(crop, INPUT_H, INPUT_W)
            outputs = ort_session.run(None, {input_name: onnx_input})
            kpts = decode_simcc_output(outputs, crop.shape[0], crop.shape[1])
            
            # Check Indices 5-12 (All potential limb points for Animal Model)
            for k in range(5, 13):
                if k < len(kpts):
                    real_x, real_y = int(kpts[k][0] + x1), int(kpts[k][1] + y1)
                    if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                        hand_in_zone = True

        # Update History
        if hand_in_zone: snatch_history[box_id] += 1
        else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
        
        # Draw Visuals
        cnt = snatch_history[box_id]
        color = (0, 255, 0)
        label = f"ID:{box_id}"
        
        if cnt > 0: 
            color = (0, 165, 255) # Orange
            label = f"ID:{box_id} Chk:{cnt}"
        if cnt >= REQUIRED_FRAMES: 
            color = (0, 0, 255) # Red
            cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
            cv2.circle(frame, neck_center, neck_radius, color, -1)
        else:
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
        
        cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"‚úÖ DONE! Output saved to: {OUTPUT_PATH}")

In [None]:
# Force reinstall Pillow and dependencies to fix the "Ink" error
!pip install "pillow>=10.3.0" "numpy==1.26.4" "scipy==1.11.4" "ultralytics" --force-reinstall

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import sys
import os

# --- A. PYTORCH SAFETY PATCH ---
if not hasattr(torch.load, "_is_patched"):
    _original_load = torch.load
    def safe_load_wrapper(*args, **kwargs):
        if 'weights_only' not in kwargs:
            kwargs['weights_only'] = False
        return _original_load(*args, **kwargs)
    safe_load_wrapper._is_patched = True
    torch.load = safe_load_wrapper

# --- B. SETUP DEEP OC-SORT TRACKER ---
sys.path.append('/kaggle/input/using-deep-oc-sort')

try:
    from deep_oc_sort.ocsort import OCSort
    print("‚úÖ Deep OC-SORT imported successfully!")
except ImportError:
    print("‚ö†Ô∏è Standard import failed. Trying fallback path...")
    sys.path.append('/kaggle/input/using-deep-oc-sort/deep_oc_sort')
    try:
        from ocsort import OCSort
        print("‚úÖ Deep OC-SORT imported from fallback!")
    except:
        print("‚ùå ERROR: Could not find OCSort. Tracking will fail.")
        class OCSort:
            def __init__(self, **kwargs): pass
            def update(self, dets, img): return dets 

# Initialize Tracker
tracker = OCSort(det_thresh=0.4, max_age=30, min_hits=3)

# --- C. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/best-yolov8m/pytorch/default/1/best _yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_final_custom_ocsort.mp4'

INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.3 

# --- D. LOAD MODELS ---
print(f"Loading Custom YOLO from: {YOLO_PATH}")
try:
    yolo_model = YOLO(YOLO_PATH)
except Exception as e:
    print(f"‚ùå Error loading YOLO: {e}")
    exit()

print("Loading RTMPose...")
try:
    providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=providers)
except:
    ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name

# --- E. HELPER FUNCTIONS ---
def get_neck_from_box(x1, y1, x2, y2):
    box_h, box_w = y2 - y1, x2 - x1
    return (int(x1 + box_w/2), int(y1 + box_h*0.15)), int(box_h*0.15)

def is_point_in_circle(p, c, r):
    return np.sqrt((p[0]-c[0])**2 + (p[1]-c[1])**2) < r

def preprocess_for_rtmpose(img, h, w):
    resized = cv2.resize(img, (w, h))
    mean, std = np.array([123.675, 116.28, 103.53]), np.array([58.395, 57.12, 57.375])
    img_data = (cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) - mean) / std
    return np.expand_dims(img_data.astype(np.float32).transpose(2, 0, 1), axis=0)

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0] / simcc_x.shape[2] * crop_w
    y_locs = np.argmax(simcc_y, axis=2)[0] / simcc_y.shape[2] * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- F. MAIN PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"‚ùå Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width, height = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS) or 25
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"üöÄ Processing... Alarm triggers at {REQUIRED_FRAMES} frames.")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    # 1. DETECT (Custom YOLO)
    results = yolo_model.predict(frame, verbose=False, conf=0.4)
    
    detections_list = []
    for r in results:
        for box in r.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().numpy()
            detections_list.append([x1, y1, x2, y2, conf])
    
    detections_array = np.array(detections_list)
    
    # 2. TRACK (Deep OC-SORT)
    if len(detections_array) > 0:
        track_results = tracker.update(detections_array, frame)
    else:
        track_results = np.empty((0, 5))

    # 3. ANALYZE
    for track in track_results:
        x1, y1, x2, y2, box_id = map(int, track[:5])
        
        if box_id not in snatch_history: snatch_history[box_id] = 0
        
        # Neck Zone
        neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
        
        # Check Hands (Wide Net for Animal Model)
        hand_in_zone = False
        crop = frame[max(0,y1):min(height,y2), max(0,x1):min(width,x2)]
        
        if crop.size > 0:
            onnx_input = preprocess_for_rtmpose(crop, INPUT_H, INPUT_W)
            outputs = ort_session.run(None, {input_name: onnx_input})
            kpts = decode_simcc_output(outputs, crop.shape[0], crop.shape[1])
            
            # Check Indices 5-12
            for k in range(5, 13):
                if k < len(kpts):
                    real_x, real_y = int(kpts[k][0] + x1), int(kpts[k][1] + y1)
                    if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                        hand_in_zone = True

        # Update History
        if hand_in_zone: snatch_history[box_id] += 1
        else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
        
        # Draw Visuals
        cnt = snatch_history[box_id]
        color = (0, 255, 0)
        label = f"ID:{box_id}"
        
        if cnt > 0: 
            color = (0, 165, 255) # Orange
            label = f"ID:{box_id} Chk:{cnt}"
        if cnt >= REQUIRED_FRAMES: 
            color = (0, 0, 255) # Red
            cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
            cv2.circle(frame, neck_center, neck_radius, color, -1)
        else:
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
        
        cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"‚úÖ DONE! Output saved to: {OUTPUT_PATH}")

In [None]:
import torch
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import sys
import os

# --- A. FIX OCSORT IMPORT (Auto-Search) ---
# This block hunts for 'ocsort.py' instead of guessing the path
print("üîç Searching for Deep OC-SORT...")
dataset_root = '/kaggle/input/using-deep-oc-sort'
found_tracker = False

for root, dirs, files in os.walk(dataset_root):
    if 'ocsort.py' in files:
        sys.path.append(root)  # Add the folder containing the file
        sys.path.append(os.path.dirname(root)) # Add the parent folder
        print(f"‚úÖ FOUND tracker in: {root}")
        found_tracker = True
        try:
            from ocsort import OCSort
            print("‚úÖ Import Successful!")
            break
        except ImportError:
            try:
                # Try importing as a package
                from deep_oc_sort.ocsort import OCSort
                print("‚úÖ Package Import Successful!")
                break
            except:
                pass

if not found_tracker:
    print("‚ùå CRITICAL ERROR: Could not find 'ocsort.py'.")
    print("Listing files in dataset to help debug:")
    for root, dirs, files in os.walk(dataset_root):
        print(f"  {root} -> {files}")
    # Dummy backup to prevent crash (but tracking won't work)
    class OCSort:
        def __init__(self, **kwargs): pass
        def update(self, dets, img): return dets

# Initialize Tracker
tracker = OCSort(det_thresh=0.4, max_age=30, min_hits=3)

# --- B. CONFIGURATION ---
YOLO_PATH = '/kaggle/input/best-yolov8m/pytorch/default/1/best _yolov8m.pt'
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_11.mp4'
OUTPUT_PATH = '/kaggle/working/output_final_fixed.mp4'

INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.3 

# --- C. LOAD MODELS (CPU Forced to stop errors) ---
print(f"Loading Custom YOLO...")
yolo_model = YOLO(YOLO_PATH)

print("Loading RTMPose (CPU Mode)...")
# We explicitly use CPU to stop the "CUDA failure 35" error log
ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name

# --- D. HELPER FUNCTIONS ---
def get_neck_from_box(x1, y1, x2, y2):
    box_h, box_w = y2 - y1, x2 - x1
    return (int(x1 + box_w/2), int(y1 + box_h*0.15)), int(box_h*0.15)

def is_point_in_circle(p, c, r):
    return np.sqrt((p[0]-c[0])**2 + (p[1]-c[1])**2) < r

def preprocess_for_rtmpose(img, h, w):
    resized = cv2.resize(img, (w, h))
    mean, std = np.array([123.675, 116.28, 103.53]), np.array([58.395, 57.12, 57.375])
    img_data = (cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) - mean) / std
    return np.expand_dims(img_data.astype(np.float32).transpose(2, 0, 1), axis=0)

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0] / simcc_x.shape[2] * crop_w
    y_locs = np.argmax(simcc_y, axis=2)[0] / simcc_y.shape[2] * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- E. MAIN LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"‚ùå Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width, height = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS) or 25
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"üöÄ Processing... Alarm triggers at {REQUIRED_FRAMES} frames.")

frame_idx = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    frame_idx += 1
    
    # 1. DETECT (Custom YOLO)
    results = yolo_model.predict(frame, verbose=False, conf=0.4)
    
    detections_list = []
    for r in results:
        for box in r.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = box.conf[0].cpu().numpy()
            detections_list.append([x1, y1, x2, y2, conf])
    
    detections_array = np.array(detections_list)
    
    # 2. TRACK (Deep OC-SORT)
    if len(detections_array) > 0:
        track_results = tracker.update(detections_array, frame)
    else:
        track_results = np.empty((0, 5))

    # 3. ANALYZE
    for track in track_results:
        x1, y1, x2, y2, box_id = map(int, track[:5])
        
        if box_id not in snatch_history: snatch_history[box_id] = 0
        
        # Neck Zone
        neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
        
        # Check Hands
        hand_in_zone = False
        crop = frame[max(0,y1):min(height,y2), max(0,x1):min(width,x2)]
        
        if crop.size > 0:
            onnx_input = preprocess_for_rtmpose(crop, INPUT_H, INPUT_W)
            outputs = ort_session.run(None, {input_name: onnx_input})
            kpts = decode_simcc_output(outputs, crop.shape[0], crop.shape[1])
            
            # Check Paws/Hands
            for k in range(5, 13):
                if k < len(kpts):
                    real_x, real_y = int(kpts[k][0] + x1), int(kpts[k][1] + y1)
                    if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                        hand_in_zone = True

        # Update
        if hand_in_zone: snatch_history[box_id] += 1
        else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
        
        # Draw
        cnt = snatch_history[box_id]
        color = (0, 255, 0)
        label = f"ID:{box_id}"
        
        if cnt > 0: 
            color = (0, 165, 255)
            label = f"ID:{box_id} Chk:{cnt}"
        if cnt >= REQUIRED_FRAMES: 
            color = (0, 0, 255)
            cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
            cv2.circle(frame, neck_center, neck_radius, color, -1)
        else:
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
        
        cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)
    if frame_idx % 20 == 0: print(f"Processing frame {frame_idx}")

cap.release()
out.release()
print(f"‚úÖ DONE! Output saved to: {OUTPUT_PATH}")

In [None]:
!pip install "onnxruntime-gpu" "ultralytics" "numpy==1.26.4" "scipy==1.11.4" "pillow>=10.3.0"

In [1]:
import cv2
import numpy as np
import onnxruntime as ort
from ultralytics import YOLO
import os

# --- A. CONFIGURATION ---
# 1. NEW PATH (YOLO11 Small)
YOLO_PATH = '/kaggle/input/yolo-11/yolo11s.pt'

# 2. POSE MODEL (Animal Model "Hacked" for Humans)
RTMPOSE_ONNX_PATH = '/kaggle/input/rtm-pose/onnx/default/1/rtmpose-m_ap10k_256.onnx' 

# 3. VIDEO PATH
VIDEO_PATH = '/kaggle/input/snatching-videos/snatching_videos/snatching_1.mp4'
OUTPUT_PATH = '/kaggle/working/final_output_yolo11_bytetrack.mp4'

# 4. SETTINGS
INPUT_H, INPUT_W = 256, 256
SNATCH_TIME_THRESHOLD = 0.3  # 0.3s trigger

# --- B. LOAD MODELS ---
print(f"Loading YOLO11 Model: {YOLO_PATH}...")
try:
    yolo_model = YOLO(YOLO_PATH)
except Exception as e:
    print(f"‚ùå Error loading YOLO: {e}")
    exit()

print("Loading Pose Model (CPU)...")
# Force CPU to avoid CUDA errors
ort_session = ort.InferenceSession(RTMPOSE_ONNX_PATH, providers=['CPUExecutionProvider'])
input_name = ort_session.get_inputs()[0].name

# --- C. HELPER FUNCTIONS ---
def get_neck_from_box(x1, y1, x2, y2):
    # Neck is roughly top 15% of the bounding box
    box_h, box_w = y2 - y1, x2 - x1
    return (int(x1 + box_w/2), int(y1 + box_h*0.15)), int(box_h*0.15)

def is_point_in_circle(p, c, r):
    return np.sqrt((p[0]-c[0])**2 + (p[1]-c[1])**2) < r

def preprocess_for_rtmpose(img, h, w):
    resized = cv2.resize(img, (w, h))
    mean, std = np.array([123.675, 116.28, 103.53]), np.array([58.395, 57.12, 57.375])
    img_data = (cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) - mean) / std
    return np.expand_dims(img_data.astype(np.float32).transpose(2, 0, 1), axis=0)

def decode_simcc_output(outputs, crop_h, crop_w):
    simcc_x, simcc_y = outputs[0], outputs[1]
    x_locs = np.argmax(simcc_x, axis=2)[0] / simcc_x.shape[2] * crop_w
    y_locs = np.argmax(simcc_y, axis=2)[0] / simcc_y.shape[2] * crop_h
    return np.stack([x_locs, y_locs], axis=-1)

# --- D. MAIN PROCESSING LOOP ---
if not os.path.exists(VIDEO_PATH):
    print(f"‚ùå Video not found at {VIDEO_PATH}")
    exit()

cap = cv2.VideoCapture(VIDEO_PATH)
width, height = int(cap.get(3)), int(cap.get(4))
fps = cap.get(cv2.CAP_PROP_FPS) or 25
REQUIRED_FRAMES = int(SNATCH_TIME_THRESHOLD * fps)

out = cv2.VideoWriter(OUTPUT_PATH, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height))
snatch_history = {} 

print(f"üöÄ Processing with YOLO11 + ByteTrack...")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    
    # 1. TRACK (Using Built-in Ultralytics ByteTrack)
    # This replaces the entire old Deep OC-SORT block
    results = yolo_model.track(
        frame, 
        persist=True, 
        tracker="bytetrack.yaml",  # <--- HERE IS THE BYTETRACK IMPORT
        verbose=False,
        classes=[0] # Only track class 0 (Person)
    )
    
    # 2. ANALYZE RESULTS
    # Ultralytics handles the tracking internally now
    for r in results:
        if r.boxes.id is None: continue # Skip if no ID assigned yet
        
        boxes = r.boxes.xyxy.cpu().numpy()
        ids = r.boxes.id.cpu().numpy()
        
        for box, box_id in zip(boxes, ids):
            x1, y1, x2, y2 = map(int, box[:4])
            box_id = int(box_id)
            
            if box_id not in snatch_history: snatch_history[box_id] = 0
            
            # --- SNATCHING LOGIC STARTS HERE ---
            
            # A. Define Neck Zone
            neck_center, neck_radius = get_neck_from_box(x1, y1, x2, y2)
            
            # B. Check Hands (Pose Model)
            hand_in_zone = False
            crop = frame[max(0,y1):min(height,y2), max(0,x1):min(width,x2)]
            
            if crop.size > 0:
                onnx_input = preprocess_for_rtmpose(crop, INPUT_H, INPUT_W)
                outputs = ort_session.run(None, {input_name: onnx_input})
                kpts = decode_simcc_output(outputs, crop.shape[0], crop.shape[1])
                
                # Check indices 5-12 (Paws/Elbows mapped to Hands)
                for k in range(5, 13):
                    if k < len(kpts):
                        real_x, real_y = int(kpts[k][0] + x1), int(kpts[k][1] + y1)
                        if is_point_in_circle((real_x, real_y), neck_center, neck_radius):
                            hand_in_zone = True

            # C. Update History
            if hand_in_zone: snatch_history[box_id] += 1
            else: snatch_history[box_id] = max(0, snatch_history[box_id] - 1) 
            
            # D. Draw Visuals
            cnt = snatch_history[box_id]
            color = (0, 255, 0) # Green
            label = f"ID:{box_id}"
            
            if cnt > 0: 
                color = (0, 165, 255) # Orange
                label = f"ID:{box_id} Chk:{cnt}"
            if cnt >= REQUIRED_FRAMES: 
                color = (0, 0, 255) # Red (Alarm)
                cv2.putText(frame, "SNATCH!", (x1, y1-30), cv2.FONT_HERSHEY_SIMPLEX, 1, color, 3)
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 4)
                cv2.circle(frame, neck_center, neck_radius, color, -1)
            else:
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.circle(frame, neck_center, neck_radius, (255, 255, 0), 2)
            
            cv2.putText(frame, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    out.write(frame)

cap.release()
out.release()
print(f"‚úÖ DONE! Video saved to: {OUTPUT_PATH}")

Loading YOLO11 Model: /kaggle/input/yolo-11/yolo11s.pt...
Loading Pose Model (CPU)...
üöÄ Processing with YOLO11 + ByteTrack...


KeyboardInterrupt: 