<a href="https://colab.research.google.com/github/otakunoichin/test/blob/main/Ai%E7%B6%B2%E8%86%9CwhiteAddResult.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# ===== 完全版：1M vs 1Y 変化量比較（モデルロード込み）=====

from google.colab import drive
drive.mount('/content/drive')

import os, re
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
import cv2

import torch
import torch.nn as nn

Mounted at /content/drive


In [2]:
# -------------------------
# 設定
# -------------------------
ROOT = "/content/drive/MyDrive/AiFundas"

pairs = [
    ("pt1_1M.png",   "pt1_1Y.png"),
    ("pt13R_1M.png", "pt13R_1Y.png"),
    ("pt21R_1M.png", "pt21R_1Y.png"),
    ("pt27R_1M.png", "pt27R_1Y.png"),
]

In [3]:
# ★白学習済みモデル
CKPT_PATH = os.path.join(ROOT, "best_unet_with_white.pt")
# もし無いなら旧モデルに切り替える
if not os.path.exists(CKPT_PATH):
    CKPT_PATH = os.path.join(ROOT, "best_unet.pt")
print("using ckpt:", CKPT_PATH)

OUT_DIR = os.path.join(ROOT, "followup_results_v2")  # 新しい保存先（消してもOK）
os.makedirs(OUT_DIR, exist_ok=True)

THR = 0.5
ALPHA = 0.35
TARGET_SIZE = 4096
PATCH_SIZE  = 512

using ckpt: /content/drive/MyDrive/AiFundas/best_unet_with_white.pt


In [4]:
# -------------------------
# 画像読み込み
# -------------------------
def load_gray_u8(path):
    return np.array(Image.open(path).convert("L"), dtype=np.uint8)

# -------------------------
# UNet（state_dictロード用に必要）
# -------------------------
class DoubleConv(nn.Module):
    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(in_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_ch, out_ch, 3, padding=1, bias=False),
            nn.BatchNorm2d(out_ch),
            nn.ReLU(inplace=True),
        )
    def forward(self, x):
        return self.net(x)

class UNet(nn.Module):
    def __init__(self, in_ch=1, out_ch=1, base=32):
        super().__init__()
        self.enc1 = DoubleConv(in_ch, base);    self.pool1 = nn.MaxPool2d(2)
        self.enc2 = DoubleConv(base, base*2);   self.pool2 = nn.MaxPool2d(2)
        self.enc3 = DoubleConv(base*2, base*4); self.pool3 = nn.MaxPool2d(2)
        self.enc4 = DoubleConv(base*4, base*8); self.pool4 = nn.MaxPool2d(2)
        self.bottleneck = DoubleConv(base*8, base*16)
        self.up4 = nn.ConvTranspose2d(base*16, base*8, 2, stride=2); self.dec4 = DoubleConv(base*16, base*8)
        self.up3 = nn.ConvTranspose2d(base*8,  base*4, 2, stride=2); self.dec3 = DoubleConv(base*8,  base*4)
        self.up2 = nn.ConvTranspose2d(base*4,  base*2, 2, stride=2); self.dec2 = DoubleConv(base*4,  base*2)
        self.up1 = nn.ConvTranspose2d(base*2,  base,   2, stride=2); self.dec1 = DoubleConv(base*2,  base)
        self.outc = nn.Conv2d(base, out_ch, 1)

    def forward(self, x):
        e1 = self.enc1(x)
        e2 = self.enc2(self.pool1(e1))
        e3 = self.enc3(self.pool2(e2))
        e4 = self.enc4(self.pool3(e3))
        b  = self.bottleneck(self.pool4(e4))
        d4 = self.up4(b); d4 = torch.cat([d4, e4], dim=1); d4 = self.dec4(d4)
        d3 = self.up3(d4); d3 = torch.cat([d3, e3], dim=1); d3 = self.dec3(d3)
        d2 = self.up2(d3); d2 = torch.cat([d2, e2], dim=1); d2 = self.dec2(d2)
        d1 = self.up1(d2); d1 = torch.cat([d1, e1], dim=1); d1 = self.dec1(d1)
        return self.outc(d1)

In [5]:
# -------------------------
# device & モデルロード
# -------------------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device:", device)

ckpt = torch.load(CKPT_PATH, map_location=device)
state_dict = ckpt["model"] if isinstance(ckpt, dict) and "model" in ckpt else ckpt

model = UNet(in_ch=1, out_ch=1, base=32).to(device)
model.load_state_dict(state_dict, strict=True)
model.eval()
print("model loaded")

device: cuda
model loaded


In [6]:
# -------------------------
# 4096pad→512推論
# -------------------------
def reflect_pad_to(arr, target=4096):
    h, w = arr.shape
    if h > target or w > target:
        raise ValueError(f"image too large: {arr.shape} > {target}")
    pad_h = target - h
    pad_w = target - w
    top = pad_h // 2
    bottom = pad_h - top
    left = pad_w // 2
    right = pad_w - left
    arr_pad = np.pad(arr, ((top, bottom), (left, right)), mode="reflect")
    return arr_pad, (top, left, h, w)

@torch.no_grad()
def predict_mask(gray_u8, thr=0.5):
    H, W = gray_u8.shape
    img_pad, (top, left, h, w) = reflect_pad_to(gray_u8, TARGET_SIZE)
    prob_canvas = np.zeros((TARGET_SIZE, TARGET_SIZE), dtype=np.float32)

    for r in range(TARGET_SIZE // PATCH_SIZE):
        for c in range(TARGET_SIZE // PATCH_SIZE):
            y0 = r * PATCH_SIZE
            x0 = c * PATCH_SIZE
            patch = img_pad[y0:y0+PATCH_SIZE, x0:x0+PATCH_SIZE].astype(np.float32) / 255.0
            x = torch.from_numpy(patch).unsqueeze(0).unsqueeze(0).to(device)
            logits = model(x)
            prob = torch.sigmoid(logits).squeeze().cpu().numpy()
            prob_canvas[y0:y0+PATCH_SIZE, x0:x0+PATCH_SIZE] = prob

    prob_crop = prob_canvas[top:top+h, left:left+w]
    mask01 = (prob_crop > thr).astype(np.uint8)
    return prob_crop, mask01


In [7]:
# -------------------------
# ECC（1Yを1Mへ合わせる）
# -------------------------
def normalize_u8(img_u8):
    img = img_u8.astype(np.float32)
    mn, mx = np.percentile(img, 1), np.percentile(img, 99)
    img = np.clip(img, mn, mx)
    img = (img - img.min()) / (img.max() - img.min() + 1e-8)
    return (img * 255).astype(np.uint8)

def register_ecc(ref_u8, mov_u8, warp_mode=cv2.MOTION_AFFINE, iters=300, eps=1e-6):
    ref_f = ref_u8.astype(np.float32)
    mov_f = mov_u8.astype(np.float32)

    if warp_mode == cv2.MOTION_HOMOGRAPHY:
        warp = np.eye(3, 3, dtype=np.float32)
    else:
        warp = np.eye(2, 3, dtype=np.float32)

    criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, iters, eps)
    cc, warp = cv2.findTransformECC(ref_f, mov_f, warp, warp_mode, criteria)
    return warp, float(cc)

def warp_image(img_u8, ref_shape_hw, warp, warp_mode=cv2.MOTION_AFFINE):
    h, w = ref_shape_hw
    if warp_mode == cv2.MOTION_HOMOGRAPHY:
        return cv2.warpPerspective(img_u8, warp, (w, h),
                                   flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP)
    else:
        return cv2.warpAffine(img_u8, warp, (w, h),
                              flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP)

In [8]:
# -------------------------
# overlay（元画像はそのまま + 赤半透明）
# -------------------------
def overlay_on_gray(img_u8, mask01, alpha=0.35):
    img_rgb = np.stack([img_u8]*3, axis=-1).astype(np.float32)
    red = np.zeros_like(img_rgb); red[..., 0] = 255
    m3 = np.stack([mask01]*3, axis=-1)
    out = img_rgb.copy()
    out[m3 == 1] = out[m3 == 1] * (1 - alpha) + red[m3 == 1] * alpha
    return out.astype(np.uint8)

In [9]:
# -------------------------
# 実行：CSV + 8枚overlay保存
# -------------------------
rows = []

for f1M, f1Y in pairs:
    case = f1M.replace("_1M.png", "")

    p1 = os.path.join(ROOT, f1M)
    p2 = os.path.join(ROOT, f1Y)
    assert os.path.exists(p1) and os.path.exists(p2), f"missing file: {f1M} or {f1Y}"

    img1M = load_gray_u8(p1)
    img1Y = load_gray_u8(p2)

    # ECC registration（基本AFFINE、失敗したらHOMOGRAPHY）
    warp_mode = cv2.MOTION_AFFINE
    try:
        warp, cc = register_ecc(normalize_u8(img1M), normalize_u8(img1Y), warp_mode=warp_mode)
        img1Y_aligned = warp_image(img1Y, img1M.shape, warp, warp_mode=warp_mode)
    except cv2.error:
        warp_mode = cv2.MOTION_HOMOGRAPHY
        warp, cc = register_ecc(normalize_u8(img1M), normalize_u8(img1Y), warp_mode=warp_mode)
        img1Y_aligned = warp_image(img1Y, img1M.shape, warp, warp_mode=warp_mode)

    # segmentation
    _, maskM = predict_mask(img1M, thr=THR)
    _, maskY = predict_mask(img1Y_aligned, thr=THR)

    areaM = int(maskM.sum())
    areaY = int(maskY.sum())

    rows.append({
        "case": case,
        "file_1M": f1M,
        "file_1Y": f1Y,
        "ecc_cc": cc,
        "warp_mode": "AFFINE" if warp_mode == cv2.MOTION_AFFINE else "HOMOGRAPHY",
        "area_1M": areaM,
        "area_1Y": areaY,
        "area_delta": areaY - areaM,
        "area_ratio": areaY / (areaM + 1e-8),
    })

    # overlay保存（元画像はそのまま + 赤半透明）
    ovM = overlay_on_gray(img1M, maskM, alpha=ALPHA)
    ovY = overlay_on_gray(img1Y_aligned, maskY, alpha=ALPHA)

    Image.fromarray(ovM).save(os.path.join(OUT_DIR, f"{case}_1M_overlay.png"))
    Image.fromarray(ovY).save(os.path.join(OUT_DIR, f"{case}_1Y_overlay.png"))

df = pd.DataFrame(rows).sort_values("case")
csv_path = os.path.join(OUT_DIR, "followup_change_summary.csv")
df.to_csv(csv_path, index=False)

print("saved:", csv_path)
display(df)

print("overlay saved to:", OUT_DIR)

saved: /content/drive/MyDrive/AiFundas/followup_results_v2/followup_change_summary.csv


Unnamed: 0,case,file_1M,file_1Y,ecc_cc,warp_mode,area_1M,area_1Y,area_delta,area_ratio
0,pt1,pt1_1M.png,pt1_1Y.png,0.937967,AFFINE,4650028,4140144,-509884,0.890348
1,pt13R,pt13R_1M.png,pt13R_1Y.png,0.956527,AFFINE,3887638,3520596,-367042,0.905587
2,pt21R,pt21R_1M.png,pt21R_1Y.png,0.924565,AFFINE,3566347,3770612,204265,1.057276
3,pt27R,pt27R_1M.png,pt27R_1Y.png,0.963204,AFFINE,3844098,3742250,-101848,0.973505


overlay saved to: /content/drive/MyDrive/AiFundas/followup_results_v2
