# Stereo Matching (WCT + Guided Filter + WTA)

這份 notebook 收納完整實作與簡易使用方式，包含：
- 影像讀取與灰階正規化
- WCT/Census cost volume
- Guided Filter 聚合
- WTA 輸出視差圖


In [None]:
from __future__ import annotations

from typing import List, Sequence, Tuple

import numpy as np
from PIL import Image


In [None]:
def read_image(path: str) -> np.ndarray:
    """讀取影像並回傳為 numpy 陣列（保持原色彩通道）。

    參數:
        path: 影像檔案路徑。

    回傳:
        影像陣列，dtype 依原始影像而定。
    """
    image: Image.Image = Image.open(path)
    return np.array(image)


def to_gray(image: np.ndarray) -> np.ndarray:
    """將影像轉為灰階 float32，並正規化到 0~1。

    參數:
        image: 輸入影像陣列，形狀為 HxW 或 HxWx3/4。

    回傳:
        灰階影像陣列，dtype 為 float32，範圍 0~1。
    """
    if image.ndim == 2:
        gray: np.ndarray = image.astype(np.float32)
    elif image.ndim == 3 and image.shape[2] >= 3:
        rgb: np.ndarray = image[..., :3].astype(np.float32)
        gray = 0.299 * rgb[..., 0] + 0.587 * rgb[..., 1] + 0.114 * rgb[..., 2]
    else:
        raise ValueError("不支援的影像形狀，需為 HxW 或 HxWx3/4。")

    if gray.max() > 1.0:
        gray = gray / 255.0

    return gray.astype(np.float32)


def ensure_same_shape(left: np.ndarray, right: np.ndarray) -> Tuple[int, int]:
    """確認左右影像尺寸一致，回傳高度與寬度。

    參數:
        left: 左影像灰階陣列。
        right: 右影像灰階陣列。

    回傳:
        (height, width)。
    """
    if left.shape != right.shape:
        raise ValueError("左右影像尺寸不一致。")
    if left.ndim != 2:
        raise ValueError("灰階影像維度必須為 2。")
    height: int = int(left.shape[0])
    width: int = int(left.shape[1])
    return height, width


In [None]:
def generate_offsets(radius: int = 4) -> List[Tuple[int, int, int]]:
    """產生 8 方向、距離 1..radius 的位移清單。

    參數:
        radius: 位移最大距離。

    回傳:
        位移清單，每個元素為 (dy, dx, r)。
    """
    if radius <= 0:
        raise ValueError("radius 必須為正整數。")
    directions: Sequence[Tuple[int, int]] = (
        (1, 0),
        (-1, 0),
        (0, 1),
        (0, -1),
        (1, 1),
        (1, -1),
        (-1, 1),
        (-1, -1),
    )
    offsets: List[Tuple[int, int, int]] = []
    for dx, dy in directions:
        for r in range(1, radius + 1):
            offsets.append((dy * r, dx * r, r))
    return offsets


def compute_weights(offsets: Sequence[Tuple[int, int, int]], base_weight: float = 8.0) -> np.ndarray:
    """依距離生成權重，距離每增加 1 權重除以 2。

    參數:
        offsets: 位移清單，包含 (dy, dx, r)。
        base_weight: r=1 的基準權重。

    回傳:
        權重陣列，順序對應 offsets。
    """
    weights: List[float] = []
    for _, _, r in offsets:
        weight: float = base_weight / (2 ** (r - 1))
        weights.append(weight)
    return np.array(weights, dtype=np.float32)


def compute_wct_cost_volume(
    left: np.ndarray,
    right: np.ndarray,
    dmax: int,
    radius: int = 4,
    base_weight: float = 8.0,
) -> np.ndarray:
    """計算加權 Census Transform (WCT) 的 DSI cost volume。

    參數:
        left: 左影像灰階陣列。
        right: 右影像灰階陣列。
        dmax: 最大視差數量。
        radius: Census 半徑。
        base_weight: r=1 的基準權重。

    回傳:
        DSI cost volume，形狀為 (H, W, D)。
    """
    if left.ndim != 2 or right.ndim != 2:
        raise ValueError("left/right 必須為 2D 灰階影像。")
    if left.shape != right.shape:
        raise ValueError("left/right 影像尺寸不一致。")
    if dmax <= 0:
        raise ValueError("dmax 必須為正整數。")

    height: int = int(left.shape[0])
    width: int = int(left.shape[1])

    offsets: List[Tuple[int, int, int]] = generate_offsets(radius)
    weights: np.ndarray = compute_weights(offsets, base_weight)
    large_value: float = float(np.sum(weights))

    dsi: np.ndarray = np.full((height, width, dmax), large_value, dtype=np.float32)

    for y in range(height):
        for x in range(width):
            left_center: float = float(left[y, x])
            for d in range(dmax):
                xr: int = x - d
                if xr < 0:
                    continue
                right_center: float = float(right[y, xr])
                cost: float = 0.0
                valid: bool = True
                for (dy, dx, _), weight in zip(offsets, weights):
                    yl: int = y + dy
                    xl: int = x + dx
                    yr: int = y + dy
                    xr2: int = xr + dx
                    if (
                        yl < 0
                        or yl >= height
                        or xl < 0
                        or xl >= width
                        or yr < 0
                        or yr >= height
                        or xr2 < 0
                        or xr2 >= width
                    ):
                        valid = False
                        break
                    left_bit: bool = bool(left[yl, xl] > left_center)
                    right_bit: bool = bool(right[yr, xr2] > right_center)
                    if left_bit != right_bit:
                        cost += float(weight)
                if valid:
                    dsi[y, x, d] = np.float32(cost)

    return dsi


In [None]:
def integral_image(image: np.ndarray) -> np.ndarray:
    """計算 integral image，回傳大小為 (H+1, W+1)。

    參數:
        image: 輸入 2D 影像。

    回傳:
        integral image，形狀為 (H+1, W+1)。
    """
    if image.ndim != 2:
        raise ValueError("image 必須為 2D。")
    integral: np.ndarray = np.zeros((image.shape[0] + 1, image.shape[1] + 1), dtype=np.float32)
    integral[1:, 1:] = np.cumsum(np.cumsum(image, axis=0), axis=1)
    return integral


def _box_sum_from_integral(integral: np.ndarray, radius: int) -> np.ndarray:
    """使用 integral image 計算每個像素的視窗總和。

    參數:
        integral: integral image，大小為 (H+1, W+1)。
        radius: 視窗半徑。

    回傳:
        每個像素的視窗總和，形狀為 (H, W)。
    """
    if radius < 0:
        raise ValueError("radius 必須為非負整數。")
    height: int = integral.shape[0] - 1
    width: int = integral.shape[1] - 1

    ys: np.ndarray = np.arange(height)
    xs: np.ndarray = np.arange(width)
    y0: np.ndarray = np.clip(ys - radius, 0, height - 1)
    y1: np.ndarray = np.clip(ys + radius, 0, height - 1)
    x0: np.ndarray = np.clip(xs - radius, 0, width - 1)
    x1: np.ndarray = np.clip(xs + radius, 0, width - 1)

    sum_region: np.ndarray = (
        integral[y1[:, None] + 1, x1[None, :] + 1]
        - integral[y0[:, None], x1[None, :] + 1]
        - integral[y1[:, None] + 1, x0[None, :]]
        + integral[y0[:, None], x0[None, :]]
    )
    return sum_region.astype(np.float32)


def box_filter_mean(image: np.ndarray, radius: int) -> np.ndarray:
    """計算 box filter 的區域平均。

    參數:
        image: 輸入 2D 影像。
        radius: 視窗半徑。

    回傳:
        區域平均影像，形狀為 (H, W)。
    """
    integral: np.ndarray = integral_image(image.astype(np.float32))
    sum_region: np.ndarray = _box_sum_from_integral(integral, radius)
    height: int = image.shape[0]
    width: int = image.shape[1]
    ys: np.ndarray = np.arange(height)
    xs: np.ndarray = np.arange(width)
    y0: np.ndarray = np.clip(ys - radius, 0, height - 1)
    y1: np.ndarray = np.clip(ys + radius, 0, height - 1)
    x0: np.ndarray = np.clip(xs - radius, 0, width - 1)
    x1: np.ndarray = np.clip(xs + radius, 0, width - 1)
    area: np.ndarray = (y1 - y0 + 1)[:, None] * (x1 - x0 + 1)[None, :]
    return sum_region / area.astype(np.float32)


def guided_filter(
    guide: np.ndarray,
    src: np.ndarray,
    radius: int,
    eps: float,
) -> np.ndarray:
    """使用 Guided Image Filter 對 src 做平滑。

    參數:
        guide: 引導影像（灰階）。
        src: 輸入影像（要被濾波的 cost）。
        radius: 視窗半徑。
        eps: 正則化項。

    回傳:
        濾波後影像，形狀為 (H, W)。
    """
    if guide.shape != src.shape:
        raise ValueError("guide 與 src 尺寸必須一致。")
    if guide.ndim != 2:
        raise ValueError("guide 與 src 必須為 2D。")
    if radius <= 0:
        raise ValueError("radius 必須為正整數。")
    if eps <= 0:
        raise ValueError("eps 必須為正值。")

    guide_f: np.ndarray = guide.astype(np.float32)
    src_f: np.ndarray = src.astype(np.float32)

    mean_guide: np.ndarray = box_filter_mean(guide_f, radius)
    mean_src: np.ndarray = box_filter_mean(src_f, radius)
    mean_gg: np.ndarray = box_filter_mean(guide_f * guide_f, radius)
    mean_gs: np.ndarray = box_filter_mean(guide_f * src_f, radius)

    var_g: np.ndarray = mean_gg - mean_guide * mean_guide
    cov_gs: np.ndarray = mean_gs - mean_guide * mean_src

    a: np.ndarray = cov_gs / (var_g + np.float32(eps))
    b: np.ndarray = mean_src - a * mean_guide

    mean_a: np.ndarray = box_filter_mean(a, radius)
    mean_b: np.ndarray = box_filter_mean(b, radius)

    q: np.ndarray = mean_a * guide_f + mean_b
    return q.astype(np.float32)


In [None]:
def aggregate_cost_volume(
    dsi: np.ndarray,
    guide: np.ndarray,
    radius: int,
    eps: float,
) -> np.ndarray:
    """使用 Guided Filter 對每個 disparity layer 做 cost aggregation。

    參數:
        dsi: 原始 cost volume，形狀為 (H, W, D)。
        guide: 引導影像（灰階）。
        radius: Guided Filter 視窗半徑。
        eps: 正則化項。

    回傳:
        聚合後 cost volume，形狀為 (H, W, D)。
    """
    if dsi.ndim != 3:
        raise ValueError("dsi 必須為 3D (H, W, D)。")
    if guide.ndim != 2:
        raise ValueError("guide 必須為 2D 灰階影像。")
    if dsi.shape[0] != guide.shape[0] or dsi.shape[1] != guide.shape[1]:
        raise ValueError("dsi 與 guide 尺寸不一致。")

    height: int = dsi.shape[0]
    width: int = dsi.shape[1]
    dmax: int = dsi.shape[2]
    aggregated: np.ndarray = np.zeros((height, width, dmax), dtype=np.float32)

    for d in range(dmax):
        aggregated[:, :, d] = guided_filter(guide, dsi[:, :, d], radius, eps)

    return aggregated


def winner_take_all(cost_volume: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """沿 disparity 維度取最小 cost，回傳視差與最小 cost。

    參數:
        cost_volume: 聚合後 cost volume，形狀為 (H, W, D)。

    回傳:
        (disparity, min_cost)。
    """
    if cost_volume.ndim != 3:
        raise ValueError("cost_volume 必須為 3D (H, W, D)。")
    disparity: np.ndarray = np.argmin(cost_volume, axis=2).astype(np.int32)
    min_cost: np.ndarray = np.min(cost_volume, axis=2).astype(np.float32)
    return disparity, min_cost


def compute_disparity(
    left_gray: np.ndarray,
    right_gray: np.ndarray,
    dmax: int,
    wct_radius: int = 4,
    base_weight: float = 8.0,
    guided_radius: int = 3,
    guided_eps: float = 1e-3,
) -> Tuple[np.ndarray, np.ndarray]:
    """完整流程：WCT cost volume -> Guided Filter 聚合 -> WTA。

    參數:
        left_gray: 左影像灰階陣列。
        right_gray: 右影像灰階陣列。
        dmax: 最大視差數量。
        wct_radius: WCT 半徑。
        base_weight: WCT 基準權重。
        guided_radius: Guided Filter 半徑。
        guided_eps: Guided Filter 正則化項。

    回傳:
        (disparity, min_cost)。
    """
    ensure_same_shape(left_gray, right_gray)
    dsi: np.ndarray = compute_wct_cost_volume(
        left_gray,
        right_gray,
        dmax=dmax,
        radius=wct_radius,
        base_weight=base_weight,
    )
    aggregated: np.ndarray = aggregate_cost_volume(dsi, left_gray, guided_radius, guided_eps)
    disparity, min_cost = winner_take_all(aggregated)
    return disparity, min_cost


def save_disparity_image(disparity: np.ndarray, dmax: int, path: str) -> None:
    """將視差圖正規化到 0~255 並輸出。

    參數:
        disparity: 視差圖。
        dmax: 最大視差數量。
        path: 輸出檔案路徑。

    回傳:
        None。
    """
    if dmax <= 0:
        raise ValueError("dmax 必須為正整數。")
    disp_norm: np.ndarray = (disparity.astype(np.float32) / float(dmax - 1)) * 255.0
    disp_img: Image.Image = Image.fromarray(disp_norm.astype(np.uint8), mode="L")
    disp_img.save(path)


## 使用方式

請自行提供左右影像路徑與 dmax，執行下列範例即可產生視差圖：
- `disp.png`：灰階視差圖（0~255）


In [None]:
# 範例用法（請替換路徑）
left_path: str = "left.png"
right_path: str = "right.png"
dmax: int = 64

left_img: np.ndarray = read_image(left_path)
right_img: np.ndarray = read_image(right_path)
left_gray: np.ndarray = to_gray(left_img)
right_gray: np.ndarray = to_gray(right_img)

disparity, min_cost = compute_disparity(
    left_gray,
    right_gray,
    dmax=dmax,
    wct_radius=4,
    base_weight=8.0,
    guided_radius=3,
    guided_eps=1e-3,
)

save_disparity_image(disparity, dmax, "disp.png")

_ = min_cost
