In [7]:
# -*- coding: utf-8 -*-
"""Generate a *DefMap* video (plasma speed heat‑map) via **PIV**.

➤ Version 2025‑04‑16 C
   • Interpolation bei Upscaling zurück auf *INTER_CUBIC* (weicher).
   • 3×3‑Mittelwert‑In‑Painting füllt Null‑Löcher vor dem Glätten –
     vermeidet weiße Punkte.
   • Größeres Gauß‑Kernel (7×7) + FIG_DPI = 200 für weniger Pixelkanten.
"""
from __future__ import annotations

import cv2
import numpy as np
import pandas as pd
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from pathlib import Path

from openpiv import pyprocess, validation

# ----------------------------------------------------------------------------
# CONFIGURATION
# ----------------------------------------------------------------------------
ROOT_DIR = Path("/Users/macbook/Library/Mobile Documents/com~apple~CloudDocs/Studium/Spezialisierung/CodeExampleRayan")
INPUT_VIDEO = ROOT_DIR / "data/input/LinearStackAlignmentSift_Gauss5px.avi"
OUTPUT_DIR = ROOT_DIR / "data/output/defmap_video_piv"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
VIDEO_OUT = OUTPUT_DIR / "defmap_piv_plasma.mp4"
CSV_OUT = OUTPUT_DIR / "defmap_piv_speed_metrics.csv"

ROI_MARGIN = 100
WINDOW_SIZE = 32
OVERLAP = 16
DOWNSAMPLE = 1
FPS_OUT = 10
MAX_SPEED_VIS = 10.0
FIG_DPI = 200  # higher dpi → smoother rendering

# ----------------------------------------------------------------------------

def sig2noise_safe(u, v, s2n):
    for args in [(u, v, s2n, 2.0), (u, v, s2n)]:
        try:
            return validation.sig2noise_val(*args)[:2]
        except Exception:
            continue
    try:
        return validation.sig2noise_val(u=u, v=v, s2n=s2n, threshold=2.0)[:2]
    except Exception:
        return u, v


def fill_zero_holes(arr: np.ndarray) -> np.ndarray:
    """Replace zeros by local 3×3 mean to eliminate white squares."""
    mask = (arr == 0)
    if not np.any(mask):
        return arr
    mean = cv2.blur(arr, (3, 3))
    filled = arr.copy()
    filled[mask] = mean[mask]
    return filled


def piv_pass(im0: np.ndarray, im1: np.ndarray):
    h, w = im0.shape
    res = pyprocess.extended_search_area_piv(
        im0.astype(np.int32), im1.astype(np.int32),
        window_size=WINDOW_SIZE, overlap=OVERLAP,
        dt=1.0, search_area_size=WINDOW_SIZE, sig2noise_method="peak2peak")

    if len(res) == 5:
        _, _, u, v, s2n = res
    elif len(res) == 3:
        u, v, s2n = res
    else:
        raise RuntimeError("Unexpected openpiv return format")

    u, v = sig2noise_safe(u, v, s2n)

    # upscale to image resolution with smooth cubic interpolation
    u_dense = cv2.resize(u, (w, h), interpolation=cv2.INTER_CUBIC)
    v_dense = cv2.resize(v, (w, h), interpolation=cv2.INTER_CUBIC)

    # fill zero blocks, then smooth
    u_dense = cv2.GaussianBlur(fill_zero_holes(u_dense), (7, 7), 0)
    v_dense = cv2.GaussianBlur(fill_zero_holes(v_dense), (7, 7), 0)
    return u_dense, v_dense


def render(speed: np.ndarray, frame_no: int) -> np.ndarray:
    fig = plt.figure(figsize=(4, 4), dpi=FIG_DPI)
    ax = fig.add_axes([0.05, 0.05, 0.8, 0.9])
    im = ax.imshow(speed, cmap="plasma", vmin=0, vmax=MAX_SPEED_VIS)
    ax.set_axis_off()
    ax.set_title(f"DefMap at Time: {frame_no}", fontsize=14, weight="bold")

    cb_ax = fig.add_axes([0.88, 0.1, 0.03, 0.8])
    cb = fig.colorbar(im, cax=cb_ax)
    cb.set_ticks([0, MAX_SPEED_VIS])
    cb.set_ticklabels(["0", str(int(MAX_SPEED_VIS))])

    fig.canvas.draw()
    w, h = fig.canvas.get_width_height()
    rgb = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
    plt.close(fig)
    return rgb.reshape((h, w, 4))[..., :3]

# ----------------------------------------------------------------------------
# MAIN
# ----------------------------------------------------------------------------
cap = cv2.VideoCapture(str(INPUT_VIDEO))
if not cap.isOpened():
    raise IOError(f"Cannot open {INPUT_VIDEO}")
ret, frame = cap.read()
if not ret:
    raise RuntimeError("Video empty")
prev = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)[ROI_MARGIN:-ROI_MARGIN, ROI_MARGIN:-ROI_MARGIN]

h_fig, w_fig, _ = render(np.zeros_like(prev, float), 0).shape
writer = cv2.VideoWriter(str(VIDEO_OUT), cv2.VideoWriter_fourcc(*"mp4v"), FPS_OUT, (w_fig, h_fig))

metrics = []
idx = 0
while True:
    for _ in range(DOWNSAMPLE):
        ret, frame = cap.read()
        if not ret:
            break
    if not ret:
        break
    cur = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)[ROI_MARGIN:-ROI_MARGIN, ROI_MARGIN:-ROI_MARGIN]
    try:
        u, v = piv_pass(prev, cur)
    except Exception as e:
        print(f"⚠️ frame {idx}: {e} – skipping")
        prev = cur
        idx += 1
        continue

    speed = np.sqrt(u**2 + v**2)
    writer.write(cv2.cvtColor(render(speed, idx), cv2.COLOR_RGB2BGR))

    metrics.append({"frame": idx,
                    "speed_mean": float(np.mean(speed)),
                    "speed_std": float(np.std(speed)),
                    "speed_max": float(np.max(speed))})

    prev = cur
    idx += 1

cap.release()
writer.release()

pd.DataFrame(metrics).to_csv(CSV_OUT, index=False)
print(f"✅ DefMap (PIV) video → {VIDEO_OUT}\n📈 Metrics CSV → {CSV_OUT}")

✅ DefMap (PIV) video → /Users/macbook/Library/Mobile Documents/com~apple~CloudDocs/Studium/Spezialisierung/CodeExampleRayan/data/output/defmap_video_piv/defmap_piv_plasma.mp4
📈 Metrics CSV → /Users/macbook/Library/Mobile Documents/com~apple~CloudDocs/Studium/Spezialisierung/CodeExampleRayan/data/output/defmap_video_piv/defmap_piv_speed_metrics.csv
