1- Dataset Discovery (Kaggle Input Check)

In [None]:
import os

# This lists everything Kaggle found in your upload
for dirname, _, filenames in os.walk('/kaggle/input'):
    print(dirname)

2- List Available Kaggle Datasets

In [None]:
ls /kaggle/input

3- Install and Import YOLO (Ultralytics)

In [None]:
# 1. Install the YOLO library
!pip install ultralytics

# 2. Re-run your import and training code
import os
import yaml
from ultralytics import YOLO

4- YOLO Setup, GPU Verification, and Smoke Test Training

In [None]:
# 1. Install YOLO (Takes ~30 seconds)
!pip install ultralytics -q

import torch
from ultralytics import YOLO
import os
import yaml

# 2. Verify GPU status
print(f"CUDA Available: {torch.cuda.is_available()}")

if torch.cuda.is_available():
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")
    
    # 3. Double-check the YAML exists
    if not os.path.exists('/kaggle/working/data.yaml'):
        # Re-create it just in case the session reset
        data_config = {
            'train': '/kaggle/input/vehic-ped-intuition/images/train',
            'val': '/kaggle/input/vehic-ped-intuition/images/val',
            'test': '/kaggle/input/vehic-ped-intuition/images/test',
            'nc': 1, 'names': ['pedestrian']
        }
        with open('/kaggle/working/data.yaml', 'w') as f:
            yaml.dump(data_config, f)

    # 4. Run the 2-Epoch Test
    model = YOLO('/kaggle/working/yolov8n.pt')
    model.train(
        data='/kaggle/working/data.yaml',
        epochs=2,
        imgsz=640,
        batch=16,
        device=0
    )
    print("\n‚úÖ SMOKE TEST PASSED! You are ready for the final Commit.")
else:
    print("‚ùå GPU NOT DETECTED. Check the right sidebar 'Accelerator' setting.")

5- Final YOLO Training Configuration and Execution (Kaggle)

In [None]:

!pip install ultralytics

import os
import yaml
import torch
from ultralytics import YOLO

if torch.cuda.is_available():
    print(f"GPU is active: {torch.cuda.get_device_name(0)}")
    device_id = 0
else:
    print("GPU not found! Training will be slow on CPU.")
    device_id = 'cpu'

DATASET_ROOT = '/kaggle/input/vehic-ped-intuition'
WORKING_DIR = '/kaggle/working'

data_config = {
    'train': f'{DATASET_ROOT}/images/train',
    'val': f'{DATASET_ROOT}/images/val',
    'test': f'{DATASET_ROOT}/images/test',
    'nc': 1,
    'names': ['pedestrian']
}

with open(f'{WORKING_DIR}/data.yaml', 'w') as f:
    yaml.dump(data_config, f)


model = YOLO('yolov8n.pt')

print("üöÄ Starting Final Training...")
model.train(
    data=f'{WORKING_DIR}/data.yaml',
    epochs=50,
    imgsz=640,
    batch=32,            
    patience=10,
    save_period=5,
    name='jaad_final_model',
    project=f'{WORKING_DIR}/training_results',
    device=device_id,
    exist_ok=True
)

6- List Current Working Directory Contents 

In [None]:
ls

8- List Current Working Directory Contents

In [None]:
ls

9- Analyze Test Set Video Distribution

In [None]:
import os
from collections import Counter
import re

# Path to the test images directory
test_dir = '/kaggle/input/vehic-ped-intuition/images/test'
all_frames = [f for f in os.listdir(test_dir) if f.endswith('.jpg')]

# Extract video IDs from filenames (e.g., video_0051_frame_001.jpg -> video_0051)
video_ids = []
for f in all_frames:
    if 'video_' in f:
        # Splits the filename to get 'video' and 'XXXX'
        parts = f.split('_')
        video_name = f"{parts[0]}_{parts[1]}"
        video_ids.append(video_name)

# Count frames per video
video_counts = Counter(video_ids)

print(f"Total Unique Videos in Test Set: {len(video_counts)}")
print("-" * 40)
for vid, count in sorted(video_counts.items()):
    print(f"üé¨ {vid}: {count} frames")

print("-" * 40)
print(f"Grand Total of Test Frames: {len(all_frames)}")

10- Final Model Evaluation on Test Dataset

In [None]:
from ultralytics import YOLO
import os

# 1. Load your best trained weights
model_path = '/kaggle/working/training_results/jaad_final_model/weights/best.pt'
model = YOLO(model_path)

# 2. Run validation specifically on the TEST split
print("Starting Final Evaluation on 2,303 test frames...")
metrics = model.val(
    data='/kaggle/working/data.yaml',
    split='test',             # Forces the model to use the 'test' folder
    imgsz=640,
    batch=32,
    name='final_test_evaluation',
    project='/kaggle/working/evaluation',
    device=0                  # Use GPU
)

print("\n" + "="*35)
print(f"Mean Average Precision (mAP50): {metrics.box.map50:.4f}")
print(f"Recall (R): {metrics.box.mr:.4f}")
print(f"Precision (P): {metrics.box.mp:.4f}")
print("="*35)

11- Generate and Save Visual Prediction Results 

In [None]:
# 3. Save predicted images for visual inspection
print("Saving sample predictions for the report...")
model.predict(
    source='/kaggle/input/vehic-ped-intuition/images/test',
    conf=0.33,               # Optimal confidence from your F1 curve
    save=True,
    max_det=10,
    name='prediction_visuals',
    project='/kaggle/working/evaluation',
    exist_ok=True
)

In [None]:
import pandas as pd

csv_path = "/kaggle/working/training_results/jaad_final_model/results.csv"
df = pd.read_csv(csv_path)
print(df.tail(5))
print("\nBest epoch by mAP50:", df['metrics/mAP50(B)'].idxmax(), "=>", df['metrics/mAP50(B)'].max())

In [None]:
import pandas as pd

df = pd.read_csv("/kaggle/working/training_results/jaad_final_model/results.csv")
df[['epoch', 'metrics/mAP50(B)', 'metrics/precision(B)', 'metrics/recall(B)']].tail(10)

14-  Video-Level Data Leakage Check (Train/Val/Test Overlap)

In [None]:
import os
import glob

def video_ids_from(image_folder: str) -> set[str]:
    """
    Extracts video IDs from filenames like: video_0051_frame_001.jpg -> video_0051
    Adjust parsing if your naming differs.
    """
    vids = set()
    for f in glob.glob(os.path.join(image_folder, "*.jpg")):
        base = os.path.basename(f)
        if "video_" in base:
            parts = base.split("_")
            if len(parts) >= 2:
                vids.add(f"{parts[0]}_{parts[1]}")  # e.g., video_0051
    return vids

train_dir = "/kaggle/input/vehic-ped-intuition/images/train"
val_dir   = "/kaggle/input/vehic-ped-intuition/images/val"
test_dir  = "/kaggle/input/vehic-ped-intuition/images/test"

train_vids = video_ids_from(train_dir)
val_vids   = video_ids_from(val_dir)
test_vids  = video_ids_from(test_dir)

print("Unique videos:")
print("  Train:", len(train_vids))
print("  Val  :", len(val_vids))
print("  Test :", len(test_vids))

print("\nOverlaps (should ideally be 0 for strict video-level split):")
print("  Train ‚à© Test:", len(train_vids & test_vids))
print("  Val   ‚à© Test:", len(val_vids & test_vids))
print("  Train ‚à© Val :", len(train_vids & val_vids))

# If you want to see which video IDs overlap:
# print("Train‚à©Test IDs:", sorted(train_vids & test_vids))

**15- Random Test Sample Inference and Visualization**

In [None]:
from ultralytics import YOLO
import os, glob, cv2, shutil, random

# 1) Load the trained YOLO model
model = YOLO("/kaggle/working/training_results/jaad_final_model/weights/best.pt")

# 2) Load all test images
test_dir = "/kaggle/input/vehic-ped-intuition/images/test"
all_imgs = glob.glob(os.path.join(test_dir, "*.jpg"))

# 3) Output directory (cleaned on every run)
out_dir = "/kaggle/working/evaluation/random_samples"
if os.path.exists(out_dir):
    shutil.rmtree(out_dir)
os.makedirs(out_dir)

# 4) Randomly select test samples
N = 8            # number of images per run
CONF = 0.33      # confidence threshold

selected_imgs = random.sample(all_imgs, N)

for i, img_path in enumerate(selected_imgs, start=1):
    results = model.predict(
        source=img_path,
        conf=CONF,
        iou=0.5,
        max_det=10,
        save=False,
        verbose=False
    )

    # Get annotated prediction image (BGR format)
    annotated = results[0].plot()

    # Save prediction result
    save_path = os.path.join(out_dir, f"sample_{i}.jpg")
    cv2.imwrite(save_path, annotated)

print(f"‚úÖ {N} random prediction samples saved to ‚Üí {out_dir}")

In [None]:
!ls /kaggle/input

In [None]:
!ls -lah /kaggle/input/vehic-ped-intuition
!ls -lah /kaggle/input/vehic-ped-intuition/crops

****PHASE 2****

Checking Random Samples

In [None]:
from ultralytics import YOLO
import torch

DEVICE = 0 if torch.cuda.is_available() else "cpu"

yolo = YOLO("/kaggle/input/first-phase-model/weights/best.pt")
print("YOLO model loaded ‚úî")


In [None]:
import glob
from ultralytics import YOLO

model = YOLO("/kaggle/input/first-phase-model/weights/best.pt")
imgs = glob.glob("/kaggle/input/vehic-ped-intuition/images/test/*.jpg")

detections = []
for img in imgs[:200]:  # sample
    r = model.predict(img, conf=0.33, verbose=False)[0]
    detections.append(len(r.boxes))

print("Avg detections per frame:", sum(detections)/len(detections))
print("Zero-detection frames:", sum(d == 0 for d in detections))


In [None]:
import os
import re
import random
from collections import defaultdict
from ultralytics import YOLO
import cv2

TEST_IMG_DIR = "/kaggle/input/vehic-ped-intuition/images/test"
OUT_DIR = "/kaggle/working/evaluation/random_video_samples"
os.makedirs(OUT_DIR, exist_ok=True)

def get_video_id(fname):
    m = re.search(r"(video_\d+)", fname)
    return m.group(1) if m else None

# Group frames by video
video_frames = defaultdict(list)
for f in os.listdir(TEST_IMG_DIR):
    if f.endswith(".jpg"):
        vid = get_video_id(f)
        if vid:
            video_frames[vid].append(f)

print("Total test videos:", len(video_frames))


In [None]:
random.seed(42)

selected_videos = random.sample(list(video_frames.keys()), 10)
print("Selected videos:", selected_videos)


In [None]:
yolo = YOLO("/kaggle/input/first-phase-model/weights/best.pt")


In [None]:
CONF = 0.33

for vid in selected_videos:
    frames = video_frames[vid]
    sampled_frames = random.sample(frames, min(3, len(frames)))

    vid_out = os.path.join(OUT_DIR, vid)
    os.makedirs(vid_out, exist_ok=True)

    for fname in sampled_frames:
        img_path = os.path.join(TEST_IMG_DIR, fname)

        results = yolo.predict(
            source=img_path,
            conf=CONF,
            iou=0.5,
            max_det=10,
            verbose=False
        )

        annotated = results[0].plot()
        save_path = os.path.join(vid_out, fname)
        cv2.imwrite(save_path, annotated)

print("‚úÖ Saved random detections per video to:", OUT_DIR)


In [None]:
import shutil
import os

SRC_DIR = "/kaggle/working/evaluation/random_video_samples"
ZIP_PATH = "/kaggle/working/random_video_samples.zip"

# Remove old zip if exists
if os.path.exists(ZIP_PATH):
    os.remove(ZIP_PATH)

# Create zip
shutil.make_archive(
    base_name=ZIP_PATH.replace(".zip", ""),
    format="zip",
    root_dir=SRC_DIR
)

print("‚úÖ Zipped to:", ZIP_PATH)


In [None]:
import os
import re
from collections import defaultdict

# ======================================================
# 1) Paths
# ======================================================
TEST_IMG_DIR = "/kaggle/input/vehic-ped-intuition/images/test"

# ======================================================
# 2) Helper: extract video ID
# ======================================================
def get_video_id(filename):
    """
    Extracts video ID from filenames like:
    video_0024_frame_0050.jpg -> video_0024
    """
    m = re.search(r"(video_\d+)", filename)
    return m.group(1) if m else None

# ======================================================
# 3) Group frames by video
# ======================================================
video_frames = defaultdict(list)

for fname in sorted(os.listdir(TEST_IMG_DIR)):
    if not fname.lower().endswith(".jpg"):
        continue

    vid = get_video_id(fname)
    if vid is not None:
        video_frames[vid].append(fname)

# ======================================================
# 4) Summary
# ======================================================
print("‚úÖ Total test videos found:", len(video_frames))

for i, (vid, frames) in enumerate(video_frames.items()):
    print(f"{vid}: {len(frames)} frames")
    if i >= 9:  # show first 10 only
        break

# ======================================================
# 5) Optional: inspect a specific video
# ======================================================
TARGET_VIDEO = "video_0024"

if TARGET_VIDEO in video_frames:
    print(f"\nüìå {TARGET_VIDEO} contains {len(video_frames[TARGET_VIDEO])} frames")
else:
    print(f"\n‚ùå {TARGET_VIDEO} not found in test set")


In [None]:
import shutil

OUT_DIR = "/kaggle/working/video_0024_test_frames"
os.makedirs(OUT_DIR, exist_ok=True)

for fname in video_frames["video_0024"]:
    src = os.path.join(TEST_IMG_DIR, fname)
    dst = os.path.join(OUT_DIR, fname)
    shutil.copy(src, dst)

print(f"‚úÖ Copied frames to: {OUT_DIR}")


In [None]:
CONF  = 0.33      # optimal from F1-confidence curve
IMGSZ = 960       # improves small/night targets
AUG   = True      # test-time augmentation
IOU   = 0.40      # stricter NMS to reduce duplicates


Enhanced Light 

In [None]:
import os
import cv2
import glob
import random
import numpy as np
from ultralytics import YOLO

# 1. DEFINICI√ìN DE LA FUNCI√ìN DE MEJORA (Soluci√≥n al NameError)
def enhance_low_light(img):
    """
    Mejora la visibilidad en escenas oscuras usando CLAHE en el espacio de color LAB.
    """
    if img is None:
        return None
    # Convertir a LAB para manipular la luminancia (L) sin afectar los colores (A, B)
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    
    # CLAHE: Ecualizaci√≥n de histograma adaptativa limitada por contraste
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    
    # Unir canales y volver a BGR
    limg = cv2.merge((cl,a,b))
    return cv2.cvtColor(limg, cv2.COLOR_LAB2BGR)

# 2. CONFIGURACI√ìN
CONF = 0.25  # Un poco m√°s bajo para captar peatones en sombras
IOU = 0.45
OUT_DIR = "/kaggle/working/enhanced_tracking_samples"

# Asumiendo que ya tienes definidos: selected_videos, video_frames, TEST_IMG_DIR y yolo
for vid in selected_videos:
    frames = video_frames[vid]
    # Tomamos 5 muestras para ver mejor la continuidad
    sampled_frames = sorted(random.sample(frames, min(5, len(frames))))

    vid_out = os.path.join(OUT_DIR, vid)
    os.makedirs(vid_out, exist_ok=True)

    print(f"üñºÔ∏è Procesando muestras mejoradas para: {vid}")

    for fname in sampled_frames:
        img_path = os.path.join(TEST_IMG_DIR, fname)
        img_orig = cv2.imread(img_path)
        
        if img_orig is None: continue

        # APLICAR MEJORA
        img_enhanced = enhance_low_light(img_orig)

        # USAR TRACK EN LUGAR DE PREDICT (Vital para Fase 2)
        # persist=True mantiene el ID del peat√≥n entre frames
        # classes=[0] filtra para que SOLO detecte personas
        results = yolo.track(
            source=img_enhanced,
            conf=CONF,
            iou=IOU,
            persist=True,
            classes=[0], 
            verbose=False
        )

        # Dibujar anotaciones (incluye el ID del tracking)
        annotated = results[0].plot(line_width=2)
        
        # Agregar texto informativo en la imagen
        cv2.putText(annotated, f"ID Tracking Habilitado | Peatones Solo", (20, 40), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)

        save_path = os.path.join(vid_out, f"enhanced_{fname}")
        cv2.imwrite(save_path, annotated)

print(f"‚úÖ Proceso completado. Revisa las im√°genes en: {OUT_DIR}")

The CLAHE is being effective, getting more continual vision, the AVG streak almost duplicated itself

In [None]:
def streak_lengths(counts):
    streaks = []
    cur = 0
    for c in counts:
        if c >= 1:
            cur += 1
        else:
            if cur > 0:
                streaks.append(cur)
            cur = 0
    if cur > 0:
        streaks.append(cur)
    return streaks

orig_streaks = streak_lengths(orig_counts)
enh_streaks  = streak_lengths(enh_counts)

print("Original streaks:", orig_streaks)
print("Enhanced streaks:", enh_streaks)
print("Original avg streak:", np.mean(orig_streaks) if orig_streaks else 0)
print("Enhanced avg streak:", np.mean(enh_streaks) if enh_streaks else 0)
print("Streaks ‚â•3 (orig):", sum(s >= 3 for s in orig_streaks))
print("Streaks ‚â•3 (enh): ", sum(s >= 3 for s in enh_streaks))


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# 1. Preparar los datos
frames = np.arange(len(orig_counts))

# 2. Crear la gr√°fica
# Quitamos plt.figure() si est√°s en algunas versiones de entorno limitado, 
# pero en Kaggle/Colab funciona bien.
plt.figure(figsize=(12, 4))
plt.plot(frames, orig_counts, label="Original (Oscuro)", marker="o", color='blue', alpha=0.7)
plt.plot(frames, enh_counts, label="Enhanced (CLAHE)", marker="s", color='green', alpha=0.7)

# L√≠nea de referencia (m√≠nimo 1 detecci√≥n)
plt.axhline(1, color='red', linestyle="--", alpha=0.5, label="Umbral de detecci√≥n")

# Etiquetas y T√≠tulo
plt.xlabel("√çndice del Frame")
plt.ylabel("N√∫mero de Peatones Detectados")
plt.title("Efecto de la Mejora CLAHE en Video Nocturno")
plt.legend()
plt.grid(True, alpha=0.3)

# Mostrar o Guardar
plt.tight_layout()
plt.show() 
# plt.savefig("comparativa_deteccion.png") # Opcional: para descargar la imagen

Quality Filter

In [None]:
def get_streak_indices(counts, min_len=3):
    streaks, cur = [], []
    for i, c in enumerate(counts):
        if c >= 1:
            cur.append(i)
        else:
            if len(cur) >= min_len:
                streaks.append(cur)
            cur = []
    if len(cur) >= min_len:
        streaks.append(cur)
    return streaks

long_streaks = get_streak_indices(enh_counts, min_len=3)
long_frames  = sorted(set(i for s in long_streaks for i in s))
single_frames = [i for i, c in enumerate(enh_counts) if c == 1][:5]
fail_frames   = [i for i, c in enumerate(enh_counts) if c == 0][:5]

selected = sorted(set(long_frames + single_frames + fail_frames))
print("Selected frames:", selected)


In [None]:
EVIDENCE_DIR = "/kaggle/working/night_evidence"
os.makedirs(EVIDENCE_DIR, exist_ok=True)

for i in selected:
    fname = img_files[i].split("/")[-1]

    shutil.copy(
        os.path.join(OUT_ORIG, fname),
        os.path.join(EVIDENCE_DIR, f"{i:03d}_orig.jpg")
    )
    shutil.copy(
        os.path.join(OUT_ENH, fname),
        os.path.join(EVIDENCE_DIR, f"{i:03d}_enh.jpg")
    )


In [None]:
ZIP_PATH = "/kaggle/working/night_detection_evidence.zip"

if os.path.exists(ZIP_PATH):
    os.remove(ZIP_PATH)

shutil.make_archive(
    ZIP_PATH.replace(".zip", ""),
    "zip",
    EVIDENCE_DIR
)

ZIP_PATH


## Phase 2 : Preparing Data for Combining YOLO + ViT + LSTM

In [None]:
!pip install ultralytics

# Testing to find out Tracking + Cropping

In [None]:
import os
import random
import cv2
import torch
import numpy as np
from ultralytics import YOLO
from torchvision import transforms
from PIL import Image


Smoke test for enhanced video, tracking and feature extraction to CSV

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
from ultralytics import YOLO
from collections import defaultdict

# ==========================================
# 1. CONFIGURACI√ìN Y RUTAS
# ==========================================
VIDEO_ID = "0161"
PATHS = {
    'model': '/kaggle/input/first-phase-model/weights/best.pt',
    'images': '/kaggle/input/vehic-ped-intuition/images/test',
    'labels': '/kaggle/input/vehic-ped-intuition/labels/test',
    'attributes': '/kaggle/input/attributes-label/annotations_attributes'
}

yolo = YOLO(PATHS['model'])

# ==========================================
# 2. FUNCIONES DE APOYO (Mejora y Handshake)
# ==========================================
def enhance_low_light(img):
    if img is None: return None
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    return cv2.cvtColor(cv2.merge((cl,a,b)), cv2.COLOR_LAB2BGR)

def calculate_iou(boxA, boxB):
    xA, yA = max(boxA[0], boxB[0]), max(boxA[1], boxB[1])
    xB, yB = min(boxA[2], boxB[2]), min(boxA[3], boxB[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
    return interArea / float(boxAArea + boxBArea - interArea + 1e-6)

# ==========================================
# 3. PROCESAMIENTO DE TRACKING
# ==========================================
# Cargar Metadatos XML
xml_path = os.path.join(PATHS['attributes'], f"video_{VIDEO_ID}_attributes.xml")
peds_meta = {}
tree = ET.parse(xml_path)
for p in tree.getroot().findall('pedestrian'):
    peds_meta[p.get('id')] = {'dp': int(p.get('decision_point')), 'crossing': int(p.get('crossing'))}

# Obtener Frames
frame_files = sorted([os.path.join(PATHS['images'], f) for f in os.listdir(PATHS['images']) 
                     if VIDEO_ID in f and f.endswith('.jpg')])

processed_data = defaultdict(list)
id_map = {}
assigned_xml_ids = set()

print(f"üöÄ Iniciando Tracking Mejorado para Video {VIDEO_ID}...")

for fidx, path in enumerate(frame_files[:100]): # Procesamos 100 frames para asegurar el DP
    img = cv2.imread(path)
    img_enh = enhance_low_light(img)
    
    results = yolo.track(img_enh, persist=True, conf=0.25, classes=[0], verbose=False)[0]
    
    txt_path = os.path.join(PATHS['labels'], os.path.basename(path).replace('.jpg', '.txt'))
    
    if os.path.exists(txt_path) and results.boxes.id is not None:
        h, w = results.orig_shape
        gt_boxes = []
        with open(txt_path, 'r') as f:
            for line in f:
                c = list(map(float, line.split()))[1:]
                gt_boxes.append([(c[0]-c[2]/2)*w, (c[1]-c[3]/2)*h, (c[0]+c[2]/2)*w, (c[1]+c[3]/2)*h])

        t_boxes = results.boxes.xyxy.cpu().numpy()
        t_ids = results.boxes.id.int().cpu().numpy()

        for tb, tid in zip(t_boxes, t_ids):
            if tid not in id_map:
                for gb in gt_boxes:
                    if calculate_iou(tb, gb) > 0.3:
                        for xid in peds_meta.keys():
                            if xid not in assigned_xml_ids:
                                id_map[tid] = xid
                                assigned_xml_ids.add(xid)
                                break
            
            if tid in id_map:
                xid = id_map[tid]
                processed_data[xid].append({
                    'frame': fidx, 'bbox': tb, 'before_dp': fidx <= peds_meta[xid]['dp']
                })

# ==========================================
# 4. GENERACI√ìN DE DATASET CSV (Fase 2)
# ==========================================
rows = []
for xid, frames_list in processed_data.items():
    if len(frames_list) < 3: continue
    
    for i in range(len(frames_list)):
        curr = frames_list[i]
        b = curr['bbox']
        cx, cy, w, h = (b[0]+b[2])/2, (b[1]+b[3])/2, b[2]-b[0], b[3]-b[1]
        
        # Features cinem√°ticos
        vel_x, vel_y, delta_area = (0, 0, 1)
        if i > 0:
            p_b = frames_list[i-1]['bbox']
            vel_x = cx - (p_b[0]+p_b[2])/2
            vel_y = cy - (p_b[1]+p_b[3])/2
            delta_area = (w*h) / ((p_b[2]-p_b[0])*(p_b[3]-p_b[1]) + 1e-6)
            
        rows.append({
            'ped_id': xid, 'frame': curr['frame'], 'x': cx, 'y': cy, 
            'vel_x': vel_x, 'vel_y': vel_y, 'delta_area': delta_area,
            'aspect_ratio': w/h, 'before_dp': int(curr['before_dp']),
            'label': peds_meta[xid]['crossing']
        })

df = pd.DataFrame(rows)
df.to_csv("dataset_fase2_intencion.csv", index=False)
print(f"\n‚úÖ ¬°√âxito! CSV guardado con {len(df)} filas.")
print(df.head())

# üé¨ Multivideo Processor: Phase 1
---
> **Purpose:** Extracting cinematic features from multiple video splits (Train, Val, Test) for LSTM training.
Extracting features for Train,val and testing after the Enhancing of the images using the CLAHE for identifying better the frame streaks where pedestrians were shown or catched without interruptions. Enhanced the image ilumination performing better for dark videos making them visible for YOLO leading to fewer broken trajectories.

Our LSTM will only receive High Quality Sequences of 5 frames per streak so then will learn with accurate data and no trash.

the before_dp: will allow to identify the pedestrian intention before the person steps into the road, because is learning from the frames before the real DP

Vel_x and Vel_y are directional movement
delta_area : If it's >1.0 the pedestrian is getting closer to the camera.
aspect_ratio: Helps detect changes in posture for example from standing to walking.


Import Libraries

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
from ultralytics import YOLO
from collections import defaultdict

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import xml.etree.ElementTree as ET
from ultralytics import YOLO
from collections import defaultdict

# --- 1. PATH CONFIGURATION ---
# Ensure these paths match your Kaggle input structure
BASE_PATH = '/kaggle/input/vehic-ped-intuition'
XML_PATH = '/kaggle/input/attributes-label/annotations_attributes'
MODEL_PATH = '/kaggle/input/first-phase-model/weights/best.pt'

# Load the Phase 1 YOLO model
yolo = YOLO(MODEL_PATH)

def enhance_image(img):
    """
    Applies CLAHE (Contrast Limited Adaptive Histogram Equalization) 
    to improve visibility in low-light or night scenes.
    """
    if img is None: return None
    # Convert BGR to LAB to process Luminance (L) independently
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    
    # Create CLAHE object (clipLimit handles contrast, tileGridSize handles local areas)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    cl = clahe.apply(l)
    
    # Merge back and convert to BGR for YOLO processing
    return cv2.cvtColor(cv2.merge((cl,a,b)), cv2.COLOR_LAB2BGR)

def process_split(split_name):
    """
    Processes a full data split (train, val, or test), performing tracking,
    handshaking with XML metadata, and feature engineering.
    """
    print(f"\nüìÇ PROCESSING SPLIT: {split_name.upper()}...")
    
    img_dir = os.path.join(BASE_PATH, 'images', split_name)
    if not os.path.exists(img_dir):
        print(f"‚ö†Ô∏è Directory not found: {img_dir}")
        return

    all_files = os.listdir(img_dir)
    # Extract unique video IDs (e.g., from video_0161_f000.jpg -> 0161)
    video_ids = sorted(list(set([f.split('_')[1] for f in all_files if '_' in f])))
    split_rows = []

    for v_id in video_ids:
        xml_file = os.path.join(XML_PATH, f"video_{v_id}_attributes.xml")
        if not os.path.exists(xml_file): continue
        
        # --- PHASE 2 METADATA LOADING ---
        peds_meta = {}
        tree = ET.parse(xml_file)
        for p in tree.getroot().findall('pedestrian'):
            peds_meta[p.get('id')] = {
                'dp': int(p.get('decision_point')), 
                'crossing': int(p.get('crossing'))
            }

        # Filter and sort frames belonging to the current video
        v_frames = sorted([os.path.join(img_dir, f) for f in all_files if f"video_{v_id}" in f])
        
        processed_tracks = defaultdict(list)
        id_map = {} # Maps YOLO Track IDs to XML Pedestrian IDs
        assigned_xml_ids = set()

        # --- INFERENCE & TRACKING LOOP ---
        for fidx, path in enumerate(v_frames[:100]): # Processing first 100 frames for sequence stability
            img = cv2.imread(path)
            img_enh = enhance_image(img) # Apply night-vision enhancement
            
            # Perform Tracking: persist=True maintains IDs across frames
            results = yolo.track(img_enh, persist=True, conf=0.28, classes=[0], verbose=False)[0]
            
            if results.boxes.id is not None:
                t_boxes = results.boxes.xyxy.cpu().numpy()
                t_ids = results.boxes.id.int().cpu().numpy()
                
                for tb, tid in zip(t_boxes, t_ids):
                    # XML Handshake: Assign YOLO ID to XML ID if not already mapped
                    if tid not in id_map:
                        for xid in peds_meta.keys():
                            if xid not in assigned_xml_ids:
                                id_map[tid] = xid
                                assigned_xml_ids.add(xid)
                                break
                                
                    if tid in id_map:
                        current_xid = id_map[tid]
                        # Store frame data for sequence building
                        processed_tracks[current_xid].append({
                            'frame': fidx, 
                            'bbox': tb, 
                            'before_dp': fidx <= peds_meta[current_xid]['dp']
                        })

        # --- FEATURE ENGINEERING ---
        for xid, frames_list in processed_tracks.items():
            # Filter: We only keep sequences (streaks) of 5+ frames for the LSTM
            if len(frames_list) < 5: continue 
            
            for i in range(len(frames_list)):
                curr = frames_list[i]
                b = curr['bbox']
                # Calculate center coordinates and dimensions
                cx, cy, w, h = (b[0]+b[2])/2, (b[1]+b[3])/2, b[2]-b[0], b[3]-b[1]
                
                # Kinetic features: Velocity and Scale Change
                vel_x, vel_y, d_area = (0, 0, 1)
                if i > 0:
                    p_b = frames_list[i-1]['bbox']
                    # Velocity = displacement between current and previous frame
                    vel_x = cx - (p_b[0]+p_b[2])/2
                    vel_y = cy - (p_b[1]+p_b[3])/2
                    # Delta Area = Change in bounding box size (approaching/receding)
                    d_area = (w*h) / ((p_b[2]-p_b[0])*(p_b[3]-p_b[1]) + 1e-6)
                
                split_rows.append({
                    'video_id': v_id, 
                    'ped_id': xid, 
                    'frame': curr['frame'],
                    'x': cx, 'y': cy, 
                    'vel_x': vel_x, 'vel_y': vel_y,
                    'delta_area': d_area, 
                    'aspect_ratio': w/h,
                    'before_dp': int(curr['before_dp']), # Vital for intention prediction
                    'label': peds_meta[xid]['crossing']   # Target variable (1/0)
                })
    
    # Save results to a CSV file for Phase 2 training
    if split_rows:
        df = pd.DataFrame(split_rows)
        output_name = f"master_{split_name}_dataset.csv"
        df.to_csv(output_name, index=False)
        print(f"‚úÖ Saved: {output_name} ({len(df)} rows)")
    else:
        print(f"‚ùå No data generated for split: {split_name}")

# --- EXECUTION ---
# This will generate 3 CSV files: master_train_dataset, master_val_dataset, master_test_dataset
for split in ['train', 'val', 'test']:
    process_split(split)

# Data Pre-Processor

In [None]:
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import MinMaxScaler

# --- 1. CONFIGURATION ---
SEQ_LEN = 10 
FEATURES = ['x', 'y', 'vel_x', 'vel_y', 'delta_area', 'aspect_ratio']
TARGET = 'label'

def create_lstm_sequences(csv_path, seq_len=10):
    df = pd.read_csv(csv_path)
    df = df[df[TARGET].isin([0, 1])]
    df[FEATURES] = df[FEATURES].fillna(0)
    
    scaler = MinMaxScaler()
    df[FEATURES] = scaler.fit_transform(df[FEATURES])
    
    sequences, labels = [], []
    grouped = df.groupby(['video_id', 'ped_id'])
    
    for (v_id, p_id), group in grouped:
        if len(group) >= seq_len:
            group = group.sort_values('frame')
            feature_data = group[FEATURES].values
            target_value = group[TARGET].mode()[0] 
            
            for i in range(len(group) - seq_len + 1):
                window = feature_data[i : i + seq_len]
                sequences.append(window)
                labels.append(target_value)
                
    return np.array(sequences), np.array(labels)

# --- 2. THE BALANCE FIX (Oversampling) ---
print("‚öñÔ∏è Balancing training data...")
df_raw = pd.read_csv('master_train_dataset.csv')
df_raw = df_raw[df_raw[TARGET].isin([0, 1])]

# Separate majority and minority
df_majority = df_raw[df_raw[TARGET] == 1]
df_minority = df_raw[df_raw[TARGET] == 0]

# Oversample the minority (duplicate rows until it matches the majority)
df_minority_upsampled = df_minority.sample(len(df_majority), replace=True, random_state=42)

# Combine and save as a new file
df_balanced = pd.concat([df_majority, df_minority_upsampled])
df_balanced.to_csv('balanced_train_dataset.csv', index=False)
print(f"‚úÖ Balanced file created: {len(df_balanced)} rows (50/50 split)")

# --- 3. EXECUTION ---
print("‚öôÔ∏è Transforming CSVs into Sequences...")

# NOTE: We use the BALANCED file for training, but ORIGINAL files for Val/Test
X_train, y_train = create_lstm_sequences('balanced_train_dataset.csv', SEQ_LEN)
X_val, y_val = create_lstm_sequences('master_val_dataset.csv', SEQ_LEN)
X_test, y_test = create_lstm_sequences('master_test_dataset.csv', SEQ_LEN)

print(f"\n‚úÖ Preprocessing Complete!")
print(f"Train Sequences: {X_train.shape} (Should be much larger now!)")
print(f"Validation Sequences: {X_val.shape}")

In [None]:
import torch.nn as nn
from sklearn.preprocessing import StandardScaler

# --- Move the scaler HERE (outside) so it becomes a global variable ---
scaler = StandardScaler()

def create_lstm_sequences(csv_path, seq_len=10, is_training=False):
    df = pd.read_csv(csv_path)
    df = df[df[TARGET].isin([0, 1])]
    df[FEATURES] = df[FEATURES].fillna(0)
    
    # --- THE FIX: Convert to .values (NumPy) BEFORE scaling ---
    feature_values = df[FEATURES].values 
    
    if is_training:
        # Scaler fits on raw numbers, no names memorized
        scaled_values = scaler.fit_transform(feature_values)
    else:
        scaled_values = scaler.transform(feature_values)
    
    # Put the scaled values back into the dataframe structure for the groupby
    df[FEATURES] = scaled_values
    
    sequences, labels = [], []
    grouped = df.groupby(['video_id', 'ped_id'])
    
    for (v_id, p_id), group in grouped:
        if len(group) >= seq_len:
            group = group.sort_values('frame')
            feature_data = group[FEATURES].values
            target_value = group[TARGET].mode()[0] 
            
            for i in range(len(group) - seq_len + 1):
                window = feature_data[i : i + seq_len]
                sequences.append(window)
                labels.append(target_value)
                
    return np.array(sequences), np.array(labels)

# --- RE-RUN YOUR DATA GENERATION ---
X_train, y_train = create_lstm_sequences('balanced_train_dataset.csv', SEQ_LEN, is_training=True)
X_val, y_val = create_lstm_sequences('master_val_dataset.csv', SEQ_LEN, is_training=False)
X_test, y_test = create_lstm_sequences('master_test_dataset.csv', SEQ_LEN, is_training=False)

# LSTM Construction

# Data Auditory before Training

Identifying noisy features, that have been ignored in the sequences building

In [None]:
import pandas as pd

def audit_csv_files(file_list):
    print("üîç Auditing CSV files for 'dirty' labels...")
    for file in file_list:
        df = pd.read_csv(file)
        # Check for unique values in 'label'
        unique_labels = df['label'].unique()
        # Count NaNs or values outside [0, 1]
        invalid_mask = ~df['label'].isin([0, 1])
        invalid_count = invalid_mask.sum()
        
        print(f"\nüìÑ File: {file}")
        print(f"   - Unique labels found: {unique_labels}")
        if invalid_count > 0:
            print(f"   - ‚ö†Ô∏è ALERT: Found {invalid_count} invalid rows (not 0 or 1).")
            # Show a sample of invalid rows if they exist
            print(df[invalid_mask][['video_id', 'ped_id', 'label']].head())
        else:
            print(f"   - ‚úÖ Data is clean (only 0 and 1).")

# Run audit on your 3 generated files
audit_csv_files(['master_train_dataset.csv', 'master_val_dataset.csv', 'master_test_dataset.csv'])

# LSTM TRAINING LOOP


# Calculating the weights

In [None]:
print(pd.Series(y_train).value_counts())

In [None]:
import torch

# --- 1. INITIALIZE DEVICE ---
# This tells the code to use the GPU if available (faster) or the CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"üíª Using device: {device}")

# --- 2. CALCULATE CLASS WEIGHTS ---
# Stay (Class 0): 4541
# Cross (Class 1): 578
count_stay = 4541
count_cross = 578

# The pos_weight formula for BCEWithLogitsLoss:
# pos_weight = total_negative_samples / total_positive_samples
pos_weight_value = count_stay / count_cross
class_weights = torch.tensor([pos_weight_value]).to(device)

print(f"‚öñÔ∏è Scale Weight for Class 1 (Crossing): {class_weights.item():.2f}")

# --- 3. INITIALIZE LOSS FUNCTION ---
# We pass the weights here so the LSTM knows the 'Crossing' class is the priority
criterion = torch.nn.BCEWithLogitsLoss(pos_weight=class_weights)

# Loss Function and Class Weights

These to prevet the AI to ignore the minority group of 0 that are not moving

In [None]:
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split

# --- 1. DEVICE CONFIGURATION ---
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# --- 2. STRATIFIED DATA SPLIT ---
# Stratify ensures the 157/1380 ratio is preserved in both sets
train_df, val_df = train_test_split(
    df_test, 
    test_size=0.2, 
    stratify=df_test['label'], 
    random_state=42
)

# --- 3. SAFETY-FIRST WEIGHT CALCULATION ---
counts = train_df['label'].value_counts()
stay_count = counts[0]
cross_count = counts[1]

# LOGIC: If 'Stay' is majority, we boost 'Cross'. 
# If 'Cross' is already majority (your case), we keep weight at 1.0.
# We NEVER set it to 0.11 because that would make the AI ignore pedestrians.
if stay_count > cross_count:
    pos_weight_val = stay_count / cross_count
else:
    pos_weight_val = 1.0  # Keep priority high for the dangerous class

class_weights = torch.tensor([pos_weight_val]).to(device)

print(f"üìä Dataset Split Complete:")
print(f"   - Training Samples: {stay_count} Stay, {cross_count} Cross")
print(f"‚öñÔ∏è Adjusted pos_weight for 'Cross': {pos_weight_val:.2f}")

# --- 4. MODEL INITIALIZATION ---
model = IntentionLSTM(input_size=6, hidden_size=64, num_layers=2, output_size=1).to(device)

# --- 5. STRATEGY: OPTIMIZER & LOSS FUNCTION ---
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-3)
criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights)

print("\nüõ°Ô∏è Safety Mode Active: Pedestrian crossing is prioritized.")
print("üöÄ Ready for training.")

# DATA LOADERS

In [None]:
# --- 3. PREPARE THE DATA FLOW (DataLoader) ---
# CRITICAL: We turn the Numpy arrays into PyTorch Tensors and SHUFFLE them.
# Shuffling ensures the model doesn't see a block of '0's then a block of '1's.
train_dataset = torch.utils.data.TensorDataset(torch.Tensor(X_train), torch.Tensor(y_train).reshape(-1, 1))
val_dataset = torch.utils.data.TensorDataset(torch.Tensor(X_val), torch.Tensor(y_val).reshape(-1, 1))

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=False)

import numpy as np

# Convert and save the test data (and train/val if you have them in memory)
# We use .cpu().numpy() to move them from GPU to CPU and convert them
if 'X_test' in globals():
    np.save('/kaggle/working/X_test.npy', X_test)
    np.save('/kaggle/working/y_test.npy', y_test)
    print("‚úÖ X_test.npy and y_test.npy saved!")

if 'X_train' in globals():
    np.save('/kaggle/working/X_train.npy', X_train)
    np.save('/kaggle/working/y_train.npy', y_train)
    print("‚úÖ X_train.npy and y_train.npy saved!")

if 'X_val' in globals():
    np.save('/kaggle/working/X_val.npy', X_val)
    np.save('/kaggle/working/y_val.npy', y_val)
    print("‚úÖ X_val.npy and y_val.npy saved!")

In [None]:
# --- 1. INITIALIZE LISTS ---
train_losses, val_losses = [], []
train_accs, val_accs = [], []

# --- 2. CONFIGURATION ---
model = IntentionLSTM(input_size=6, hidden_size=64, num_layers=2, output_size=1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-3)

PATIENCE = 15
best_val_loss = float('inf')
counter = 0

print("üöÄ Starting Training Loop...")

for epoch in range(40):
    # --- TRAINING PHASE ---
    model.train()
    running_train_loss = 0.0
    correct_train = 0
    
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        
        logits = model(batch_x)
        loss = criterion(logits, batch_y)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_train_loss += loss.item()
        # Calculate accuracy for this batch
        predicted = (torch.sigmoid(logits) > 0.5).float()
        correct_train += (predicted == batch_y).sum().item()

    # --- VALIDATION PHASE ---
    model.eval()
    running_val_loss = 0.0
    correct_val = 0
    
    with torch.no_grad():
        for batch_x, batch_y in val_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)
            v_logits = model(batch_x)
            v_loss = criterion(v_logits, batch_y)
            
            running_val_loss += v_loss.item()
            v_predicted = (torch.sigmoid(v_logits) > 0.5).float()
            correct_val += (v_predicted == batch_y).sum().item()

    # --- CALCULATE FINAL METRICS FOR THIS EPOCH ---
    # These names MUST match the append() lines below
    epoch_train_loss = running_train_loss / len(train_loader)
    epoch_val_loss = running_val_loss / len(val_loader)
    epoch_train_acc = (correct_train / len(train_loader.dataset)) * 100
    epoch_val_acc = (correct_val / len(val_loader.dataset)) * 100
    
    # Store metrics for plotting
    train_losses.append(epoch_train_loss)
    val_losses.append(epoch_val_loss)
    train_accs.append(epoch_train_acc)
    val_accs.append(epoch_val_acc)

    print(f"Epoch [{epoch+1}] | Train Acc: {epoch_train_acc:.2f}% | Val Acc: {epoch_val_acc:.2f}%")

    # --- EARLY STOPPING & SAVING ---
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        torch.save(model.state_dict(), 'best_pedestrian_model.pth')
        print(f"‚≠ê New Best Model Saved! Loss: {epoch_val_loss:.4f}")
        counter = 0
    else:
        counter += 1
        if counter >= PATIENCE:
            print(f"üõë Early Stopping triggered at epoch {epoch+1}")
            break

Traning Graphics

In [None]:
import matplotlib.pyplot as plt
import os
from datetime import datetime

def plot_and_save_training_results(t_loss, v_loss, t_acc, v_acc):
    """
    Plots training and validation metrics and saves the figure with a timestamp.
    """
    epochs = range(1, len(t_loss) + 1)
    
    # Create the figure
    fig = plt.figure(figsize=(15, 6))

    # 1. LOSS GRAPH
    plt.subplot(1, 2, 1)
    plt.plot(epochs, t_loss, color='blue', linestyle='-', marker='o', label='Train Loss', linewidth=2)
    plt.plot(epochs, v_loss, color='red', linestyle='-', marker='o', label='Val Loss', linewidth=2)
    plt.title('Model Loss (Convergence Analysis)', fontsize=14)
    plt.xlabel('Epochs')
    plt.ylabel('Loss Value')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)

    # 2. ACCURACY GRAPH
    plt.subplot(1, 2, 2)
    # Separated color='orange' from the shorthand to fix the ValueError
    plt.plot(epochs, t_acc, color='green', linestyle='--', marker='s', label='Train Acc', markersize=6)
    plt.plot(epochs, v_acc, color='orange', linestyle='--', marker='s', label='Val Acc', markersize=6)
    plt.title('Model Accuracy (Generalization Analysis)', fontsize=14)
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.7)

    plt.tight_layout()

    # --- SAVE LOGIC ---
    # Ensure OUTPUT_DIR is defined (e.g., OUTPUT_DIR = '/kaggle/working/perception_results')
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"training_metrics_{timestamp}.png"
    
    # Using a fallback if OUTPUT_DIR isn't set globally
    try:
        save_path = os.path.join(OUTPUT_DIR, filename)
    except NameError:
        save_path = filename 

    plt.savefig(save_path, dpi=300) 
    print(f"üìà Training graphs saved successfully at: {save_path}")
    
    plt.show()

# --- EXECUTION ---
plot_and_save_training_results(train_losses, val_losses, train_accs, val_accs)

# Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import torch
import os
from datetime import datetime

# --- 1. DEFINE OUTPUT PATH ---
# We define it here to prevent the NameError
OUTPUT_DIR = '/kaggle/working/perception_results'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# --- 2. LOAD MODEL ---
# Ensure your model file exists in /kaggle/working/
model.load_state_dict(torch.load('best_pedestrian_model.pth'))
model.to(device)
model.eval()

all_preds = []
all_labels = []

# --- 3. GET PREDICTIONS ---
with torch.no_grad():
    for batch_x, batch_y in val_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)
        outputs = model(batch_x)
        preds = (torch.sigmoid(outputs) > 0.5).float()
        
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

# --- 4. PLOT & SAVE HEATMAP ---
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Greens', 
            xticklabels=['Stay', 'Cross'], 
            yticklabels=['Stay', 'Cross'],
            annot_kws={"size": 16, "weight": "bold"})

plt.xlabel('AI Prediction', fontsize=12)
plt.ylabel('Actual Reality (Label)', fontsize=12)
plt.title('Confusion Matrix: Pedestrian Intention Analysis', fontsize=14)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
cm_filename = f"confusion_matrix_{timestamp}.png"
save_path_cm = os.path.join(OUTPUT_DIR, cm_filename)

plt.savefig(save_path_cm, dpi=300)
print(f"‚úÖ Heatmap saved as: {save_path_cm}")
plt.show()

# --- 5. SAVE TEXT REPORT ---
report = classification_report(all_labels, all_preds, target_names=['Stay', 'Cross'])
report_filename = f"classification_report_{timestamp}.txt"
save_path_report = os.path.join(OUTPUT_DIR, report_filename)

with open(save_path_report, "w") as f:
    f.write("--- Pedestrian Intention Classification Report ---\n")
    f.write(report)

print(f"‚úÖ Text report saved as: {save_path_report}")
print("\n--- Classification Report ---\n")
print(report)

# Prediction Script

In [None]:
import torch
import numpy as np

def predict_pedestrian_intention(sequence_data, model, scaler, device, threshold=0.5):
    """
    Predicts if a pedestrian will cross based on a sequence of 10 frames.
    
    Args:
        sequence_data (np.array): Shape (10, 6) -> 10 frames of [x, y, vel_x, vel_y, delta_area, aspect_ratio]
        model: The trained LSTM model
        scaler: The StandardScaler used during training
        device: 'cuda' or 'cpu'
        threshold (float): Cutoff for classification (default 0.5)
    """
    model.eval()
    
    # 1. Preprocess the sequence (Scaling)
    # Scaler expects (N, 6), so we flatten and then reshape back
    scaled_sequence = scaler.transform(sequence_data)
    
    # 2. Convert to Tensor and add Batch dimension (1, 10, 6)
    input_tensor = torch.Tensor(scaled_sequence).unsqueeze(0).to(device)
    
    # 3. Inference
    with torch.no_grad():
        logits = model(input_tensor)
        probability = torch.sigmoid(logits).item() # Convert to 0.0 - 1.0 range
    
    # 4. Interpret Result
    intention = "CROSSING" if probability > threshold else "STAYING"
    confidence = probability if intention == "CROSSING" else (1 - probability)
    
    return intention, confidence, probability

# --- EXAMPLE USAGE ---

# Let's pretend we have 10 frames of a pedestrian walking toward the street
# Note: In a real scenario, you would grab these from your 'df_test'
sample_sequence = X_test[0] # Taking the first sequence from your test set

# Get prediction
label, certitude, raw_prob = predict_pedestrian_intention(
    sample_sequence, 
    model, 
    scaler, # Use the scaler instance from your preprocessing cell
    device
)

print(f"--- Real-Time Prediction ---")
print(f"Predicted Action: {label}")
print(f"Confidence: {certitude:.2%} (Raw Score: {raw_prob:.4f})")

# AI's Internal Debate

In [None]:
# Check 10 random pedestrians from the test set
import random

print(f"{'Index':<8} | {'Prediction':<10} | {'Confidence':<12} | {'Actual':<10}")
print("-" * 50)

for i in range(10):
    idx = random.randint(0, len(X_test) - 1)
    
    # Get raw data and label
    input_data = X_test[idx]
    actual_label = y_test[idx]
    
    # Inference
    input_tensor = torch.Tensor(input_data).unsqueeze(0).to(device)
    with torch.no_grad():
        prob = torch.sigmoid(model(input_tensor)).item()
    
    pred_label = "CROSS" if prob > 0.5 else "STAY"
    truth = "CROSS" if actual_label == 1 else "STAY"
    conf = prob if prob > 0.5 else (1 - prob)
    
    # Color-coded logic (symbolic)
    status = "‚úÖ" if pred_label == truth else "‚ùå"
    
    print(f"{idx:<8} | {pred_label:<10} | {conf:<12.2%} | {truth:<10} {status}")

# OpenCV Visualization

"Serialization and Deployment Phase."

In [None]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Estamos usando: {device}") # DEBE DECIR 'cuda'

In [None]:
import torch
import joblib
import os


# --- STEP A: SAVE ---
torch.save(model.state_dict(), 'lstm_intention_model.pth')
joblib.dump(scaler, 'data_scaler.pkl')

# --- STEP B: VERIFY ---
if os.path.exists('lstm_intention_model.pth') and os.path.exists('data_scaler.pkl'):
    print("‚úÖ Success! Files are saved and visible.")
    print(f"Model size: {os.path.getsize('lstm_intention_model.pth') / 1024:.2f} KB")
else:
    print("‚ùå Error: Files were not saved correctly.")

# --- STEP C: LOAD ---
# Now we load them back to be 100% sure they work
model.load_state_dict(torch.load('lstm_intention_model.pth'))
scaler = joblib.load('data_scaler.pkl')
model.eval()
print("üß† Model and Scaler are now loaded in memory.")

In [None]:
import pandas as pd

# 1. Cargar el DataFrame de prueba (antes de convertirlo a secuencias LSTM)
df_test = pd.read_csv('master_test_dataset.csv') 

# Aseg√∫rate de que los nombres de las columnas coincidan con los que usa la funci√≥n
# (video_id, frame, x, y, w, h, etc.)
print(df_test.head())
print(f"Total de filas en df_test: {len(df_test)}")
print(f"Total de secuencias en X_test: {len(X_test)}")

In [None]:
import cv2
import os
import torch
import matplotlib.pyplot as plt

def draw_pedestrian_logic(img, box, prob):
    """
    Draws a bounding box and prediction text on the image.
    Green = Stay, Red = Cross
    """
    x, y, w, h = box
    
    # Threshold: If prob > 0.5, we predict 'CROSS'
    if prob > 0.5:
        color = (0, 0, 255) # Red in BGR
        label = f"CROSSING ({prob:.1%})"
    else:
        color = (0, 255, 0) # Green in BGR
        label = f"STAYING ({1-prob:.1%})"
        
    # Draw the rectangle
    cv2.rectangle(img, (x, y), (x + w, y + h), color, 3)
    
    # Draw a background label for better readability
    cv2.rectangle(img, (x, y - 35), (x + 220, y), color, -1)
    cv2.putText(img, label, (x + 5, y - 10), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
    
    return img

# Configuration for Kaggle environment
PATH_TO_IMAGES = '/kaggle/input/vehic-ped-intuition/images/test/' 

import numpy as np
import pandas as pd

# Cargar las secuencias (X)
X_test = np.load('/kaggle/input/phase-3-dataset/X_test.npy')

# Cargar el CSV (df_test) para tener las etiquetas y nombres de archivos
# Aseg√∫rate de que la ruta sea la correcta para tu archivo CSV de test
df_test = pd.read_csv('/kaggle/input/phase-3-dataset/master_test_dataset.csv') 

print(f"‚úÖ X_test cargado con forma: {X_test.shape}")
print(f"‚úÖ df_test cargado con {len(df_test)} filas")

def visualize_real_prediction(sequence_idx, X_data, df_original):
    """
    Visualizes the LSTM prediction on actual video frames.
    Maps the processed sequence back to the specific image file.
    """
    model.eval()
    
    # 1. Retrieve metadata from the final frame of the 10-frame sequence
    # Since X_test[0] represents frames 0-9, we look at index + 9
    final_info = df_original.iloc[sequence_idx + 9]
    
    # 2. Format Video ID (e.g., 8 -> video_0008)
    v_id = int(final_info['video_id'])
    video_folder = f"video_{v_id:04d}"
    
    # 3. FREQUENCY ADJUSTMENT:
    # Our CSV uses a relative index (0, 1, 2...), but the files are saved 
    # every 5 frames (0, 5, 10...). We multiply by 5 to find the real file.
    csv_frame = int(final_info['frame'])
    actual_frame = csv_frame * 5 
    
    file_name = f"{video_folder}_f{actual_frame:04d}.jpg"
    full_path = os.path.join(PATH_TO_IMAGES, file_name)
    
    # 4. Load the image
    img = cv2.imread(full_path)
    if img is None:
        print(f"‚ùå File not found: {full_path}")
        print("üí° Tip: Check if the video frames start at f0000 or f0005")
        return

    # 5. Model Inference
    # Extract the sequence and convert to tensor for the LSTM
    sequence = X_data[sequence_idx]
    input_tensor = torch.Tensor(sequence).unsqueeze(0).to(device)
    with torch.no_grad():
        # Apply Sigmoid to get a probability between 0 and 1
        probability = torch.sigmoid(model(input_tensor)).item()

    # 6. Computer Vision Drawing
    # Get coordinates and draw the prediction box
    x_coord = int(final_info['x'])
    y_coord = int(final_info['y'])
    bounding_box = [x_coord - 30, y_coord - 100, 60, 130] 
    final_img = draw_pedestrian_logic(img, bounding_box, probability)

    # 7. Display Result
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(final_img, cv2.COLOR_BGR2RGB))
    
    ground_truth = "CROSS" if final_info['label'] == 1 else "STAY"
    plt.title(f"Real-Time Visualization | CSV Frame: {csv_frame} -> File: f{actual_frame:04d}\nAI Confidence: {probability:.2%} | Ground Truth: {ground_truth}")
    plt.axis('off')
    plt.show()

# --- EXECUTE ---
visualize_real_prediction(0, X_test, df_test)

In [None]:
import os
import random
import cv2
import torch
import matplotlib.pyplot as plt
from datetime import datetime

# Define the output directory in Kaggle's working space
OUTPUT_DIR = '/kaggle/working/perception_results'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

# Fix: Define the missing path variable
IMAGES_BASE_PATH = '/kaggle/input/vehic-ped-intuition/images/test/' 

# Double-check that your drawing function is also in memory
# If you get an error for 'draw_pedestrian_logic', run that function definition again.

def run_random_perception_test_and_save(num_frames, X_data, original_df):
    """
    Selects a random video, predicts intention, and saves the output with a timestamp.
    """
    # 1. Random Selection Logic
    unique_videos = original_df['video_id'].unique()
    random_video = random.choice(unique_videos)
    video_indices = original_df[original_df['video_id'] == random_video].index
    start_idx_in_x = max(0, video_indices[0]) 
    
    # 2. Generate Timestamp for the filename
    # Format: YYYYMMDD_HHMMSS (e.g., 20260113_143005)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    save_name = f"video_{random_video:04d}_{timestamp}.png"
    save_path = os.path.join(OUTPUT_DIR, save_name)

    # 3. Visualization Pipeline
    plt.figure(figsize=(20, 10))
    
    for i in range(num_frames):
        current_idx = start_idx_in_x + i
        if current_idx >= len(X_data): break
        
        metadata = original_df.iloc[current_idx + 9]
        video_id = int(metadata['video_id'])
        frame_number = int(metadata['frame']) * 5 
        
        # Construct Image Path
        img_name = f"video_{video_id:04d}_f{frame_number:04d}.jpg"
        full_path = os.path.join(IMAGES_BASE_PATH, img_name)
        
        frame = cv2.imread(full_path)
        if frame is None: continue
        
        # LSTM Inference
        sequence = torch.Tensor(X_data[current_idx]).unsqueeze(0).to(device)
        with torch.no_grad():
            prediction_prob = torch.sigmoid(model(sequence)).item()
            
        # Draw Bounding Box and Label
        x, y = int(metadata['x']), int(metadata['y'])
        bbox = [x - 30, y - 100, 60, 130] 
        processed_img = draw_pedestrian_logic(frame, bbox, prediction_prob)
        
        # Subplot setup
        plt.subplot(1, num_frames, i + 1)
        plt.imshow(cv2.cvtColor(processed_img, cv2.COLOR_BGR2RGB))
        color = 'red' if prediction_prob > 0.5 else 'green'
        plt.title(f"T+{i}\nProb: {prediction_prob:.1%}", color=color, fontsize=12, fontweight='bold')
        plt.axis('off')
        
    plt.tight_layout()
    
    # 4. Save and Close
    plt.savefig(save_path)
    print(f"‚úÖ Successfully saved: {save_name}")
    plt.show()
    plt.close() # Free up memory

# --- RUN MULTIPLE TESTS ---
# This will create 5 unique files in your output folder
for i in range(5):
    run_random_perception_test_and_save(5, X_test, df_test)

In [None]:
import torch
import torch.nn as nn
from ultralytics import YOLO

# 1. Re-define the Architecture (Must match your trained model)
class IntentionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(IntentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        out = self.fc(hn[-1])
        return out

# 2. Initialize and Load the LSTM
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lstm_path = '/kaggle/input/phase-3-lstm-yolo/Phase_3_models/lstm_intention_model.pth'

# Note: Adjust input_size, hidden_size, num_layers to your training config
model = IntentionLSTM(input_size=6, hidden_size=64, num_layers=2).to(device)
model.load_state_dict(torch.load(lstm_path, map_location=device))
model.eval()
print("‚úÖ LSTM Model loaded successfully")

# 3. Initialize YOLO
yolo_model = YOLO('yolo11n.pt') 
print("‚úÖ YOLO Model initialized")

# 4. Run the Pipeline
import cv2
import torch
import numpy as np
import glob
import os
from tqdm import tqdm

def process_image_folder(folder_path, output_path, yolo_model, lstm_model, scaler):
    # 1. Get and sort images (to ensure they are in temporal order)
    images = sorted(glob.glob(os.path.join(folder_path, "*.jpg"))) # or .png
    if not images:
        print(f"‚ùå No images found in {folder_path}")
        return

    # Read first image to get dimensions
    first_frame = cv2.imread(images[0])
    height, width, _ = first_frame.shape
    
    # Setup VideoWriter to save the results as a video
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, 10, (width, height)) # 10 FPS for sequence

    history = {} 
    print(f"üé¨ Processing {len(images)} frames from folder...")

    for img_path in tqdm(images):
        frame = cv2.imread(img_path)
        
        # 2. YOLO Tracking
        # Note: We use 'persist=True' to keep the IDs across separate images
        results = yolo_model.track(frame, persist=True, classes=[0], verbose=False)

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xywh.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)

            for box, id in zip(boxes, ids):
                x, y, w, h = box
                # Features must match your training (x, y, w, h, ratio, dummy)
                features = [x, y, w, h, w/h, 1.0] 

                if id not in history: history[id] = []
                history[id].append(features)
                
                # Maintain the same window size as your training (e.g., 10 frames)
                if len(history[id]) > 10: history[id].pop(0)

                if len(history[id]) == 10:
                    seq_scaled = scaler.transform(np.array(history[id]))
                    input_tensor = torch.FloatTensor(seq_scaled).unsqueeze(0).to(device)
                    
                    with torch.no_grad():
                        # LSTM Prediction
                        prob = torch.sigmoid(lstm_model(input_tensor)).item()

                    # 3. Visual Feedback
                    color = (0, 0, 255) if prob > 0.5 else (0, 255, 0)
                    label = "CROSSING" if prob > 0.5 else "STAYING"
                    x1, y1, x2, y2 = int(x-w/2), int(y-h/2), int(x+w/2), int(y+h/2)
                    
                    cv2.rectangle(frame, (x1, y1), (x2, y2), color, 3)
                    cv2.putText(frame, f"ID:{id} {label} {prob:.1%}", (x1, y1-15), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        out.write(frame)

    out.release()
    print(f"‚úÖ Video saved to: {output_path}")

# --- EXECUTION ---
# Update this folder path to one of your test frame folders
IMAGE_FOLDER = "/kaggle/input/vehic-ped-intuition/images/test" 
OUTPUT_VIDEO = "/kaggle/working/perception_results/demo_from_frames.mp4"

process_image_folder(IMAGE_FOLDER, OUTPUT_VIDEO, yolo_model, model, scaler)

In [None]:
import cv2
import torch
import numpy as np
import glob
import os
from tqdm import tqdm

def process_video_realtime(folder_path, video_id, output_path, yolo_model, lstm_model, scaler):
    search_pattern = os.path.join(folder_path, f"{video_id}_*.jpg")
    all_frames = sorted(glob.glob(search_pattern))
    
    if not all_frames: 
        print(f"‚ùå No se encontraron frames para {video_id}")
        return

    sample_img = cv2.imread(all_frames[0])
    h, w, _ = sample_img.shape
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 20, (w, h))

    # Diccionario para guardar la trayectoria de cada peat√≥n
    history = {} 
    
    for frame_idx, img_path in enumerate(tqdm(all_frames, desc=f"Video {video_id}")):
        frame = cv2.imread(img_path)
        if frame is None: continue

        results = yolo_model.track(frame, persist=True, classes=[0], conf=0.2, verbose=False, device=device)

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xywh.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)

            for box, id_ped in zip(boxes, ids):
                xc, yc, wb, hb = box
                current_features = [xc, yc, wb, hb, wb/hb, 1.0]

                if id_ped not in history: 
                    history[id_ped] = []
                
                # Agregamos CADA frame para mantener la fluidez del movimiento
                history[id_ped].append(current_features)
                
                # Mantenemos solo los √∫ltimos 10 frames
                if len(history[id_ped]) > 10: 
                    history[id_ped].pop(0)

                # --- VALORES POR DEFECTO (Para evitar el NameError) ---
                color = (0, 255, 0) # Verde (Staying)
                label = "WAITING..." # Mientras recolecta los 10 frames iniciales
                
                # --- PREDICCI√ìN CON LSTM ---
                if len(history[id_ped]) == 10:
                    seq_array = np.array(history[id_ped])
                    seq_scaled = scaler.transform(seq_array)
                    input_tensor = torch.FloatTensor(seq_scaled).unsqueeze(0).to(device)
                    
                    with torch.no_grad():
                        prob = torch.sigmoid(lstm_model(input_tensor)).item()
                    
                    # Umbral de sensibilidad 0.3
                    if prob > 0.25:
                        color = (0, 0, 255) # Rojo
                        label = f"CROSSING {prob:.0%}"
                    else:
                        color = (0, 255, 0) # Verde
                        label = f"STAYING {prob:.0%}"

                # --- DIBUJAR EN EL FRAME ---
                x1, y1 = int(xc - wb/2), int(yc - hb/2)
                x2, y2 = int(xc + wb/2), int(yc + hb/2)
                
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                # Fondo para el texto
                cv2.rectangle(frame, (x1, y1 - 20), (x1 + 130, y1), color, -1)
                cv2.putText(frame, f"ID:{id_ped} {label}", (x1 + 2, y1 - 5), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

        out.write(frame)
    
    out.release()
    print(f"‚úÖ Video generado con √©xito en: {output_path}")

# --- EJECUCI√ìN ---
TARGET = "video_0039" 
OUTPUT = f"/kaggle/working/perception_results/demo_{TARGET}.mp4"

process_video_realtime(INPUT_FOLDER, TARGET, OUTPUT, yolo_model, model, scaler)

# Initializing Models for Pipeline YOLO+LSTM and CV

In [None]:
import torch
import torch.nn as nn
from ultralytics import YOLO

# 1. Re-define the Architecture (Must match your trained model)
class IntentionLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(IntentionLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        _, (hn, _) = self.lstm(x)
        out = self.fc(hn[-1])
        return out

# 2. Initialize and Load the LSTM
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
lstm_path = '/kaggle/input/phase-3-lstm-yolo/Phase_3_models/lstm_intention_model.pth'

# Note: Adjust input_size, hidden_size, num_layers to your training config
model = IntentionLSTM(input_size=6, hidden_size=64, num_layers=2).to(device)
model.load_state_dict(torch.load(lstm_path, map_location=device))
model.eval()
print("‚úÖ LSTM Model loaded successfully")

# 3. Initialize YOLO
yolo_model = YOLO('yolo11n.pt') 
print("‚úÖ YOLO Model initialized")


In [None]:
import numpy as np
from sklearn.preprocessing import StandardScaler

# 1. Cargar tus datos de entrenamiento
X_train = np.load('/kaggle/input/phase-3-dataset/X_train.npy')

# 2. Reshape a 2D para el scaler (de [N, 10, 6] a [N*10, 6])
# Esto asume que tus datos tienen 6 columnas de caracter√≠sticas
X_train_reshaped = X_train.reshape(-1, X_train.shape[-1])

# 3. Inicializar y ajustar el scaler
scaler = StandardScaler()
scaler.fit(X_train_reshaped)

print(f"‚úÖ Scaler ajustado con √©xito usando X_train de forma {X_train.shape}")
print(f"üìä Media calculada para xc: {scaler.mean_[0]:.2f}, yc: {scaler.mean_[1]:.2f}")

In [None]:
import cv2
import torch
import numpy as np
import glob
import os
from tqdm import tqdm

def process_video_realtime(folder_path, video_id, output_path, yolo_model, lstm_model, scaler):
    """
    Processes video frames, tracks pedestrians with YOLO, 
    and predicts intention (Crossing/Staying) using an LSTM model.
    """
    # 1. Setup Frame Paths
    search_pattern = os.path.join(folder_path, f"{video_id}_*.jpg")
    all_frames = sorted(glob.glob(search_pattern))
    
    if not all_frames: 
        print(f"‚ùå No frames found for {video_id}")
        return

    # 2. Initialize Video Writer
    sample_img = cv2.imread(all_frames[0])
    h, w, _ = sample_img.shape
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), 20, (w, h))

    # 3. History Dictionary for Temporal Tracking
    history = {} 
    
    for frame_idx, img_path in enumerate(tqdm(all_frames, desc=f"Processing {video_id}")):
        frame = cv2.imread(img_path)
        if frame is None: continue

        # 4. YOLO Tracking (Pedestrians only: class 0)
        results = yolo_model.track(frame, persist=True, classes=[0], conf=0.2, verbose=False, device=device)

        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xywh.cpu().numpy()
            ids = results[0].boxes.id.cpu().numpy().astype(int)

            for box, id_ped in zip(boxes, ids):
                xc, yc, wb, hb = box
                # Feature vector: [xc, yc, w, h, ratio, constant]
                current_features = [xc, yc, wb, hb, wb/hb, 1.0]

                if id_ped not in history: 
                    history[id_ped] = []
                
                # Append current frame data to history
                history[id_ped].append(current_features)
                
                # Keep only the last 10 frames (LSTM window)
                if len(history[id_ped]) > 10: 
                    history[id_ped].pop(0)

                # --- DEFAULT VALUES (Staying State) ---
                color = (0, 255, 0) # Green
                label = "ANALYZING..." 
                
                # --- LSTM INFERENCE ---
                if len(history[id_ped]) == 10:
                    seq_array = np.array(history[id_ped])
                    seq_scaled = scaler.transform(seq_array)
                    input_tensor = torch.FloatTensor(seq_scaled).unsqueeze(0).to(device)
                    
                    with torch.no_grad():
                        # Get probability from the model
                        prob = torch.sigmoid(lstm_model(input_tensor)).item()
                    
                    # BI-LEVEL CLASSIFICATION (Threshold: 0.25)
                    if prob > 0.25:
                        color = (0, 0, 255) # Red for danger
                        label = f"CROSSING {prob:.0%}"
                    else:
                        color = (0, 255, 0) # Green for safe
                        label = f"STAYING {prob:.0%}"

                # --- DRAWING ON FRAME ---
                x1, y1 = int(xc - wb/2), int(yc - hb/2)
                x2, y2 = int(xc + wb/2), int(yc + hb/2)
                
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                # Text Background
                cv2.rectangle(frame, (x1, y1 - 20), (x1 + 140, y1), color, -1)
                cv2.putText(frame, f"ID:{id_ped} {label}", (x1 + 2, y1 - 5), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)

        out.write(frame)
    
    out.release()
    print(f"‚úÖ Video successfully saved to: {output_path}")

# --- BATCH EXECUTION ---
video_list = [
    'video_0008', 'video_0024', 'video_0039', 'video_0042', 
    'video_0048', 'video_0054', 'video_0078', 'video_0081', 
    'video_0084', 'video_0085'
]

INPUT_DIR = "/kaggle/input/vehic-ped-intuition/images/test"
OUTPUT_DIR = "/kaggle/working/final_results"
if not os.path.exists(OUTPUT_DIR): os.makedirs(OUTPUT_DIR)

for vid in video_list:
    save_path = os.path.join(OUTPUT_DIR, f"result_{vid}.mp4")
    process_video_realtime(INPUT_DIR, vid, save_path, yolo_model, model, scaler)

# --- ZIP AND DOWNLOAD ---
!zip -j final_submission_videos.zip /kaggle/working/final_results/*.mp4
print("üöÄ DONE! Download 'final_submission_videos.zip' from the Output panel.")

In [None]:
import cv2
import glob
from ultralytics import YOLO

# 1. Aseg√∫rate de que el modelo est√© limpio
yolo_model = YOLO('yolov8n.pt').to(device)

# 2. Tomar una secuencia (no solo uno)
video_path = "/kaggle/input/vehic-ped-intuition/images/test/video_0039_*.jpg"
frames_paths = sorted(glob.glob(video_path))[:10] # Tomamos los primeros 10

print(f"Probando secuencia de {len(frames_paths)} frames...")

for p in frames_paths:
    img = cv2.imread(p)
    # Bajamos conf para forzar detecci√≥n
    results = yolo_model.track(img, persist=True, conf=0.1, device=device)
    
    if results[0].boxes.id is not None:
        ids = results[0].boxes.id.cpu().numpy()
        print(f"‚úÖ Frame {p.split('_')[-1]}: IDs detectados: {ids}")
    else:
        print(f"‚ùå Frame {p.split('_')[-1]}: No se detectaron IDs (solo cajas: {len(results[0].boxes)})")

In [None]:
from ultralytics import YOLO
yolo_model = YOLO('yolo11n.pt') # O el modelo que est√©s usando
yolo_model.to(device)

# Prueba r√°pida con un frame del video 0024
test_img_path = "/kaggle/input/vehic-ped-intuition/images/test/video_0024_f0155.jpg"
test_frame = cv2.imread(test_img_path)


if test_frame is not None:
    res = yolo_model.track(test_frame, persist=True, classes=[0], device=device)
    if res[0].boxes.id is not None:
        print(f"‚úÖ YOLO funciona: Detectados {len(res[0].boxes.id)} peatones con ID.")
    else:
        print("‚ùå YOLO detecta, pero NO hay IDs. Revisa el tracker.")
else:
    print("‚ùå No se pudo leer la imagen. Revisa la ruta.")

In [None]:
# Comprimir solo los archivos .mp4 sin incluir las carpetas
!zip -j final_demos_videos.zip /kaggle/working/perception_results/*.mp4

print("‚úÖ ¬°ZIP creado con √©xito! Solo contiene los archivos .mp4.")
print("Descarga 'final_demos_videos.zip' desde el panel derecho (Data > Output).")

In [None]:
import shutil
import os

# --- 1. PREPARE THE RESULTS DIRECTORY ---
results_dir = '/kaggle/working/perception_results'
os.makedirs(results_dir, exist_ok=True)

# List of specific files you mentioned to include in the Results ZIP
# This ensures your best metrics are packaged together
important_files = [
    '/kaggle/working/training_metrics_20260113_145818.png',
    '/kaggle/working/training_metrics_20260113_145925.png',
    '/kaggle/working/perception_results/classification_report_20260113_150426.txt',
    '/kaggle/working/perception_results/confusion_matrix_20260113_150426.png'
]

for file_path in important_files:
    if os.path.exists(file_path):
        # Copy file to the results folder if it's not already there
        if os.path.dirname(file_path) != results_dir:
            shutil.copy(file_path, results_dir)
    else:
        print(f"‚ö†Ô∏è File not found for ZIP: {file_path}")

# --- 2. CREATE THE ZIP FILES ---

# ZIP 1: Phase 3 Results (Metrics + Report + Matrix + Prediction visuals)
shutil.make_archive('Phase_3_Results', 'zip', results_dir)
print("‚úÖ Created: Phase_3_Results.zip")

# ZIP 2: Tracking Results (The /runs folder from YOLO/Tracking)
runs_dir = '/kaggle/working/runs'
if os.path.exists(runs_dir):
    shutil.make_archive('tracking_results', 'zip', runs_dir)
    print("‚úÖ Created: tracking_results.zip")

# ZIP 3: Phase 3 Models (Weights .pth, Scaler .pkl, and Config .yaml)
models_temp_dir = '/kaggle/working/temp_models'
os.makedirs(models_temp_dir, exist_ok=True)

models_to_save = [
    '/kaggle/working/best_pedestrian_model.pth',
    '/kaggle/working/data_scaler.pkl',
    '/kaggle/working/lstm_intention_model.pth',
    '/kaggle/working/data.yaml'
]

for model_file in models_to_save:
    if os.path.exists(model_file):
        shutil.copy(model_file, models_temp_dir)

shutil.make_archive('Phase_3_models', 'zip', models_temp_dir)
shutil.rmtree(models_temp_dir) # Clean up temporary folder
print("‚úÖ Created: Phase_3_models.zip")

print("\nüì¶ DONE! You can now download the 3 ZIP files from the 'Output' pane on the right.")

In [None]:
import shutil
import os
import glob

# --- 1. SETUP DIRECTORIES ---
dataset_temp_dir = '/kaggle/working/temp_dataset_master'
os.makedirs(dataset_temp_dir, exist_ok=True)

# --- 2. DEFINE FILE PATHS ---
# CSV Files (Tabular data)
csv_files = [
    '/kaggle/working/balanced_train_dataset.csv',
    '/kaggle/working/master_test_dataset.csv',
    '/kaggle/working/master_train_dataset.csv',
    '/kaggle/working/master_val_dataset.csv'
]

# Config and NumPy files (The "Actual" LSTM inputs)
config_files = ['/kaggle/working/data.yaml']
npy_files = glob.glob('/kaggle/working/*.npy') # This will now find the files we just saved

all_dataset_files = csv_files + config_files + npy_files

print("üì¶ Gathering files for Dataset Master...")
for file_path in all_dataset_files:
    if os.path.exists(file_path):
        shutil.copy(file_path, dataset_temp_dir)
        print(f"‚úÖ Included: {os.path.basename(file_path)}")
    else:
        print(f"‚ö†Ô∏è Warning: File not found -> {file_path}")

# --- 3. CREATE THE MASTER ZIP ---
zip_filename = '/kaggle/working/Phase_3_Dataset_Master'
shutil.make_archive(zip_filename, 'zip', dataset_temp_dir)

# Clean up
shutil.rmtree(dataset_temp_dir)

print(f"\nüöÄ SUCCESS! '{zip_filename}.zip' is ready for download.")
print("This package is now complete with CSVs, NumPy tensors, and the YAML config.")

# Model Performance Summary
This project developed a Temporal Intention Predictor using a Long Short-Term Memory (LSTM) network to classify pedestrian behavior into two categories: Staying (Safe) or Crossing (Hazard). The model processes 10-frame sequences of spatial and kinematic data ($x, y, v_x, v_y, \text{area delta, aspect ratio}$) to predict future intent.

After addressing severe class imbalance through Minority Class Oversampling and stabilizing gradients with Standard Scaling, the model achieved the following results on the unseen test set:

Metric,Value,Interpretation
Overall Accuracy,84%,High reliability across the entire dataset.
Crossing Recall,86%,Successfully identifies the vast majority of pedestrians entering the roadway.
Crossing Precision,96%,"Extremely low ""False Alarm"" rate for crossing events."
Staying Recall,60%,"Significant Achievement: Captured the majority of the minority ""staying"" class."

**Vehicular Technology Insights (Failure Mode Analysis)**

While the model shows high confidence (up to 99.9%) on clear trajectories, the Failure Analysis of the 16% error rate reveals critical insights for autonomous vehicle (AV) deployment:

Temporal Latency: Incorrect predictions (e.g., aIndex 291) often occur when a pedestrian's transition from static to active movement is faster than the 10-frame observation window (approx. 0.3‚Äì0.5 seconds).

Safety Thresholding: To mitigate the risk of "False Negatives" (predicting Stay when they Cross), a safety-critical system should implement a Non-Symmetric Threshold.

Example: Trigger braking at 30% Crossing Probability but only resume acceleration at 10% Probability.

# System Architecture for Deployment
The system is serialized into two lightweight components for real-time edge computing (e.g., NVIDIA Jetson):

lstm_intention_model.pth: A 2-layer LSTM state-dict (approx. 150 KB).

data_scaler.pkl: A Scikit-Learn StandardScaler object to ensure input consistency.

In [None]:
# Final Performance Note
print("--- Final Model Statistics ---")
print(f"Validation Accuracy: {max(val_accs):.2f}%")
print(f"Best Loss reached at Epoch: 4")
print("Status: DEPLOYMENT READY")

# Save the summary to a text file for your report
with open('performance_summary.txt', 'w') as f:
    f.write(f"Model: LSTM Intention Predictor\n")
    f.write(f"Accuracy: {max(val_accs):.2f}%\n")
    f.write(f"Class 0 (Stay) Recall: 0.60\n")
    f.write(f"Class 1 (Cross) Recall: 0.86\n")

Configuration (All Thresholds in One Place)

RAW TRACKING
‚Üí ‚ÄúWhat does YOLO + tracker see?‚Äù

FILTERING & RELEVANCE SELECTION
‚Üí ‚ÄúWhich pedestrians matter for intention?‚Äù

DECISION & SEQUENCE PREPARATION
‚Üí ‚ÄúWhich tracks become sequences?‚Äù

In [None]:
# Frames
split = "train"
IMG_DIR = f"/kaggle/input/vehic-ped-intuition/images/{split}"

# YOLO model
MODEL_PATH = "/kaggle/input/first-phase-model/weights/best.pt"
CONF = 0.33
IMGSZ = 640
TRACKER = "botsort.yaml"

# Filtering
MIN_TRACK_LEN = 16
INTENTION_CONF = 0.50

# Relevance (distance proxy)
MIN_MEDIAN_HEIGHT = 90     # px
MIN_HEIGHT_GROWTH = 15     # px

# Duplicate suppression
DUP_IOU_THR = 0.70

# Context crop
EXPAND_RATIO = 1.8

# Outputs
OUT_DIR = "/kaggle/working/smoking_test"
os.makedirs(OUT_DIR, exist_ok=True)

RAW_VIDEO_PATH      = f"{OUT_DIR}/tracking_raw.mp4"
FILTERED_VIDEO_PATH = f"{OUT_DIR}/tracking_filtered.mp4"
DECISION_VIDEO_PATH = f"{OUT_DIR}/tracking_decision.mp4"

CROPS_DIR = f"{OUT_DIR}/crops"         # crops per track and per frame
SEQS_DIR  = f"{OUT_DIR}/sequences"     # sequences saved as folders
FEAT_DIR  = f"{OUT_DIR}/vit_features"  # tensors .pt
for d in [CROPS_DIR, SEQS_DIR, FEAT_DIR]:
    os.makedirs(d, exist_ok=True)

FPS = 10


In [None]:
yolo = YOLO(MODEL_PATH)


Pick One Random Video (ALL Frames)

In [None]:
video_ids = sorted(set(f.split("_f")[0] for f in os.listdir(IMG_DIR)))
video_id = random.choice(video_ids)

frames = sorted([
    os.path.join(IMG_DIR, f)
    for f in os.listdir(IMG_DIR)
    if f.startswith(video_id)
])

print("VIDEO:", video_id)
print("TOTAL FRAMES:", len(frames))


# # Frame loading & raw tracking (NO FILTERING)

Frame loading & raw tracking (NO FILTERING)

Load all frames of one video (from IMG_DIR)

Run YOLO + BoT-SORT on every frame

Track ALL detected pedestrians

Assign track IDs

Draw all bounding boxes + IDs

In [None]:
def draw_tracks_on_frame(img_bgr, frame_idx, tracks, color=(0,255,255), put_conf=False):
    for tid, seq in tracks.items():
        for (fidx, box, conf) in seq:
            if fidx != frame_idx:
                continue
            x1,y1,x2,y2 = map(int, box)
            cv2.rectangle(img_bgr, (x1,y1), (x2,y2), color, 2)
            txt = f"ID {tid}"
            if put_conf:
                txt += f" {conf:.2f}"
            cv2.putText(img_bgr, txt, (x1, max(0,y1-7)),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

def track_video_build_db_and_raw_video(frames):
    track_db = {}
    first = cv2.imread(frames[0])
    H,W = first.shape[:2]
    writer = cv2.VideoWriter(RAW_VIDEO_PATH, cv2.VideoWriter_fourcc(*"mp4v"), FPS, (W,H))

    for fidx, frame_path in enumerate(frames):
        img = cv2.imread(frame_path)
        if img is None:
            continue

        r = yolo.track(img, conf=CONF, imgsz=IMGSZ, persist=True, tracker=TRACKER, verbose=False)[0]

        if r.boxes is not None and r.boxes.id is not None:
            boxes = r.boxes.xyxy.cpu().numpy()
            ids   = r.boxes.id.cpu().numpy().astype(int)
            confs = r.boxes.conf.cpu().numpy()

            for box, tid, c in zip(boxes, ids, confs):
                if tid == -1:
                    continue
                track_db.setdefault(tid, []).append((fidx, box, float(c)))

            # draw from this frame's outputs directly (stronger visual)
            for box, tid, c in zip(boxes, ids, confs):
                if tid == -1:
                    continue
                x1,y1,x2,y2 = map(int, box)
                cv2.rectangle(img, (x1,y1), (x2,y2), (0,255,255), 2)
                cv2.putText(img, f"ID {tid} {c:.2f}", (x1, max(0,y1-7)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2)

        writer.write(img)

    writer.release()
    return track_db

track_db = track_video_build_db_and_raw_video(frames)
print("Saved RAW tracking video:", RAW_VIDEO_PATH)
print("RAW track count:", len(track_db))

# Show top tracks
stats = sorted([(tid, len(seq), seq[0][0], seq[-1][0]) for tid, seq in track_db.items()],
               key=lambda x: x[1], reverse=True)[:10]
print("Top tracks (tid,len,start,end):", stats)


Now we do not touch frames yet ‚Äî we analyze tracks.

For each track ID, we compute:

Track-level metrics (from the config)
Filters: length + relevance + confidence + duplicate suppression

In [None]:
def iou(a, b):
    x1,y1 = max(a[0],b[0]), max(a[1],b[1])
    x2,y2 = min(a[2],b[2]), min(a[3],b[3])
    inter = max(0,x2-x1) * max(0,y2-y1)
    areaA = max(0,(a[2]-a[0])) * max(0,(a[3]-a[1]))
    areaB = max(0,(b[2]-b[0])) * max(0,(b[3]-b[1]))
    return inter / (areaA + areaB - inter + 1e-6)

def bbox_heights(seq):
    return np.array([(b[3]-b[1]) for (_, b, _) in seq])

def is_relevant(seq):
    h = bbox_heights(seq)
    return (np.median(h) >= MIN_MEDIAN_HEIGHT) or ((h[-1]-h[0]) >= MIN_HEIGHT_GROWTH)

def high_conf(seq, thr=0.5):
    return float(np.mean([c for (_,_,c) in seq])) >= thr

def mean_iou_tracks(seq1, seq2):
    n = min(len(seq1), len(seq2))
    if n <= 2:
        return 0.0
    return float(np.mean([iou(seq1[i][1], seq2[i][1]) for i in range(n)]))

def suppress_duplicates(tracks):
    kept = {}
    tids = list(tracks.keys())
    # keep longer tracks first
    tids = sorted(tids, key=lambda t: len(tracks[t]), reverse=True)
    for tid in tids:
        dup = False
        for kt in list(kept.keys()):
            if mean_iou_tracks(tracks[tid], kept[kt]) > DUP_IOU_THR:
                dup = True
                break
        if not dup:
            kept[tid] = tracks[tid]
    return kept


Context-Aware Cropping

In [None]:
# 1) length
t1 = {tid: seq for tid, seq in track_db.items() if len(seq) >= MIN_TRACK_LEN}
print("After MIN_TRACK_LEN:", len(t1))

# 2) relevance
t2 = {tid: seq for tid, seq in t1.items() if is_relevant(seq)}
print("After relevance:", len(t2))

# 3) confidence
t3 = {tid: seq for tid, seq in t2.items() if high_conf(seq, INTENTION_CONF)}
print("After confidence:", len(t3))

# 4) duplicate suppression
filtered_tracks = suppress_duplicates(t3)
print("After duplicate suppression:", len(filtered_tracks))

# show some stats
def track_stats(seq):
    h = bbox_heights(seq)
    return dict(
        length=len(seq),
        median_h=int(np.median(h)),
        growth=int(h[-1]-h[0]),
        mean_conf=float(np.mean([c for (_,_,c) in seq])),
        start=int(seq[0][0]),
        end=int(seq[-1][0]),
    )

for tid in list(filtered_tracks.keys())[:5]:
    print("Track", tid, track_stats(filtered_tracks[tid]))


In [None]:
first = cv2.imread(frames[0])
H,W = first.shape[:2]
writer = cv2.VideoWriter(FILTERED_VIDEO_PATH, cv2.VideoWriter_fourcc(*"mp4v"), FPS, (W,H))

for fidx, frame_path in enumerate(frames):
    img = cv2.imread(frame_path)
    if img is None:
        continue
    draw_tracks_on_frame(img, fidx, filtered_tracks, color=(0,255,0), put_conf=False)
    writer.write(img)

writer.release()
print("Saved FILTERED tracking video:", FILTERED_VIDEO_PATH)


In [None]:
# Pick the longest, most stable track
selected_tid = max(filtered_tracks.keys(),
                   key=lambda t: len(filtered_tracks[t]))

selected_track = filtered_tracks[selected_tid]

print("Selected TID:", selected_tid)
print("Track length:", len(selected_track))


In [None]:
XML_GLOB = "/kaggle/input/attributes-label/annotations_attributes/video_*_*.xml"
xml_files = sorted(glob.glob(XML_GLOB))
print("XML files found:", len(xml_files))

# Find XML candidates containing video_id string
candidates = [x for x in xml_files if video_id in os.path.basename(x)]
print("XML candidates for this video:", len(candidates))
print("Example candidates:", candidates[:3])


In [None]:
xml_path = candidates[0] if len(candidates) > 0 else xml_files[0]
print("Using XML:", xml_path)


In [None]:
xml_label = "crossing" if ped_attrs[0]["crossing"] == 1 else "not_crossing"
print("Assigned label:", xml_label)


In [None]:
cut = int(0.8 * len(selected_track))
usable_seq = selected_track[:cut]

print("Using frames:", usable_seq[0][0], "to", usable_seq[-1][0])


Building Sliding-Window Sequences

In [None]:
SEQ_LEN = 16
STRIDE = 4

def build_windows(seq, T=16, stride=4):
    windows = []
    for i in range(0, len(seq) - T + 1, stride):
        windows.append(seq[i:i+T])
    return windows

windows = build_windows(usable_seq, SEQ_LEN, STRIDE)
print("Total sequences:", len(windows))


======================

# Phase2

In [None]:
import torch
import timm
import os
import cv2
import numpy as np
from PIL import Image
from torchvision import transforms
from tqdm import tqdm

# Constants
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMG_ROOT = '/kaggle/input/vehic-ped-intuition/images'
LBL_ROOT = '/kaggle/input/vehic-ped-intuition/labels'

# Load ViT Model (Base version with 768-dimensional features)
print(f"Loading ViT on {DEVICE}...")
model = timm.create_model('vit_base_patch16_224', pretrained=True, num_classes=0).to(DEVICE)
model.eval()

# Preprocessing for ViT
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
def extract_pedestrian_features(img_path, label_path):
    features_list = []
    
    # Load image
    img = cv2.imread(img_path)
    if img is None: return None
    h, w, _ = img.shape
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # Read YOLO labels
    if os.path.exists(label_path):
        with open(label_path, 'r') as f:
            for line in f.readlines():
                # YOLO format: cls, x_center, y_center, width, height
                _, x, y, nw, nh = map(float, line.split())
                
                # Convert to pixel coordinates
                x1 = int((x - nw/2) * w)
                y1 = int((y - nh/2) * h)
                x2 = int((x + nw/2) * w)
                y2 = int((y + nh/2) * h)

                # Ensure crop is within image bounds
                crop = img_rgb[max(0, y1):min(h, y2), max(0, x1):min(w, x2)]
                
                if crop.size > 0:
                    # Transform and extract features
                    pil_img = Image.fromarray(crop)
                    img_tensor = transform(pil_img).unsqueeze(0).to(DEVICE)
                    
                    with torch.no_grad():
                        feat = model(img_tensor)
                        features_list.append(feat.cpu().numpy().flatten())
    
    return features_list # Returns list of 768-dim vectors

In [None]:
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset

def create_baseline_sequences(features_path, seq_len=10):
    # Load your extracted ViT features
    data = np.load(features_path) 
    
    sequences = []
    # Simplified: We slide a window of 'seq_len' across the frames
    # In the final version, we will group these strictly by Video ID
    for i in range(0, len(data) - seq_len, 5): # Step of 5 to reduce overlap
        sequences.append(data[i : i + seq_len])
    
    return np.array(sequences)

# Prepare Train and Val baseline data
X_train_base = create_baseline_sequences('/kaggle/working/vit_train_features.npy')
X_val_base = create_baseline_sequences('/kaggle/working/vit_val_features.npy')

# Placeholder labels (replace with your actual intent labels)
y_train_base = np.random.randint(0, 2, len(X_train_base))
y_val_base = np.random.randint(0, 2, len(X_val_base))

print(f"Baseline Train Shapes: {X_train_base.shape}") # Goal: (N, 10, 768)

21-

In [None]:
import torch
import torch.nn as nn

# Device
device = "cuda" if torch.cuda.is_available() else "cpu"

# Baseline LSTM model
class BaselineLSTM(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=128):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x: (batch, 10, 768)
        _, (hn, _) = self.lstm(x)      # hn: (num_layers, B, hidden_dim)
        out = self.fc(hn[-1])         # (B, 1)
        return self.sigmoid(out)      # probability in [0,1]

# Initialize model
baseline_model = BaselineLSTM().to(device)

# Check parameters
print(
    "Trainable parameters:",
    sum(p.numel() for p in baseline_model.parameters() if p.requires_grad)
)

22- 

In [None]:
print(X_train_base.shape)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

In [None]:
import torch
import torch.nn as nn

class BaselineLSTM(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=128):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x: (batch, 10, 768)
        _, (hn, _) = self.lstm(x)
        out = self.fc(hn[-1])         # (B, 1)
        return self.sigmoid(out)      # probability

In [None]:
baseline_model = BaselineLSTM().to(device)

print(
    "Trainable parameters:",
    sum(p.numel() for p in baseline_model.parameters() if p.requires_grad)
)

In [None]:
from torch.utils.data import DataLoader, TensorDataset
import torch

train_loader = DataLoader(
    TensorDataset(
        torch.tensor(X_train_base, dtype=torch.float32),
        torch.tensor(y_train_base, dtype=torch.float32)
    ),
    batch_size=32,
    shuffle=True
)

val_loader = DataLoader(
    TensorDataset(
        torch.tensor(X_val_base, dtype=torch.float32),
        torch.tensor(y_val_base, dtype=torch.float32)
    ),
    batch_size=32
)

print("Train batches:", len(train_loader))
print("Val batches:", len(val_loader))

In [None]:
import torch.optim as optim
import torch.nn as nn

optimizer = optim.Adam(baseline_model.parameters(), lr=1e-3)

# Model outputs probability (Sigmoid already applied)
criterion = nn.BCELoss()

print("Optimizer and loss are ready ‚úÖ")

**TRAINING LOOP**

In [None]:
EPOCHS = 15

for epoch in range(EPOCHS):
    baseline_model.train()
    total_loss = 0.0

    for batch_x, batch_y in train_loader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device).view(-1, 1)

        optimizer.zero_grad()
        preds = baseline_model(batch_x)      # probability in [0,1]
        loss = criterion(preds, batch_y)     # BCELoss
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1:02d}/{EPOCHS} | Train Loss: {avg_loss:.4f}")

**VALIDATION / BASELINE ACCURACY**

In [None]:
baseline_model.eval()
correct = 0
total = 0

with torch.no_grad():
    for batch_x, batch_y in val_loader:
        batch_x = batch_x.to(device)
        batch_y = batch_y.to(device).view(-1, 1)

        probs = baseline_model(batch_x)   # ALREADY probability
        preds = (probs > 0.5).float()

        correct += (preds == batch_y).sum().item()
        total += batch_y.size(0)

accuracy = correct / total
print(f"Baseline Validation Accuracy: {accuracy:.4f}")

In [None]:
import numpy as np

print("y_train unique/counts:", np.unique(y_train_base, return_counts=True))
print("y_val   unique/counts:", np.unique(y_val_base, return_counts=True))

In [None]:
import os
import re
from collections import defaultdict

IMG_DIR = "/kaggle/input/vehic-ped-intuition/images/train"

def get_video_id(filename):
    """
    Extract video ID from filenames like:
    video_0051_frame_000123.jpg -> video_0051
    """
    match = re.search(r"(video_\d+)", filename)
    return match.group(1) if match else None

# Group frames by video
video_frames = defaultdict(list)

for fname in sorted(os.listdir(IMG_DIR)):
    if not fname.lower().endswith(".jpg"):
        continue
    vid = get_video_id(fname)
    if vid is not None:
        video_frames[vid].append(fname)

print("Total videos found:", len(video_frames))

# Show a few examples
for i, (vid, frames) in enumerate(video_frames.items()):
    print(f"{vid}: {len(frames)} frames")
    if i == 4:
        break

**3.2**

In [None]:
import numpy as np

SEQ_LEN = 10

def sample_frames_uniform(frames, seq_len=10):
    """
    Select seq_len frames uniformly from a list of frames.
    If video has fewer frames, pad by repeating last frame.
    """
    n = len(frames)
    
    if n >= seq_len:
        indices = np.linspace(0, n - 1, seq_len).astype(int)
        return [frames[i] for i in indices]
    else:
        # Pad by repeating last frame
        return frames + [frames[-1]] * (seq_len - n)


# Test on a few videos
for vid in list(video_frames.keys())[:5]:
    sampled = sample_frames_uniform(video_frames[vid], SEQ_LEN)
    print(vid, "‚Üí sampled frames:", len(sampled))

**3.3**
Extracting features for each video by focusing on 20 sequence of frames

In [None]:
!pip install -U ultralytics


In [None]:
ls

In [None]:
import os
import re
import cv2
import numpy as np
from PIL import Image
from collections import defaultdict
from tqdm import tqdm
import torch

# ---- SETTINGS ----
SEQ_LEN = 20
DATA_ROOT = "/kaggle/input/vehic-ped-intuition"
OUT_DIR = "/kaggle/working/phase3_video_features"
os.makedirs(OUT_DIR, exist_ok=True)

# ---- REQUIRED: ViT model + transform + DEVICE must exist ----
# Expecting: model, transform, DEVICE
assert "model" in globals(), "ViT model not found. Run the ViT setup cell first (timm.create_model...)."
assert "transform" in globals(), "transform not found. Run the ViT preprocessing cell first."
assert "DEVICE" in globals(), "DEVICE not found. Define DEVICE = 'cuda' if available else 'cpu'."

def get_video_id(filename):
    m = re.search(r"(video_\d+)", filename)
    return m.group(1) if m else None

def group_frames_by_video(img_dir):
    d = defaultdict(list)
    for fname in sorted(os.listdir(img_dir)):
        if fname.lower().endswith(".jpg"):
            vid = get_video_id(fname)
            if vid:
                d[vid].append(fname)
    return d

def sample_frames_uniform(frames, seq_len=10):
    n = len(frames)
    if n >= seq_len:
        idx = np.linspace(0, n - 1, seq_len).astype(int)
        return [frames[i] for i in idx]
    else:
        return frames + [frames[-1]] * (seq_len - n)

def extract_one_frame_feature(img_path, lbl_path):
    """
    Returns a single 768-dim feature for the frame using the LARGEST pedestrian bbox.
    If no bbox -> None
    """
    img = cv2.imread(img_path)
    if img is None:
        return None

    h, w = img.shape[:2]
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    if not os.path.exists(lbl_path):
        return None

    best_crop = None
    best_area = 0

    with open(lbl_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue

            _, x, y, nw, nh = map(float, parts)

            x1 = int((x - nw/2) * w)
            y1 = int((y - nh/2) * h)
            x2 = int((x + nw/2) * w)
            y2 = int((y + nh/2) * h)

            x1 = max(0, min(w-1, x1))
            y1 = max(0, min(h-1, y1))
            x2 = max(0, min(w-1, x2))
            y2 = max(0, min(h-1, y2))

            if x2 <= x1 or y2 <= y1:
                continue

            area = (x2 - x1) * (y2 - y1)
            if area > best_area:
                best_area = area
                best_crop = img_rgb[y1:y2, x1:x2]

    if best_crop is None or best_crop.size == 0:
        return None

    pil_img = Image.fromarray(best_crop)
    img_tensor = transform(pil_img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        feat = model(img_tensor)
        if feat.ndim == 3:
            feat = feat[:, 0, :]
        feat = feat.cpu().numpy().squeeze().astype(np.float32)

    return feat  # (768,)

def build_split_video_tensors(split_name):
    img_dir = f"{DATA_ROOT}/images/{split_name}"
    lbl_dir = f"{DATA_ROOT}/labels/{split_name}"

    split_video_frames = group_frames_by_video(img_dir)
    video_ids = sorted(split_video_frames.keys())

    X = []
    meta = []

    def label_path_for(fname):
        return os.path.join(lbl_dir, fname.rsplit(".", 1)[0] + ".txt")

    print(f"\nExtracting {split_name} videos: {len(video_ids)}")
    for vid in tqdm(video_ids):
        frames = split_video_frames[vid]
        sampled = sample_frames_uniform(frames, SEQ_LEN)

        feats = []
        last_valid = None

        for fname in sampled:
            img_path = os.path.join(img_dir, fname)
            lbl_path = label_path_for(fname)

            fvec = extract_one_frame_feature(img_path, lbl_path)

            if fvec is None:
                fvec = last_valid if last_valid is not None else np.zeros((768,), dtype=np.float32)
            else:
                last_valid = fvec

            feats.append(fvec)

        X.append(np.stack(feats, axis=0))  # (10,768)
        meta.append(vid)

    X = np.stack(X, axis=0)  # (N,10,768)
    return X, meta

# ---- BUILD VAL + TEST ----
X_val_videos, val_video_ids = build_split_video_tensors("val")
print("‚úÖ X_val_videos shape:", X_val_videos.shape)

X_test_videos, test_video_ids = build_split_video_tensors("test")
print("‚úÖ X_test_videos shape:", X_test_videos.shape)

# ---- SAVE ----
np.save(os.path.join(OUT_DIR, "X_val_videos.npy"), X_val_videos)
np.save(os.path.join(OUT_DIR, "X_test_videos.npy"), X_test_videos)
np.save(os.path.join(OUT_DIR, "val_video_ids.npy"), np.array(val_video_ids))
np.save(os.path.join(OUT_DIR, "test_video_ids.npy"), np.array(test_video_ids))

print("\n‚úÖ Saved to:", OUT_DIR)
print(" - X_val_videos.npy, X_test_videos.npy")
print(" - val_video_ids.npy, test_video_ids.npy")

In [None]:
X_train_videos, train_video_ids = build_split_video_tensors("train")
print("‚úÖ X_train_videos shape:", X_train_videos.shape)

np.save(os.path.join(OUT_DIR, "X_train_videos.npy"), X_train_videos)
np.save(os.path.join(OUT_DIR, "train_video_ids.npy"), np.array(train_video_ids))


In [None]:
import os
import re
import cv2
import numpy as np
from PIL import Image
from collections import defaultdict
from tqdm import tqdm
import torch

SEQ_LEN = 10
DATA_ROOT = "/kaggle/input/vehic-ped-intuition"
IMG_DIR = f"{DATA_ROOT}/images/train"
LBL_DIR = f"{DATA_ROOT}/labels/train"

# --- helpers ---
def get_video_id(filename):
    m = re.search(r"(video_\d+)", filename)
    return m.group(1) if m else None

def group_frames_by_video(img_dir):
    d = defaultdict(list)
    for fname in sorted(os.listdir(img_dir)):
        if fname.lower().endswith(".jpg"):
            vid = get_video_id(fname)
            if vid:
                d[vid].append(fname)
    return d

def sample_frames_uniform(frames, seq_len=10):
    n = len(frames)
    if n >= seq_len:
        idx = np.linspace(0, n - 1, seq_len).astype(int)
        return [frames[i] for i in idx]
    else:
        return frames + [frames[-1]] * (seq_len - n)

def extract_one_frame_feature(img_path, lbl_path):
    img = cv2.imread(img_path)
    if img is None:
        return None

    h, w = img.shape[:2]
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    if not os.path.exists(lbl_path):
        return None

    best_crop, best_area = None, 0

    with open(lbl_path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue

            _, x, y, nw, nh = map(float, parts)

            x1 = int((x - nw/2) * w)
            y1 = int((y - nh/2) * h)
            x2 = int((x + nw/2) * w)
            y2 = int((y + nh/2) * h)

            x1, y1 = max(0, x1), max(0, y1)
            x2, y2 = min(w-1, x2), min(h-1, y2)

            if x2 <= x1 or y2 <= y1:
                continue

            area = (x2 - x1) * (y2 - y1)
            if area > best_area:
                best_area = area
                best_crop = img_rgb[y1:y2, x1:x2]

    if best_crop is None or best_crop.size == 0:
        return None

    pil_img = Image.fromarray(best_crop)
    img_tensor = transform(pil_img).unsqueeze(0).to(DEVICE)

    with torch.no_grad():
        feat = model(img_tensor)
        if feat.ndim == 3:
            feat = feat[:, 0, :]
        feat = feat.cpu().numpy().squeeze().astype(np.float32)

    return feat  # (768,)

# --- build train ---
video_frames = group_frames_by_video(IMG_DIR)
video_ids = sorted(video_frames.keys())

X_train_videos = []
train_meta = []

print("Extracting TRAIN video features:", len(video_ids))

for vid in tqdm(video_ids):
    frames = video_frames[vid]
    sampled = sample_frames_uniform(frames, SEQ_LEN)

    feats, last_valid = [], None

    for fname in sampled:
        img_path = os.path.join(IMG_DIR, fname)
        lbl_path = os.path.join(LBL_DIR, fname.rsplit(".", 1)[0] + ".txt")

        fvec = extract_one_frame_feature(img_path, lbl_path)
        if fvec is None:
            fvec = last_valid if last_valid is not None else np.zeros((768,), dtype=np.float32)
        else:
            last_valid = fvec

        feats.append(fvec)

    X_train_videos.append(np.stack(feats, axis=0))
    train_meta.append(vid)

X_train_videos = np.stack(X_train_videos, axis=0)
print("‚úÖ X_train_videos shape:", X_train_videos.shape)

**3.4**

In [None]:
import os
import numpy as np

OUT_DIR = "/kaggle/working/phase3_video_features"
os.makedirs(OUT_DIR, exist_ok=True)

# Save train if it exists in memory
assert "X_train_videos" in globals(), "X_train_videos not found. Run Phase 3 Step 3.2 (train extraction) first."
assert "train_meta" in globals(), "train_meta (train video ids) not found."

np.save(os.path.join(OUT_DIR, "X_train_videos.npy"), X_train_videos)
np.save(os.path.join(OUT_DIR, "train_video_ids.npy"), np.array(train_meta))

print("‚úÖ Saved train tensors too:")
print(" - X_train_videos.npy:", X_train_videos.shape)
print(" - train_video_ids.npy:", len(train_meta))
print("Folder:", OUT_DIR)

**4**

In [None]:
# Sanity check: IDs and labels
missing = []
for vid in val_video_ids:
    if vid not in video_to_label:
        missing.append(vid)

print("Missing labels for val:", missing[:5])
print("Total missing:", len(missing))


**5. PCA , Embedding 2D**

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

OUT_DIR = "/kaggle/working/phase3_video_features"

E_train = np.load(os.path.join(OUT_DIR, "E_train.npy"))
E_val   = np.load(os.path.join(OUT_DIR, "E_val.npy"))
E_test  = np.load(os.path.join(OUT_DIR, "E_test.npy"))

# Combine for visualization
E_all = np.vstack([E_train, E_val, E_test])
split_tags = (["train"] * len(E_train)) + (["val"] * len(E_val)) + (["test"] * len(E_test))

pca = PCA(n_components=2, random_state=0)
Z = pca.fit_transform(E_all)

print("Explained variance ratio (PC1, PC2):", pca.explained_variance_ratio_)

# Plot
plt.figure(figsize=(7, 6))
for tag in ["train", "val", "test"]:
    idx = [i for i, t in enumerate(split_tags) if t == tag]
    plt.scatter(Z[idx, 0], Z[idx, 1], label=tag, alpha=0.7)

plt.title("PCA of Video Embeddings (Phase 3)")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.legend()
plt.show()

# Save figure
fig_path = os.path.join(OUT_DIR, "pca_video_embeddings.png")
plt.figure(figsize=(7, 6))
for tag in ["train", "val", "test"]:
    idx = [i for i, t in enumerate(split_tags) if t == tag]
    plt.scatter(Z[idx, 0], Z[idx, 1], label=tag, alpha=0.7)
plt.title("PCA of Video Embeddings (Phase 3)")
plt.xlabel("PC1")
plt.ylabel("PC2")
plt.legend()
plt.savefig(fig_path, dpi=200, bbox_inches="tight")
plt.close()

print("‚úÖ Saved figure:", fig_path)

----------

In [None]:
import os
import glob
import xml.etree.ElementTree as ET

XML_DIR = "/kaggle/input/attributes-label/annotations_attributes/"

def parse_video_crossing_label(xml_path):
    """
    Returns:
      1  if ANY pedestrian has crossing == 1
      0  if at least one crossing == 0 and none == 1
      None if all are -1 or file invalid
    """
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
    except Exception:
        return None

    crossings = []
    for ped in root.findall(".//pedestrian"):
        c = ped.attrib.get("crossing", None)
        if c is None:
            continue
        try:
            crossings.append(int(c))
        except:
            continue

    if len(crossings) == 0:
        return None

    if any(c == 1 for c in crossings):
        return 1
    if any(c == 0 for c in crossings):
        return 0

    return None  # all -1

# Build mapping: video_id -> label
xml_files = sorted(glob.glob(os.path.join(XML_DIR, "video_*_attributes.xml")))
video_to_label = {}

for xp in xml_files:
    fname = os.path.basename(xp)                  # video_0012_attributes.xml
    vid = fname.split("_attributes.xml")[0]       # video_0012
    video_to_label[vid] = parse_video_crossing_label(xp)

# Summary
labels = [v for v in video_to_label.values() if v is not None]
print("Total XML files:", len(xml_files))
print("Labeled videos:", len(labels))
print("Crossing=1:", sum(1 for v in labels if v == 1))
print("Crossing=0:", sum(1 for v in labels if v == 0))
print("Unknown (-1):", sum(1 for v in video_to_label.values() if v is None))

In [None]:
def build_labels(video_ids, video_to_label):
    X_ids, y = [], []
    for vid in video_ids:
        lab = video_to_label.get(vid, None)
        if lab is None:
            continue
        X_ids.append(vid)
        y.append(lab)
    return np.array(X_ids), np.array(y, dtype=np.int64)


In [None]:
val_ids_labeled, y_val = build_labels(val_video_ids, video_to_label)
test_ids_labeled, y_test = build_labels(test_video_ids, video_to_label)

print("Val labels:", np.bincount(y_val))
print("Test labels:", np.bincount(y_test))


In [None]:
def filter_X_by_ids(X, video_ids, keep_ids):
    id_to_idx = {vid: i for i, vid in enumerate(video_ids)}
    idxs = [id_to_idx[vid] for vid in keep_ids]
    return X[idxs]


In [None]:
X_val_labeled = filter_X_by_ids(X_val_videos, val_video_ids, val_ids_labeled)
X_test_labeled = filter_X_by_ids(X_test_videos, test_video_ids, test_ids_labeled)

print(X_val_labeled.shape, y_val.shape)
print(X_test_labeled.shape, y_test.shape)


In [None]:
import os
import numpy as np

# Paths
OUT_DIR = "/kaggle/working/phase3_video_features"

# Load embeddings
E_train = np.load(os.path.join(OUT_DIR, "X_train_videos.npy"))
E_val   = np.load(os.path.join(OUT_DIR, "X_val_videos.npy"))
E_test  = np.load(os.path.join(OUT_DIR, "X_test_videos.npy"))

# Load video ids
train_ids = np.load(os.path.join(OUT_DIR, "train_video_ids.npy"), allow_pickle=True)
val_ids   = np.load(os.path.join(OUT_DIR, "val_video_ids.npy"), allow_pickle=True)
test_ids  = np.load(os.path.join(OUT_DIR, "test_video_ids.npy"), allow_pickle=True)

# video_to_label MUST exist from Step 1
assert "video_to_label" in globals(), "video_to_label not found. Run Step 1 (XML parsing) first."

def filter_labeled(E, ids, video_to_label):
    X, y, kept = [], [], []
    for emb, vid in zip(E, ids):
        vid = str(vid)
        lab = video_to_label.get(vid, None)
        if lab is None:
            continue
        X.append(emb)
        y.append(lab)
        kept.append(vid)
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.int64), kept

# Apply filtering
Xtr, ytr, tr_ids = filter_labeled(E_train, train_ids, video_to_label)
Xva, yva, va_ids = filter_labeled(E_val,   val_ids,   video_to_label)
Xte, yte, te_ids = filter_labeled(E_test,  test_ids,  video_to_label)

# Summary
print("Train:", Xtr.shape, "Pos:", int((ytr==1).sum()), "Neg:", int((ytr==0).sum()))
print("Val:  ", Xva.shape, "Pos:", int((yva==1).sum()), "Neg:", int((yva==0).sum()))
print("Test: ", Xte.shape, "Pos:", int((yte==1).sum()), "Neg:", int((yte==0).sum()))

In [None]:
pos = (ytr == 1).sum()
neg = (ytr == 0).sum()
pos_weight = torch.tensor([neg / pos]).to(device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)


**PHSAE 4**

In [None]:
import os, glob, random
import numpy as np
import xml.etree.ElementTree as ET

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    classification_report,
    roc_auc_score
)

# ======================================================
# 0) Reproducibility
# ======================================================
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

# ======================================================
# 1) Paths & device
# ======================================================
FEAT_DIR = "/kaggle/working/phase3_video_features"
XML_DIR  = "/kaggle/input/attributes-label/annotations_attributes"
SAVE_DIR = "/kaggle/working/phase3_lstm_supervised"
os.makedirs(SAVE_DIR, exist_ok=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# ======================================================
# 2) Load features + video IDs
# ======================================================
X_train = np.load(os.path.join(FEAT_DIR, "X_train_videos.npy"))
X_val   = np.load(os.path.join(FEAT_DIR, "X_val_videos.npy"))
X_test  = np.load(os.path.join(FEAT_DIR, "X_test_videos.npy"))

train_ids = np.load(os.path.join(FEAT_DIR, "train_video_ids.npy"), allow_pickle=True)
val_ids   = np.load(os.path.join(FEAT_DIR, "val_video_ids.npy"),   allow_pickle=True)
test_ids  = np.load(os.path.join(FEAT_DIR, "test_video_ids.npy"),  allow_pickle=True)

# ======================================================
# 3) XML parsing ‚Üí video-level crossing label
# ======================================================
def parse_crossing(xml_path):
    try:
        root = ET.parse(xml_path).getroot()
    except Exception:
        return None

    vals = []
    for ped in root.findall(".//pedestrian"):
        c = ped.attrib.get("crossing", None)
        if c is not None:
            try:
                vals.append(int(c))
            except:
                pass

    if not vals:
        return None
    if any(v == 1 for v in vals):
        return 1
    if any(v == 0 for v in vals):
        return 0
    return None

video_to_label = {}
for xp in glob.glob(os.path.join(XML_DIR, "video_*_attributes.xml")):
    vid = os.path.basename(xp).replace("_attributes.xml", "")
    video_to_label[vid] = parse_crossing(xp)

def filter_labeled(X, ids):
    X_out, y_out = [], []
    for x, vid in zip(X, ids):
        lab = video_to_label.get(str(vid), None)
        if lab is None:
            continue
        X_out.append(x)
        y_out.append(lab)
    return np.asarray(X_out, np.float32), np.asarray(y_out, np.float32)

Xtr, ytr = filter_labeled(X_train, train_ids)
Xva, yva = filter_labeled(X_val,   val_ids)
Xte, yte = filter_labeled(X_test,  test_ids)

print("Train:", Xtr.shape, "Pos:", int((ytr==1).sum()), "Neg:", int((ytr==0).sum()))
print("Val:  ", Xva.shape)
print("Test: ", Xte.shape)

# ======================================================
# 4) DataLoaders
# ======================================================
train_loader = DataLoader(
    TensorDataset(torch.tensor(Xtr), torch.tensor(ytr).view(-1,1)),
    batch_size=32, shuffle=True
)
val_loader = DataLoader(
    TensorDataset(torch.tensor(Xva), torch.tensor(yva).view(-1,1)),
    batch_size=32
)
test_loader = DataLoader(
    TensorDataset(torch.tensor(Xte), torch.tensor(yte).view(-1,1)),
    batch_size=32
)

# ======================================================
# 5) LSTM Classifier
# ======================================================
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=128, dropout=0.4):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        last = out[:, -1, :]
        last = self.dropout(last)
        return self.fc(last)

model = LSTMClassifier().to(device)

# ======================================================
# 6) Loss + Optimizer (imbalance handled)
# ======================================================
pos = (ytr == 1).sum()
neg = (ytr == 0).sum()
pos_weight = torch.tensor([neg / max(pos, 1)], device=device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

print("pos_weight:", float(pos_weight))

# ======================================================
# 7) Training + Early Stopping + METRIC LOGGING
# ======================================================
# ======================================================
# 7) Training + Early Stopping + METRIC LOGGING
# ======================================================
EPOCHS = 20
PATIENCE = 5
best_auc = -1.0
patience_ctr = 0
best_state = None

train_losses = []
val_losses   = []
val_aucs     = []

def eval_val_metrics(loader):
    model.eval()
    total_loss = 0.0
    ys, ps = [], []

    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            total_loss += loss.item()

            probs = torch.sigmoid(logits)
            ys.append(yb.cpu().numpy())
            ps.append(probs.cpu().numpy())

    y = np.vstack(ys).ravel()
    p = np.vstack(ps).ravel()
    avg_loss = total_loss / len(loader)
    auc = roc_auc_score(y, p) if len(np.unique(y)) > 1 else None
    return avg_loss, auc


for epoch in range(1, EPOCHS + 1):
    model.train()
    total_loss = 0.0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        loss = criterion(model(xb), yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    train_loss = total_loss / len(train_loader)
    val_loss, val_auc = eval_val_metrics(val_loader)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_aucs.append(val_auc)

    print(
        f"Epoch {epoch:02d} | "
        f"train_loss={train_loss:.4f} | "
        f"val_loss={val_loss:.4f} | "
        f"val_auc={val_auc:.4f}"
    )

    if val_auc is not None and val_auc > best_auc:
        best_auc = val_auc
        patience_ctr = 0
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
    else:
        patience_ctr += 1
        if patience_ctr >= PATIENCE:
            print("‚èπ Early stopping triggered")
            break


# Restore & save best model
model.load_state_dict(best_state)
torch.save(model.state_dict(), os.path.join(SAVE_DIR, "best_lstm_crossing.pt"))

# Save logs
np.save(os.path.join(SAVE_DIR, "train_losses.npy"), np.array(train_losses))
np.save(os.path.join(SAVE_DIR, "val_losses.npy"),   np.array(val_losses))
np.save(os.path.join(SAVE_DIR, "val_aucs.npy"),     np.array(val_aucs))


# ======================================================
# 8) Test Evaluation
# ======================================================
model.eval()
ys, ps = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        probs = torch.sigmoid(model(xb)).cpu().numpy()
        ys.append(yb.numpy())
        ps.append(probs)

y_true = np.vstack(ys).ravel()
y_prob = np.vstack(ps).ravel()
y_pred = (y_prob >= 0.5).astype(int)

print("\n===== TEST RESULTS =====")
print("Accuracy:", accuracy_score(y_true, y_pred))
print("ROC-AUC:", roc_auc_score(y_true, y_prob))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred, digits=4))


In [None]:
FIG_DIR = "/kaggle/working/phase3_figures"
os.makedirs(FIG_DIR, exist_ok=True)


In [None]:
plt.figure(figsize=(6,4))
plt.plot(epochs, train_losses, label="Training Loss", marker="o")
plt.plot(epochs, val_losses, label="Validation Loss", marker="s")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs Validation Loss")
plt.legend()
plt.grid(True)
plt.tight_layout()

plt.savefig(os.path.join(FIG_DIR, "loss_curve.png"), dpi=300)
plt.show()


In [None]:
plt.figure(figsize=(6,4))
plt.plot(epochs, val_aucs, marker="o", color="darkgreen")
plt.xlabel("Epoch")
plt.ylabel("ROC-AUC")
plt.title("Validation ROC-AUC Over Epochs")
plt.ylim(0, 1)
plt.grid(True)
plt.tight_layout()

plt.savefig(os.path.join(FIG_DIR, "val_auc_curve.png"), dpi=300)
plt.show()


In [None]:
from sklearn.metrics import ConfusionMatrixDisplay

disp = ConfusionMatrixDisplay(
    confusion_matrix=cm,
    display_labels=["Non-crossing", "Crossing"]
)

plt.figure(figsize=(4,4))
disp.plot(cmap="Blues", values_format="d")
plt.title("Confusion Matrix (Test Set)")
plt.tight_layout()

plt.savefig(os.path.join(FIG_DIR, "confusion_matrix_test.png"), dpi=300)
plt.show()


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

FIG_DIR = "/kaggle/working/phase3_figures"
os.makedirs(FIG_DIR, exist_ok=True)

labels = ["Non-crossing (0)", "Crossing (1)"]


In [None]:
def plot_confusion(cm_to_plot, title, fname, fmt=".2f", cmap="Blues"):
    plt.figure(figsize=(5,4))
    plt.imshow(cm_to_plot, interpolation="nearest", cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=25, ha="right")
    plt.yticks(tick_marks, labels)

    thresh = cm_to_plot.max() / 2.0
    for i in range(cm_to_plot.shape[0]):
        for j in range(cm_to_plot.shape[1]):
            plt.text(
                j, i,
                format(cm_to_plot[i, j], fmt),
                ha="center", va="center",
                color="white" if cm_to_plot[i, j] > thresh else "black"
            )

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()

    plt.savefig(os.path.join(FIG_DIR, fname), dpi=300)
    plt.show()


In [None]:
cm = confusion_matrix(y_true, y_pred)
cm_norm = confusion_matrix(y_true, y_pred, normalize="true")


In [None]:
import os
import matplotlib.pyplot as plt

FIG_DIR = "/kaggle/working/phase3_figures"
os.makedirs(FIG_DIR, exist_ok=True)

plt.figure(figsize=(6,4))

plt.hist(
    y_prob[y_true == 1],
    bins=15,
    alpha=0.75,
    label="Crossing (1)",
    color="tab:blue",
    edgecolor="black"
)

plt.hist(
    y_prob[y_true == 0],
    bins=15,
    alpha=0.75,
    label="Non-crossing (0)",
    color="tab:orange",
    edgecolor="black"
)

plt.xlabel("Predicted Probability of Crossing")
plt.ylabel("Number of Videos")
plt.title("Prediction Probability Distribution (Test Set)")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.6)
plt.tight_layout()

plt.savefig(
    os.path.join(FIG_DIR, "05_probability_distribution.png"),
    dpi=300
)

plt.show()


In [None]:
import random

indices = list(range(len(y_true)))
random.shuffle(indices)

print("Random test samples:\n")

for idx in indices[:5]:
    gt = int(y_true[idx])
    pred = int(y_pred[idx])
    prob = float(y_prob[idx])
    status = "‚úÖ Correct" if gt == pred else "‚ùå Wrong"

    print(
        f"Sample {idx:02d} | "
        f"GT={gt} | Pred={pred} | "
        f"P(crossing)={prob:.3f} | {status}"
    )


In [None]:
IMG_ROOT = "/kaggle/input/vehic-ped-intuition/images/test"
LBL_ROOT = "/kaggle/input/vehic-ped-intuition/labels/test"


In [None]:
import cv2
import matplotlib.pyplot as plt

def show_video_sample(video_id, gt, pred, prob):
    # find frames belonging to this video
    frames = sorted([
        f for f in os.listdir(IMG_ROOT)
        if f.startswith(video_id)
    ])
    
    if len(frames) == 0:
        print("No frames found for", video_id)
        return

    # take middle frame (stable & interpretable)
    fname = frames[len(frames)//2]
    img_path = os.path.join(IMG_ROOT, fname)

    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    title = (
        f"{video_id}\n"
        f"GT: {gt} | Pred: {pred} | P(crossing)={prob:.2f}"
    )

    plt.figure(figsize=(5,4))
    plt.imshow(img)
    plt.axis("off")
    plt.title(title)
    plt.tight_layout()
    plt.savefig(
    os.path.join(FIG_DIR, f"{video_id}_gt{gt}_pred{pred}.png"),
    dpi=300
)
plt.close()


In [None]:
import random

# Map index ‚Üí video id
test_video_ids = np.load(
    os.path.join(FEAT_DIR, "test_video_ids.npy"),
    allow_pickle=True
)

indices = list(range(len(y_true)))
random.shuffle(indices)

shown = 0
for idx in indices:
    vid = str(test_video_ids[idx])
    gt  = int(y_true[idx])
    pred = int(y_pred[idx])
    prob = float(y_prob[idx])

    # show both correct and wrong
    show_video_sample(vid, gt, pred, prob)
    shown += 1
    if shown == 5:
        break


Fine-Tuned Training

In [None]:
import os, glob, random
import numpy as np
import xml.etree.ElementTree as ET
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score

# ======================================================
# 0) Reproducibility
# ======================================================
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

# ======================================================
# 1) Paths & device
# ======================================================
FEAT_DIR = "/kaggle/working/phase3_video_features"
XML_DIR  = "/kaggle/input/attributes-label/annotations_attributes"
SAVE_DIR = "/kaggle/working/phase3_lstm_finetuned"
os.makedirs(SAVE_DIR, exist_ok=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# ======================================================
# 2) Load features + video IDs
# ======================================================
X_train = np.load(os.path.join(FEAT_DIR, "X_train_videos.npy"))
X_val   = np.load(os.path.join(FEAT_DIR, "X_val_videos.npy"))
X_test  = np.load(os.path.join(FEAT_DIR, "X_test_videos.npy"))

train_ids = np.load(os.path.join(FEAT_DIR, "train_video_ids.npy"), allow_pickle=True)
val_ids   = np.load(os.path.join(FEAT_DIR, "val_video_ids.npy"),   allow_pickle=True)
test_ids  = np.load(os.path.join(FEAT_DIR, "test_video_ids.npy"),  allow_pickle=True)

# ======================================================
# 3) XML parsing ‚Üí video-level crossing label
# ======================================================
def parse_crossing(xml_path):
    try:
        root = ET.parse(xml_path).getroot()
    except Exception:
        return None

    vals = []
    for ped in root.findall(".//pedestrian"):
        c = ped.attrib.get("crossing", None)
        if c is not None:
            try:
                vals.append(int(c))
            except:
                pass

    if not vals:
        return None
    if any(v == 1 for v in vals):
        return 1
    if any(v == 0 for v in vals):
        return 0
    return None

video_to_label = {}
for xp in glob.glob(os.path.join(XML_DIR, "video_*_attributes.xml")):
    vid = os.path.basename(xp).replace("_attributes.xml", "")
    video_to_label[vid] = parse_crossing(xp)

def filter_labeled(X, ids):
    X_out, y_out = [], []
    for x, vid in zip(X, ids):
        lab = video_to_label.get(str(vid), None)
        if lab is None:
            continue
        X_out.append(x)
        y_out.append(lab)
    return np.asarray(X_out, np.float32), np.asarray(y_out, np.float32)

Xtr, ytr = filter_labeled(X_train, train_ids)
Xva, yva = filter_labeled(X_val,   val_ids)
Xte, yte = filter_labeled(X_test,  test_ids)

print("Train:", Xtr.shape, "Pos:", int((ytr==1).sum()), "Neg:", int((ytr==0).sum()))
print("Val:  ", Xva.shape)
print("Test: ", Xte.shape)

# ======================================================
# 4) DataLoaders
# ======================================================
train_loader = DataLoader(
    TensorDataset(torch.tensor(Xtr), torch.tensor(ytr).view(-1,1)),
    batch_size=32, shuffle=True
)
val_loader = DataLoader(
    TensorDataset(torch.tensor(Xva), torch.tensor(yva).view(-1,1)),
    batch_size=32
)
test_loader = DataLoader(
    TensorDataset(torch.tensor(Xte), torch.tensor(yte).view(-1,1)),
    batch_size=32
)

# ======================================================
# 5) LSTM Classifier (REGULARIZED)
# ======================================================
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=64, dropout=0.5):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        out, _ = self.lstm(x)
        last = out[:, -1, :]
        last = self.dropout(last)
        return self.fc(last)

model = LSTMClassifier().to(device)

# ======================================================
# 6) Loss + Optimizer (imbalance + L2)
# ======================================================
pos = (ytr == 1).sum()
neg = (ytr == 0).sum()
pos_weight = torch.tensor([neg / max(pos, 1)], device=device)

criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.Adam(
    model.parameters(),
    lr=5e-4,
    weight_decay=1e-4   # L2 regularization
)

print("pos_weight:", float(pos_weight))

# ======================================================
# 7) Training + Early Stopping (LOSS + AUC)
# ======================================================
EPOCHS = 30
PATIENCE = 6

train_losses = []
val_losses   = []
val_aucs     = []

best_auc = -1.0
patience_ctr = 0
best_state = None

def eval_metrics(loader):
    model.eval()
    ys, ps, losses = [], [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb, yb = xb.to(device), yb.to(device)
            logits = model(xb)
            loss = criterion(logits, yb)
            probs = torch.sigmoid(logits)
            losses.append(loss.item())
            ys.append(yb.cpu().numpy())
            ps.append(probs.cpu().numpy())

    y = np.vstack(ys).ravel()
    p = np.vstack(ps).ravel()
    auc = roc_auc_score(y, p) if len(np.unique(y)) > 1 else None
    return np.mean(losses), auc

for epoch in range(1, EPOCHS + 1):
    model.train()
    total_loss = 0.0

    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        loss = criterion(model(xb), yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    train_loss = total_loss / len(train_loader)
    val_loss, val_auc = eval_metrics(val_loader)

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_aucs.append(val_auc)

    print(f"Epoch {epoch:02d} | train_loss={train_loss:.4f} | val_loss={val_loss:.4f} | val_auc={val_auc:.4f}")

    if val_auc is not None and val_auc > best_auc:
        best_auc = val_auc
        patience_ctr = 0
        best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
    else:
        patience_ctr += 1
        if patience_ctr >= PATIENCE:
            print("‚èπ Early stopping triggered")
            break

# Restore best model
model.load_state_dict(best_state)
torch.save(model.state_dict(), os.path.join(SAVE_DIR, "best_finetuned_lstm_crossing.pt"))

np.save(os.path.join(SAVE_DIR, "train_losses.npy"), np.array(train_losses))
np.save(os.path.join(SAVE_DIR, "val_losses.npy"),   np.array(val_losses))
np.save(os.path.join(SAVE_DIR, "val_aucs.npy"),     np.array(val_aucs))

# ======================================================
# 8) Test Evaluation
# ======================================================
model.eval()
ys, ps = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        probs = torch.sigmoid(model(xb)).cpu().numpy()
        ys.append(yb.numpy())
        ps.append(probs)

y_true = np.vstack(ys).ravel()
y_prob = np.vstack(ps).ravel()
y_pred = (y_prob >= 0.5).astype(int)

print("\n===== TEST RESULTS (FINETUNED) =====")
print("Accuracy:", accuracy_score(y_true, y_pred))
print("ROC-AUC:", roc_auc_score(y_true, y_prob))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))
print(classification_report(y_true, y_pred, digits=4))


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import confusion_matrix

FIG_DIR = "/kaggle/working/figures"
os.makedirs(FIG_DIR, exist_ok=True)


In [None]:
epochs = np.arange(1, len(train_losses) + 1)

plt.figure(figsize=(7,5))
plt.plot(epochs, train_losses, marker="o", label="Training Loss")
plt.plot(epochs, val_losses, marker="s", label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training vs Validation Loss (LSTM Finetuned)")
plt.legend()
plt.grid(True)
plt.tight_layout()

plt.savefig(os.path.join(FIG_DIR, "01_loss_curve.png"), dpi=300)
plt.show()


In [None]:
plt.figure(figsize=(7,5))
plt.plot(epochs, val_aucs, marker="o", color="green")
plt.xlabel("Epoch")
plt.ylabel("ROC-AUC")
plt.title("Validation ROC-AUC Over Epochs")
plt.grid(True)
plt.tight_layout()

plt.savefig(os.path.join(FIG_DIR, "02_val_auc_curve.png"), dpi=300)
plt.show()


In [None]:
labels = ["Non-crossing (0)", "Crossing (1)"]
cm = confusion_matrix(y_true, y_pred)
cm_norm = confusion_matrix(y_true, y_pred, normalize="true")

def plot_cm(cm, title, fname, fmt=".2f"):
    plt.figure(figsize=(5,4))
    plt.imshow(cm, cmap="Blues")
    plt.title(title)
    plt.colorbar()
    ticks = np.arange(len(labels))
    plt.xticks(ticks, labels, rotation=25, ha="right")
    plt.yticks(ticks, labels)

    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], fmt),
                     ha="center", va="center")

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.savefig(os.path.join(FIG_DIR, fname), dpi=300)
    plt.show()

plot_cm(cm, "Confusion Matrix (Test Set)", "03_confusion_raw.png", fmt="d")
plot_cm(cm_norm, "Confusion Matrix (Normalized)", "04_confusion_normalized.png")


In [None]:
results = pd.DataFrame({
    "Model": ["Baseline LSTM", "Finetuned LSTM"],
    "Test Accuracy": [0.9167, 0.9167],
    "Test ROC-AUC": [0.9802, 0.9683],
    "Crossing Recall": [0.9524, 1.0000],
    "Non-crossing Recall": [0.6667, 0.3333]
})

results


In [None]:
results.to_csv(os.path.join(FIG_DIR, "06_model_comparison.csv"), index=False)


In [None]:
fig, ax = plt.subplots(figsize=(8,2))
ax.axis("off")
ax.table(cellText=results.values,
         colLabels=results.columns,
         loc="center")
plt.tight_layout()
plt.savefig(os.path.join(FIG_DIR, "06_model_comparison_table.png"), dpi=300)
plt.show()


# Full Pipeline for preparing sequence folders before preprocessing on ViT

In [None]:
import os, glob, json, zipfile, shutil
import cv2
import numpy as np
import xml.etree.ElementTree as ET
from collections import Counter
from ultralytics import YOLO


In [None]:
# Dataset dirs
IMG_ROOT = "/kaggle/input/vehic-ped-intuition/images"
XML_GLOB = "/kaggle/input/attributes-label/annotations_attributes/video_*_*.xml"

SPLITS = ["train", "val", "test"]

# YOLO+Tracking
MODEL_PATH = "/kaggle/input/first-phase-model/weights/best.pt"
CONF = 0.33
IMGSZ = 640
TRACKER = "botsort.yaml"

# Filters (freeze these)
MIN_TRACK_LEN = 16
INTENTION_CONF = 0.50

# Relevance filtering (distance proxy)
MIN_MEDIAN_HEIGHT = 90     # pixels
MIN_HEIGHT_GROWTH = 15     # pixels

# Duplicate suppression
DUP_IOU_THR = 0.70

# Cropping (context-aware)
EXPAND_RATIO = 1.8

# Sequences
SEQ_LEN = 16
STRIDE = 4

# Decision safety
USE_DECISION_POINT_IF_AVAILABLE = True
FALLBACK_CUTOFF_RATIO = 0.8  # used when decision_point == -1 or not present

# Output
OUT_ROOT = "/kaggle/working/intent_sequences_dataset"
os.makedirs(OUT_ROOT, exist_ok=True)

# If True: only process videos where XML contains exactly 1 pedestrian
# (Recommended if you do not have a reliable mapping between XML ped IDs and tracker IDs)
REQUIRE_SINGLE_PED_XML = True


Helper Functions (tracking + filter +XML + Cropping + Sequences)
* IoU, relevance, confidence, duplicate suppression
* XML parsing (attributes only)
* Crop with context
* Temporal cutoff using decision_point (if available)
* Build XML index (fast lookup)
* Initialize YOLO once
* Main Batch Runner (ALL videos in a split)
* Run ALL splits and save dataset (crops as sequences)

In [None]:
def iou(a, b):
    x1, y1 = max(a[0], b[0]), max(a[1], b[1])
    x2, y2 = min(a[2], b[2]), min(a[3], b[3])
    inter = max(0, x2 - x1) * max(0, y2 - y1)
    areaA = max(0, (a[2] - a[0])) * max(0, (a[3] - a[1]))
    areaB = max(0, (b[2] - b[0])) * max(0, (b[3] - b[1]))
    return inter / (areaA + areaB - inter + 1e-6)

def bbox_heights(seq):
    return np.array([(b[3] - b[1]) for (_, b, _) in seq], dtype=np.float32)

def is_relevant(seq):
    h = bbox_heights(seq)
    return (np.median(h) >= MIN_MEDIAN_HEIGHT) or ((h[-1] - h[0]) >= MIN_HEIGHT_GROWTH)

def high_conf(seq):
    return float(np.mean([c for (_, _, c) in seq])) >= INTENTION_CONF

def mean_iou_tracks(seq1, seq2):
    n = min(len(seq1), len(seq2))
    if n < 5:
        return 0.0
    return float(np.mean([iou(seq1[i][1], seq2[i][1]) for i in range(n)]))

def suppress_duplicates(tracks):
    # keep longer tracks first
    tids = sorted(tracks.keys(), key=lambda t: len(tracks[t]), reverse=True)
    kept = {}
    for tid in tids:
        dup = False
        for kt in kept.keys():
            if mean_iou_tracks(tracks[tid], kept[kt]) > DUP_IOU_THR:
                dup = True
                break
        if not dup:
            kept[tid] = tracks[tid]
    return kept


In [None]:
def parse_pedestrian_attributes(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()

    peds = []
    for ped in root.findall(".//pedestrian"):
        d = dict(ped.attrib)
        # cast fields if present
        if "crossing" in d:
            try: d["crossing"] = int(d["crossing"])
            except: pass
        if "decision_point" in d:
            try: d["decision_point"] = int(d["decision_point"])
            except: d["decision_point"] = -1
        if "crossing_point" in d:
            try: d["crossing_point"] = int(d["crossing_point"])
            except: d["crossing_point"] = -1
        peds.append(d)

    return peds

def label_from_ped_attr(ped_attr):
    # Your dataset: crossing = -1 means NOT crossing; crossing = 1 means crossing
    c = ped_attr.get("crossing", None)
    if c == 1:
        return "crossing"
    return "not_crossing"


In [None]:
def crop_with_context(frame_path, box, expand_ratio=1.8):
    img = cv2.imread(frame_path)
    if img is None:
        return None
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    H, W = img.shape[:2]

    x1, y1, x2, y2 = map(float, box)
    cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
    bw, bh = (x2 - x1) * expand_ratio, (y2 - y1) * expand_ratio

    nx1 = int(max(0, cx - bw / 2))
    ny1 = int(max(0, cy - bh / 2))
    nx2 = int(min(W, cx + bw / 2))
    ny2 = int(min(H, cy + bh / 2))

    if nx2 <= nx1 or ny2 <= ny1:
        return None

    crop = img[ny1:ny2, nx1:nx2]
    if crop.size == 0:
        return None
    return crop


In [None]:
def build_windows(track_seq, seq_len=16, stride=4):
    windows = []
    for i in range(0, len(track_seq) - seq_len + 1, stride):
        windows.append(track_seq[i:i + seq_len])
    return windows


In [None]:
def apply_temporal_cutoff(track_seq, ped_attr):
    if not USE_DECISION_POINT_IF_AVAILABLE:
        cut = int(len(track_seq) * FALLBACK_CUTOFF_RATIO)
        return track_seq[:max(cut, SEQ_LEN)]

    dp = ped_attr.get("decision_point", -1)
    if isinstance(dp, int) and dp >= 0:
        # keep only frames <= decision point
        seq = [x for x in track_seq if x[0] <= dp]
        if len(seq) >= SEQ_LEN:
            return seq

    # fallback
    cut = int(len(track_seq) * FALLBACK_CUTOFF_RATIO)
    return track_seq[:max(cut, SEQ_LEN)]


In [None]:
xml_files = sorted(glob.glob(XML_GLOB))
print("XML files:", len(xml_files))

# Store by filename (basename without extension)
xml_by_base = {os.path.basename(x).replace(".xml", ""): x for x in xml_files}
all_xml_paths = set(xml_files)


In [None]:
def find_xml_for_video(video_id):
    # best effort: any xml basename containing video_id
    candidates = [p for p in xml_files if video_id in os.path.basename(p)]
    if len(candidates) == 1:
        return candidates[0]
    if len(candidates) > 1:
        # pick shortest basename match (often most specific)
        candidates = sorted(candidates, key=lambda p: len(os.path.basename(p)))
        return candidates[0]
    return None


In [None]:
yolo = YOLO(MODEL_PATH)


In [None]:
def list_video_ids(split):
    img_dir = f"{IMG_ROOT}/{split}"
    vids = sorted(set(f.split("_f")[0] for f in os.listdir(img_dir)))
    return vids

def list_frames(split, video_id):
    img_dir = f"{IMG_ROOT}/{split}"
    frs = sorted([
        os.path.join(img_dir, f)
        for f in os.listdir(img_dir)
        if f.startswith(video_id)
    ])
    return frs

def run_tracking(frames):
    track_db = {}
    for fidx, frame_path in enumerate(frames):
        img = cv2.imread(frame_path)
        if img is None:
            continue
        r = yolo.track(img, conf=CONF, imgsz=IMGSZ, persist=True, tracker=TRACKER, verbose=False)[0]
        if r.boxes is None or r.boxes.id is None:
            continue
        boxes = r.boxes.xyxy.cpu().numpy()
        ids   = r.boxes.id.cpu().numpy().astype(int)
        confs = r.boxes.conf.cpu().numpy()
        for box, tid, c in zip(boxes, ids, confs):
            if tid == -1:
                continue
            track_db.setdefault(int(tid), []).append((fidx, box, float(c)))
    return track_db

def filter_tracks(track_db):
    # length
    t = {tid: seq for tid, seq in track_db.items() if len(seq) >= MIN_TRACK_LEN}
    # relevance
    t = {tid: seq for tid, seq in t.items() if is_relevant(seq)}
    # confidence
    t = {tid: seq for tid, seq in t.items() if high_conf(seq)}
    # duplicates
    t = suppress_duplicates(t)
    return t


In [None]:
sequence_index = []   # global index rows (we save to CSV later)
fail_log = []         # list of dicts describing failures

for split in SPLITS:
    print("\n====================")
    print("PROCESSING SPLIT:", split)
    print("====================")

    split_out = os.path.join(OUT_ROOT, split)
    os.makedirs(split_out, exist_ok=True)

    video_ids = list_video_ids(split)
    print("Videos found:", len(video_ids))

    for vid_i, video_id in enumerate(video_ids):
        frames = list_frames(split, video_id)
        if len(frames) < SEQ_LEN:
            fail_log.append({"split": split, "video": video_id, "reason": "too_few_frames"})
            continue

        # 1) find XML
        xml_path = find_xml_for_video(video_id)
        if xml_path is None:
            fail_log.append({"split": split, "video": video_id, "reason": "xml_not_found"})
            continue

        # 2) parse pedestrians in XML
        peds = parse_pedestrian_attributes(xml_path)
        if len(peds) == 0:
            fail_log.append({"split": split, "video": video_id, "reason": "xml_no_pedestrians"})
            continue

        if REQUIRE_SINGLE_PED_XML and len(peds) != 1:
            fail_log.append({"split": split, "video": video_id, "reason": f"xml_ped_count_{len(peds)}"})
            continue

        # 3) tracking
        track_db = run_tracking(frames)
        if len(track_db) == 0:
            fail_log.append({"split": split, "video": video_id, "reason": "no_tracks"})
            continue

        # 4) filtering
        filtered = filter_tracks(track_db)
        if len(filtered) == 0:
            fail_log.append({"split": split, "video": video_id, "reason": "no_tracks_after_filter"})
            continue

        # 5) pick track(s) and label
        if len(peds) == 1:
            ped = peds[0]
            label = label_from_ped_attr(ped)

            # dominant track = longest filtered track
            selected_tid = max(filtered.keys(), key=lambda t: len(filtered[t]))
            track = filtered[selected_tid]

            # decision cutoff
            track = apply_temporal_cutoff(track, ped)
            if len(track) < SEQ_LEN:
                fail_log.append({"split": split, "video": video_id, "reason": "track_too_short_after_cutoff"})
                continue

            # 6) make sequences
            windows = build_windows(track, SEQ_LEN, STRIDE)
            if len(windows) == 0:
                fail_log.append({"split": split, "video": video_id, "reason": "no_windows"})
                continue

            # 7) save sequences
            video_out = os.path.join(split_out, video_id)
            seq_out_root = os.path.join(video_out, "sequences")
            os.makedirs(seq_out_root, exist_ok=True)

            seq_counter = 0
            saved_any = False

            for w in windows:
                # crop all frames in window
                crops = []
                for fidx, box, conf in w:
                    crop = crop_with_context(frames[fidx], box, EXPAND_RATIO)
                    if crop is None:
                        crops = []
                        break
                    crops.append((fidx, crop))

                if len(crops) != SEQ_LEN:
                    continue

                seq_id = f"{video_id}_tid{selected_tid}_seq{seq_counter:05d}"
                seq_dir = os.path.join(seq_out_root, seq_id)
                os.makedirs(seq_dir, exist_ok=True)

                for j, (fidx, crop) in enumerate(crops):
                    out_path = os.path.join(seq_dir, f"{j:02d}_frame_{fidx:05d}.jpg")
                    cv2.imwrite(out_path, cv2.cvtColor(crop, cv2.COLOR_RGB2BGR))

                sequence_index.append({
                    "split": split,
                    "video_id": video_id,
                    "xml": os.path.basename(xml_path),
                    "tid": int(selected_tid),
                    "seq_id": seq_id,
                    "label": label,
                    "crossing": int(ped.get("crossing", -1)) if isinstance(ped.get("crossing", -1), int) else -1,
                    "decision_point": int(ped.get("decision_point", -1)) if isinstance(ped.get("decision_point", -1), int) else -1,
                    "start_frame": int(crops[0][0]),
                    "end_frame": int(crops[-1][0]),
                    "seq_dir": seq_dir,
                })

                seq_counter += 1
                saved_any = True

            if not saved_any:
                fail_log.append({"split": split, "video": video_id, "reason": "all_windows_failed_cropping"})
                continue

        # progress print sometimes
        if (vid_i + 1) % 50 == 0:
            print(f"[{split}] Processed {vid_i+1}/{len(video_ids)} videos. Seqs so far: {len(sequence_index)}")

print("\nDONE.")
print("Total sequences saved:", len(sequence_index))
print("Failures:", len(fail_log))
print("Label distribution:", Counter([r["label"] for r in sequence_index]))


In [None]:
import csv

index_csv = os.path.join(OUT_ROOT, "sequence_index.csv")
with open(index_csv, "w", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=list(sequence_index[0].keys()) if sequence_index else [])
    if sequence_index:
        writer.writeheader()
        writer.writerows(sequence_index)

fail_json = os.path.join(OUT_ROOT, "fail_log.json")
with open(fail_json, "w") as f:
    json.dump(fail_log, f, indent=2)

print("Saved index:", index_csv)
print("Saved fail log:", fail_json)


In [None]:
zip_path = os.path.join("/kaggle/working", "intent_sequences_dataset.zip")

def zipdir(folder, ziph):
    for root, dirs, files in os.walk(folder):
        for file in files:
            full_path = os.path.join(root, file)
            rel_path = os.path.relpath(full_path, folder)
            ziph.write(full_path, arcname=rel_path)

with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
    # include index and fail log
    if os.path.exists(index_csv):
        z.write(index_csv, arcname="sequence_index.csv")
    if os.path.exists(fail_json):
        z.write(fail_json, arcname="fail_log.json")
    # include all split folders
    zipdir(OUT_ROOT, z)

print("Zipped dataset to:", zip_path)


In [None]:
seqs_per_video = idx.groupby("video_id").size()

print("Min sequences per video:", seqs_per_video.min())
print("Max sequences per video:", seqs_per_video.max())
print(seqs_per_video.describe())


# ViT Preprocessing

In [None]:
import os, glob, csv
import cv2
import numpy as np
import pandas as pd
from tqdm import tqdm

import torch
import torch.nn as nn
from torchvision import transforms
import timm


In [None]:
vit_preprocess = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225]
    ),
])


In [None]:
import timm

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

vit = timm.create_model(
    "vit_base_patch16_224",
    pretrained=True,
    num_classes=0  # feature extractor
)
vit.eval()
vit.to(device)

print("ViT feature dim:", vit.num_features)


In [None]:
def load_sequence_tensor(row, T_expected=16):
    seq_dir = resolve_seq_dir(row)
    frames = sorted([f for f in os.listdir(seq_dir) if f.endswith(".jpg")])

    assert len(frames) == T_expected, "‚ùå Sequence length mismatch"

    imgs = []
    for f in frames:
        img = cv2.cvtColor(
            cv2.imread(os.path.join(seq_dir, f)),
            cv2.COLOR_BGR2RGB
        )
        imgs.append(vit_preprocess(img))

    return torch.stack(imgs, dim=0)  # [T,3,224,224]


In [None]:
row = idx.sample(1, random_state=1).iloc[0]

print("Testing seq:", row["seq_id"], "| label:", row["label"])

x = load_sequence_tensor(row)
print("Input tensor shape:", x.shape)

with torch.no_grad():
    feats = vit(x.to(device))

print("Output feature shape:", feats.shape)


In [None]:
import os

FEAT_ROOT = "/kaggle/working/vit_features"
os.makedirs(FEAT_ROOT, exist_ok=True)

print("Feature output dir:", FEAT_ROOT)


In [None]:
def load_sequence_tensor(row, T_expected=16):
    seq_dir = resolve_seq_dir(row)
    frames = sorted([f for f in os.listdir(seq_dir) if f.endswith(".jpg")])
    if len(frames) != T_expected:
        return None

    imgs = []
    for f in frames:
        img = cv2.cvtColor(
            cv2.imread(os.path.join(seq_dir, f)),
            cv2.COLOR_BGR2RGB
        )
        imgs.append(vit_preprocess(img))

    return torch.stack(imgs, dim=0)  # [T,3,224,224]


In [None]:
import torch
import pandas as pd
from tqdm import tqdm

features_index = []

vit.eval()
torch.set_grad_enabled(False)

for split in ["train", "val", "test"]:
    split_df = idx[idx["split"] == split]
    out_dir = os.path.join(FEAT_ROOT, split)
    os.makedirs(out_dir, exist_ok=True)

    print(f"\nProcessing split: {split} | sequences: {len(split_df)}")

    for _, row in tqdm(split_df.iterrows(), total=len(split_df)):
        seq_id = row["seq_id"]
        out_path = os.path.join(out_dir, f"{seq_id}.pt")

        # Skip if already computed (safe resume)
        if os.path.exists(out_path):
            continue

        x = load_sequence_tensor(row)
        if x is None:
            continue

        x = x.to(device)

        feats = vit(x).cpu()  # [T,768]

        torch.save(
            {
                "seq_id": seq_id,
                "features": feats,
                "label": row["label"],
                "split": split,
                "video_id": row["video_id"],
                "tid": int(row["tid"]),
            },
            out_path
        )

        features_index.append(
            {
                "seq_id": seq_id,
                "split": split,
                "label": row["label"],
                "pt_path": out_path,
            }
        )


In [None]:
feat_index_path = os.path.join(FEAT_ROOT, "features_index.csv")
pd.DataFrame(features_index).to_csv(feat_index_path, index=False)

print("Saved feature index:", feat_index_path)
print("Total feature tensors:", len(features_index))


In [None]:
sample = pd.read_csv(feat_index_path).sample(3, random_state=0)

for _, r in sample.iterrows():
    d = torch.load(r["pt_path"])
    print(
        r["seq_id"],
        d["features"].shape,
        d["label"]
    )


# LSTM Phase

In [None]:
import os, copy
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report
import cv2
import matplotlib.pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"


In [None]:
FEATURE_ROOT = "/kaggle/input/vit-features"
FEATURE_INDEX = os.path.join(FEATURE_ROOT, "features_index.csv")

SEQ_ROOT = "/kaggle/input/dataset-sequences/intent_sequences_dataset"
SEQ_INDEX = os.path.join(SEQ_ROOT, "sequence_index_final.csv")


# Dataset Loader

In [None]:
def resolve_pt_path(row):
    return os.path.join(
        FEATURE_ROOT,
        row["split"],
        f"{row['seq_id']}.pt"
    )

class IntentDataset(Dataset):
    def __init__(self, index_csv, split):
        df = pd.read_csv(index_csv)
        self.df = df[df["split"] == split].reset_index(drop=True)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        data = torch.load(resolve_pt_path(row))
        x = data["features"]            # [16, 768]
        y = 1 if row["label"] == "crossing" else 0
        return x, y


In [None]:
train_ds = IntentDataset(FEATURE_INDEX, "train")
val_ds   = IntentDataset(FEATURE_INDEX, "val")
test_ds  = IntentDataset(FEATURE_INDEX, "test")

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=32)
test_loader  = DataLoader(test_ds, batch_size=32)


# LSTM Model

In [None]:
class IntentLSTM(nn.Module):
    def __init__(self, input_dim=768, hidden_dim=128, dropout=0.3):
        super().__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        _, (h, _) = self.lstm(x)
        h = self.dropout(h[-1])
        return self.fc(h)


# Training

In [None]:
def run_epoch(model, loader, optimizer=None):
    train = optimizer is not None
    total_loss, correct, total = 0, 0, 0

    if train:
        model.train()
    else:
        model.eval()

    for x, y in loader:
        x, y = x.to(device), y.to(device)

        if train:
            optimizer.zero_grad()

        out = model(x)
        loss = criterion(out, y)

        if train:
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

        total_loss += loss.item() * x.size(0)
        correct += (out.argmax(1) == y).sum().item()
        total += y.size(0)

    return total_loss / total, correct / total


In [None]:
model = IntentLSTM().to(device)

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=1e-3,
    weight_decay=1e-4
)

criterion = nn.CrossEntropyLoss()

best_state = None
best_val_loss = float("inf")
patience = 5
pat_left = patience

EPOCHS = 30

for epoch in range(EPOCHS):
    train_loss, train_acc = run_epoch(model, train_loader, optimizer)
    val_loss, val_acc     = run_epoch(model, val_loader)

    print(f"Epoch {epoch:02d} | "
          f"Train loss {train_loss:.3f} acc {train_acc:.3f} | "
          f"Val loss {val_loss:.3f} acc {val_acc:.3f}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_state = copy.deepcopy(model.state_dict())
        pat_left = patience
    else:
        pat_left -= 1
        if pat_left == 0:
            print("Early stopping")
            break

model.load_state_dict(best_state)


evaluation

In [None]:
model.eval()
ys, preds = [], []

with torch.no_grad():
    for x, y in test_loader:
        x = x.to(device)
        out = model(x)
        preds.extend(out.argmax(1).cpu().tolist())
        ys.extend(y.tolist())

print("Accuracy:", accuracy_score(ys, preds))
print("F1:", f1_score(ys, preds))
print("Confusion Matrix:\n", confusion_matrix(ys, preds))
print(classification_report(ys, preds, target_names=["not_crossing", "crossing"]))


# Visualize Predictions with Frames

In [None]:
feat_idx = pd.read_csv(FEATURE_INDEX)
seq_idx  = pd.read_csv(SEQ_INDEX)

# Merge to get video_id for each feature row
merged = feat_idx.merge(
    seq_idx[["seq_id", "video_id", "split"]],
    on=["seq_id", "split"],
    how="left"
)

# Keep only test split
test_merged = merged[merged["split"] == "test"].reset_index(drop=True)

print("Total test sequences:", len(test_merged))
print("Unique videos:", test_merged["video_id"].nunique())


In [None]:
# One random sequence per video
samples = (
    test_merged
    .groupby("video_id", group_keys=False)
    .apply(lambda x: x.sample(1))
    .sample(10, random_state=42)
    .reset_index(drop=True)
)

print("Sampled sequences from videos:")
print(samples[["video_id", "seq_id", "label"]])


In [None]:
def visualize_sequence(seq_dir, title):
    frames = sorted(os.listdir(seq_dir))
    picks = np.linspace(0, len(frames)-1, 8).astype(int)

    plt.figure(figsize=(16,3))
    for i, k in enumerate(picks):
        img = cv2.imread(os.path.join(seq_dir, frames[k]))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        plt.subplot(1, len(picks), i+1)
        plt.imshow(img)
        plt.axis("off")

    plt.suptitle(title, fontsize=13)
    plt.show()


In [None]:
model.eval()

for _, row in samples.iterrows():
    # Load features
    data = torch.load(resolve_pt_path(row))
    x = data["features"].unsqueeze(0).to(device)

    # Predict
    with torch.no_grad():
        logits = model(x)
        pred = logits.argmax(1).item()

    pred_label = "crossing" if pred == 1 else "not_crossing"
    true_label = row["label"]

    print("=" * 70)
    print(f"VIDEO  : {row['video_id']}")
    print(f"SEQ_ID : {row['seq_id']}")
    print(f"TRUE   : {true_label}")
    print(f"PRED   : {pred_label}")

    seq_dir = os.path.join(
        SEQ_ROOT,
        row["split"],
        row["video_id"],
        "sequences",
        row["seq_id"]
    )

    visualize_sequence(
        seq_dir,
        title=f"VIDEO {row['video_id']} | TRUE={true_label} | PRED={pred_label}"
    )


In [None]:
import pandas as pd

SEQ_INDEX = "/kaggle/input/dataset-sequences/intent_sequences_dataset/sequence_index_final.csv"

seq_idx = pd.read_csv(SEQ_INDEX)
print(seq_idx.head())


In [None]:
all_videos = set(seq_idx["video_id"].unique())

print("Used videos:", len(all_videos))


# Fine-Tuned parameters

In [None]:
import os, glob, json, shutil, random
from collections import Counter
import numpy as np
import pandas as pd
import cv2
import xml.etree.ElementTree as ET
import matplotlib.pyplot as plt

from ultralytics import YOLO


In [None]:
# Paths
IMG_ROOT = "/kaggle/input/vehic-ped-intuition/images"
XML_GLOB = "/kaggle/input/attributes-label/annotations_attributes/video_*_*.xml"
SPLITS = ["train", "val", "test"]

# YOLO + tracking
MODEL_PATH = "/kaggle/input/first-phase-model/weights/best.pt"
CONF = 0.33
IMGSZ = 640
TRACKER = "botsort.yaml"

# Minimal keep (dataset construction)
MIN_TRACK_LEN = 8

# Stability filter (anti-drift)
MIN_STABILITY_IOU = 0.25

# Duplicate suppression
DUP_IOU_THR = 0.70

# Cropping context
EXPAND_RATIO = 1.8

# Sequences
SEQ_LEN = 16
STRIDE = 4

# Temporal cutoff
USE_DECISION_POINT_IF_AVAILABLE = True
FALLBACK_CUTOFF_RATIO = 0.8

# Output
OUT_ROOT = "/kaggle/working/intent_sequences_dataset_clean"
os.makedirs(OUT_ROOT, exist_ok=True)

print("OUT_ROOT:", OUT_ROOT)


In [None]:
yolo = YOLO(MODEL_PATH)
print("Loaded YOLO:", MODEL_PATH)


In [None]:
def iou(a, b):
    x1, y1 = max(a[0], b[0]), max(a[1], b[1])
    x2, y2 = min(a[2], b[2]), min(a[3], b[3])
    inter = max(0, x2 - x1) * max(0, y2 - y1)
    areaA = max(0, (a[2] - a[0])) * max(0, (a[3] - a[1]))
    areaB = max(0, (b[2] - b[0])) * max(0, (b[3] - b[1]))
    return inter / (areaA + areaB - inter + 1e-6)

def track_stability(seq):
    if len(seq) < 2:
        return 0.0
    ious = [iou(seq[i-1][1], seq[i][1]) for i in range(1, len(seq))]
    return float(np.mean(ious))

def bbox_heights(seq):
    return np.array([(b[3] - b[1]) for (_, b, _) in seq], dtype=np.float32)

def track_metrics(seq):
    h = bbox_heights(seq)
    avg_conf = float(np.mean([c for (_, _, c) in seq])) if len(seq) else 0.0
    med_h = float(np.median(h)) if len(h) else 0.0
    growth = float(h[-1] - h[0]) if len(h) else 0.0
    return {
        "len": int(len(seq)),
        "avg_conf": avg_conf,
        "median_h": med_h,
        "growth_h": growth,
        "stability": track_stability(seq),
    }

def mean_iou_tracks(seq1, seq2):
    n = min(len(seq1), len(seq2))
    if n < 5:
        return 0.0
    return float(np.mean([iou(seq1[i][1], seq2[i][1]) for i in range(n)]))

def suppress_duplicates(tracks):
    tids = sorted(tracks.keys(), key=lambda t: len(tracks[t]), reverse=True)
    kept = {}
    for tid in tids:
        dup = False
        for kt in kept.keys():
            if mean_iou_tracks(tracks[tid], kept[kt]) > DUP_IOU_THR:
                dup = True
                break
        if not dup:
            kept[tid] = tracks[tid]
    return kept


In [None]:
def parse_pedestrian_attributes(xml_path):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    peds = []
    for ped in root.findall(".//pedestrian"):
        d = dict(ped.attrib)
        for k in ["crossing", "decision_point", "crossing_point"]:
            if k in d:
                try:
                    d[k] = int(d[k])
                except:
                    d[k] = -999
        peds.append(d)
    return peds

def label_from_ped_attr(ped):
    if ped.get("crossing") == 1:
        return "crossing"
    if ped.get("crossing") in [-1, 0]:
        return "not_crossing"
    return "unknown"

def find_xml_for_video(video_id):
    cands = [x for x in xml_files if video_id in os.path.basename(x)]
    if len(cands) == 0:
        return None
    return sorted(cands, key=lambda x: len(os.path.basename(x)))[0]


In [None]:
xml_files = sorted(glob.glob(XML_GLOB))
print("XML files:", len(xml_files))

def find_xml_for_video(video_id):
    candidates = [p for p in xml_files if video_id in os.path.basename(p)]
    if len(candidates) == 1:
        return candidates[0]
    if len(candidates) > 1:
        candidates = sorted(candidates, key=lambda p: len(os.path.basename(p)))
        return candidates[0]
    return None

def parse_pedestrian_attributes(xml_path):
    try:
        tree = ET.parse(xml_path)
        root = tree.getroot()
    except Exception:
        return []

    peds = []
    for ped in root.findall(".//pedestrian"):
        d = dict(ped.attrib)
        for k in ["crossing", "decision_point", "crossing_point"]:
            if k in d:
                try:
                    d[k] = int(d[k])
                except:
                    d[k] = -1
        peds.append(d)
    return peds

def label_from_ped_attr(ped_attr):
    c = ped_attr.get("crossing", None)
    if c == 1:
        return "crossing"
    if c in [0, -1]:
        return "not_crossing"
    return "unknown"


In [None]:
def list_video_ids(split):
    img_dir = f"{IMG_ROOT}/{split}"
    return sorted(set(f.split("_f")[0] for f in os.listdir(img_dir)))

def list_frames(split, video_id):
    img_dir = f"{IMG_ROOT}/{split}"
    frames = sorted([
        os.path.join(img_dir, f)
        for f in os.listdir(img_dir)
        if f.startswith(video_id)
    ])
    return frames


In [None]:
def run_tracking(frames):
    track_db = {}
    for fidx, frame_path in enumerate(frames):
        img = cv2.imread(frame_path)
        if img is None:
            continue

        r = yolo.track(
            img,
            conf=CONF,
            imgsz=IMGSZ,
            persist=True,
            tracker=TRACKER,
            verbose=False
        )[0]

        if r.boxes is None or r.boxes.id is None:
            continue

        boxes = r.boxes.xyxy.cpu().numpy()
        ids   = r.boxes.id.cpu().numpy().astype(int)
        confs = r.boxes.conf.cpu().numpy()

        for box, tid, c in zip(boxes, ids, confs):
            if tid == -1:
                continue
            track_db.setdefault(int(tid), []).append((fidx, box, float(c)))

    return track_db


In [None]:
def filter_tracks(track_db):
    # length
    t = {tid: seq for tid, seq in track_db.items() if len(seq) >= MIN_TRACK_LEN}
    # stability
    t = {tid: seq for tid, seq in t.items() if track_stability(seq) >= MIN_STABILITY_IOU}
    # duplicates
    t = suppress_duplicates(t)
    return t


In [None]:
def select_best_track(tracks):
    best_tid, best_score = None, -1e9
    for tid, seq in tracks.items():
        m = track_metrics(seq)
        score = (
            1.0 * m["len"] +
            0.03 * m["median_h"] +
            2.0 * m["avg_conf"] +
            0.02 * m["growth_h"] +
            5.0 * m["stability"]
        )
        if score > best_score:
            best_score = score
            best_tid = tid
    return best_tid, best_score


In [None]:
def draw_tracks(img_bgr, boxes, ids, color=(0,255,255)):
    for box, tid in zip(boxes, ids):
        x1,y1,x2,y2 = map(int, box)
        cv2.rectangle(img_bgr, (x1,y1), (x2,y2), color, 2)
        cv2.putText(img_bgr, f"ID {tid}", (x1, max(0,y1-5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

def save_tracking_video(frames, track_db, out_path, fps=10, only_tids=None, title=None):
    first = cv2.imread(frames[0])
    H, W = first.shape[:2]
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    vw = cv2.VideoWriter(out_path, fourcc, fps, (W, H))

    # Build quick lookup: frame -> list of (box, tid)
    by_frame = {}
    for tid, seq in track_db.items():
        if only_tids is not None and tid not in only_tids:
            continue
        for fidx, box, _ in seq:
            by_frame.setdefault(fidx, []).append((box, tid))

    for fidx, fp in enumerate(frames):
        img = cv2.imread(fp)
        if img is None:
            continue
        items = by_frame.get(fidx, [])
        if items:
            boxes = [b for (b, _) in items]
            ids   = [t for (_, t) in items]
            draw_tracks(img, boxes, ids)
        if title:
            cv2.putText(img, title, (10,30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255,255,255), 2)
        vw.write(img)

    vw.release()
    print("Saved video:", out_path)


In [None]:
def apply_temporal_cutoff(track_seq, ped_attr):
    if not USE_DECISION_POINT_IF_AVAILABLE:
        cut = int(len(track_seq) * FALLBACK_CUTOFF_RATIO)
        return track_seq[:max(cut, 1)]

    dp = ped_attr.get("decision_point", -1)
    if isinstance(dp, int) and dp >= 0:
        seq = [x for x in track_seq if x[0] <= dp]
        if len(seq) > 0:
            return seq

    cut = int(len(track_seq) * FALLBACK_CUTOFF_RATIO)
    return track_seq[:max(cut, 1)]

def build_windows(track_seq, seq_len=16, stride=4):
    if len(track_seq) < seq_len:
        return []
    return [track_seq[i:i+seq_len] for i in range(0, len(track_seq)-seq_len+1, stride)]

def crop_with_context(frame_path, box, expand_ratio=1.8):
    img = cv2.imread(frame_path)
    if img is None:
        return None
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    H, W = img.shape[:2]

    x1, y1, x2, y2 = map(float, box)
    cx, cy = (x1 + x2) / 2, (y1 + y2) / 2
    bw, bh = (x2 - x1) * expand_ratio, (y2 - y1) * expand_ratio

    nx1 = int(max(0, cx - bw / 2))
    ny1 = int(max(0, cy - bh / 2))
    nx2 = int(min(W, cx + bw / 2))
    ny2 = int(min(H, cy + bh / 2))

    if nx2 <= nx1 or ny2 <= ny1:
        return None
    crop = img[ny1:ny2, nx1:nx2]
    return crop if crop.size else None


In [None]:
split = "train"
video_id = random.choice(list_video_ids(split))
frames = list_frames(split, video_id)

print("Smoking video:", video_id, "frames:", len(frames))

# XML
xml_path = find_xml_for_video(video_id)
peds = parse_pedestrian_attributes(xml_path) if xml_path else []
ped = peds[0] if len(peds)==1 else {"decision_point":-1, "crossing":-1}
label = label_from_ped_attr(ped) if len(peds)==1 else "unknown"

# Tracking raw
track_raw = run_tracking(frames)

# Save raw tracking video
raw_path = f"{OUT_ROOT}/{video_id}_tracking_raw.mp4"
save_tracking_video(frames, track_raw, raw_path, fps=10, title="RAW TRACKS")

# Filter tracks
track_f = filter_tracks(track_raw)

filtered_path = f"{OUT_ROOT}/{video_id}_tracking_filtered.mp4"
save_tracking_video(frames, track_f, filtered_path, fps=10, title="FILTERED TRACKS")

print("Raw tracks:", len(track_raw), "Filtered tracks:", len(track_f), "Label:", label)

# If labeled, select best track and save its tracking video
if label in ["crossing", "not_crossing"] and len(track_f) > 0:
    best_tid, score = select_best_track(track_f)
    best_path = f"{OUT_ROOT}/{video_id}_tracking_best_tid{best_tid}.mp4"
    save_tracking_video(frames, track_f, best_path, fps=10, only_tids={best_tid}, title=f"BEST TRACK {best_tid}")
    print("Best tid:", best_tid, "score:", score)


In [None]:
def show_original_vs_crop(frames, track_seq, expand=EXPAND_RATIO):
    # take 1st, middle, last
    picks = [track_seq[0], track_seq[len(track_seq)//2], track_seq[-1]]
    plt.figure(figsize=(12,6))

    for i, (fidx, box, conf) in enumerate(picks):
        orig = cv2.imread(frames[fidx])
        orig = cv2.cvtColor(orig, cv2.COLOR_BGR2RGB)
        x1,y1,x2,y2 = map(int, box)
        orig2 = orig.copy()
        cv2.rectangle(orig2, (x1,y1),(x2,y2),(0,255,0),2)

        crop = crop_with_context(frames[fidx], box, expand)

        plt.subplot(2,3,i+1)
        plt.imshow(orig2)
        plt.title(f"Original f{fidx}")
        plt.axis("off")

        plt.subplot(2,3,3+i+1)
        plt.imshow(crop)
        plt.title(f"Crop f{fidx}")
        plt.axis("off")

    plt.tight_layout()
    plt.show()

if label in ["crossing","not_crossing", "unknown"] and len(track_f)>0:
    best_tid, _ = select_best_track(track_f)
    seq = track_f[best_tid]
    show_original_vs_crop(frames, seq, expand=EXPAND_RATIO)


# Cleaning XML 

In [None]:
import glob
import os
import pandas as pd
import xml.etree.ElementTree as ET

XML_GLOB = "/kaggle/input/attributes-label/annotations_attributes/video_*_*.xml"
OUT_CSV  = "/kaggle/working/clean_video_labels.csv"


In [None]:
def parse_xml_peds(xml_path):
    rows = []
    base = os.path.splitext(os.path.basename(xml_path))[0]

    # Normalize ID: video_0001_attributes ‚Üí video_0001
    video_id = base.replace("_attributes", "")

    try:
        root = ET.parse(xml_path).getroot()
    except Exception:
        return rows

    for ped in root.findall(".//pedestrian"):
        try:
            crossing = int(ped.attrib.get("crossing", -1))
        except:
            crossing = -1

        rows.append({
            "video_id": video_id,
            "crossing": crossing
        })

    return rows


In [None]:
xml_files = sorted(glob.glob(XML_GLOB))
print("XML files found:", len(xml_files))

rows = []
for xp in xml_files:
    rows.extend(parse_xml_peds(xp))

df_peds = pd.DataFrame(rows)
print("Total pedestrian annotations:", len(df_peds))


In [None]:
df_peds = df_peds[df_peds["crossing"].isin([0, 1])].copy()

print("After removing unknown (-1):", len(df_peds))
print(df_peds["crossing"].value_counts())


In [None]:
df_video_labels = (
    df_peds
    .groupby("video_id")["crossing"]
    .apply(lambda s: 1 if (s == 1).any() else 0)
    .reset_index()
)

df_video_labels.columns = ["video_id", "label"]


In [None]:
df_video_labels.to_csv(OUT_CSV, index=False)

print("Saved CSV to:", OUT_CSV)
print("Total labeled videos:", len(df_video_labels))
print(df_video_labels["label"].value_counts())


# Checking again 

In [None]:
import os, glob, random
import pandas as pd
import numpy as np
import cv2
from ultralytics import YOLO


In [None]:
LABELS_CSV = "/kaggle/working/clean_video_labels.csv"
labels_df = pd.read_csv(LABELS_CSV)

label_map = dict(zip(labels_df["video_id"], labels_df["label"]))
print("Loaded labels:", len(label_map))
print(labels_df["label"].value_counts())


In [None]:
def list_video_ids(split):
    img_dir = f"{IMG_ROOT}/{split}"
    return sorted(set(f.split("_f")[0] for f in os.listdir(img_dir)))

def list_frames(split, video_id):
    img_dir = f"{IMG_ROOT}/{split}"
    return sorted(
        os.path.join(img_dir, f)
        for f in os.listdir(img_dir)
        if f.startswith(video_id)
    )

split = "train"
video_ids = [v for v in list_video_ids(split) if v in label_map]
video_id = random.choice(video_ids)

frames = list_frames(split, video_id)
label = label_map[video_id]

print(video_id, "frames:", len(frames), "label:", label)


In [None]:
img = cv2.imread(frames[len(frames)//2])
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img); plt.axis("off"); plt.title(f"{video_id} | label={label}")


In [None]:
CONF = 0.33
MIN_TRACK_LEN = 8
MIN_STABILITY_IOU = 0.25
TRACKER = "botsort.yaml"


In [None]:
def run_tracking(frames):
    yolo_local = YOLO(MODEL_PATH)  # reset tracker
    track_db = {}

    for fidx, fp in enumerate(frames):
        img = cv2.imread(fp)
        if img is None:
            continue

        r = yolo_local.track(
            img,
            conf=CONF,
            imgsz=IMGSZ,
            persist=True,
            tracker=TRACKER,
            verbose=False
        )[0]

        if r.boxes is None or r.boxes.id is None:
            continue

        boxes = r.boxes.xyxy.cpu().numpy()
        ids   = r.boxes.id.cpu().numpy().astype(int)
        confs = r.boxes.conf.cpu().numpy()

        seen = set()
        for box, tid, c in zip(boxes, ids, confs):
            if tid == -1 or tid in seen:
                continue
            seen.add(tid)
            track_db.setdefault(tid, []).append((fidx, box, float(c)))

    return track_db


In [None]:
def filter_tracks(track_db):
    t = {k:v for k,v in track_db.items() if len(v) >= MIN_TRACK_LEN}
    t = {k:v for k,v in t.items() if track_stability(v) >= MIN_STABILITY_IOU}
    return suppress_duplicates(t)


In [None]:
track_f = filter_tracks(track_raw)

save_tracking_video(
    frames,
    track_f,
    f"{OUT_ROOT}/{video_id}_FILTERED.mp4",
    title="FILTERED TRACKS"
)

print("Filtered tracks:", len(track_f))


In [None]:
best_tid = max(track_f, key=lambda t: track_score(track_f[t]))
best_seq = track_f[best_tid]
print("Best track ID:", best_tid)


In [None]:
best_seq = sorted(best_seq, key=lambda x: x[0])
locked_track = {best_tid: best_seq}


In [None]:
save_tracking_video(
    frames,
    locked_track,
    f"{OUT_ROOT}/{video_id}_BEST_LOCKED.mp4",
    title=f"BEST LOCKED | label={label}"
)


In [None]:
def crop_with_context(frame_path, box, expand_ratio=1.8):
    img = cv2.imread(frame_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    H, W = img.shape[:2]

    x1,y1,x2,y2 = map(float, box)
    cx,cy = (x1+x2)/2,(y1+y2)/2
    bw,bh = (x2-x1)*expand_ratio,(y2-y1)*expand_ratio

    nx1 = int(max(0, cx-bw/2))
    ny1 = int(max(0, cy-bh/2))
    nx2 = int(min(W, cx+bw/2))
    ny2 = int(min(H, cy+bh/2))

    return img[ny1:ny2, nx1:nx2]


In [None]:
samples = [best_seq[0], best_seq[len(best_seq)//2], best_seq[-1]]

plt.figure(figsize=(12,4))
for i,(fidx,box,_) in enumerate(samples):
    crop = crop_with_context(frames[fidx], box)
    plt.subplot(1,3,i+1)
    plt.imshow(crop); plt.axis("off")
plt.show()


In [None]:
def build_windows(seq, seq_len=16, stride=4):
    if len(seq) < seq_len:
        return []
    return [seq[i:i+seq_len] for i in range(0, len(seq)-seq_len+1, stride)]


In [None]:
windows = build_windows(best_seq, SEQ_LEN, STRIDE)
print("Windows:", len(windows))


In [None]:
w = windows[len(windows)//2]

plt.figure(figsize=(12,4))
for i,(fidx,box,_) in enumerate(w[:3]):
    crop = crop_with_context(frames[fidx], box)
    plt.subplot(1,3,i+1)
    plt.imshow(crop); plt.axis("off")
plt.suptitle(f"Sequence sample | label={label}")
plt.show()
