# Farneback Optical Flow — Ordered frames (preserve original filenames)

Questa versione salva gli overlay **con lo stesso nome** dei file di input (prefisso `overlay_` aggiunto solo per chiarezza), senza alcun indice numerico davanti. Esempio: input `D2013.02.19_S0675_I141_1_3_0_0.5h.jpg` -> output `overlay_D2013.02.19_S0675_I141_1_3_0_0.5h.jpg`.

Usa questo notebook se preferisci avere gli stessi nomi nei file di output.

In [2]:
# %%
# Environment & imports
%matplotlib inline
import cv2
import os
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from typing import List, Tuple, Dict, Any
import json
import re

print('OpenCV version:', cv2.__version__)


OpenCV version: 4.10.0


## Parameters (here to change)

In [11]:
# %%
# Input: can be a video file or a folder containing sequential frames (png/jpg).
DIRNAME = "D2013.03.09_S0695_I141_7" # "D2013.02.19_S0675_I141_1"
BASE_PATH = Path('/home/phd2/Scrivania/CorsoData/OF_prova/')
INPUT_PATH = BASE_PATH / DIRNAME
# If folder: frames are read sorted lexicographically.

# Output
OUTPUT_DIR = BASE_PATH / f'output_{DIRNAME}'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Processing options
MAX_FRAMES = 200     # None for all
FRAME_STEP = 1       # process every FRAME_STEP frames
QUIVER_STRIDE = 16   # sampling for quiver overlay
SAVE_OVERLAYS = True # Save overlay frames (and optionally make a video)
MAKE_VIDEO = True    # Save results as video per parameter set

# Parameter grid for Farneback (list of dicts). Add or remove dicts to test.
PARAM_GRID = [
    # 5) Più veloce / meno smoothing (per confronto): diminuire iterations ma aumentare winsize
    {'pyr_scale': 0.5, 'levels': 4, 'winsize': 25, 'iterations': 3, 'poly_n': 5, 'poly_sigma': 1.2, 'flags': 0},    # seems to work ok according to scores

    # 6) Usare gaussian window (flag): spesso migliora rumore per immagini con texture finissima
    {'pyr_scale': 0.5, 'levels': 5, 'winsize': 21, 'iterations': 5, 'poly_n': 5, 'poly_sigma': 1.2, 'flags': cv2.OPTFLOW_FARNEBACK_GAUSSIAN},   # seems to work ok qualitatively

    # 7) mix tra 5 e 6: più veloce ma con gaussian window
    {'pyr_scale': 0.5, 'levels': 4, 'winsize': 25, 'iterations': 3, 'poly_n': 5, 'poly_sigma': 1.2, 'flags': cv2.OPTFLOW_FARNEBACK_GAUSSIAN},
    ]
    


# Visual options
VIZ_HSV = True
VIZ_QUIVER = True
QUIVER_COLOR = (0,0,0)

# Internal options
VERBOSE = True

print('INPUT_PATH:', INPUT_PATH)
print('OUTPUT_DIR:', OUTPUT_DIR)
print('PARAM_GRID length:', len(PARAM_GRID))


INPUT_PATH: /home/phd2/Scrivania/CorsoData/OF_prova/D2013.03.09_S0695_I141_7
OUTPUT_DIR: /home/phd2/Scrivania/CorsoData/OF_prova/output_D2013.03.09_S0695_I141_7
PARAM_GRID length: 3


## Utilities (natural sort & read frames that also returns filenames)

In [12]:
# %%
import math

def natural_key(s: str):
    s = str(s)
    return [int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', s)]


def read_frames_ordered(input_path: Path, max_frames: int=None, step: int=1, to_gray: bool=True):
    frames = []
    frame_names = []
    if input_path.is_file():
        cap = cv2.VideoCapture(str(input_path))
        idx = 0
        saved_idx = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if idx % step == 0:
                frames.append(frame)
                frame_names.append(f'frame_{saved_idx:06d}.png')
                saved_idx += 1
            idx += 1
            if max_frames and len(frames) >= max_frames:
                break
        cap.release()
    elif input_path.is_dir():
        files = [p for p in input_path.iterdir() if p.suffix.lower() in ('.png','.jpg','.jpeg','.tif','.tiff')]
        files_sorted = sorted(files, key=lambda p: natural_key(p.name))
        for i, p in enumerate(files_sorted):
            if i % step != 0:
                continue
            frame = cv2.imread(str(p))
            if frame is None:
                continue
            frames.append(frame)
            frame_names.append(p.name)  # preserve original filename exactly
            if max_frames and len(frames) >= max_frames:
                break
    else:
        raise FileNotFoundError(f"Input path not found: {input_path}")

    if to_gray:
        frames_gray = [cv2.cvtColor(f, cv2.COLOR_BGR2GRAY) for f in frames]
        return frames, frames_gray, frame_names
    return frames, None, frame_names


## Flow, viz and metrics

In [13]:
# %%

def compute_farneback(prev_gray: np.ndarray, next_gray: np.ndarray, params: Dict[str,Any]):
    flow = cv2.calcOpticalFlowFarneback(prev_gray, next_gray,
                                        None,
                                        params['pyr_scale'], params['levels'], params['winsize'],
                                        params['iterations'], params['poly_n'], params['poly_sigma'],
                                        params.get('flags', 0))
    return flow


def flow_to_hsv(flow: np.ndarray):
    mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
    hsv = np.zeros((flow.shape[0], flow.shape[1], 3), dtype=np.uint8)
    hsv[...,0] = np.uint8(ang * 180 / np.pi / 2)
    hsv[...,1] = 255
    v = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)
    hsv[...,2] = np.uint8(v)
    bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
    return bgr, mag, ang


def overlay_quiver_bgr(bgr_img: np.ndarray, flow: np.ndarray, stride: int=16, color=(0,0,0)):
    h, w = bgr_img.shape[:2]
    Y, X = np.mgrid[0:h:stride, 0:w:stride]
    fx = flow[Y, X, 0]
    fy = flow[Y, X, 1]
    fig, ax = plt.subplots(figsize=(8,6))
    ax.imshow(cv2.cvtColor(bgr_img, cv2.COLOR_BGR2RGB))
    ax.quiver(X, Y, fx, fy, angles='xy', scale_units='xy', scale=1, color='yellow')
    ax.set_axis_off()
    fig.tight_layout()
    return fig


def warp_frame(frame_src_gray: np.ndarray, flow_fwd: np.ndarray):
    h, w = frame_src_gray.shape
    flow_x = flow_fwd[...,0].astype(np.float32)
    flow_y = flow_fwd[...,1].astype(np.float32)
    coords_x, coords_y = np.meshgrid(np.arange(w), np.arange(h))
    map_x = (coords_x + flow_x).astype(np.float32)
    map_y = (coords_y + flow_y).astype(np.float32)
    warped = cv2.remap(frame_src_gray, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
    return warped


def compute_forward_backward_consistency(flow_fwd: np.ndarray, flow_bwd: np.ndarray):
    h, w = flow_fwd.shape[:2]
    flow_bwd_x = flow_bwd[...,0]
    flow_bwd_y = flow_bwd[...,1]
    coords_x, coords_y = np.meshgrid(np.arange(w), np.arange(h))
    map_x = (coords_x + flow_fwd[...,0]).astype(np.float32)
    map_y = (coords_y + flow_fwd[...,1]).astype(np.float32)
    warped_bwd_x = cv2.remap(flow_bwd_x.astype(np.float32), map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
    warped_bwd_y = cv2.remap(flow_bwd_y.astype(np.float32), map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
    sum_x = flow_fwd[...,0] + warped_bwd_x
    sum_y = flow_fwd[...,1] + warped_bwd_y
    fb_error = np.sqrt(sum_x**2 + sum_y**2)
    return fb_error


def photometric_warp_error(frame_t_gray: np.ndarray, frame_t1_gray: np.ndarray, flow_fwd: np.ndarray):
    warped = warp_frame(frame_t_gray, flow_fwd)
    err = np.abs(warped.astype(np.float32) - frame_t1_gray.astype(np.float32))
    return err


def summarize_metrics(metrics_list: List[Dict[str,Any]]):
    agg = {}
    if not metrics_list:
        return agg
    numeric_keys = [k for k,v in metrics_list[0].items() if isinstance(v, (int, float, np.floating, np.integer))]
    for k in numeric_keys:
        vals = np.array([m[k] for m in metrics_list], dtype=np.float32)
        agg[k + '_mean'] = float(np.nanmean(vals))
        agg[k + '_std'] = float(np.nanstd(vals))
        agg[k + '_median'] = float(np.nanmedian(vals))
    return agg


## Processing loop (save overlays using original filenames)

In [14]:
# %%

def process_video_preserve_filenames(input_path: Path, param_grid: List[Dict[str,Any]],
                                 max_frames: int=None, frame_step: int=1, quiver_stride: int=16,
                                 save_overlays: bool=True, make_video: bool=True, output_base: Path=OUTPUT_DIR):
    frames_bgr, frames_gray, frame_names = read_frames_ordered(input_path, max_frames=max_frames, step=frame_step, to_gray=True)
    n = len(frames_gray)
    if n < 2:
        raise RuntimeError('Not enough frames to compute flow')

    results = []
    for i, params in enumerate(param_grid):
        tag = f"FB_pg{i}_w{params['winsize']}_it{params['iterations']}_lv{params['levels']}"
        out_dir = output_base / tag
        out_dir.mkdir(parents=True, exist_ok=True)
        print(f'Processing param set {i+1}/{len(param_grid)} ->', tag)

        per_frame_metrics = []
        overlay_frames = []

        prev_flow = None
        for t in range(n-1):
            I0 = frames_gray[t]
            I1 = frames_gray[t+1]
            flow_fwd = compute_farneback(I0, I1, params)
            flow_bwd = compute_farneback(I1, I0, params)

            fb_err_map = compute_forward_backward_consistency(flow_fwd, flow_bwd)
            fb_err = float(np.nanmean(fb_err_map))
            photo_err_map = photometric_warp_error(I0, I1, flow_fwd)
            photo_err = float(np.nanmean(photo_err_map))
            mag = np.sqrt(flow_fwd[...,0]**2 + flow_fwd[...,1]**2)
            mag_mean = float(np.nanmean(mag))
            mag_std = float(np.nanstd(mag))
            temporal_smooth = float(np.nanmean(np.sqrt(((flow_fwd - prev_flow)**2).sum(axis=2)))) if prev_flow is not None else np.nan

            per_frame_metrics.append({'frame_idx': t, 'fb_err': fb_err, 'photo_err': photo_err, 'mag_mean': mag_mean, 'mag_std': mag_std, 'temporal_smooth': temporal_smooth})

            # visualization
            vis_hsv_bgr, mag_map, ang_map = flow_to_hsv(flow_fwd)
            blended = cv2.addWeighted(frames_bgr[t], 0.7, vis_hsv_bgr, 0.3, 0)

            if quiver_stride and quiver_stride > 0:
                fig_q = overlay_quiver_bgr(blended, flow_fwd, stride=quiver_stride)
                fig_q.canvas.draw()
                w,h = fig_q.canvas.get_width_height()
                img_q = np.frombuffer(fig_q.canvas.buffer_rgba(), dtype='uint8').reshape(h, w, 4)
                plt.close(fig_q)
                overlay_frames.append(img_q)
                if save_overlays:
                    # use original filename exactly (no prefix)
                    name = frame_names[t]
                    out_name = f'overlay_{name}'
                    cv2.imwrite(str(out_dir / out_name), cv2.cvtColor(img_q, cv2.COLOR_RGB2BGR))
            else:
                overlay_frames.append(cv2.cvtColor(blended, cv2.COLOR_BGR2RGB))
                if save_overlays:
                    name = frame_names[t]
                    out_name = f'overlay_{name}'
                    cv2.imwrite(str(out_dir / out_name), blended)

            prev_flow = flow_fwd

        agg = summarize_metrics(per_frame_metrics)
        stats = {'tag': tag, 'params': params, 'n_frames': n, 'metrics': agg}
        results.append(stats)

        # make video
        if make_video and overlay_frames:
            h, w = overlay_frames[0].shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            video_path = out_dir / f'overlay_{tag}.mp4'
            writer = cv2.VideoWriter(str(video_path), fourcc, 10.0, (w, h))
            for img in overlay_frames:
                bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
                writer.write(bgr)
            writer.release()
            print('Saved video:', video_path)

        with open(out_dir / 'metrics_summary.json', 'w') as f:
            json.dump(stats, f, indent=2)
        print('Saved metrics for', tag)

    return results

# To run: uncomment and execute
results = process_video_preserve_filenames(INPUT_PATH, PARAM_GRID, max_frames=MAX_FRAMES, frame_step=FRAME_STEP, quiver_stride=QUIVER_STRIDE, save_overlays=SAVE_OVERLAYS, make_video=MAKE_VIDEO, output_base=OUTPUT_DIR)

print('Cell ready. Overlays will be saved using original filenames (prefixed with overlay_).')


Processing param set 1/3 -> FB_pg0_w25_it3_lv4
Saved video: /home/phd2/Scrivania/CorsoData/OF_prova/output_D2013.03.09_S0695_I141_7/FB_pg0_w25_it3_lv4/overlay_FB_pg0_w25_it3_lv4.mp4
Saved metrics for FB_pg0_w25_it3_lv4
Processing param set 2/3 -> FB_pg1_w21_it5_lv5
Saved video: /home/phd2/Scrivania/CorsoData/OF_prova/output_D2013.03.09_S0695_I141_7/FB_pg1_w21_it5_lv5/overlay_FB_pg1_w21_it5_lv5.mp4
Saved metrics for FB_pg1_w21_it5_lv5
Processing param set 3/3 -> FB_pg2_w25_it3_lv4
Saved video: /home/phd2/Scrivania/CorsoData/OF_prova/output_D2013.03.09_S0695_I141_7/FB_pg2_w25_it3_lv4/overlay_FB_pg2_w25_it3_lv4.mp4
Saved metrics for FB_pg2_w25_it3_lv4
Cell ready. Overlays will be saved using original filenames (prefixed with overlay_).


## Quick preview (preserve filenames)

In [15]:
# %%

def preview_one_param_preserve(input_path: Path, params: Dict[str,Any], frame_idx: int=0, save_image: bool=False, outdir: Path=OUTPUT_DIR):
    frames_bgr, frames_gray, frame_names = read_frames_ordered(input_path, max_frames=frame_idx+2, step=1, to_gray=True)
    if len(frames_gray) < frame_idx+2:
        raise RuntimeError('Not enough frames for preview')
    I0 = frames_gray[frame_idx]
    I1 = frames_gray[frame_idx+1]
    flow = compute_farneback(I0, I1, params)
    vis_hsv_bgr, mag_map, ang_map = flow_to_hsv(flow)
    blended = cv2.addWeighted(frames_bgr[frame_idx], 0.7, vis_hsv_bgr, 0.3, 0)
    fig = overlay_quiver_bgr(blended, flow, stride=QUIVER_STRIDE)
    display(fig)
    if save_image:
        outdir.mkdir(parents=True, exist_ok=True)
        p = outdir / f'overlay_{frame_names[frame_idx]}'
        fig.savefig(p, dpi=150)
        print('Saved preview to', p)
    plt.close(fig)

# Example usage:
# preview_one_param_preserve(INPUT_PATH, PARAM_GRID[0], frame_idx=5, save_image=False)


## Notes

- Questo notebook mantiene esattamente i nomi dei file di input (solo con il prefisso `overlay_` per distinguerli). Se preferisci rimuovere anche il prefisso `overlay_`, dimmi e lo tolgo.
- Se processi una cartella con nomi complessi come `D2013.02.19_S0675_I141_1_3_0_0.5h.jpg`, i file di output saranno `overlay_D2013.02.19_S0675_I141_1_3_0_0.5h.jpg`.
