In [18]:
# Radiomics: Process NIfTI for Deep Learning Training

import os
import numpy as np
import SimpleITK as sitk
import pathlib
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from skimage.transform import resize
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from skimage import measure

# === CONFIG ===
DATA_DIR = pathlib.Path("data")  # folder with nested case folders
OUTPUT_DIR = pathlib.Path("processed_data")
TARGET_SHAPE = (128, 128, 64)  # standard volume shape
TEST_SIZE = 0.2
VAL_SIZE = 0.1
EXPORT_PNG = True  # export slices for inspection

# Create folders
for split in ["train", "val", "test"]:
    (OUTPUT_DIR / split / "X").mkdir(parents=True, exist_ok=True)
    (OUTPUT_DIR / split / "y").mkdir(parents=True, exist_ok=True)
    if EXPORT_PNG:
        (OUTPUT_DIR / split / "png").mkdir(parents=True, exist_ok=True)

def load_nii_image(path):
    image = sitk.ReadImage(str(path))
    array = sitk.GetArrayFromImage(image)  # shape: (Z, Y, X)
    return array.astype(np.float32)

def normalize_volume(volume):
    volume = np.clip(volume, np.percentile(volume, 1), np.percentile(volume, 99))
    volume -= volume.min()
    volume /= volume.max() + 1e-8
    return volume

def resize_volume(volume, target_shape):
    return resize(volume, target_shape, mode='constant', preserve_range=True)

def save_middle_slices(image, label, case_name, output_folder):
    z_mid = image.shape[2] // 2
    y_mid = image.shape[1] // 2
    x_mid = image.shape[0] // 2

    fig, axs = plt.subplots(2, 3, figsize=(10, 6))
    axs[0, 0].imshow(image[x_mid, :, :], cmap='gray'); axs[0, 0].set_title('X slice')
    axs[0, 1].imshow(image[:, y_mid, :], cmap='gray'); axs[0, 1].set_title('Y slice')
    axs[0, 2].imshow(image[:, :, z_mid], cmap='gray'); axs[0, 2].set_title('Z slice')

    axs[1, 0].imshow(label[x_mid, :, :]); axs[1, 0].set_title('Label X')
    axs[1, 1].imshow(label[:, y_mid, :]); axs[1, 1].set_title('Label Y')
    axs[1, 2].imshow(label[:, :, z_mid]); axs[1, 2].set_title('Label Z')

    for ax in axs.flat:
        ax.axis('off')

    plt.tight_layout()
    plt.savefig(output_folder / f"{case_name}_slices.png")
    plt.close()

def visualize_3d(volume, threshold=0.5):
    verts, faces, _, _ = measure.marching_cubes(volume, level=threshold)
    fig = plt.figure(figsize=(8, 8))
    ax = fig.add_subplot(111, projection='3d')
    mesh = Poly3DCollection(verts[faces], alpha=0.7)
    mesh.set_facecolor([0.5, 0.5, 1])
    ax.add_collection3d(mesh)
    ax.set_xlim(0, volume.shape[0])
    ax.set_ylim(0, volume.shape[1])
    ax.set_zlim(0, volume.shape[2])
    plt.title("3D Segmentation")
    plt.show()

def process_case(case_dir):
    image_paths = list(case_dir.rglob("body-parts.nii.gz"))  # <- nome real no seu diretório
    label_paths = list(case_dir.rglob("body-regions.nii.gz"))
    if not image_paths:
        print(f"⚠️ Imagem não encontrada em {case_dir}")
        return None

    image = load_nii_image(image_paths[0])
    image = normalize_volume(image)
    image = resize_volume(image, TARGET_SHAPE)

    if label_paths:
        label = load_nii_image(label_paths[0])
        label = resize_volume(label, TARGET_SHAPE)
        label = (label > 0).astype(np.uint8)  # binarize
    else:
        label = np.zeros(TARGET_SHAPE, dtype=np.uint8)

    return image, label

# === Find All Valid Cases (folders that contain body-parts.nii.gz) ===
all_cases = sorted({p.parent for p in DATA_DIR.rglob("body-parts.nii.gz")})

if len(all_cases) == 0:
    print("❌ Nenhum caso encontrado. Verifique a pasta 'data'.")
    train, val, test = [], [], []
elif len(all_cases) == 1:
    train, val, test = all_cases, [], []
    print("⚠️ Apenas 1 caso encontrado: todos os dados serão usados como treino.")
else:
    test_size = TEST_SIZE if len(all_cases) > 4 else max(1 / len(all_cases), 0.2)
    val_size = VAL_SIZE if len(all_cases) > 4 else 0
    train_val, test = train_test_split(all_cases, test_size=test_size, random_state=42)
    if len(train_val) > 1 and val_size > 0:
        train, val = train_test_split(train_val, test_size=val_size, random_state=42)
    else:
        train, val = train_val, []

splits = [(train, "train"), (val, "val"), (test, "test")]

for split_cases, split_name in splits:
    if not split_cases:
        continue
    print(f"\nProcessing {split_name} ({len(split_cases)} cases)...")
    for case_dir in tqdm(split_cases):
        result = process_case(case_dir)
        if result is None:
            continue
        image, label = result
        np.save(OUTPUT_DIR / split_name / "X" / f"{case_dir.name}.npy", image)
        np.save(OUTPUT_DIR / split_name / "y" / f"{case_dir.name}.npy", label)

        if EXPORT_PNG:
            save_middle_slices(image, label, case_dir.name, OUTPUT_DIR / split_name / "png")

print("\n✅ Preprocessing complete. Ready for model training.")


Processing train (582 cases)...


100%|██████████| 582/582 [37:45<00:00,  3.89s/it]



Processing val (65 cases)...


100%|██████████| 65/65 [04:10<00:00,  3.86s/it]



Processing test (162 cases)...


100%|██████████| 162/162 [10:06<00:00,  3.74s/it]


✅ Preprocessing complete. Ready for model training.





In [None]:
# U-Net 3D Training for Radiomics Segmentation

import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from pathlib import Path
from tqdm import tqdm
import matplotlib.pyplot as plt
import nibabel as nib

# ===================
# Configuration
# ===================
DATA_DIR = Path("processed_data")
BATCH_SIZE = 2
EPOCHS = 20
LEARNING_RATE = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
CHECKPOINT_PATH = Path("unet3d_model.pt")

# ===================
# Dataset
# ===================
class NumpyVolumeDataset(Dataset):
    def __init__(self, x_dir, y_dir):
        self.x_paths = sorted(list(Path(x_dir).glob("*.npy")))
        self.y_paths = sorted(list(Path(y_dir).glob("*.npy")))

    def __len__(self):
        return len(self.x_paths)

    def __getitem__(self, idx):
        x = np.load(self.x_paths[idx])[None, ...]  # add channel
        y = np.load(self.y_paths[idx])[None, ...]  # binary mask
        return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32)

# ===================
# U-Net 3D Model
# ===================
class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv3d(in_channels, out_channels, 3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv3d(out_channels, out_channels, 3, padding=1),
            nn.BatchNorm3d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        return self.double_conv(x)

class UNet3D(nn.Module):
    def __init__(self, in_channels=1, out_channels=1):
        super().__init__()
        self.enc1 = DoubleConv(in_channels, 32)
        self.enc2 = DoubleConv(32, 64)
        self.enc3 = DoubleConv(64, 128)
        self.pool = nn.MaxPool3d(2)
        self.up2 = nn.ConvTranspose3d(128, 64, 2, stride=2)
        self.dec2 = DoubleConv(128, 64)
        self.up1 = nn.ConvTranspose3d(64, 32, 2, stride=2)
        self.dec1 = DoubleConv(64, 32)
        self.out_conv = nn.Conv3d(32, out_channels, 1)

    def forward(self, x):
        x1 = self.enc1(x)
        x2 = self.enc2(self.pool(x1))
        x3 = self.enc3(self.pool(x2))
        x = self.up2(x3)
        x = self.dec2(torch.cat([x, x2], dim=1))
        x = self.up1(x)
        x = self.dec1(torch.cat([x, x1], dim=1))
        return torch.sigmoid(self.out_conv(x))

# ===================
# Training Utilities
# ===================
def dice_loss(pred, target, smooth=1.0):
    pred = pred.contiguous()
    target = target.contiguous()
    intersection = (pred * target).sum(dim=(2, 3, 4))
    union = pred.sum(dim=(2, 3, 4)) + target.sum(dim=(2, 3, 4))
    dice = (2. * intersection + smooth) / (union + smooth)
    return 1 - dice.mean()

def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for x, y in tqdm(loader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        optimizer.zero_grad()
        pred = model(x)
        loss = criterion(pred, y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(loader)

def validate(model, loader):
    model.eval()
    dices = []
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            pred = model(x)
            pred = (pred > 0.5).float()
            dice = 1 - dice_loss(pred, y)
            dices.append(dice.item())
    return np.mean(dices)

# ===================
# Main Training Loop
# ===================
train_ds = NumpyVolumeDataset(DATA_DIR / "train/X", DATA_DIR / "train/y")
val_ds = NumpyVolumeDataset(DATA_DIR / "val/X", DATA_DIR / "val/y")
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=1)

model = UNet3D().to(DEVICE)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

train_losses = []
val_scores = []

for epoch in range(EPOCHS):
    print(f"Epoch {epoch+1}/{EPOCHS}")
    loss = train_one_epoch(model, train_loader, optimizer, dice_loss)
    val_dice = validate(model, val_loader)
    print(f"Train Loss: {loss:.4f} | Val Dice: {val_dice:.4f}")
    train_losses.append(loss)
    val_scores.append(val_dice)

# Save model
torch.save(model.state_dict(), CHECKPOINT_PATH)
print(f"✅ Modelo salvo em: {CHECKPOINT_PATH}")

# Plot
plt.figure()
plt.plot(train_losses, label="Train Loss")
plt.plot(val_scores, label="Val Dice")
plt.xlabel("Epoch")
plt.ylabel("Metric")
plt.legend()
plt.title("Training Curve")
plt.show()

# ===================
# Prediction on Test Set + Save
# ===================
test_ds = NumpyVolumeDataset(DATA_DIR / "test/X", DATA_DIR / "test/y")
test_loader = DataLoader(test_ds, batch_size=1)
model.eval()

PRED_DIR = DATA_DIR / "predictions"
PRED_DIR.mkdir(exist_ok=True)

with torch.no_grad():
    for i, (x, _) in enumerate(test_loader):
        x = x.to(DEVICE)
        pred = model(x)
        pred = (pred > 0.5).float().cpu().numpy()[0, 0]
        nib.save(nib.Nifti1Image(pred, affine=np.eye(4)), PRED_DIR / f"pred_{i:03d}.nii.gz")

print(f"🎯 Predições salvas em: {PRED_DIR}")

Epoch 1/20


  3%|▎         | 8/291 [04:23<2:33:45, 32.60s/it]