In [1]:
from pathlib import Path
import torch
import cv2
from ultralytics import YOLO
import math
from IPython.display import display, clear_output
from PIL import Image
import numpy as np

In [2]:
# Video verkleinern
import cv2

input_path = "altendorfer_1.mp4"
output_path = "altendorfer_2.mp4"
scale = 0.3  # 0.3 = 30% der Originalgröße

cap = cv2.VideoCapture(input_path)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) * scale)
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) * scale)
fps = cap.get(cv2.CAP_PROP_FPS)

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frame = cv2.resize(frame, (w, h), interpolation=cv2.INTER_AREA)
    out.write(frame)

cap.release()
out.release()
print("Fertig:", output_path)

Fertig: altendorfer_2.mp4


In [3]:
# Pfade/Dateinamen (plattformunabhängig)
MODEL_PATH   = Path("best.pt")            # YOLO11-Gewichte (.pt)
VIDEO_PATH   = Path("altendorfer_2.mp4")     # Eingabevideo
OUTPUT_VIDEO = Path(f"{VIDEO_PATH.stem}_bboxes.mp4")  # Ausgabevideo

# Inferenz-Parameter:
CONF_THRES     = 0.25   # Mindest-Konfidenz für Detections
IOU_THRES      = 0.45   # IoU-Schwelle für NMS (Dubletten-Unterdrückung)
DISPLAY_EVERY  = 10     # Jeden n-ten Frame inline anzeigen (0 = nie)

# Device-Handhabung:
DEVICE_AUTO = True      # True = automatisch wählen (cuda falls verfügbar, sonst cpu)
DEVICE      = "cpu"     # Wird nur benutzt, wenn DEVICE_AUTO = False

In [4]:
# Device bestimmen (fail-safe)
_device = ("cuda" if torch.cuda.is_available() else "cpu") if DEVICE_AUTO else DEVICE

# Sanity-Checks
if not MODEL_PATH.exists():
    raise FileNotFoundError(f"Modell nicht gefunden: {MODEL_PATH}")
if not VIDEO_PATH.exists():
    raise FileNotFoundError(f"Video nicht gefunden: {VIDEO_PATH}")

# YOLOv11 laden
model = YOLO(str(MODEL_PATH))  # Gerät wird später beim Predict übergeben
print(f"Torch: {torch.__version__} | OpenCV: {cv2.__version__} | Device: {_device}")


Torch: 2.8.0+cpu | OpenCV: 4.12.0 | Device: cpu


In [5]:
_video_abs = Path(VIDEO_PATH).resolve()
_out_abs   = Path(OUTPUT_VIDEO).resolve()

if not _video_abs.is_file():
    raise FileNotFoundError(f"Video-Datei nicht gefunden: {_video_abs}")

cap = cv2.VideoCapture(str(_video_abs))
if not cap.isOpened():
    raise RuntimeError(f"Konnte Video nicht öffnen: {_video_abs}")

fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
w   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if w == 0 or h == 0:
    raise RuntimeError(f"Ungültige Videodimensionen für: {_video_abs}")

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(_out_abs), fourcc, fps, (w, h))

def _show(frame_bgr):
    rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    clear_output(wait=True); display(Image.fromarray(rgb))

print(f"Video: {_video_abs} | {w}x{h}@{fps:.2f} -> Ausgabe: {_out_abs}")


Video: C:\Develop\Python\PyCharmProjects\iu_computer_vision\analysis\notebooks\altendorfer_2.mp4 | 576x324@29.97 -> Ausgabe: C:\Develop\Python\PyCharmProjects\iu_computer_vision\analysis\notebooks\altendorfer_2_bboxes.mp4


In [None]:
# MicronNetPlus laden & für ROIs nutzbar machen – NUR torch, cv2, numpy
import torch, torch.nn as nn
import cv2, numpy as np

# ==== Architektur aus dem Training (ohne externe Module) ====
IMG_SIZE  = 48
N_CLASSES = 43
USE_MISH  = True  # falls du im Training False hattest, hier auch auf False setzen

class Mish(nn.Module):
    def forward(self, x):
        return x * torch.tanh(torch.nn.functional.softplus(x))

def act_layer():
    return Mish() if USE_MISH else nn.ReLU(inplace=True)

class MicronNetPlus(nn.Module):
    def __init__(self, n_classes=43):
        super().__init__()
        A = act_layer
        self.features = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1), nn.BatchNorm2d(32), A(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.BatchNorm2d(64), A(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.BatchNorm2d(128), A(), nn.MaxPool2d(2),
        )
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128*6*6, 256), A(), nn.Dropout(0.4),
            nn.Linear(256, n_classes)
        )
    def forward(self, x):
        return self.classifier(self.features(x))

# ==== Checkpoint laden (speichert unter "model") ====
path_ckpt = r"micronnet_best.pt"
ckpt = torch.load(path_ckpt, map_location="cpu")
key = "model" if "model" in ckpt else (
      "model_state_dict" if "model_state_dict" in ckpt else (
      "state_dict" if "state_dict" in ckpt else None))
if key is None:
    raise ValueError(f"Checkpoint-Schlüssel nicht gefunden. Verfügbare Keys: {list(ckpt.keys())}")

model_cls = MicronNetPlus(N_CLASSES)
# Eventuelles 'module.'-Präfix entfernen
sd = ckpt[key]
if any(k.startswith("module.") for k in sd.keys()):
    sd = {k.replace("module.","",1): v for k,v in sd.items()}
missing, unexpected = model_cls.load_state_dict(sd, strict=False)
if missing or unexpected:
    print("[WARN] load_state_dict:", "missing:", missing, "unexpected:", unexpected)
model_cls.eval()

# ==== Klassenliste (optional echte GTSRB-Namen einsetzen) ====
GTSRB_CLASSES = [str(i) for i in range(N_CLASSES)]

@torch.no_grad()
def classify_sign(crop_bgr: np.ndarray) -> str:
    # Preprocessing wie im Training: Resize 48x48, ToTensor, Normalize(mean=std=0.5)
    img = cv2.resize(crop_bgr, (IMG_SIZE, IMG_SIZE), interpolation=cv2.INTER_AREA)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    img = (img - 0.5) / 0.5
    t = torch.from_numpy(img).permute(2, 0, 1).unsqueeze(0)  # 1x3x48x48
    logits = model_cls(t)
    cls_id = int(torch.argmax(logits, dim=1))
    return GTSRB_CLASSES[cls_id]

# Kurztest
_ = classify_sign(np.zeros((60,60,3), dtype=np.uint8))
print("MicronNetPlus geladen & classify_sign bereit.")


[WARN] load_state_dict: missing: ['features.1.weight', 'features.1.bias', 'features.1.running_mean', 'features.1.running_var', 'features.4.weight', 'features.4.bias', 'features.5.weight', 'features.5.bias', 'features.5.running_mean', 'features.5.running_var', 'features.8.weight', 'features.8.bias', 'features.9.weight', 'features.9.bias', 'features.9.running_mean', 'features.9.running_var', 'classifier.4.weight', 'classifier.4.bias'] unexpected: ['features.3.weight', 'features.3.bias', 'features.6.weight', 'features.6.bias', 'classifier.3.weight', 'classifier.3.bias']
MicronNetPlus geladen & classify_sign bereit.


In [None]:
BATCH_SIZE = 32   # größer = mehr Durchsatz, mehr Latenz
frame_idx = 0
frames = []

while True:
    ret, frame = cap.read()
    if not ret:
        break
    frames.append(frame)

    if len(frames) == BATCH_SIZE:
        results = model.predict(
            source=frames,
            conf=CONF_THRES,
            iou=IOU_THRES,
            device=_device,
            verbose=False
        )

        for res, frm in zip(results, frames):
            boxes = res.boxes
            if boxes and len(boxes) > 0:  # nur zeigen, wenn Detektion vorhanden
                for b in boxes:
                    x1, y1, x2, y2 = map(int, b.xyxy[0].tolist())
                    conf = float(b.conf[0])
                    cv2.rectangle(frm, (x1, y1), (x2, y2), (0, 255, 0), 2)
                    cv2.putText(frm, f"{conf:.2f}", (x1, max(0, y1-5)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
                _show(frm)  # nur hier, kein DISPLAY_EVERY
            writer.write(frm)
            frame_idx += 1
        frames.clear()

# Restbatch
if frames:
    results = model.predict(source=frames, conf=CONF_THRES, iou=IOU_THRES, device=_device, verbose=False)
    for res, frm in zip(results, frames):
        boxes = res.boxes
        if boxes and len(boxes) > 0:
            for b in boxes:
                x1, y1, x2, y2 = map(int, b.xyxy[0].tolist())
                conf = float(b.conf[0])
                cv2.rectangle(frm, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(frm, f"{conf:.2f}", (x1, max(0, y1-5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
            _show(frm)
        writer.write(frm)
        frame_idx += 1

print(f"Fertig: {frame_idx} Frames verarbeitet.")


AttributeError: 'dict' object has no attribute 'eval'

: 

In [27]:
# Cleanup
cap.release()
writer.release()
cv2.destroyAllWindows()
print(f"Ausgabe gespeichert unter: {OUTPUT_VIDEO}")


Ausgabe gespeichert unter: generated_video_1_bboxes.mp4
