# tensorflow

In [11]:
# Purpose: Imports for TensorFlow video inference
import os
from pathlib import Path
import cv2
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.applications import MobileNetV2
import numpy as np
from PIL import Image
import threading
import queue
import time

In [12]:
# Purpose: TensorFlow config - paths, grid settings, thresholds (edit these)
VIDEO_PATH_TF = r"C:\Users\thaim\Videos\AI_LEDS\second_dataset\preparations\detector_video\30.mp4"
WEIGHTS_PATH_TF = r"C:\Users\thaim\Videos\AI_LEDS\second_dataset\preparations\model_output\detector_classifier_mobilenetv2_final (3).h5"
OUTPUT_VIDEO_PATH_TF = r"C:\Users\thaim\Videos\AI_LEDS\second_dataset\video_test_output_tf"

# Model and grid settings
MODEL_INPUT_SIZE_TF = 224
BOX_SIZE_TF = 448  # Larger boxes = fewer predictions = faster
OVERLAP_PERCENT_TF = 0
PROCESS_EVERY_N_FRAMES_TF = 5  # Process every 5th frame
CONF_THRESHOLD_TF = 0.5

# Display and output
DISPLAY_WINDOW_TF = True
SAVE_OUTPUT_TF = True
DRAW_GRID_TF = True
FONT_TF = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE_TF = 0.4
THICKNESS_TF = 1

# Color BGR tuples
COLOR_POS_TF = (0, 200, 0)
COLOR_NEG_TF = (0, 0, 200)
COLOR_GRID_TF = (180, 180, 180)

SEED_TF = 42


In [None]:
# Load TensorFlow model directly from saved file
model_tf = keras.models.load_model(WEIGHTS_PATH_TF)
model_tf.compile()
print(f'✓ Model loaded from {WEIGHTS_PATH_TF}')

def compute_stride(box_size, overlap_percent):
    overlap_fraction = overlap_percent / 100.0
    return max(1, int(box_size * (1 - overlap_fraction)))

def preprocess_tf(crop, target_size):
    # MATCH TRAINING: Use PIL + keras preprocessing (NOT cv2!)
    # Convert BGR (cv2) to RGB then to PIL Image
    rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(rgb)
    # Resize using PIL (matches keras.load_img behavior)
    pil_img = pil_img.resize((target_size, target_size), Image.BILINEAR)
    # Convert to array using keras method (matches training exactly)
    img_array = keras.preprocessing.image.img_to_array(pil_img)
    img_array = img_array / 255.0
    return img_array

stride_tf = compute_stride(BOX_SIZE_TF, OVERLAP_PERCENT_TF)

cap_tf = cv2.VideoCapture(VIDEO_PATH_TF)
if not cap_tf.isOpened():
    raise RuntimeError(f'Could not open video: {VIDEO_PATH_TF}')

fps_tf = cap_tf.get(cv2.CAP_PROP_FPS) or 30
width_tf = int(cap_tf.get(cv2.CAP_PROP_FRAME_WIDTH))
height_tf = int(cap_tf.get(cv2.CAP_PROP_FRAME_HEIGHT))
delay_ms_tf = int(1000 / fps_tf)

writer_tf = None
if SAVE_OUTPUT_TF and OUTPUT_VIDEO_PATH_TF:
    fourcc_tf = cv2.VideoWriter_fourcc(*'mp4v')
    writer_tf = cv2.VideoWriter(OUTPUT_VIDEO_PATH_TF, fourcc_tf, fps_tf, (width_tf, height_tf))

frame_queue_tf = queue.Queue(maxsize=1)
last_overlay_tf = None
processed_frames_tf = 0

# Worker thread does inference with BATCHED predictions (all boxes at once)
def worker_tf():
    global last_overlay_tf, processed_frames_tf
    while True:
        frame_tf = frame_queue_tf.get()
        if frame_tf is None:
            break
        overlay_tf = frame_tf.copy()
        
        # Collect all boxes first
        boxes_data = []
        for y_tf in range(0, height_tf - BOX_SIZE_TF + 1, stride_tf):
            for x_tf in range(0, width_tf - BOX_SIZE_TF + 1, stride_tf):
                crop_tf = frame_tf[y_tf:y_tf + BOX_SIZE_TF, x_tf:x_tf + BOX_SIZE_TF]
                if crop_tf.shape[0] == BOX_SIZE_TF and crop_tf.shape[1] == BOX_SIZE_TF:
                    boxes_data.append((x_tf, y_tf, crop_tf))
        
        # Batch predict all boxes at once (MUCH faster than individual predictions)
        if boxes_data:
            batch_tensors = np.array([preprocess_tf(box[2], MODEL_INPUT_SIZE_TF) for box in boxes_data])
            probs_tf = model_tf.predict(batch_tensors, verbose=0)[:, 0]
            
            # Draw results
            for idx, (x_tf, y_tf, _) in enumerate(boxes_data):
                prob_tf = probs_tf[idx]
                is_det_tf = (prob_tf >= CONF_THRESHOLD_TF)
                
                if DRAW_GRID_TF:
                    cv2.rectangle(overlay_tf, (x_tf, y_tf), (x_tf + BOX_SIZE_TF, y_tf + BOX_SIZE_TF), COLOR_GRID_TF, 1)
                
                color_tf = COLOR_POS_TF if is_det_tf else COLOR_NEG_TF
                cv2.rectangle(overlay_tf, (x_tf, y_tf), (x_tf + BOX_SIZE_TF, y_tf + BOX_SIZE_TF), color_tf, 2)
                label_tf = f"{prob_tf*100:.1f}%"
                cv2.putText(overlay_tf, label_tf, (x_tf + 4, y_tf + 20), FONT_TF, FONT_SCALE_TF, color_tf, THICKNESS_TF, cv2.LINE_AA)
        
        last_overlay_tf = overlay_tf
        processed_frames_tf += 1

worker_thread_tf = threading.Thread(target=worker_tf, daemon=True)
worker_thread_tf.start()

frame_idx_tf = 0

while True:
    ret_tf, frame_tf = cap_tf.read()
    if not ret_tf:
        cap_tf.set(cv2.CAP_PROP_POS_FRAMES, 0)
        frame_idx_tf = 0
        continue

    if frame_idx_tf % PROCESS_EVERY_N_FRAMES_TF == 0 and frame_queue_tf.empty():
        frame_queue_tf.put(frame_tf.copy())

    overlay_tf = last_overlay_tf if last_overlay_tf is not None else frame_tf

    if writer_tf is not None:
        writer_tf.write(overlay_tf)
    if DISPLAY_WINDOW_TF:
        cv2.imshow('TensorFlow Model Grid Predictions', overlay_tf)
        if cv2.waitKey(delay_ms_tf) & 0xFF == 27:
            break

    frame_idx_tf += 1

frame_queue_tf.put(None)
worker_thread_tf.join(timeout=2)
cap_tf.release()
if writer_tf is not None:
    writer_tf.release()
cv2.destroyAllWindows()

print(f'✓ TensorFlow done: processed {processed_frames_tf} inference frames')




✓ Model loaded from C:\Users\thaim\Videos\AI_LEDS\second_dataset\preparations\model_output\detector_classifier_mobilenetv2_final (3).h5


In [15]:
# Test on single random frame from video - standalone (requires only Cell 2 & 3)
import random

# Load model if not already loaded
if 'model_tf' not in dir():
    model_tf = keras.models.load_model(WEIGHTS_PATH_TF)
    model_tf.compile()
    print(f'✓ Model loaded from {WEIGHTS_PATH_TF}')

# Define preprocessing function - MATCHES TRAINING EXACTLY
def preprocess_single(crop, target_size):
    # Use PIL + keras preprocessing (NOT cv2) to match training
    rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
    pil_img = Image.fromarray(rgb)
    pil_img = pil_img.resize((target_size, target_size), Image.BILINEAR)
    img_array = keras.preprocessing.image.img_to_array(pil_img)
    img_array = img_array / 255.0
    return img_array

# Calculate stride
def compute_stride_single(box_size, overlap_percent):
    overlap_fraction = overlap_percent / 100.0
    return max(1, int(box_size * (1 - overlap_fraction)))

stride_single = compute_stride_single(BOX_SIZE_TF, OVERLAP_PERCENT_TF)

cap_single = cv2.VideoCapture(VIDEO_PATH_TF)
total_frames = int(cap_single.get(cv2.CAP_PROP_FRAME_COUNT))
width_single = int(cap_single.get(cv2.CAP_PROP_FRAME_WIDTH))
height_single = int(cap_single.get(cv2.CAP_PROP_FRAME_HEIGHT))
random_frame_idx = random.randint(0, total_frames - 1)
cap_single.set(cv2.CAP_PROP_POS_FRAMES, random_frame_idx)
ret_single, frame_single = cap_single.read()
cap_single.release()

if not ret_single:
    raise RuntimeError(f'Could not read frame {random_frame_idx}')

print(f'Testing frame {random_frame_idx}/{total_frames} ({width_single}×{height_single})')

# Same grid inference as video worker
overlay_single = frame_single.copy()
boxes_data_single = []

for y_s in range(0, height_single - BOX_SIZE_TF + 1, stride_single):
    for x_s in range(0, width_single - BOX_SIZE_TF + 1, stride_single):
        crop_s = frame_single[y_s:y_s + BOX_SIZE_TF, x_s:x_s + BOX_SIZE_TF]
        if crop_s.shape[0] == BOX_SIZE_TF and crop_s.shape[1] == BOX_SIZE_TF:
            boxes_data_single.append((x_s, y_s, crop_s))

# Batch predict all boxes at once
if boxes_data_single:
    batch_tensors_single = np.array([preprocess_single(box[2], MODEL_INPUT_SIZE_TF) for box in boxes_data_single])
    probs_single = model_tf.predict(batch_tensors_single, verbose=0)[:, 0]
    
    # Draw results
    for idx, (x_s, y_s, _) in enumerate(boxes_data_single):
        prob_s = probs_single[idx]
        is_det_s = (prob_s >= CONF_THRESHOLD_TF)
        
        if DRAW_GRID_TF:
            cv2.rectangle(overlay_single, (x_s, y_s), (x_s + BOX_SIZE_TF, y_s + BOX_SIZE_TF), COLOR_GRID_TF, 1)
        
        color_s = COLOR_POS_TF if is_det_s else COLOR_NEG_TF
        cv2.rectangle(overlay_single, (x_s, y_s), (x_s + BOX_SIZE_TF, y_s + BOX_SIZE_TF), color_s, 2)
        label_s = f"{prob_s*100:.1f}%"
        cv2.putText(overlay_single, label_s, (x_s + 4, y_s + 20), FONT_TF, FONT_SCALE_TF, color_s, THICKNESS_TF, cv2.LINE_AA)

# Display result
cv2.imshow('Single Frame Test', overlay_single)
print(f'Processed {len(boxes_data_single)} boxes - Press any key to close')
cv2.waitKey(0)
cv2.destroyAllWindows()


Testing frame 79/132 (1280×960)
Processed 4 boxes - Press any key to close


# torch

In [None]:
# Purpose: Imports for video handling, model inference, and utilities
import os
from pathlib import Path
import cv2
import torch
import torch.nn as nn
import torchvision.transforms as transforms
import torchvision.models as models
import numpy as np
from PIL import Image
import threading
import queue
import time

In [None]:
# Purpose: Configurable paths, grid settings, and thresholds (edit these)
VIDEO_PATH = r"C:\Users\thaim\Videos\AI_LEDS\second_dataset\preparations\detector_video"
WEIGHTS_PATH = r"C:\Users\thaim\Videos\AI_LEDS\first_dataset\preparations\model_output"
OUTPUT_VIDEO_PATH = r"C:\Users\thaim\Videos\AI_LEDS\second_dataset\video_test_output"

# Model and grid settings
MODEL_INPUT_SIZE = 224  # matches training
BOX_SIZE = 224          # grid box size (can change)
OVERLAP_PERCENT = 0     # 0-90, controls stride; 0 means no overlap
PROCESS_EVERY_N_FRAMES = 1  # process every Nth frame
CONF_THRESHOLD = 0.5        # probability threshold for detector

# Display and output
DISPLAY_WINDOW = True
SAVE_OUTPUT = True
DRAW_GRID = True
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 0.4
THICKNESS = 1

# Color BGR tuples
COLOR_POS = (0, 200, 0)
COLOR_NEG = (0, 0, 200)
COLOR_GRID = (180, 180, 180)

# Fixed seed for any randomness (not used in deterministic grid)
SEED = 42

In [None]:
class DetectorClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.base = models.mobilenet_v2(weights='IMAGENET1K_V1')
        self.base.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(1280, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        return self.base(x)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = DetectorClassifier().to(device)
model.load_state_dict(torch.load(WEIGHTS_PATH, map_location=device, weights_only=True))
model.eval()

transform = transforms.Compose([
    transforms.Resize((MODEL_INPUT_SIZE, MODEL_INPUT_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

def compute_stride(box_size, overlap_percent):
    overlap_fraction = overlap_percent / 100.0
    return max(1, int(box_size * (1 - overlap_fraction)))

stride = compute_stride(BOX_SIZE, OVERLAP_PERCENT)

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f'Could not open video: {VIDEO_PATH}')

fps = cap.get(cv2.CAP_PROP_FPS) or 30
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

writer = None
if SAVE_OUTPUT and OUTPUT_VIDEO_PATH:
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, fps, (width, height))

frame_queue = queue.Queue(maxsize=1)
last_overlay = None
processed_frames = 0

# Worker thread does inference so display loop stays smooth
def worker():
    global last_overlay, processed_frames
    while True:
        frame = frame_queue.get()
        if frame is None:
            break
        overlay = frame.copy()
        with torch.no_grad():
            for y in range(0, height - BOX_SIZE + 1, stride):
                for x in range(0, width - BOX_SIZE + 1, stride):
                    crop = frame[y:y + BOX_SIZE, x:x + BOX_SIZE]
                    process_now = (crop.shape[0] == BOX_SIZE and crop.shape[1] == BOX_SIZE)
                    if not process_now:
                        continue
                    
                    rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
                    pil_img = Image.fromarray(rgb)
                    tensor = transform(pil_img).unsqueeze(0).to(device)
                    prob = model(tensor).cpu().item()
                    is_det = (prob >= CONF_THRESHOLD)
                    
                    if DRAW_GRID:
                        color = COLOR_GRID
                        cv2.rectangle(overlay, (x, y), (x + BOX_SIZE, y + BOX_SIZE), color, 1)
                    
                    color = COLOR_POS if is_det else COLOR_NEG
                    if is_det or True:
                        cv2.rectangle(overlay, (x, y), (x + BOX_SIZE, y + BOX_SIZE), color, 1)
                        label = f"{prob*100:.1f}%"
                        cv2.putText(overlay, label, (x + 4, y + 14), FONT, FONT_SCALE, color, THICKNESS, cv2.LINE_AA)
        last_overlay = overlay
        processed_frames += 1

worker_thread = threading.Thread(target=worker, daemon=True)
worker_thread.start()

frame_idx = 0

while True:
    ret, frame = cap.read()
    if not ret:
        # Loop video instead of stopping
        cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
        frame_idx = 0
        continue

    if frame_idx % PROCESS_EVERY_N_FRAMES == 0 and frame_queue.empty():
        frame_queue.put(frame.copy())

    overlay = last_overlay if last_overlay is not None else frame

    if writer is not None:
        writer.write(overlay)
    if DISPLAY_WINDOW:
        cv2.imshow('Model Grid Predictions', overlay)
        if cv2.waitKey(1) & 0xFF == 27:
            break

    frame_idx += 1

frame_queue.put(None)
worker_thread.join(timeout=2)
cap.release()
if writer is not None:
    writer.release()
cv2.destroyAllWindows()

print(f'✓ PyTorch done: processed {processed_frames} inference frames')
