# Модель и обучение

В этой части блокнота:
- Определяется модель U-Net для сегментации
- Реализуются функции метрик: DSC (Dice Similarity Coefficient) и IoU (Intersection over Union)
- Выполняется обучение с логированием в TensorBoard


In [None]:
import os
import random
import numpy as np
import torch
from torch.optim import Adam
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split

# Импорт конфигурации из первого блокнота (при условии, что блокнот 01 выполнен ранее)
from IPython import get_ipython
get_ipython().run_line_magic('run', '01_Configuration_and_Data_Preparation.ipynb')

# Если вы не используете %run, можно просто скопировать нужные переменные:
print("Используемые настройки:")

BASE_DIR = os.path.join( "d:/", "NartovFatSegm", "Nartov_Fat_Segmentation")
print("IMAGE_DIR =", os.path.join(BASE_DIR, "data", "cropped_images"))
print("MASK_DIR =", os.path.join(BASE_DIR, "data", "cropped_masks"))


In [None]:
# Определение модели U-Net с использованием библиотеки segmentation_models_pytorch

import segmentation_models_pytorch as smp

def create_unet_model(in_channels=1, classes=1, encoder_name="efficientnet-b2", encoder_weights="imagenet"):
    """
    Создаёт модель U-Net для сегментации.
    
    Возвращает:
        Модель U-Net.
    """
    model = smp.Unet(
        encoder_name=encoder_name,
        encoder_weights=encoder_weights,
        in_channels=in_channels,
        classes=classes,
    )
    return model

model = create_unet_model()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print("Модель создана и переведена на", device)


In [None]:
# Определение функций метрик: Dice (DSC) и IoU.


def dice_coefficient(pred, target, threshold=0.5, eps=1e-7):
    """
    Вычисляет Dice Similarity Coefficient (DSC).

    Dice = 2 * (|pred ∩ target|) / (|pred| + |target|)
    
    Возвращает среднее значение DSC по батчу.
    """
    pred = torch.sigmoid(pred)
    pred = (pred > threshold).float()
    # Вычисление пересечения и суммы
    intersection = (pred * target).sum(dim=(1, 2, 3))
    union = pred.sum(dim=(1, 2, 3)) + target.sum(dim=(1, 2, 3))
    dice = (2. * intersection + eps) / (union + eps)
    return dice.mean().item()

def iou_metric(pred, target, threshold=0.5, eps=1e-7):
    """
    Вычисляет метрику Intersection over Union (IoU).

    IoU = (|pred ∩ target|) / (|pred ∪ target|)
    
    Возвращает среднее значение IoU по батчу.
    """
    pred = torch.sigmoid(pred)
    pred = (pred > threshold).float()
    intersection = (pred * target).sum(dim=(1, 2, 3))
    union = pred.sum(dim=(1, 2, 3)) + target.sum(dim=(1, 2, 3)) - intersection
    iou = (intersection + eps) / (union + eps)
    return iou.mean().item()

print("Функции для Dice и IoU определены")


In [None]:
# Загружаем полный датасет
full_dataset = UltrasoundSegmentationDataset(
    images_dir=os.path.join(BASE_DIR, "data", "cropped_images"),
    masks_dir=os.path.join(BASE_DIR, "data", "cropped_masks"),
    transform=TRAIN_TRANSFORMS  # для обучения
)

indices = list(range(len(full_dataset)))
train_indices, temp_indices = train_test_split(indices, test_size=0.3, random_state=RANDOM_STATE)
val_indices, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=RANDOM_STATE)

train_dataset = Subset(full_dataset, train_indices)
# Для валидации используем трансформации, исключающие случайные изменения
from copy import deepcopy
val_dataset = Subset(
    UltrasoundSegmentationDataset(
        images_dir=os.path.join(os.getcwd(), "data", "cropped_images"),
        masks_dir=os.path.join(os.getcwd(), "data", "cropped_masks"),
        transform=VAL_TRANSFORMS
    ),
    val_indices
)

# DataLoader-ы (можно задать в ячейке)
from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
print("Датасеты и DataLoader-ы подготовлены")


In [None]:
"""
Определяем цикл обучения.
Логируются метрики: Loss, Dice, IoU.
"""

from torch.utils.tensorboard import SummaryWriter
import segmentation_models_pytorch as smp

# Функции потерь
bce_loss = torch.nn.BCEWithLogitsLoss()
dice_loss_fn = smp.losses.DiceLoss(mode="binary")
def combined_loss_fn(outputs, masks):
    return bce_loss(outputs, masks) + dice_loss_fn(outputs, masks)

writer = SummaryWriter(LOG_DIR)

optimizer = Adam(model.parameters(), lr=LEARNING_RATE)
best_val_loss = float("inf")

for epoch in range(NUM_EPOCHS):
    model.train()
    running_loss = 0.0
    dice_train_sum = 0.0
    iou_train_sum = 0.0

    for images, masks in train_loader:
        images, masks = images.to(device), masks.to(device).float()
        if masks.ndim == 3:
            masks = masks.unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(images)
        loss = combined_loss_fn(outputs, masks)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        dice_train_sum += dice_coefficient(outputs, masks) * images.size(0)
        iou_train_sum += iou_metric(outputs, masks) * images.size(0)

    epoch_train_loss = running_loss / len(train_loader.dataset)
    epoch_train_dice = dice_train_sum / len(train_loader.dataset)
    epoch_train_iou = iou_train_sum / len(train_loader.dataset)

    writer.add_scalar("Train/Loss", epoch_train_loss, epoch)
    writer.add_scalar("Train/Dice", epoch_train_dice, epoch)
    writer.add_scalar("Train/IoU", epoch_train_iou, epoch)

    # Валидиация
    model.eval()
    val_loss = 0.0
    dice_val_sum = 0.0
    iou_val_sum = 0.0
    with torch.no_grad():
        for images, masks in val_loader:
            images, masks = images.to(device), masks.to(device).float()
            if masks.ndim == 3:
                masks = masks.unsqueeze(1)
            outputs = model(images)
            loss = combined_loss_fn(outputs, masks)
            val_loss += loss.item() * images.size(0)
            dice_val_sum += dice_coefficient(outputs, masks) * images.size(0)
            iou_val_sum += iou_metric(outputs, masks) * images.size(0)
    
    epoch_val_loss = val_loss / len(val_loader.dataset)
    epoch_val_dice = dice_val_sum / len(val_loader.dataset)
    epoch_val_iou = iou_val_sum / len(val_loader.dataset)

    writer.add_scalar("Validation/Loss", epoch_val_loss, epoch)
    writer.add_scalar("Validation/Dice", epoch_val_dice, epoch)
    writer.add_scalar("Validation/IoU", epoch_val_iou, epoch)

    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}]: "
          f"Train Loss: {epoch_train_loss:.4f} | Dice: {epoch_train_dice:.4f} | IoU: {epoch_train_iou:.4f} || "
          f"Val Loss: {epoch_val_loss:.4f} | Dice: {epoch_val_dice:.4f} | IoU: {epoch_val_iou:.4f}")
    
    # Сохранение лучшей модели
    if epoch_val_loss < best_val_loss:
        best_val_loss = epoch_val_loss
        torch.save(model.state_dict(), MODEL_SAVE_PATH)
        print(f"Сохранена лучшая модель на эпохе {epoch+1}")

writer.close()
print("Обучение завершено")


# Оценка модели и визуализация результатов

В этой части блокнота:
- Загружается сохранённая модель
- Выполняется тестирование на тестовой выборке
- Визуализируются результаты (входное изображение, истинная маска и предсказание)

In [None]:
# Загружаем полный датасет с трансформациями для валидации/теста
full_dataset = UltrasoundSegmentationDataset(
    images_dir=IMAGE_DIR,
    masks_dir=MASK_DIR,
    transform=VAL_TRANSFORMS
)

# Разбивка для теста (при условии, что индексы совпадают с предыдущим блокнотом)
indices = list(range(len(full_dataset)))
_, temp_indices = train_test_split(indices, test_size=0.3, random_state=42)
_, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)

test_dataset = Subset(full_dataset, test_indices)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=0)



In [None]:
import os
import torch
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split

# Переиспользуем те же настройки
IMAGE_DIR = os.path.join(os.getcwd(), "data", "cropped_images")
MASK_DIR = os.path.join(os.getcwd(), "data", "cropped_masks")


# Загружаем полный датасет с трансформациями для валидации/теста
full_dataset = UltrasoundSegmentationDataset(
    images_dir=IMAGE_DIR,
    masks_dir=MASK_DIR,
    transform=VAL_TRANSFORMS
)

# Разбивка для теста (при условии, что индексы совпадают с предыдущим блокнотом)
indices = list(range(len(full_dataset)))
_, temp_indices = train_test_split(indices, test_size=0.3, random_state=42)
_, test_indices = train_test_split(temp_indices, test_size=0.5, random_state=42)

test_dataset = Subset(full_dataset, test_indices)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=0)



In [None]:
import segmentation_models_pytorch as smp

# Создаём модель и загружаем сохранённые веса
model = create_unet_model()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location=device))
model.eval()
print("Модель загружена для тестирования")

In [None]:
model.eval()
test_loss = 0.0
dice_test_sum = 0.0
iou_test_sum = 0.0

with torch.no_grad():
    for images, masks in test_loader:
        images = images.to(device)
        masks = masks.to(device).float()
        if images.ndim == 3:
            images = images.unsqueeze(1)
        if masks.ndim == 3:
            masks = masks.unsqueeze(1)

        outputs = model(images)
        loss = combined_loss_fn(outputs, masks)
        test_loss += loss.item() * images.size(0)
        dice_test_sum += dice_coefficient(outputs, masks) * images.size(0)
        iou_test_sum += iou_metric(outputs, masks) * images.size(0)

epoch_test_loss = test_loss / len(test_loader.dataset)
epoch_test_dice = dice_test_sum / len(test_loader.dataset)
epoch_test_iou = iou_test_sum / len(test_loader.dataset)

print("Тестовые метрики:")
print(f"Test Loss: {epoch_test_loss:.4f}")
print(f"Test Dice: {epoch_test_dice:.4f}")
print(f"Test IoU: {epoch_test_iou:.4f}")

import torch.nn.functional as F

# Получаем один батч для визуализации
with torch.no_grad():
    for images, masks in test_loader:
        images = images.to(device)
        masks = masks.to(device).float()
        if masks.ndim == 3:
            masks = masks.unsqueeze(1)
        outputs = model(images)
        preds = torch.sigmoid(outputs)
        preds = (preds > 0.5).float()
        break  # достаточно одного батча

# Перенос на CPU
images = images.cpu().numpy()
masks = masks.cpu().numpy()
preds = preds.cpu().numpy()

n = min(7, images.shape[0])
for i in range(n):
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 3, 1)
    plt.imshow(images[i].squeeze(0), cmap="gray")
    plt.title("Input Image")
    plt.axis("off")

    plt.subplot(1, 3, 2)
    plt.imshow(masks[i].squeeze(0), cmap="gray")
    plt.title("Ground Truth Mask")
    plt.axis("off")

    plt.subplot(1, 3, 3)
    plt.imshow(preds[i].squeeze(0), cmap="gray")
    plt.title("Predicted Mask")
    plt.axis("off")
    plt.show()
