In [5]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import cv2

@dataclass
class Config:
    root: str
    folders: Tuple[str, ...] = ("litho", "pixelILT", "printed", "resist", "target")

    # binary vs gray stages
    binary_stages: Tuple[str, ...] = ("target", "pixelILT", "printed")
    gray_stages: Tuple[str, ...] = ("litho", "resist")

    thresholds: Tuple[float, ...] = (0.30, 0.40, 0.50)

    # Morph cleanup kernel size (set 0 to disable)
    morph_k: int = 3

    out_csv: str = ""


DATASETS: Dict[str, str] = {
    "StdContact": "C:\\Users\\asap0\\OneDrive\\바탕 화면\\yonsei\\26-1 DSL\\eda\\SemiConductor_EDA_2602\\LithoBench\\0_Datasets\\lithodata\\StdContact",
    "StdContactTest": "C:\\Users\\asap0\\OneDrive\\바탕 화면\\yonsei\\26-1 DSL\\eda\\SemiConductor_EDA_2602\\LithoBench\\0_Datasets\\lithodata\\StdContactTest",
    "ViaSet": "C:\\Users\\asap0\\OneDrive\\바탕 화면\\yonsei\\26-1 DSL\\eda\\SemiConductor_EDA_2602\\LithoBench\\0_Datasets\\lithodata\\ViaSet",
}


def make_cfg(name: str, root: str) -> Config:
    return Config(
        root=root,
        out_csv=f"eda_6stage_features_errors_{name}.csv",
    )


#File indexing (filename-based matching)
def list_files(folder: Path) -> Dict[str, Path]:
    return {p.name: p for p in folder.iterdir() if p.is_file()}


def build_index(cfg: Config) -> Tuple[Dict[str, Dict[str, Path]], List[str]]:
    root = Path(cfg.root)
    if not root.exists():
        raise FileNotFoundError(f"Root not found: {root.resolve()}")

    maps: Dict[str, Dict[str, Path]] = {}
    for f in cfg.folders:
        fp = root / f
        if not fp.exists():
            raise FileNotFoundError(f"Missing folder: {fp.resolve()}")
        maps[f] = list_files(fp)

    common = set(maps[cfg.folders[0]].keys())
    for f in cfg.folders[1:]:
        common &= set(maps[f].keys())
    common = sorted(common)

    if len(common) == 0:
        raise RuntimeError("No common filenames across all 6 folders.")

    # quick report
    for f in cfg.folders:
        miss = sorted(set(common) - set(maps[f].keys()))
        extra = sorted(set(maps[f].keys()) - set(common))
        if miss:
            print(f"[WARN] {f}: missing {len(miss)} (first 5): {miss[:5]}")
        if extra:
            print(f"[INFO] {f}: extra {len(extra)} (first 5): {extra[:5]}")

    print(f"[OK] common masks = {len(common)}")
    return maps, common


#Robust loader (png/jpg + npy)
def load_as_float01(path: Path) -> np.ndarray:
    suf = path.suffix.lower()
    if suf == ".npy":
        arr = np.load(path)
        arr = np.asarray(arr)
        if arr.ndim == 3:
            arr = arr.mean(axis=-1)
        arr = arr.astype(np.float32)
        mn, mx = float(arr.min()), float(arr.max())
        if mx - mn < 1e-8:
            return np.zeros_like(arr, dtype=np.float32)
        return (arr - mn) / (mx - mn)

    # png/jpg
    if not path.exists() or path.stat().st_size == 0:
        raise ValueError(f"Missing or empty file: {path}")

    img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
    if img is None:
        # fallback: PIL
        from PIL import Image
        with Image.open(path) as im:
            img = np.array(im.convert("L"))

    return img.astype(np.float32) / 255.0


# Part 3) Binarization + cleanup
def otsu_binarize(img01: np.ndarray, auto_invert: bool = True) -> np.ndarray:
    img8 = np.clip(img01 * 255.0, 0, 255).astype(np.uint8)
    blur = cv2.GaussianBlur(img8, (5, 5), 0)
    _, b = cv2.threshold(blur, 0, 1, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    if auto_invert and b.mean() > 0.5:
        b = 1 - b
    return b.astype(np.uint8)


def threshold_binarize(img01: np.ndarray, thr: float, auto_invert: bool = True) -> np.ndarray:
    b = (img01 > thr).astype(np.uint8)
    if auto_invert and b.mean() > 0.5:
        b = 1 - b
    return b


def cleanup_binary(bin01: np.ndarray, k: int) -> np.ndarray:
    if k is None or k <= 0:
        return bin01.astype(np.uint8)
    kernel = np.ones((k, k), np.uint8)
    x = cv2.morphologyEx(bin01.astype(np.uint8), cv2.MORPH_OPEN, kernel)
    x = cv2.morphologyEx(x, cv2.MORPH_CLOSE, kernel)
    return x.astype(np.uint8)


#Target feature extraction (geometry / complexity)
def perimeter_px(bin01: np.ndarray) -> int:
    edge = cv2.Canny((bin01 * 255).astype(np.uint8), 50, 150)
    return int((edge > 0).sum())


def count_components(bin01: np.ndarray) -> int:
    num, _ = cv2.connectedComponents((bin01 > 0).astype(np.uint8), connectivity=4)
    return int(num - 1)


def extract_target_features(target_bin: np.ndarray) -> Dict[str, float]:
    area_frac = float(target_bin.mean())
    perim = float(perimeter_px(target_bin))
    comps = float(count_components(target_bin))
    area = float(target_bin.sum()) + 1e-6
    compact = float((perim * perim) / area)
    edge = cv2.Canny((target_bin * 255).astype(np.uint8), 50, 150)
    edge_density = float((edge > 0).mean())

    return {
        "target_area_frac": area_frac,
        "target_perimeter": perim,
        "target_components": comps,
        "target_compactness": compact,
        "target_edge_density": edge_density,
    }


#Error metrics (IoU / XOR / component delta)
def iou(a: np.ndarray, b: np.ndarray) -> float:
    a = (a > 0)
    b = (b > 0)
    inter = np.logical_and(a, b).sum()
    uni = np.logical_or(a, b).sum()
    return float(inter / (uni + 1e-8))


def xor_rate(a: np.ndarray, b: np.ndarray) -> float:
    a = (a > 0)
    b = (b > 0)
    x = np.logical_xor(a, b).sum()
    uni = np.logical_or(a, b).sum()
    return float(x / (uni + 1e-8))


def comp_delta(a: np.ndarray, b: np.ndarray) -> int:
    return int(count_components(b) - count_components(a))

#Optional alignment sanity check (centroid shift)
def centroid_xy(bin01: np.ndarray) -> Tuple[float, float]:
    ys, xs = np.where(bin01 > 0)
    if len(xs) == 0:
        return (np.nan, np.nan)
    return float(xs.mean()), float(ys.mean())


def centroid_shift(a: np.ndarray, b: np.ndarray) -> float:
    ax, ay = centroid_xy(a)
    bx, by = centroid_xy(b)
    if np.isnan(ax) or np.isnan(bx):
        return float("nan")
    return float(np.sqrt((ax - bx) ** 2 + (ay - by) ** 2))


#Process one mask (load -> preprocess -> features/errors)
def process_one(name: str, maps: Dict[str, Dict[str, Path]], cfg: Config) -> Dict[str, float]:
    # Load float01
    imgs = {st: load_as_float01(maps[st][name]) for st in cfg.folders}

    # Binarize binary-stages (Otsu)
    bins = {}
    for st in cfg.binary_stages:
        b = otsu_binarize(imgs[st], auto_invert=True)
        b = cleanup_binary(b, cfg.morph_k)
        bins[st] = b

    # Gray stages -> multi-threshold bins
    gray_bins: Dict[str, Dict[float, np.ndarray]] = {}
    for st in cfg.gray_stages:
        gray_bins[st] = {}
        for t in cfg.thresholds:
            b = threshold_binarize(imgs[st], float(t), auto_invert=True)
            b = cleanup_binary(b, cfg.morph_k)
            gray_bins[st][float(t)] = b

    # Row: id + target features
    row: Dict[str, float] = {"mask_name": name}
    row.update(extract_target_features(bins["target"]))

    # Alignment sanity: target vs printed
    row["shift_target_printed"] = centroid_shift(bins["target"], bins["printed"])

    # --- Stage-wise: target vs (glp, pixelILT, printed)
    # row["iou_target_glp"] = iou(bins["target"], bins["glp"])
    # row["xor_target_glp"] = xor_rate(bins["target"], bins["glp"])

    row["iou_target_pixelILT"] = iou(bins["target"], bins["pixelILT"])
    row["xor_target_pixelILT"] = xor_rate(bins["target"], bins["pixelILT"])

    row["iou_target_printed"] = iou(bins["target"], bins["printed"])
    row["xor_target_printed"] = xor_rate(bins["target"], bins["printed"])
    row["comp_delta_target_printed"] = comp_delta(bins["target"], bins["printed"])

    # --- Stage-wise: (glp or pixelILT) -> litho -> resist -> printed (threshold별)
    # glp -> litho
    for t in cfg.thresholds:
        lt = gray_bins["litho"][float(t)]
        # row[f"iou_glp_litho_t{t:.2f}"] = iou(bins["glp"], lt)
        # row[f"xor_glp_litho_t{t:.2f}"] = xor_rate(bins["glp"], lt)

        row[f"iou_pixelILT_litho_t{t:.2f}"] = iou(bins["pixelILT"], lt)
        row[f"xor_pixelILT_litho_t{t:.2f}"] = xor_rate(bins["pixelILT"], lt)

    # litho -> resist
    for t in cfg.thresholds:
        lt = gray_bins["litho"][float(t)]
        rt = gray_bins["resist"][float(t)]
        row[f"iou_litho_resist_t{t:.2f}"] = iou(lt, rt)
        row[f"xor_litho_resist_t{t:.2f}"] = xor_rate(lt, rt)

    # resist -> printed
    for t in cfg.thresholds:
        rt = gray_bins["resist"][float(t)]
        row[f"iou_resist_printed_t{t:.2f}"] = iou(rt, bins["printed"])
        row[f"xor_resist_printed_t{t:.2f}"] = xor_rate(rt, bins["printed"])
        row[f"comp_delta_resist_printed_t{t:.2f}"] = comp_delta(rt, bins["printed"])

    # Threshold stability summary (resist->printed)
    rp_ious = [row[f"iou_resist_printed_t{t:.2f}"] for t in cfg.thresholds]
    row["iou_resist_printed_mean"] = float(np.mean(rp_ious))
    row["iou_resist_printed_std"] = float(np.std(rp_ious))

    return row


def run(cfg: Config) -> pd.DataFrame:
    maps, common = build_index(cfg)

    rows = []
    for i, name in enumerate(common, 1):
        try:
            rows.append(process_one(name, maps, cfg))
        except Exception as e:
            print(f"[ERROR] {name}: {e}")

        if i % 50 == 0 or i == len(common):
            print(f"Processed {i}/{len(common)}")

    df = pd.DataFrame(rows)
    df.to_csv(cfg.out_csv, index=False, encoding="utf-8-sig")
    print(f"[DONE] Saved {cfg.out_csv} | rows={len(df)} cols={df.shape[1]}")
    return df


def run_all(datasets: Dict[str, str]) -> Dict[str, pd.DataFrame]:
    results: Dict[str, pd.DataFrame] = {}
    for name, root in datasets.items():
        print(f"\n[RUN] {name} -> {root}")
        cfg = make_cfg(name, root)
        results[name] = run(cfg)
    return results


if __name__ == "__main__":
    results = run_all(DATASETS)
    for name, df in results.items():
        print(f"\n[HEAD] {name}")
        print(df.head(3).to_string(index=False))


[RUN] StdContact -> C:\Users\asap0\OneDrive\바탕 화면\yonsei\26-1 DSL\eda\SemiConductor_EDA_2602\LithoBench\0_Datasets\lithodata\StdContact
[OK] common masks = 163


KeyboardInterrupt: 

In [None]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import cv2

@dataclass
class Config:
    root: str = "C:\\Users\\asap0\\OneDrive\\바탕 화면\\yonsei\\26-1 DSL\\eda\\SemiConductor_EDA_2602\\LithoBench\\0_Datasets\\lithodata\\" # <-- change to your path
    folders: Tuple[str, ...] = ("litho", "pixelILT", "printed", "resist", "target")

    # binary vs gray stages
    binary_stages: Tuple[str, ...] = ("target", "pixelILT", "printed")
    gray_stages: Tuple[str, ...] = ("litho", "resist")

    thresholds: Tuple[float, ...] = (0.30, 0.40, 0.50)

    # Morph cleanup kernel size (set 0 to disable)
    morph_k: int = 3

    out_csv: str = "eda_6stage_features_errors_MetalSet.csv"


CFG = Config()


#File indexing (filename-based matching)
def list_files(folder: Path) -> Dict[str, Path]:
    return {p.name: p for p in folder.iterdir() if p.is_file()}


def build_index(cfg: Config) -> Tuple[Dict[str, Dict[str, Path]], List[str]]:
    root = Path(cfg.root)
    if not root.exists():
        raise FileNotFoundError(f"Root not found: {root.resolve()}")

    maps: Dict[str, Dict[str, Path]] = {}
    for f in cfg.folders:
        fp = root / f
        if not fp.exists():
            raise FileNotFoundError(f"Missing folder: {fp.resolve()}")
        maps[f] = list_files(fp)

    common = set(maps[cfg.folders[0]].keys())
    for f in cfg.folders[1:]:
        common &= set(maps[f].keys())
    common = sorted(common)

    if len(common) == 0:
        raise RuntimeError("No common filenames across all 6 folders.")

    # quick report
    for f in cfg.folders:
        miss = sorted(set(common) - set(maps[f].keys()))
        extra = sorted(set(maps[f].keys()) - set(common))
        if miss:
            print(f"[WARN] {f}: missing {len(miss)} (first 5): {miss[:5]}")
        if extra:
            print(f"[INFO] {f}: extra {len(extra)} (first 5): {extra[:5]}")

    print(f"[OK] common masks = {len(common)}")
    return maps, common


#Robust loader (png/jpg + npy)
def load_as_float01(path: Path) -> np.ndarray:
    suf = path.suffix.lower()
    if suf == ".npy":
        arr = np.load(path)
        arr = np.asarray(arr)
        if arr.ndim == 3:
            arr = arr.mean(axis=-1)
        arr = arr.astype(np.float32)
        mn, mx = float(arr.min()), float(arr.max())
        if mx - mn < 1e-8:
            return np.zeros_like(arr, dtype=np.float32)
        return (arr - mn) / (mx - mn)

    # png/jpg
    if not path.exists() or path.stat().st_size == 0:
        raise ValueError(f"Missing or empty file: {path}")

    img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
    if img is None:
        # fallback: PIL
        from PIL import Image
        with Image.open(path) as im:
            img = np.array(im.convert("L"))

    return img.astype(np.float32) / 255.0


# Part 3) Binarization + cleanup
def otsu_binarize(img01: np.ndarray, auto_invert: bool = True) -> np.ndarray:
    img8 = np.clip(img01 * 255.0, 0, 255).astype(np.uint8)
    blur = cv2.GaussianBlur(img8, (5, 5), 0)
    _, b = cv2.threshold(blur, 0, 1, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    if auto_invert and b.mean() > 0.5:
        b = 1 - b
    return b.astype(np.uint8)


def threshold_binarize(img01: np.ndarray, thr: float, auto_invert: bool = True) -> np.ndarray:
    b = (img01 > thr).astype(np.uint8)
    if auto_invert and b.mean() > 0.5:
        b = 1 - b
    return b


def cleanup_binary(bin01: np.ndarray, k: int) -> np.ndarray:
    if k is None or k <= 0:
        return bin01.astype(np.uint8)
    kernel = np.ones((k, k), np.uint8)
    x = cv2.morphologyEx(bin01.astype(np.uint8), cv2.MORPH_OPEN, kernel)
    x = cv2.morphologyEx(x, cv2.MORPH_CLOSE, kernel)
    return x.astype(np.uint8)


#Target feature extraction (geometry / complexity)
def perimeter_px(bin01: np.ndarray) -> int:
    edge = cv2.Canny((bin01 * 255).astype(np.uint8), 50, 150)
    return int((edge > 0).sum())


def count_components(bin01: np.ndarray) -> int:
    num, _ = cv2.connectedComponents((bin01 > 0).astype(np.uint8), connectivity=4)
    return int(num - 1)


def extract_target_features(target_bin: np.ndarray) -> Dict[str, float]:
    area_frac = float(target_bin.mean())
    perim = float(perimeter_px(target_bin))
    comps = float(count_components(target_bin))
    area = float(target_bin.sum()) + 1e-6
    compact = float((perim * perim) / area)
    edge = cv2.Canny((target_bin * 255).astype(np.uint8), 50, 150)
    edge_density = float((edge > 0).mean())

    return {
        "target_area_frac": area_frac,
        "target_perimeter": perim,
        "target_components": comps,
        "target_compactness": compact,
        "target_edge_density": edge_density,
    }


#Error metrics (IoU / XOR / component delta)
def iou(a: np.ndarray, b: np.ndarray) -> float:
    a = (a > 0)
    b = (b > 0)
    inter = np.logical_and(a, b).sum()
    uni = np.logical_or(a, b).sum()
    return float(inter / (uni + 1e-8))


def xor_rate(a: np.ndarray, b: np.ndarray) -> float:
    a = (a > 0)
    b = (b > 0)
    x = np.logical_xor(a, b).sum()
    uni = np.logical_or(a, b).sum()
    return float(x / (uni + 1e-8))


def comp_delta(a: np.ndarray, b: np.ndarray) -> int:
    return int(count_components(b) - count_components(a))

#Optional alignment sanity check (centroid shift)
def centroid_xy(bin01: np.ndarray) -> Tuple[float, float]:
    ys, xs = np.where(bin01 > 0)
    if len(xs) == 0:
        return (np.nan, np.nan)
    return float(xs.mean()), float(ys.mean())


def centroid_shift(a: np.ndarray, b: np.ndarray) -> float:
    ax, ay = centroid_xy(a)
    bx, by = centroid_xy(b)
    if np.isnan(ax) or np.isnan(bx):
        return float("nan")
    return float(np.sqrt((ax - bx) ** 2 + (ay - by) ** 2))


#Process one mask (load -> preprocess -> features/errors)
def process_one(name: str, maps: Dict[str, Dict[str, Path]], cfg: Config) -> Dict[str, float]:
    # Load float01
    imgs = {st: load_as_float01(maps[st][name]) for st in cfg.folders}

    # Binarize binary-stages (Otsu)
    bins = {}
    for st in cfg.binary_stages:
        b = otsu_binarize(imgs[st], auto_invert=True)
        b = cleanup_binary(b, cfg.morph_k)
        bins[st] = b

    # Gray stages -> multi-threshold bins
    gray_bins: Dict[str, Dict[float, np.ndarray]] = {}
    for st in cfg.gray_stages:
        gray_bins[st] = {}
        for t in cfg.thresholds:
            b = threshold_binarize(imgs[st], float(t), auto_invert=True)
            b = cleanup_binary(b, cfg.morph_k)
            gray_bins[st][float(t)] = b

    # Row: id + target features
    row: Dict[str, float] = {"mask_name": name}
    row.update(extract_target_features(bins["target"]))

    # Alignment sanity: target vs printed
    row["shift_target_printed"] = centroid_shift(bins["target"], bins["printed"])

    # --- Stage-wise: target vs (glp, pixelILT, printed)
    # row["iou_target_glp"] = iou(bins["target"], bins["glp"])
    # row["xor_target_glp"] = xor_rate(bins["target"], bins["glp"])

    row["iou_target_pixelILT"] = iou(bins["target"], bins["pixelILT"])
    row["xor_target_pixelILT"] = xor_rate(bins["target"], bins["pixelILT"])

    row["iou_target_printed"] = iou(bins["target"], bins["printed"])
    row["xor_target_printed"] = xor_rate(bins["target"], bins["printed"])
    row["comp_delta_target_printed"] = comp_delta(bins["target"], bins["printed"])

    # --- Stage-wise: (glp or pixelILT) -> litho -> resist -> printed (threshold별)
    # glp -> litho
    for t in cfg.thresholds:
        lt = gray_bins["litho"][float(t)]
        # row[f"iou_glp_litho_t{t:.2f}"] = iou(bins["glp"], lt)
        # row[f"xor_glp_litho_t{t:.2f}"] = xor_rate(bins["glp"], lt)

        row[f"iou_pixelILT_litho_t{t:.2f}"] = iou(bins["pixelILT"], lt)
        row[f"xor_pixelILT_litho_t{t:.2f}"] = xor_rate(bins["pixelILT"], lt)

    # litho -> resist
    for t in cfg.thresholds:
        lt = gray_bins["litho"][float(t)]
        rt = gray_bins["resist"][float(t)]
        row[f"iou_litho_resist_t{t:.2f}"] = iou(lt, rt)
        row[f"xor_litho_resist_t{t:.2f}"] = xor_rate(lt, rt)

    # resist -> printed
    for t in cfg.thresholds:
        rt = gray_bins["resist"][float(t)]
        row[f"iou_resist_printed_t{t:.2f}"] = iou(rt, bins["printed"])
        row[f"xor_resist_printed_t{t:.2f}"] = xor_rate(rt, bins["printed"])
        row[f"comp_delta_resist_printed_t{t:.2f}"] = comp_delta(rt, bins["printed"])

    # Threshold stability summary (resist->printed)
    rp_ious = [row[f"iou_resist_printed_t{t:.2f}"] for t in cfg.thresholds]
    row["iou_resist_printed_mean"] = float(np.mean(rp_ious))
    row["iou_resist_printed_std"] = float(np.std(rp_ious))

    return row


def run(cfg: Config) -> pd.DataFrame:
    maps, common = build_index(cfg)

    rows = []
    for i, name in enumerate(common, 1):
        try:
            rows.append(process_one(name, maps, cfg))
        except Exception as e:
            print(f"[ERROR] {name}: {e}")

        if i % 50 == 0 or i == len(common):
            print(f"Processed {i}/{len(common)}")

    df = pd.DataFrame(rows)
    df.to_csv(cfg.out_csv, index=False, encoding="utf-8-sig")
    print(f"[DONE] Saved {cfg.out_csv} | rows={len(df)} cols={df.shape[1]}")
    return df


if __name__ == "__main__":
    df = run(CFG)
    print(df.head(3).to_string(index=False))