<a href="https://colab.research.google.com/github/mitsuo/juntendo-hds/blob/main/MONAI_Segmentation_ChestRadiologyImages_py312_fixed.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MONAI: Chest CT (LCTSC) 右肺セグメンテーション（Colab / Python 3.12 対応版）

このノートは、LCTSC (AAPM 2017) の **CT(DICOM) + RTSTRUCT** から右肺（`Lung_R`）マスクを作成し、MONAI の 3D UNet で学習・評価する最小実装です。

- **修正ポイント**: Python 3.12 で動くように依存関係を更新し、元ノートに含まれていた `...`（記事の省略記号）や切れた変数名など **実行不能な箇所を全て排除** しています。
- データ読み込みの安定性のため、**CT とマスクを `.npy` に変換してから学習**します（DICOM フォルダを直接 Loader に渡すより事故が少ないため）。

> 右肺以外にしたい場合は、`ROI_NAME` を変更してください（存在しない場合は自動で候補から選びます）。


In [1]:
\
# --- Install (Colab / Python 3.12) ---
# NOTE:
# - ColabのGPU環境ではPyTorchは通常プリインストールされています。
# - MONAIはPython 3.12対応版を入れる必要があるため、古い固定版(1.0.0等)は使いません。
!pip -q install -U "monai[pydicom]" "pydicom" "rt-utils" "nibabel" "gdown" "wandb"

import os, sys, random
import numpy as np
import torch

print("Python:", sys.version)
print("Torch :", torch.__version__, "CUDA:", torch.version.cuda, "GPU available:", torch.cuda.is_available())


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.4/2.4 MB[0m [31m38.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m42.4 MB/s[0m eta [36m0:00:00[0m
[?25hPython: 3.12.12 (main, Oct 10 2025, 08:52:57) [GCC 11.4.0]
Torch : 2.9.0+cu126 CUDA: 12.6 GPU available: True


In [2]:
\
# --- Reproducibility ---
def seed_everything(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

seed_everything(42)


In [3]:
\
# --- Download dataset (LCTSC DICOM) ---
# 元ノートのIDをそのまま使用しています（アクセス不能な場合は別URL/IDに置き換えてください）
import os, pathlib, zipfile
DATA_ZIP = "LCTSC_DICOM.zip"
DATA_DIR = pathlib.Path("LCTSC_DICOM")

if not pathlib.Path(DATA_ZIP).exists():
    !gdown --id 1TXH4PnMcU-23irtZZShZ80ClpJPedZL_ -O {DATA_ZIP}

if not DATA_DIR.exists():
    with zipfile.ZipFile(DATA_ZIP, "r") as zf:
        zf.extractall(".")
print("Extracted:", DATA_DIR.exists(), "=>", DATA_DIR.resolve())


Downloading...
From (original): https://drive.google.com/uc?id=1TXH4PnMcU-23irtZZShZ80ClpJPedZL_
From (redirected): https://drive.google.com/uc?id=1TXH4PnMcU-23irtZZShZ80ClpJPedZL_&confirm=t&uuid=44e83af2-6f1d-461a-bf61-622991a88300
To: /content/LCTSC_DICOM.zip
100% 480M/480M [00:04<00:00, 98.4MB/s]
Extracted: True => /content/LCTSC_DICOM


In [4]:
\
# --- Scan patients: find CT series folder and RTSTRUCT file ---
from glob import glob
import pydicom
from pathlib import Path

def modality_of(path: str) -> str | None:
    try:
        ds = pydicom.dcmread(path, stop_before_pixels=True, force=True)
        return getattr(ds, "Modality", None)
    except Exception:
        return None

def find_ct_and_rtstruct(patient_root: Path):
    # まずは.dcmを全部拾う（患者あたりのファイル数が多い場合があるので、必要なら制限してください）
    dcm_files = glob(str(patient_root / "**" / "*.dcm"), recursive=True)
    if not dcm_files:
        return None, None

    ct_dirs = {}
    rtstruct_files = []
    for f in dcm_files:
        mod = modality_of(f)
        if mod == "CT":
            d = str(Path(f).parent)
            ct_dirs[d] = ct_dirs.get(d, 0) + 1
        elif mod == "RTSTRUCT":
            rtstruct_files.append(f)

    if not ct_dirs or not rtstruct_files:
        return None, None

    # CTは「最もCT枚数が多いディレクトリ」を採用
    ct_dir = max(ct_dirs.items(), key=lambda kv: kv[1])[0]
    rt_path = rtstruct_files[0]
    return ct_dir, rt_path

patient_roots = sorted([p for p in DATA_DIR.glob("LCTSC-Train-S1-*") if p.is_dir()])
print("Train patients found:", len(patient_roots))
ct_rt = []
for p in patient_roots:
    ct_dir, rt_path = find_ct_and_rtstruct(p)
    if ct_dir and rt_path:
        ct_rt.append((p.name, ct_dir, rt_path))

print("Usable patients:", len(ct_rt))
ct_rt[:3]


Train patients found: 12
Usable patients: 12


[('LCTSC-Train-S1-001',
  'LCTSC_DICOM/LCTSC-Train-S1-001/11-16-2003-NA-RTRCCTTHORAX8FLow Adult-39664/0.000000-CTRespCT-cran  3.0  B30s  50 Ex-63530',
  'LCTSC_DICOM/LCTSC-Train-S1-001/11-16-2003-NA-RTRCCTTHORAX8FLow Adult-39664/1.000000-.simplified-62948/1-1.dcm'),
 ('LCTSC-Train-S1-002',
  'LCTSC_DICOM/LCTSC-Train-S1-002/11-01-2003-NA-RTRCCTTHORAX8F Adult-00439/0.000000-CT2p2b2 50 Ex-45938',
  'LCTSC_DICOM/LCTSC-Train-S1-002/11-01-2003-NA-RTRCCTTHORAX8F Adult-00439/1.000000-.simplified-15042/1-1.dcm'),
 ('LCTSC-Train-S1-003',
  'LCTSC_DICOM/LCTSC-Train-S1-003/12-20-2003-NA-RTRCCTTHORAX8FHigh Adult-78009/0.000000-CTP2B1 long RC  3.0  B30f  50 Ex-63868',
  'LCTSC_DICOM/LCTSC-Train-S1-003/12-20-2003-NA-RTRCCTTHORAX8FHigh Adult-78009/1.000000-.simplified-49059/1-1.dcm')]

In [5]:
\
# --- Convert DICOM CT + RTSTRUCT -> npy volumes ---
from rt_utils import RTStructBuilder

OUT_DIR = Path("lctsc_npy")
OUT_DIR.mkdir(exist_ok=True)

ROI_NAME = "Lung_R"  # 右肺（別ROIにしたい場合ここを変更）

def sort_key(ds):
    # なるべく安定してスライス順になるように
    if hasattr(ds, "ImagePositionPatient"):
        try:
            return float(ds.ImagePositionPatient[2])
        except Exception:
            pass
    if hasattr(ds, "InstanceNumber"):
        try:
            return int(ds.InstanceNumber)
        except Exception:
            pass
    return 0

def load_ct_volume_hu(ct_dir: str) -> np.ndarray:
    files = sorted(glob(os.path.join(ct_dir, "*.dcm")))
    dss = []
    for f in files:
        ds = pydicom.dcmread(f, force=True)
        if getattr(ds, "Modality", None) == "CT":
            dss.append(ds)
    dss = sorted(dss, key=sort_key)
    if not dss:
        raise RuntimeError(f"No CT slices in {ct_dir}")

    vol = np.stack([ds.pixel_array for ds in dss], axis=-1).astype(np.float32)  # (H,W,D)
    slope = float(getattr(dss[0], "RescaleSlope", 1.0))
    intercept = float(getattr(dss[0], "RescaleIntercept", 0.0))
    vol = vol * slope + intercept  # HU
    return vol

def pick_roi(rtstruct, preferred: str):
    names = rtstruct.get_roi_names()
    if preferred in names:
        return preferred
    # ありがちな表記ゆれをゆるく探索
    candidates = [n for n in names if ("lung" in n.lower() and ("r" in n.lower() or "right" in n.lower()))]
    if candidates:
        return candidates[0]
    # 最後の手段: Lungを含むもの
    candidates = [n for n in names if "lung" in n.lower()]
    return candidates[0] if candidates else None

MAX_PATIENTS = 10  # デモ用。全部やるなら None にする

records = []
for idx, (pid, ct_dir, rt_path) in enumerate(ct_rt):
    if MAX_PATIENTS is not None and idx >= MAX_PATIENTS:
        break

    img_out = OUT_DIR / f"{pid}_ct.npy"
    msk_out = OUT_DIR / f"{pid}_{ROI_NAME}.npy"

    if img_out.exists() and msk_out.exists():
        records.append({"patient_id": pid, "image": str(img_out), "label": str(msk_out)})
        continue

    rtstruct = RTStructBuilder.create_from(dicom_series_path=ct_dir, rt_struct_path=rt_path)
    roi = pick_roi(rtstruct, ROI_NAME)
    if roi is None:
        print(f"[SKIP] {pid}: no lung ROI. available={rtstruct.get_roi_names()[:10]}...")
        continue

    ct_vol = load_ct_volume_hu(ct_dir)
    mask = rtstruct.get_roi_mask_by_name(roi).astype(np.uint8)  # (H,W,D)

    if ct_vol.shape != mask.shape:
        # 形状が違う場合は、この患者は学習に回さず原因解析した方が安全
        print(f"[SKIP] {pid}: shape mismatch CT{ct_vol.shape} vs MASK{mask.shape}")
        continue

    np.save(img_out, ct_vol)
    np.save(msk_out, mask)
    records.append({"patient_id": pid, "image": str(img_out), "label": str(msk_out), "roi_used": roi})

print("Converted patients:", len(records))
records[:2]


Converted patients: 10


[{'patient_id': 'LCTSC-Train-S1-001',
  'image': 'lctsc_npy/LCTSC-Train-S1-001_ct.npy',
  'label': 'lctsc_npy/LCTSC-Train-S1-001_Lung_R.npy',
  'roi_used': 'Lung_R'},
 {'patient_id': 'LCTSC-Train-S1-002',
  'image': 'lctsc_npy/LCTSC-Train-S1-002_ct.npy',
  'label': 'lctsc_npy/LCTSC-Train-S1-002_Lung_R.npy',
  'roi_used': 'Lung_R'}]

In [6]:
\
# --- Train/Val/Test split ---
import pandas as pd

df = pd.DataFrame(records)
df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)

# 小規模デモ用の分割。データが増えたら比率で切るのがおすすめ。
test_df = df.iloc[:2].reset_index(drop=True)
val_df  = df.iloc[2:4].reset_index(drop=True)
train_df= df.iloc[4:].reset_index(drop=True)

print("train/val/test:", len(train_df), len(val_df), len(test_df))
df.head()


train/val/test: 6 2 2


Unnamed: 0,patient_id,image,label,roi_used
0,LCTSC-Train-S1-009,lctsc_npy/LCTSC-Train-S1-009_ct.npy,lctsc_npy/LCTSC-Train-S1-009_Lung_R.npy,Lung_R
1,LCTSC-Train-S1-002,lctsc_npy/LCTSC-Train-S1-002_ct.npy,lctsc_npy/LCTSC-Train-S1-002_Lung_R.npy,Lung_R
2,LCTSC-Train-S1-006,lctsc_npy/LCTSC-Train-S1-006_ct.npy,lctsc_npy/LCTSC-Train-S1-006_Lung_R.npy,Lung_R
3,LCTSC-Train-S1-001,lctsc_npy/LCTSC-Train-S1-001_ct.npy,lctsc_npy/LCTSC-Train-S1-001_Lung_R.npy,Lung_R
4,LCTSC-Train-S1-008,lctsc_npy/LCTSC-Train-S1-008_ct.npy,lctsc_npy/LCTSC-Train-S1-008_Lung_R.npy,Lung_R


In [7]:
\
# --- MONAI datasets / transforms ---
from monai.data import Dataset, DataLoader
from monai.transforms import (
    Compose, LoadImaged, EnsureChannelFirstd, Resized,
    ScaleIntensityRanged, EnsureTyped
)

SPATIAL_SIZE = (96, 96, 96)

train_transforms = Compose([
    LoadImaged(keys=["image", "label"]),  # .npy は自動で NumpyReader が選ばれるはず
    EnsureChannelFirstd(keys=["image", "label"], channel_dim="no_channel"),
    # CTはHUなので、よく使う範囲にクリップして[0,1]へ
    ScaleIntensityRanged(keys="image", a_min=-1000, a_max=400, b_min=0.0, b_max=1.0, clip=True),
    Resized(keys=["image", "label"], spatial_size=SPATIAL_SIZE),
    EnsureTyped(keys=["image", "label"], dtype=torch.float32),
])

val_transforms = Compose([
    LoadImaged(keys=["image", "label"]),
    EnsureChannelFirstd(keys=["image", "label"], channel_dim="no_channel"),
    ScaleIntensityRanged(keys="image", a_min=-1000, a_max=400, b_min=0.0, b_max=1.0, clip=True),
    Resized(keys=["image", "label"], spatial_size=SPATIAL_SIZE),
    EnsureTyped(keys=["image", "label"], dtype=torch.float32),
])

train_data = train_df[["image","label"]].to_dict("records")
val_data   = val_df[["image","label"]].to_dict("records")
test_data  = test_df[["image","label"]].to_dict("records")

train_ds = Dataset(data=train_data, transform=train_transforms)
val_ds   = Dataset(data=val_data,   transform=val_transforms)
test_ds  = Dataset(data=test_data,  transform=val_transforms)

train_loader = DataLoader(train_ds, batch_size=2, shuffle=True, num_workers=0)
val_loader   = DataLoader(val_ds,   batch_size=2, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds,  batch_size=1, shuffle=False, num_workers=0)

next(iter(train_loader))["image"].shape, next(iter(train_loader))["label"].shape




(torch.Size([2, 1, 96, 96, 96]), torch.Size([2, 1, 96, 96, 96]))

In [8]:
\
# --- Model / loss / metric ---
from monai.networks.nets import UNet
from monai.networks.layers import Norm
from monai.losses import DiceLoss
from monai.metrics import DiceMetric
from monai.transforms import Activations, AsDiscrete

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = UNet(
    spatial_dims=3,
    in_channels=1,
    out_channels=1,
    channels=(16, 32, 64, 128, 256),
    strides=(2, 2, 2, 2),
    num_res_units=2,
    norm=Norm.BATCH,
).to(device)

loss_function = DiceLoss(sigmoid=True)   # sigmoid込みのDiceLoss（2値）
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

dice_metric = DiceMetric(include_background=True, reduction="mean")
post_pred = Compose([Activations(sigmoid=True), AsDiscrete(threshold=0.5)])
post_label = AsDiscrete(threshold=0.5)

print("device:", device)


device: cuda


In [11]:
\
# --- (Optional) Weights & Biases ---
# wandbにログを送らない場合は "disabled" にできます
# os.environ["WANDB_MODE"] = "disabled"

import wandb, os
wandb.init(project="MONAI_Segmentation_ChestRadiologyImages", config={
    "spatial_size": SPATIAL_SIZE,
    "lr": 1e-3,
    "max_epochs": 50,
    "batch_size": 2,
    "roi": ROI_NAME,
})


0,1
best_dice,▁
best_epoch,▁
epoch,▁▂▃▃▄▅▆▆▇█
train/loss,█▆▄▃▂▂▁▁▁▁
val/dice,▁▂▁▁▂▂▆▄▆█

0,1
best_dice,0.31296
best_epoch,10.0
epoch,10.0
train/loss,0.90799
val/dice,0.31296


In [12]:
\
# --- Training loop ---
max_epochs = int(wandb.config.get("max_epochs", 10))
val_interval = 1
best_metric = -1.0
best_metric_epoch = -1
best_path = "best_metric_model.pth"

for epoch in range(max_epochs):
    model.train()
    epoch_loss = 0.0

    for step, batch_data in enumerate(train_loader, start=1):
        inputs = batch_data["image"].to(device)
        labels = batch_data["label"].to(device)

        optimizer.zero_grad(set_to_none=True)
        outputs = model(inputs)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    epoch_loss /= max(1, step)
    print(f"epoch {epoch+1}/{max_epochs}  train_loss={epoch_loss:.4f}")
    wandb.log({"train/loss": epoch_loss, "epoch": epoch+1}, step=epoch+1)

    if (epoch + 1) % val_interval == 0 and len(val_loader) > 0:
        model.eval()
        dice_metric.reset()
        with torch.no_grad():
            for val_data in val_loader:
                val_inputs = val_data["image"].to(device)
                val_labels = val_data["label"].to(device)

                val_outputs = model(val_inputs)
                val_outputs = post_pred(val_outputs)
                val_labels  = post_label(val_labels)

                dice_metric(y_pred=val_outputs, y=val_labels)

        metric = dice_metric.aggregate().item()
        dice_metric.reset()
        print(f"           val_dice={metric:.4f}")
        wandb.log({"val/dice": metric}, step=epoch+1)

        if metric > best_metric:
            best_metric = metric
            best_metric_epoch = epoch + 1
            torch.save(model.state_dict(), best_path)
            print("           saved new best model")

print(f"done. best_dice={best_metric:.4f} @epoch={best_metric_epoch}")
wandb.log({"best_dice": best_metric, "best_epoch": best_metric_epoch})


epoch 1/50  train_loss=0.9073
           val_dice=0.2925
           saved new best model
epoch 2/50  train_loss=0.9067
           val_dice=0.5335
           saved new best model
epoch 3/50  train_loss=0.9062
           val_dice=0.3263
epoch 4/50  train_loss=0.9057
           val_dice=0.5738
           saved new best model
epoch 5/50  train_loss=0.9052
           val_dice=0.5480
epoch 6/50  train_loss=0.9047
           val_dice=0.4519
epoch 7/50  train_loss=0.9042
           val_dice=0.4343
epoch 8/50  train_loss=0.9036
           val_dice=0.6845
           saved new best model
epoch 9/50  train_loss=0.9032
           val_dice=0.6678
epoch 10/50  train_loss=0.9026
           val_dice=0.5419
epoch 11/50  train_loss=0.9021
           val_dice=0.5936
epoch 12/50  train_loss=0.9017
           val_dice=0.5491
epoch 13/50  train_loss=0.9011
           val_dice=0.6401
epoch 14/50  train_loss=0.9004
           val_dice=0.5787
epoch 15/50  train_loss=0.8998
           val_dice=0.5915
epoch 16/50

In [13]:
\
# --- Test / inference ---
from monai.inferers import sliding_window_inference

# load best
if os.path.exists(best_path):
    model.load_state_dict(torch.load(best_path, map_location=device))
model.eval()

dice_metric.reset()
pred_dir = Path("pred_npy"); pred_dir.mkdir(exist_ok=True)

with torch.no_grad():
    for i, batch in enumerate(test_loader):
        img = batch["image"].to(device)
        lab = batch["label"].to(device)

        # ここではサイズが小さいので、1パッチで推論（必要ならSWに）
        pred = model(img)
        pred = post_pred(pred)

        dice_metric(y_pred=pred, y=post_label(lab))

        pred_np = pred.detach().cpu().numpy()[0,0]  # (D,H,W) or (H,W,D) depending on transforms; here (96,96,96)
        lab_np  = lab.detach().cpu().numpy()[0,0]
        np.save(pred_dir / f"test_pred_{i}.npy", pred_np.astype(np.uint8))
        np.save(pred_dir / f"test_gt_{i}.npy", lab_np.astype(np.uint8))

test_dice = dice_metric.aggregate().item() if len(test_loader)>0 else float("nan")
dice_metric.reset()
print("test dice:", test_dice)
wandb.log({"test/dice": test_dice})


test dice: 0.8471269607543945


In [14]:
\
# --- Visualize a prediction (animation) ---
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML

pred = np.load("pred_npy/test_pred_0.npy")
gt   = np.load("pred_npy/test_gt_0.npy")

def show_anim(vol, title="volume"):
    fig, ax = plt.subplots()
    ax.set_title(title)
    def update(i):
        ax.clear()
        ax.imshow(vol[:, :, i], cmap="gray")
        ax.set_title(f"{title} z={i}")
    ani = animation.FuncAnimation(fig, update, frames=range(vol.shape[2]), interval=80)
    plt.close()
    return HTML(ani.to_jshtml())

show_anim(pred, "prediction")


In [13]:
\
show_anim(gt, "ground truth")
