**脑部MRI分割系统项目需求文档**

------------------------------------------------------------------------

#### **1. 项目概述**

**目标**：开发一个端到端的脑部MRI肿瘤分割系统，实现从数据预处理到模型训练评估的全流程

**核心价值**：

-   为医学影像分析提供可复现的深度学习基准方案
-   验证U-Net在脑肿瘤分割任务中的有效性
-   输出可直接用于临床研究的模型和可视化结果

#### **2. 功能需求**

##### **2.1 数据预处理模块**

| 功能       | 技术指标                                             |
|------------|------------------------------------------------------|
| 数据加载   | 支持`kaggle_3m`数据集格式，自动匹配图像-掩膜对       |
| 数据过滤   | 剔除空白掩膜样本（肿瘤面积为0的病例）                |
| 数据增强   | 标准化Resize(256×256) + ToTensor转换                 |
| 数据集划分 | 按8:2比例分割训练集/验证集，固定随机种子保证可复现性 |

##### **2.2 深度学习模型**

| 组件     | 参数规格                       |
|----------|--------------------------------|
| 网络架构 | 经典U-Net：4层编码器-4层解码器 |
| 卷积模块 | DoubleConv(Conv3×3+BN+ReLU)×2  |
| 下采样   | MaxPool2d(k=2) 实现4倍降采样   |
| 上采样   | ConvTranspose2d(k=2, s=2)      |
| 输出层   | Conv1×1 + Sigmoid激活          |

##### **2.3 训练评估流程**

    训练流程：
    1. 输入：3通道256×256 MRI图像 
    2. 优化器：Adam(lr=0.001)
    3. 损失函数：BCEWithLogitsLoss 
    4. 评估指标：Dice系数（医学分割金标准）
     
    验证流程：
    1. 每epoch验证集测试 
    2. 自动保存最佳模型（best_model.pth ）

##### **2.4 可视化系统**

-   **训练监控**：实时输出Train/Val Loss和Dice系数
-   **结果对比**：三联图显示（原始图像、金标准掩膜、预测结果）
-   **输出格式**：支持Matplotlib交互式显示和PNG导出

##### **3.1 性能指标**

| 场景          | 要求                       |
|---------------|----------------------------|
| 单GPU训练速度 | ≥8 samples/sec（RTX 3090） |
| 推理延迟      | ≤50ms/图像（256×256）      |
| 模型体积      | ≤200MB（.pth格式）         |

##### **3.2 数据要求**

-   **输入数据**：T1加权脑部MRI（推荐3T扫描仪数据）
-   **标注标准**：二值掩膜（0=正常组织，1=肿瘤区域）
-   **最小数据集**：≥100个有效病例（含肿瘤样本）

<!-- -->

    软件环境：
    - Python ≥3.8 
    - PyTorch ≥2.0 
    - OpenCV-Python 
    - Pillow ≥9.0 
     
    硬件环境：
    - 最低配置：NVIDIA GPU(≥8GB显存)
    - 备用方案：CPU模式（需≥32GB内存）

#### **4. 技术架构**

##### **4.1 系统流程图**

    graph TD 
        A[原始DICOM数据] --> B[转换为PNG格式]
        B --> C[数据预处理]
        C --> D[U-Net模型训练]
        D --> E[模型评估]
        E --> F[可视化输出]

##### **4.2 关键算法**

1.  **数据平衡策略**：自动过滤无肿瘤样本
2.  **医学图像增强**：保留HU值范围的归一化方法
3.  **损失函数优化**：BCE+Dice联合损失

#### 导包导库

In [None]:
# 颅脑影像图像分割项目
"""
脑部MRI分割完整流程(U-Net实现)
包含数据加载、模型定义、训练和评估全流程
支持GPU加速 
"""
# 显示每个cell执行时间
%load_ext autotime
# matplotlib交互绘图
%matplotlib ipympl

In [None]:

# ====================== 导入必要的库 ======================
import os  # 文件路径操作 
import numpy as np  # 数值计算 
import glob  # 文件路径匹配 
import matplotlib.pyplot as plt  # 可视化
import torch  # PyTorch主库 
import torch.nn  as nn  # 神经网络模块 
from torch.utils.data  import Dataset, DataLoader  # 数据加载 
from torchvision import transforms  # 图像预处理 
from tqdm import tqdm  # 进度条显示 

from data_prep import BrainMRIDataset

from matplotlib import get_backend
get_backend()

#### 模型定义

In [None]:
# ====================== 2. U-Net模型定义 ======================
class DoubleConv(nn.Module):
    """
    双卷积块(卷积+BN+ReLU)*2 
    用于U-Net的编码器和解码器路径 
    """
    def __init__(self, in_channels, out_channels):
        """
        参数:
            in_channels: int - 输入通道数 
            out_channels: int - 输出通道数 
        """
        super().__init__()
        self.double_conv  = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        """前向传播"""
        return self.double_conv(x) 

class Down(nn.Module):
    """
    下采样块（最大池化+双卷积）
    用于U-Net的编码器路径 
    """
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv  = nn.Sequential(
            nn.MaxPool2d(2),  # 2×2最大池化，尺寸减半 
            DoubleConv(in_channels, out_channels)
        )
    
    def forward(self, x):
        return self.maxpool_conv(x) 
 
class Up(nn.Module):
    """
    上采样块（转置卷积+双卷积）
    用于U-Net的解码器路径 
    """
    def __init__(self, in_channels, out_channels):
        super().__init__()
        # 使用转置卷积进行上采样 
        self.up  = nn.ConvTranspose2d(
            in_channels, in_channels // 2,  # 通道数减半 
            kernel_size=2, stride=2 
        )
        self.conv  = DoubleConv(in_channels, out_channels)
    
    def forward(self, x1, x2):
        """
        参数:
            x1: 上采样特征 
            x2: 跳跃连接特征 
        """
        x1 = self.up(x1)   # 上采样 
        
        # 处理尺寸不匹配问题（由于池化导致的奇数尺寸）
        diffY = x2.size()[2]  - x1.size()[2] 
        diffX = x2.size()[3]  - x1.size()[3] 
        
        # 对称填充 
        x1 = nn.functional.pad( 
            x1, 
            [diffX // 2, diffX - diffX // 2,
             diffY // 2, diffY - diffY // 2]
        )
        
        # 沿通道维度拼接特征 
        x = torch.cat([x2,  x1], dim=1)
        return self.conv(x) 
 
class UNet(nn.Module):
    """
    U-Net模型主结构 
    经典的编码器-解码器架构，带有跳跃连接 
    """
    def __init__(self, n_channels=3, n_classes=1):
        """
        参数:
            n_channels: int - 输入图像通道数（默认3，RGB）
            n_classes: int - 输出类别数（默认1，二分类）
        """
        super().__init__()
        self.n_channels = n_channels 
        self.n_classes = n_classes 
        
        # ===== 编码器路径（下采样） ===== 
        self.inc  = DoubleConv(n_channels, 64)  # 初始卷积 
        self.down1  = Down(64, 128)    # 第一次下采样 
        self.down2  = Down(128, 256)   # 第二次下采样 
        self.down3  = Down(256, 512)   # 第三次下采样 
        self.down4  = Down(512, 1024)  # 第四次下采样 
        
        # ===== 解码器路径（上采样） ===== 
        self.up1  = Up(1024, 512)  # 第一次上采样 
        self.up2  = Up(512, 256)   # 第二次上采样 
        self.up3  = Up(256, 128)   # 第三次上采样 
        self.up4  = Up(128, 64)    # 第四次上采样 
        
        # 输出层（1×1卷积）
        self.outc  = nn.Conv2d(64, n_classes, kernel_size=1)
    
    def forward(self, x):
        # ===== 编码器 ===== 
        x1 = self.inc(x)     # 初始特征 [B,64,H,W]
        x2 = self.down1(x1)  # [B,128,H/2,W/2]
        x3 = self.down2(x2)  # [B,256,H/4,W/4]
        x4 = self.down3(x3)  # [B,512,H/8,W/8]
        x5 = self.down4(x4)  # [B,1024,H/16,W/16] 瓶颈层 
        
        # ===== 解码器 ===== 
        x = self.up1(x5,  x4)  # [B,512,H/8,W/8]
        x = self.up2(x,  x3)   # [B,256,H/4,W/4]
        x = self.up3(x,  x2)   # [B,128,H/2,W/2]
        x = self.up4(x,  x1)   # [B,64,H,W]
        
        # 输出层 
        logits = self.outc(x)   # [B,1,H,W]
        return logits 
 

#### 训练和评估

In [None]:
# ====================== 3. 训练与评估函数 ======================
def dice_coeff(pred, target, smooth=1.0):
    """
    计算Dice系数（分割任务评估指标）
    公式: (2*|X∩Y|) / (|X| + |Y|)
    
    参数:
        pred: Tensor - 模型预测结果（二值化后）
        target: Tensor - 真实标签 
        smooth: float - 平滑系数（避免除以0）
    """
    # 展平预测和标签 
    pred_flat = pred.contiguous().view(-1) 
    target_flat = target.contiguous().view(-1) 
    
    # 计算交集 
    intersection = (pred_flat * target_flat).sum()
    
    # 计算Dice系数 
    return (2. * intersection + smooth) / (pred_flat.sum()  + target_flat.sum()  + smooth)
 
def train_epoch(model, device, train_loader, optimizer, criterion, epoch):
    """
    训练单个epoch 
    
    参数:
        model: nn.Module - 待训练模型 
        device: torch.device  - 计算设备（CPU/GPU）
        train_loader: DataLoader - 训练数据加载器 
        optimizer: torch.optim  - 优化器 
        criterion: nn.Module - 损失函数 
        epoch: int - 当前epoch数（用于显示）
    """
    model.train()   # 设置为训练模式 
    running_loss = 0.0  # 累计损失 
    running_dice = 0.0  # 累计Dice系数 
    
    # 使用tqdm显示进度条 
    for images, masks in tqdm(train_loader, desc=f'Train Epoch {epoch}'):
        # 将数据移动到设备 
        images = images.to(device) 
        masks = masks.to(device).unsqueeze(1)   # 添加通道维度 [B,1,H,W]
        
        # 梯度清零 
        optimizer.zero_grad() 
        
        # 前向传播 
        outputs = model(images)
        
        # 计算损失 
        loss = criterion(outputs, masks.float()) 
        
        # 反向传播 
        loss.backward() 
        optimizer.step() 
        
        # 计算统计量 
        running_loss += loss.item() 
        
        # 将输出转为二值预测并计算Dice 
        preds = torch.sigmoid(outputs)  > 0.5 
        running_dice += dice_coeff(preds, masks).item()
    
    # 计算平均损失和Dice 
    epoch_loss = running_loss / len(train_loader)
    epoch_dice = running_dice / len(train_loader)
    
    return epoch_loss, epoch_dice 
 
def validate(model, device, val_loader, criterion):
    """
    验证模型 
    
    参数:
        model: nn.Module - 待验证模型 
        device: torch.device  - 计算设备 
        val_loader: DataLoader - 验证数据加载器 
        criterion: nn.Module - 损失函数 
    """
    model.eval()   # 设置为评估模式 
    val_loss = 0.0 
    val_dice = 0.0 
    
    with torch.no_grad():   # 禁用梯度计算 
        for images, masks in tqdm(val_loader, desc='Validating'):
            images = images.to(device) 
            masks = masks.to(device).unsqueeze(1)   # 添加通道维度 
            
            # 前向传播 
            outputs = model(images)
            
            # 计算损失 
            val_loss += criterion(outputs, masks.float()).item() 
            
            # 计算Dice 
            preds = torch.sigmoid(outputs)  > 0.5 
            val_dice += dice_coeff(preds, masks).item()
    
    # 计算平均指标 
    val_loss /= len(val_loader)
    val_dice /= len(val_loader)
    
    return val_loss, val_dice 
 

#### 可视化

In [None]:
# ====================== 4.数据和结果可视化 =====================
def plot_metrics(train_losses, val_losses, train_dices, val_dices):
    """绘制 Loss 和 Dice 曲线图"""
    plt.figure(figsize=(12, 5))
    plt.gcf().canvas.toolbar_visible=True

    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.title('Loss Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(train_dices, label='Train Dice')
    plt.plot(val_dices, label='Val Dice')
    plt.title('Dice Coefficient Curve')
    plt.xlabel('Epoch')
    plt.ylabel('Dice')
    plt.legend()

    plt.tight_layout()
    plt.show()

def visualize_results(model, device, val_loader, num_samples=3):
    """修正后的可视化函数"""
    model.eval() 
    with torch.no_grad(): 
        # 获取一个batch的数据 
        images, masks = next(iter(val_loader))
        images = images.to(device) 
        
        # 模型预测 [B,1,H,W] -> [B,H,W]
        outputs = model(images)
        preds = torch.sigmoid(outputs).squeeze(1).cpu().numpy()   # 移除通道维度 
        preds = (preds > 0.5).astype(np.float32)   # 二值化并确保数据类型正确 
        
        # 转换其他数据格式 
        images_np = images.permute(0,  2, 3, 1).cpu().numpy()  # [B,H,W,C]
        masks_np = masks.cpu().numpy()   # [B,H,W]
        
        # 调试信息（可选）
        print(f"图像形状: {images_np.shape}")  
        print(f"掩码形状: {masks_np.shape}") 
        print(f"预测形状: {preds.shape}") 
        
        # 可视化 
        plt.figure(figsize=(15,  5*num_samples))
        plt.gcf().canvas.toolbar_visible=True
        for i in range(num_samples):
            # 原始图像 [H,W,C]
            plt.subplot(num_samples,  3, i*3+1)
            plt.imshow(images_np[i]) 
            plt.title('Input  Image')
            plt.axis('off') 
            
            # 真实掩码 [H,W]
            plt.subplot(num_samples,  3, i*3+2)
            plt.imshow(masks_np[i],  cmap='gray', vmin=0, vmax=1)
            plt.title('Ground  Truth')
            plt.axis('off') 
            
            # 预测结果 [H,W]
            plt.subplot(num_samples,  3, i*3+3)
            plt.imshow(preds[i],  cmap='gray', vmin=0, vmax=1)
            plt.title('Prediction') 
            plt.axis('off') 
        
        plt.tight_layout() 
        plt.show()

In [None]:
# ====================== 5. 主程序 ======================
def main():
    """主训练流程"""
    # ===== 参数设置 =====
    base_dir = '/'
    data_dir = base_dir+'/kaggle_3m'  # 数据目录路径 
    batch_size = 8                 # 批大小 
    num_epochs = 5                # 训练轮数 
    lr = 0.000001                  # 学习率 
    
    # 自动检测设备（优先使用GPU）
    device = torch.device('cuda'  if torch.cuda.is_available()  else 'cpu')
    print(f"Using device: {device}")
    
    # ===== 数据预处理 ===== 
    transform = transforms.Compose([
        transforms.Resize((256, 256)),  # 调整尺寸 
        transforms.ToTensor(),          # 转为Tensor并归一化到[0,1]
    ])
    
    # 创建数据集 
    train_dataset = BrainMRIDataset(data_dir, transform=transform, train=True)
    val_dataset = BrainMRIDataset(data_dir, transform=transform, train=False)
    
    # plt_pt = val_dataset.image_paths[0:3]
    # for pt in plt_pt:
    #     print(pt)
    
    # 创建数据加载器 
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, 
        shuffle=True, num_workers=4  # 多线程加载 
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, 
        shuffle=False, num_workers=4 
    )
    
    # ===== 初始化模型 ===== 
    model = UNet(n_channels=3, n_classes=1).to(device)
    
    # ===== 损失函数和优化器 ===== 
    criterion = nn.BCEWithLogitsLoss()  # 二分类交叉熵损失（带sigmoid）
    optimizer = torch.optim.Adam(model.parameters(),  lr=lr)
    
    # ===== 存储训练过程指标 =====
    train_losses, val_losses = [], []
    train_dices, val_dices = [], []
    best_dice = 0.0

    for epoch in range(1,num_epochs+1):
        # 训练阶段
        train_loss, train_dice = train_epoch(model, device, train_loader, optimizer, criterion, epoch)

        # 验证阶段
        val_loss, val_dice = validate(model, device, val_loader, criterion)

        # 实时输出
        print(f'Epoch {epoch}/{num_epochs}')
        print(f'Train Loss: {train_loss:.4f} | Train Dice: {train_dice:.4f}')
        print(f'Val   Loss: {val_loss:.4f} | Val   Dice: {val_dice:.4f}')
        print('-' * 50)

        # 保存最优模型
        if val_dice > best_dice:
            best_dice = val_dice
            torch.save(model.state_dict(), 'best_model.pth')
            print("Saved best model!")

        # 记录指标
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_dices.append(train_dice)
        val_dices.append(val_dice)
    print(f'Best Dice: {best_dice:.4f}')
    # ===== 可视化结果 ===== 
    visualize_results(model, device, val_loader)
    plot_metrics(train_losses, val_losses, train_dices, val_dices)


if __name__ == '__main__':
    """程序入口"""
    main()

In [None]:
# 避免IPython后台异常显存占用
# os.system('kill %d' % os.getpid())