In [1]:
import os
import re
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from skimage.filters import sobel, gaussian
from skimage.feature import canny

In [2]:
# ---------------------------
# Config
# ---------------------------
input_dir  = "figs/projections/chest/projections"
output_dir = "figs/projections/chest/mask_edge"
os.makedirs(output_dir, exist_ok=True)

# 掩码模式：
#   'edge' 使用边缘检测；'threshold' 区间阈值；'extreme' 屏蔽极值(0/255)
MASK_MODE   = "edge"

# 当使用 'threshold'/'extreme' 时的参数
LOW_RATIO   = 0.00
HIGH_RATIO  = 0.95

# 当使用 'edge' 时的参数
EDGE_METHOD = "sobel"   # 'sobel' or 'canny'
EDGE_PCT    = 90.0      # sobel 阈值：百分位
EDGE_SIGMA  = 1.0       # sobel 前的高斯平滑

# canny 的可选参数
CANNY_SIGMA = 1.2
CANNY_LOW   = 0.1
CANNY_HIGH  = 0.3

In [None]:
# ---------------------------
# Utilities
# ---------------------------
def to_uint8_gray(img):
    """统一转灰度 uint8"""
    arr = img
    if arr.ndim == 3 and arr.shape[2] == 4:  # RGBA -> RGB
        arr = arr[:, :, :3]
    if arr.ndim == 3 and arr.shape[2] == 3:
        if np.issubdtype(arr.dtype, np.floating):
            gray = arr.mean(axis=2)           # [0,1]
            gray = np.clip(gray * 255.0, 0, 255).astype(np.uint8)
        else:
            gray = arr.mean(axis=2).round().astype(np.uint8)
    elif arr.ndim == 2:
        if np.issubdtype(arr.dtype, np.floating):
            gray = np.clip(arr * 255.0, 0, 255).astype(np.uint8)
        else:
            gray = arr.astype(np.uint8)
    else:
        raise ValueError(f"Unsupported image shape: {arr.shape}")
    return gray

def build_initial_mask(gray, mode="threshold", low_ratio=0.00, high_ratio=0.95):
    """
    基础掩码：
    - mode="threshold": 仅保留 [low_ratio, high_ratio] 的像素为 1（其余 0）
    - mode="extreme":   将灰度==0 或 255 的像素置 0，其余 1
    - mode="edge":      使用边缘检测（委托给 build_edge_mask）
    """
    if mode == "threshold":
        low  = int(255 * low_ratio)
        high = int(255 * high_ratio)
        mask = np.where((gray >= low) & (gray <= high), 1, 0).astype(np.uint8)
        return mask
    elif mode == "extreme":
        mask = np.where((gray == 0) | (gray == 255), 0, 1).astype(np.uint8)
        return mask
    elif mode == "edge":
        # 使用全局 EDGE_* 配置
        return build_edge_mask(
            gray=gray,
            method=EDGE_METHOD,
            gaussian_sigma=EDGE_SIGMA,
            thresh_mode="percentile",
            thresh_value=EDGE_PCT,
            canny_sigma=CANNY_SIGMA,
            canny_low=CANNY_LOW,
            canny_high=CANNY_HIGH
        )
    else:
        raise ValueError("mode must be 'threshold' or 'extreme' or 'edge'")

def build_edge_mask(
    gray: np.ndarray,
    method: str = "sobel",        # 'sobel' 或 'canny'
    gaussian_sigma: float = 1.0,  # 边缘前平滑，减少噪声
    thresh_mode: str = "percentile",
    thresh_value: float = 90.0,   # 当 thresh_mode='percentile' 时，使用该百分位 [0,100]
    canny_sigma: float = 1.2,     # 仅对 method='canny'
    canny_low: float = 0.1,       # 仅对 method='canny'，低阈值(0~1)
    canny_high: float = 0.3       # 仅对 method='canny'，高阈值(0~1)
) -> np.ndarray:
    """用边缘检测生成二值 mask，返回 uint8(0/1)"""
    g = gray.astype(np.float32) / 255.0

    if method.lower() == "sobel":
        if gaussian_sigma and gaussian_sigma > 0:
            g = gaussian(g, sigma=gaussian_sigma, preserve_range=True)
        edge_mag = sobel(g)
        if thresh_mode == "percentile":
            t = np.percentile(edge_mag, thresh_value)
            mask = (edge_mag >= t).astype(np.uint8)
        elif thresh_mode == "value":
            t = float(thresh_value)  # 直接使用 [0,1] 阈值
            mask = (edge_mag >= t).astype(np.uint8)
        else:
            raise ValueError("thresh_mode must be 'percentile' or 'value'.")

    elif method.lower() == "canny":
        edges_bool = canny(g, sigma=canny_sigma, low_threshold=canny_low, high_threshold=canny_high)
        mask = edges_bool.astype(np.uint8)
    else:
        raise ValueError("method must be 'sobel' or 'canny'.")

    return 1 - mask

def angle_from_filename(path):
    """从 'deg_XXX.png' 中解析角度整数"""
    m = re.search(r"deg_(\d+)\.png$", os.path.basename(path))
    if not m:
        raise ValueError(f"Bad filename: {path}")
    return int(m.group(1)) % 360



# ---------------------------
# Collect files by angle
# ---------------------------
paths = glob.glob(os.path.join(input_dir, "deg_*.png"))
by_angle = {}
for p in paths:
    ang = angle_from_filename(p)
    by_angle[ang] = p

# ---------------------------
# Process pairs (ang, ang+180)
# ---------------------------

mask_rates = []

visited = set()

for ang in sorted(by_angle.keys()):
    if ang in visited:
        continue
    mate = (ang + 180) % 360
    if mate not in by_angle:
        print(f"Skip angle {ang}: pair {mate} not found.")
        continue

    # 读取两张图
    img_a = plt.imread(by_angle[ang])
    img_b = plt.imread(by_angle[mate])
    gray_a = to_uint8_gray(img_a)
    gray_b = to_uint8_gray(img_b)

    # 生成初始 mask（根据 MASK_MODE 自动选择阈值/极值/边缘）
    mask_a0 = build_initial_mask(gray_a, mode=MASK_MODE, low_ratio=LOW_RATIO, high_ratio=HIGH_RATIO)
    mask_b0 = build_initial_mask(gray_b, mode=MASK_MODE, low_ratio=LOW_RATIO, high_ratio=HIGH_RATIO)

    # +180° 的初始 mask 水平翻转，对齐到 θ 的坐标系
    mask_b0_aligned = np.fliplr(mask_b0)

    # 并集（同一有效像素区域）
    union_mask_theta = (mask_a0 | mask_b0_aligned).astype(np.uint8)

    # 保存：θ 用 θ 坐标的并集；θ+180° 用“翻回去”的并集（保持各自原生坐标）
    out_a = os.path.join(output_dir, os.path.splitext(os.path.basename(by_angle[ang]))[0]   + ".csv")
    out_b = os.path.join(output_dir, os.path.splitext(os.path.basename(by_angle[mate]))[0] + ".csv")

    pd.DataFrame(union_mask_theta).to_csv(out_a, index=False, header=False)
    pd.DataFrame(np.fliplr(union_mask_theta)).to_csv(out_b, index=False, header=False)

    kept = int(union_mask_theta.sum())
    total = union_mask_theta.size
    print(f"[{ang:03d} & {mate:03d}] kept {kept}/{total} ({kept/total:.2%}) -> {out_a}, {out_b}")

    mask_rates.append({
    "angle": ang,
    "mate": mate,
    "kept": kept,
    "total": total,
    "mask_rate": kept / total
    })

    visited.add(ang)
    visited.add(mate)

df_rates = pd.DataFrame(mask_rates)
out_summary = os.path.join(output_dir, "mask_rates_summary.csv")
df_rates.to_csv(out_summary, index=False)

avg_rate = df_rates["mask_rate"].mean()

print("✅ Pairwise-union masks (with +180° flip alignment) generated for all available angle pairs, using edge-based masking if configured.")
print(f"\n📊 average mask rate: {1-avg_rate:.2%} (save to {out_summary})")

[000 & 180] kept 62753/65536 (95.75%) -> figs/projections/chest/mask_edge/deg_00.csv, figs/projections/chest/mask_edge/deg_180.csv
[003 & 183] kept 62694/65536 (95.66%) -> figs/projections/chest/mask_edge/deg_03.csv, figs/projections/chest/mask_edge/deg_183.csv
[006 & 186] kept 63328/65536 (96.63%) -> figs/projections/chest/mask_edge/deg_06.csv, figs/projections/chest/mask_edge/deg_186.csv
[009 & 189] kept 64314/65536 (98.14%) -> figs/projections/chest/mask_edge/deg_09.csv, figs/projections/chest/mask_edge/deg_189.csv
[012 & 192] kept 64477/65536 (98.38%) -> figs/projections/chest/mask_edge/deg_12.csv, figs/projections/chest/mask_edge/deg_192.csv
[015 & 195] kept 64706/65536 (98.73%) -> figs/projections/chest/mask_edge/deg_15.csv, figs/projections/chest/mask_edge/deg_195.csv
[018 & 198] kept 65045/65536 (99.25%) -> figs/projections/chest/mask_edge/deg_18.csv, figs/projections/chest/mask_edge/deg_198.csv
[021 & 201] kept 65144/65536 (99.40%) -> figs/projections/chest/mask_edge/deg_21.cs