# Diagnostyka raka skóry z wykorzystaniem analizy obrazów dermatologicznych - trening DenseNet

Celem tego notatnika jest wczytanie wstępnie wytrenowanego modelu DenseNet-121 i dostrojenie go na zbiorze ISIC2019 w celu klasyfikowania obrazu ze zmianą skórnej do jednej z klas. Do tego zadania wykorzystamy wszelkie implementacje i pliki, które wykorzystaliśmy w notatniku `01_przygotowanie_danych.ipynb`.

## Konfiguracja środowiska

Importujemy potrzebne biblioteki.

In [None]:
import os
import shutil
import glob
from tqdm import tqdm
from google.colab import drive, files
import kagglehub
import random
import time
import copy
import json
import numpy as np
import pandas as pd
from PIL import Image
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.metrics import matthews_corrcoef, accuracy_score, precision_score, recall_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import multiprocessing

Definiujemy zmienne globalne.

In [None]:
# parametry globalne
SEED = 42
BATCH_SIZE = 64                                                         # wielkość partii danych
NUM_WORKERS = min(4, multiprocessing.cpu_count())                       # liczba wątków procesora
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MODEL_NAME = 'DenseNet-121'                                             # nazwa modelu
NUM_CLASSES = 8                                                         # liczba klas

# parametry transformacji obrazów
IMG_SIZE = 300                      # wymiar wejściowy obrazu
NORM_MEAN = [0.485, 0.456, 0.406]   # wartości średniej do normalizacji
NORM_STD = [0.229, 0.224, 0.225]    # wartości odchyleń do normalizacji

# parametry czyszczenia danych
VIGNETTE_THRESHOLD = 20      # próg czerni
EXTRA_CROP = 0.05            # margines dodatkowego przycięcia (5%)
MIN_FILL_RATIO = 0.95        # bezpiecznik dla zdjęć pełnokadrowych

# podział danych
CV_FOLDS = 5                 # liczba foldów walidacyjnych

# hiperparametry treningu w pierwszym etapie
WARMUP_EPOCHS = 10           # liczba epok (mała)
WARMUP_LR = 1e-3            # wartość współczynnika uczenia (duża)
WARMUP_PATIENCE = 5         # cierpliość na poprawę modelu (mała)

# hiperparametry treningu w drugim etapie
FINETUNE_EPOCHS = 60      # licza epok (duża)
FINETUNE_LR = 3e-5        # wartość współczynnika uczenia (mała)
FINETUNE_PATIENCE = 12    # cierpliwość na poprawę modelu (duża)

WEIGHT_DECAY = 1e-2      # wielkość kary za duże wartości wag

Aby móc uzyskać dostęp do GPU w notatniku należy wybrać:

$\textbf{Środowisko wykonawcze} ⟶ \textbf{Zmień typ środowiska wykonwaczego} ⟶ \text{Wybieramy jedno z dostępnych GPU} ⟶ \textbf{Zapisz}$

Po takiej sekwencji operacji należy przepuścić kod notatnika od samego początku, gdyż zostanie przydzielona nowa sesja, a poprzednia zostanie usunięta.

Ustawiamy ziarno losości dla powtarzalności wyników.

In [None]:
def set_seed(seed=42):
    """Ustawia ziarno losowości dla reprodukowalności wyników."""
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

In [None]:
set_seed(SEED)

Dodatkowo konfigurujemy wygląd wykresów w całym notatniku.

In [None]:
# bazowy styl seaborn z siatką
sns.set_theme(style="whitegrid", context="notebook", font_scale=1.1)

# ustawienia matplotlib
plt.rcParams.update({
    'figure.figsize': (10, 6),       # domyślny rozmiar wykresu
    'figure.dpi': 120,               # wyraźny wygląd wykresów w Colabie
    'savefig.dpi': 300,              # wysoka rozdzielczość zapisywanych wykresów
    'savefig.bbox': 'tight',         # automatycznie przycinanie białych marginesów przy zapisie
    'font.family': 'sans-serif',     # wykorzystanie czcionki bezszerfyowej
    'font.sans-serif': ['DejaVu Sans', 'Arial'],
    'axes.spines.top': False,        # usuwanie górnej ramki
    'axes.spines.right': False,      # usuwanie prawej ramki
})

# wyostrzone grafiki w Colabie
%config InlineBackend.figure_format = 'retina'

Konfigurujemy wszelki potrzebne ścieżki.

In [None]:
# montowanie Google Drive
drive.mount('/content/drive')

# podajemy nazwę głównego folderu projektu
PROJECT_FOLDER_NAME = 'Implementacja'
BASE_DIR = os.path.join('/content/drive/MyDrive/Diagnostyka raka skóry z wykorzystaniem analizy obrazów dermatologicznych', PROJECT_FOLDER_NAME)

# podajemy ścieżkę w Colabie do zapisania pobieranych danych
DATA_ROOT_DIR = '/content/ISIC2019'
IMG_DIR = os.path.join(DATA_ROOT_DIR, 'images')
PROCESSED_DIR = os.path.join(DATA_ROOT_DIR, 'images_processed')

# definijemy podfoldery
MODELS_DIR = os.path.join(BASE_DIR, 'Modele')
RESULTS_DIR = os.path.join(BASE_DIR, 'Wyniki')
DATA_DIR = os.path.join(BASE_DIR, 'Dane')

# tworzymy foldery na Google Drive (jeśli nie istnieją)
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(DATA_DIR, exist_ok=True)

print(f"Katalog roboczy projektu: {BASE_DIR}")
print(f"Dane będą zapisywane w: {DATA_ROOT_DIR}")

### Konfiguracja Kaggle API

Kaggle API jest nam potrzebne do pobrania danych bezpośrednio do notatnika. Folder ZIP jest bardzo obszernym plikiem i ręczne wgrywanie go na Google Drive może zająć bardzo dużo czasu.

Kofigurujemy ścieżkę dla Kaggle API. Klucz będzie zapisywany bezpośrednio w folderze projektu.

In [None]:
kaggle_key_drive_path = os.path.join(BASE_DIR, 'kaggle.json')

Tworzymy w Colabie folder na plik z Kaggle API.

In [None]:
kaggle_sys_dir = '/root/.kaggle'
os.makedirs(kaggle_sys_dir, exist_ok=True)

Wprowadzamy Kaggle API. Klucz ten należy utworzyć indywidualnie dla każdego użytkownika notatników z implementacją pracy inżynierskie. Aby go zdobyć należy:
1. Zalogować się na swój profil na stronie Kaggle.
2. Wejście w ustawienia swojego profilu (kilkamy na swoje zdjęcie profilowe i wybieramy ,,Settings'').
3. W sekcji ,,API'' klikamy ,,Generate New Token'', następnie wprowadzamy nazwę tokenu i klikamy ,,Generate''.
4. Kopiujemy kod z okienka ,,API TOKEN''.
5. W dowolnym edytorze tekstu (np. Notatniku) wprowadzamy tekst o następującej strykturze
$$\{\text{"username":"nazwa_uzytkownika_kaggle","key":"skopiowane_kaggle_api"}\}$$
i zapisujemy plik jako ,,kaggle.json''.
W poniższej funkcji zostaniemy poproszeni o wgranie utworzonego pliku.


In [None]:
# sprawdzamy czy klucz jest już na Drive
if not os.path.exists(kaggle_key_drive_path):
    print(f"UWAGA: Nie znaleziono pliku kaggle.json w lokalizacji: {kaggle_key_drive_path}")
    print("Proszę przesłać plik kaggle.json teraz (zostanie zapisany na Drive):")

    # przesyłamy plik kaggle.json
    uploaded = files.upload()

    for filename in uploaded.keys():
        if filename == 'kaggle.json':
            # zapisujemy trwale na Google Drive
            with open(kaggle_key_drive_path, 'wb') as f:
                f.write(uploaded[filename])
            print(f"Zapisano kaggle.json w: {kaggle_key_drive_path}")
        else:
            print(f"Pominięto plik: {filename} (oczekiwano kaggle.json)")

## Pobranie danych

Pobieramy, rozpakowujemy i porządkujemy zbiór ISIC2019. Nie pobieramy obrazów na dysk Google Drive, lecz do pamięci Google Colab. Pozwoli to na sprawniejsze wczytywanie plików w notatniku.

In [None]:
if os.path.exists(kaggle_key_drive_path):
    # instalacja klucza w systemie
    shutil.copy(kaggle_key_drive_path, os.path.join(kaggle_sys_dir, 'kaggle.json'))
    os.chmod(os.path.join(kaggle_sys_dir, 'kaggle.json'), 600)
    print("Klucz Kaggle API skonfigurowany.")

    # POBIERANIE I PORZĄDKOWANIE DANYCH

    # sprawdzamy, czy folder ze zdjęciami już istnieje i jest pełny
    # (np. czy nie uruchamiamy komórki drugi raz w tej samej sesji)
    if not os.path.exists(IMG_DIR):
        print("\nPobieranie danych z Kaggle...")
        # pobieramy do folderu tymczasowego
        path = kagglehub.dataset_download("andrewmvd/isic-2019")
        print(f"Pobrano dane do folderu tymczasowego: {path}")

        # tworzymy docelowe foldery na obrazy
        os.makedirs(IMG_DIR, exist_ok=True)
        os.makedirs(PROCESSED_DIR, exist_ok=True)

        # przenosimy metadane w folderu z danymi w Colabie
        for csv_file in glob.glob(os.path.join(path, "*.csv")):
            shutil.copy(csv_file, DATA_ROOT_DIR)
            print(f"Przeniesiono: {os.path.basename(csv_file)}")

        # szukamy rekursywnie zdjęć we wszystkich podfolderach pobranych danych
        jpg_files = glob.glob(os.path.join(path, "**", "*.jpg"), recursive=True)

        print(f"Znaleziono {len(jpg_files)} zdjęć.")

        for file_path in tqdm(jpg_files, desc="Kopiowanie zdjęć", unit="plik"):
            # przenosimy plik do wspólnego folderu
            shutil.copy(file_path, os.path.join(IMG_DIR, os.path.basename(file_path)))
        print("Przeniesiono zdjęcia do folderu z danymi.")

        # weryfikujemy liczbę plików
        num_images = len(os.listdir(IMG_DIR))
        print(f"\nSUKCES: Dane gotowe w {DATA_ROOT_DIR}")
        print(f"Liczba zdjęć w {IMG_DIR}: {num_images}")

    else:
        print("Dane są już pobrane i uporządkowane.")
        print(f"Lokalizacja zdjęć: {IMG_DIR}")

else:
    print("BŁĄD: Brak pliku kaggle.json. Nie można pobrać danych.")

## Przygotowanie danych do treningu

Wczytujemy metadane.

In [None]:
metadata_processed_path = os.path.join(DATA_DIR, 'ISIC_metadata_processed.csv')
df = pd.read_csv(metadata_processed_path)
df.head()

Wczytujemy i przycinamy zdjęcia.

In [None]:
def crop_vignette(image_path, output_path, threshold=20, extra_crop_percent=0.05, min_fill_ratio=0.95):
    """
    Funkcja usuwająca czarne ramki (winiety) z obrazu z dodatkowym marginesem bezpieczeństwa
    oraz mechanizmem chroniącym zdjęcia pełnokadrowe.

    Parametry:
    - image_path: ścieżka do pliku wejściowego.
    - output_path: ścieżka zapisu pliku wynikowego.
    - threshold: próg jasności (0-255). Piksele ciemniejsze niż ta wartość są traktowane jako tło.
    - extra_crop_percent: ułamek (np. 0.05 = 5%), o jaki dodatkowo zmniejszamy ramkę z każdej strony,
      aby usunąć postrzępione krawędzie lub pozostałości winiety w rogach.
    - min_fill_ratio: współczynnik wypełnienia (zakres 0.0 - 1.0). Określa próg decyzyjny,
      czy zdjęcie w ogóle wymaga przycinania. Jeśli wykryty obszar treści zajmuje większą część
      zdjęcia niż ta wartość (np. > 95%), algorytm uznaje, że winieta nie występuje (lub jest pomijalna)
      i rezygnuje z przycinania, aby nie uszkodzić zmian skórnych dotykających krawędzi.
    """

    # wczytujemy obraz z dysku
    img = cv2.imread(image_path)

    # jeśli plik jest uszkodzony lub nie istnieje, przerywamy
    if img is None:
        return False, None

    # przekształcamy obraz do skali szarości i wyznaczmy obszar zdjęcia
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    h_img, w_img = gray.shape
    total_area = h_img * w_img

    # tworzymy maskę binarną:
    # - piksele > threshold (treść) dostają wartość 255 (biały)
    # - piksele <= threshold (tło/winieta) dostają wartość 0 (czarny)
    _, mask = cv2.threshold(gray, threshold, 255, cv2.THRESH_BINARY)

    # znajdujemy położenie treści
    # cv2.findNonZero zwraca listę współrzędnych wszystkich białych pikseli na masce
    coords = cv2.findNonZero(mask)

    # jeśli nie znaleziono żadnych jasnych punktów (zdjęcie jest całe czarne), kończymy
    if coords is None:
        return False, None

    # znajdujemy najmniejszy prostokąt (x, y, szerokość, wysokość), który obejmuje wszystkie jasne punkty
    x, y, w, h = cv2.boundingRect(coords)

    # obliczamy obszar czystego obrazu oraz współczynnik wypełnienia
    rect_area = w * h
    fill_ratio = rect_area / total_area

    # jeśli treść zajmuje prawie cały obraz (> 95%), to znaczy, że nie ma winiety,
    # albo są tylko mikro-paski, których nie warto ruszać, by nie uciąć zmiany
    if fill_ratio > min_fill_ratio:
        return False, None

    # ETAP PRZYCINANIA

    # obliczamy ile pikseli uciąć dodatkowo z każdej strony (5% szerokości/wysokości znalezionej ramki)
    margin_w = int(w * extra_crop_percent)
    margin_h = int(h * extra_crop_percent)

    # wyznaczamy nowe współrzędne ramki, zawężając ją do środka:
    # przesuwamy początek X w prawo i Y w dół
    new_x = x + margin_w
    new_y = y + margin_h

    # zmniejszamy szerokość i wysokość o marginesy z obu stron (lewo+prawo, góra+dół)
    new_w = w - (2 * margin_w)
    new_h = h - (2 * margin_h)

    # jeśli zdjęcie było bardzo małe lub margines był za duży i "zjadł" cały obraz (wymiar <= 0),
    # to cofamy się do wersji podstawowej (bez cięcia).
    if new_w <= 0 or new_h <= 0:
        new_x, new_y, new_w, new_h = x, y, w, h

    # sprawdzamy czy przycięcie jest konieczne
    # pobieramy oryginalne wymiary obrazu
    h_img, w_img = gray.shape

    # czy nowa, wyliczona ramka jest mniejsza niż oryginalny obraz?
    # jeśli tak, wykonujemy fizyczne cięcie obrazu
    if (new_w < w_img) or (new_h < h_img):
        # wycinanie fragmentu tablicy
        cropped_img = img[new_y:new_y+new_h, new_x:new_x+new_w]

        # zapisujemy wynik na dysk
        cv2.imwrite(output_path, cropped_img)

        # zwracamy sukces oraz informację, ile pikseli ucięto z każdej strony (góra, dół, lewo, prawo)
        return True, (new_y, h_img-(new_y+new_h), new_x, w_img-(new_x+new_w))

    # jeśli nie trzeba było ciąć (ramka pokrywa się z całym obrazem), zwracamy False
    return False, None

In [None]:
# filtrujemy pliki, biorąc tylko te z rozszerzeniem .jpg lub .png
image_files = [f for f in os.listdir(IMG_DIR) if f.lower().endswith(('.jpg', '.png'))]

cropped_count = 0
copied_count = 0

print(f"Start: {len(image_files)} zdjęć. Dodatkowe cięcie: {EXTRA_CROP*100}%")

# pętla po wszystkich zdjęciach z użyciem paska postępu
for f in tqdm(image_files, desc="Przetwarzanie"):

    # tworzenie pełnych ścieżek
    src_path = os.path.join(IMG_DIR, f)
    dst_path = os.path.join(PROCESSED_DIR, f)

    # wywołujemy cięcie
    is_cropped, _ = crop_vignette(src_path, dst_path, threshold=VIGNETTE_THRESHOLD, extra_crop_percent=EXTRA_CROP, min_fill_ratio=MIN_FILL_RATIO)

    if is_cropped:
        # jeśli funkcja wykonała cięcie i zapisała plik -> zwiększ licznik
        cropped_count += 1
    else:
        # jeśli funkcja nie przycięła zdjęcia (zwróciła False) -> kopiujemy oryginał bez zmian
        shutil.copy(src_path, dst_path)
        copied_count += 1

print("\nZakończono przycinanie.")
print(f"Przycięto: {cropped_count}")
print(f"Skopiowano (bez zmian): {copied_count}")

## Trening modelu DenseNet

### Wczytanie i modyfikacja modelu

Wczytujemy model DenseNet-121, aby przyjrzeć się jego warstwie klasyfikacyjnej.

In [None]:
# pobieramy model na chwilę, tylko do podglądu
model = models.densenet121(weights='DEFAULT')
print(model.classifier)

Widzimy, że model na wyjściu ma 1000 klas, gdzie w naszym zbiorze jest tylko 8 klas. Należy zatem zmodyfikać warstwę klasyfikacjną.

Zbudujemy funkcję, która będzie wczytywać już wstępnie wytrenowany model oraz będzie modyfikować warstwę klasyfikacyjną.

In [None]:
def get_model(device=DEVICE):
    """
    Inicjalizuje architekturę DenseNet-121 i dostosowuje ją do zadania klasyfikacji zmian skórnych.

    Funkcja realizuje strategięuczenia transferowego:
    1. Pobiera wagi wstępnie wytrenowane na zbiorze ImageNet.
    2. Wprowadza mechanizm regularyzacji wewnątrz bloków zagęszczonych (drop_rate).
    3. Zastępuje oryginalny klasyfikator z dodatkowym Dropoutem,
       dostosowaną do specyficznej liczby klas zbioru ISIC.

    Args:
        device (torch.device): Urządzenie obliczeniowe, na które zostanie przeniesiony model (CPU lub CUDA).

    Returns:
        torch.nn.Module: Skompilowany model PyTorch gotowy do treningu.
    """
    # pobieramy model z wytrenowanymi wagami
    # wprowadzamy wyłączanie neuronów w drop_rate
    model = models.densenet121(weights='DEFAULT', drop_rate=0.2)

    # pobieramy liczbę cech wchodzących do ostatniej warstwy (dla DenseNet121 to 1024)
    in_features = model.classifier.in_features

    # podmieniamy warstwę klasyfikacyjną (dodajemy Dropout)
    model.classifier = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(in_features, NUM_CLASSES)
    )

    # przenosimy model na GPU (jeśli dostępne)
    model = model.to(device)

    return model

In [None]:
model = get_model(DEVICE)
print(model.classifier)

Zobaczymy również jego złożoność (liczbętrenowanych parametrów).

In [None]:
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Wszystkie parametry: {total_params:,}")
print(f"Trenowalne parametry: {trainable_params:,}")

### Budowa pętli treningowej

Możemy teraz rozpocząć budowanie pętli treningowej modelu. Trening będzie przebiegał następująco dla kazdego z podzbiorów.
1. Konstruujemy zbiór treningowy i walidacyjny na podstawie metadanych i przy pomocy klasy `ISICDataset`
2. Konstruujemy instancje klasy `DataLoader
3. Inicjalizujemy wstępnie wytrenowany model DenseNet-121 i podmieniamy warstwę klasyfikacyjną.
3. Definiujemy funkcję straty
4. Przeprowadzamy pierwszy etap treningu:
    1. Mrozimy wszystkie wagi modelu (poza warstwą klasyfikacyjną).
    2. Trenujemy model przez małą liczbę epok wraz z mechanizmem wczesnego zatrzymania (w oparciu o wartość MCC).
5. Przeprowadzamy drugi etap treningu:
    1. Odmrażamy dwa ostatnie bloki gęste.
    2. Trenujemy model przez dużą liczbę epok z mechanizmem wczensgeo zatrzymania (w oparciu o wartość MCC) oraz harmonogramowaniem współczynnika uczenia.
6. Zapisujemy wyniki i wagi modelu.


Przywołujemy klasę `ISICDataset` oraz transformacje augumentacyjne.

In [None]:
class ISICDataset(Dataset):
    """
    Niestandardowy Dataset PyTorch do wczytywania obrazów dermatologicznych ISIC.

    Klasa wczytuje obrazy na podstawie ścieżek z DataFrame, konwertuje je do RGB
    i zwraca w formie gotowej do przetworzenia przez model.
    """

    def __init__(self, dataframe, root_dir, transform=None):
        """
        Inicjalizuje dataset.

        Args:
            dataframe (pd.DataFrame): Ramka danych zawierająca co najmniej kolumny:
                                      - 'nazwa_pliku': nazwa pliku z rozszerzeniem (np. 'IMG_1.jpg')
                                      - 'target': numeryczna etykieta klasy (int).
            root_dir (str): Ścieżka do katalogu głównego, w którym znajdują się obrazy.
            transform (callable, optional): Opcjonalne transformacje (np. augmentacja, normalizacja)
                                            aplikowane na obrazie. Domyślnie None.
        """
        self.df = dataframe
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        """Zwraca całkowitą liczbę próbek w zbiorze."""
        return len(self.df)

    def __getitem__(self, idx):
        """
        Pobiera pojedynczą próbkę danych o podanym indeksie.

        Args:
            idx (int/tensor): Indeks próbki do pobrania.

        Returns:
            tuple: Krotka (image, label), gdzie:
                   - image (PIL.Image lub Tensor): Obraz po transformacjach.
                   - label (int): Numeryczna etykieta klasy.
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # pobranie nazwy pliku
        img_name = self.df.iloc[idx]['nazwa_pliku']
        img_path = os.path.join(self.root_dir, img_name)

        # wczytanie obrazu i konwersja na RGB (dla bezpieczeństwa)
        image = Image.open(img_path).convert('RGB')

        # pobranie etykiety numerycznej (target)
        label = self.df.iloc[idx]['target']

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
working_size = int(IMG_SIZE * 1.25)

train_transforms = transforms.Compose([
    # losujemy wycinek od 80% do 100% powierzchni oryginalnego obrazu
    transforms.RandomResizedCrop(working_size, scale=(0.8, 1.0), ratio=(0.9, 1.1)),

    # losowe odbicia lustrzane pionowo i poziomo
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),

    # delikatna zmiana koloru
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1, hue=0.0),

    # obrót zdjęcia o kąt z przedziału (-15 stopni, 15 stopni)
    transforms.RandomRotation(15),

    # wycinamy środek obrazu w docelowych wymiarach
    transforms.CenterCrop(IMG_SIZE),

    # przeniesienie na tensor i normalizacja
    transforms.ToTensor(),
    transforms.Normalize(NORM_MEAN, NORM_STD)
])

val_transforms = transforms.Compose([
    # zmiana wymiarów
    transforms.Resize(working_size),
    # wycinamy środek obrazu w docelowych wymiarach
    transforms.CenterCrop(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(NORM_MEAN, NORM_STD)
])

Konstruujemy funkcją przeprowadzającą trening na jednej epoce.

In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, device, scaler=None):
    """
    Realizuje jedną pełną epokę treningową modelu.

    Funkcja iteruje przez wszystkie partie danych treningowych,
    oblicza błędy predykcji, wyznacza gradienty i aktualizuje wagi modelu.

    Args:
        model (torch.nn.Module): Model sieci neuronowej do wytrenowania.
        dataloader (DataLoader): Generator dostarczający dane treningowe w partiach.
        criterion (nn.Module): Funkcja straty (np. CrossEntropyLoss).
        optimizer (torch.optim.Optimizer): Algorytm optymalizujący wagi (np. AdamW).
        device (torch.device): Urządzenie obliczeniowe (CPU/CUDA).

    Returns:
        float: Średnia wartość funkcji straty dla całej epoki.
    """
    model.train()
    running_loss = 0.0

    # pasek postępu
    pbar = tqdm(dataloader, desc="  Trening", leave=False)

    for inputs, labels in pbar:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        if scaler is not None:
            # szybkie obliczenia
            with torch.amp.autocast(device_type='cuda'):
                outputs = model(inputs)
                loss = criterion(outputs, labels)

            # skalowanie gradientów i krok optymalizatora
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        pbar.set_postfix({'loss': loss.item()})

    epoch_loss = running_loss / len(dataloader.dataset)
    return epoch_loss

Konstruujemy funkcję przeprowadzającą ewaluację. Dodatkowo konstruujemy funkcję do obliczenia specyficzności (nie znaleziono gotowej funkcji).

In [None]:
def calculate_specificity(y_true, y_pred, num_classes):
    """
    Oblicza średnią specyficzność dla klasyfikacji wieloklasowej.

    Funkcja wykorzystuje strategię One-vs-Rest, obliczając specyficzność niezależnie
    dla każdej klasy (binarna klasyfikacja: dana klasa vs reszta), a następnie zwraca
    ich średnią arytmetyczną. Jest to kluczowa metryka w diagnostyce,
    określająca zdolność modelu do poprawnego wykluczania danej choroby.

    Args:
        y_true (array-like): Wektor rzeczywistych etykiet.
        y_pred (array-like): Wektor przewidzianych etykiet.
        num_classes (int): Całkowita liczba klas w modelu.

    Returns:
        float: Uśredniona wartość specyficzności (zakres 0.0 - 1.0).
    """
    cm = confusion_matrix(y_true, y_pred, labels=range(num_classes))
    specificity_per_class = []

    for i in range(num_classes):
        # dla klasy 'i':
        # TN = suma wszystkich próbek, które nie są 'i' i nie zostały przewidziane jako 'i'
        # FP = próbki, które nie są 'i', ale zostały przewidziane jako 'i'

        # wartości w macierzt pomyłek:
        # cm[i, i] to TP
        # cm[:, i] to kolumna predykcji (suma to TP + FP) -> FP = suma_kol - TP
        # cm[i, :] to wiersz prawdy (suma to TP + FN)
        # suma całkowita to total

        tp = cm[i, i]
        fp = cm[:, i].sum() - tp
        fn = cm[i, :].sum() - tp
        tn = cm.sum() - (tp + fp + fn)

        # Specyficzność = TN / (TN + FP) z mechanizm uniknięcia dzielenia przez 0
        if (tn + fp) > 0:
            spec = tn / (tn + fp)
        else:
            spec = 0.0
        specificity_per_class.append(spec)

    return np.mean(specificity_per_class)

In [None]:
def evaluate(model, dataloader, criterion, device):
    """
    Przeprowadza proces ewaluacji modelu na zbiorze walidacyjnym lub testowym.

    Funkcja przełącza model w tryb inferencji (wyłącza obliczanie gradientów), co optymalizuje
    zużycie pamięci i przyspiesza obliczenia. Następnie generuje predykcje dla całego zbioru
    i oblicza zestaw metryk diagnostycznych. Zastosowanie uśredniania typu 'macro' pozwala
    na rzetelną ocenę klas rzadkich (np. czerniaka) w niezbalansowanym zbiorze danych.

    Args:
        model (torch.nn.Module): Model sieci neuronowej poddawany ocenie.
        dataloader (DataLoader): Generator dostarczający dane walidacyjne/testowe.
        criterion (nn.Module): Funkcja straty używana do oceny błędu predykcji.
        device (torch.device): Urządzenie obliczeniowe (CPU lub CUDA).

    Returns:
        dict: Słownik zawierający obliczone wartości metryk:
            - 'strata': Średnia wartość funkcji straty.
            - 'dokładność': Dokładność.
            - 'mcc': Współczynnik korelacji Matthewsa.
            - 'czułość': Czułość.
            - 'precyzja': Precyzja.
            - 'specyficzność': Specyficznosć.
    """
    model.eval()
    running_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)

            running_loss += loss.item() * inputs.size(0)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(dataloader.dataset)

    # obliczamy mertyki
    # average='macro' traktuje każdą klasę na równi
    acc = accuracy_score(all_labels, all_preds) # dokładność
    mcc = matthews_corrcoef(all_labels, all_preds) # współczynnik korelacji Mattewsa
    sens = recall_score(all_labels, all_preds, average='macro', zero_division=0) # czułość
    prec = precision_score(all_labels, all_preds, average='macro', zero_division=0) # precyzja
    spec = calculate_specificity(all_labels, all_preds, NUM_CLASSES) # specyficzność

    return {
        'strata': epoch_loss,
        'dokładność': acc,
        'mcc': mcc,
        'czułość': sens,
        'precyzja': prec,
        'specyficzność': spec
    }

Konstruujemy funkcją inicjalizującą funkcję straty. Funkcja oblicza wagi poszczególnych klas, aby móc balansować nierówne ich rózłożenie w zbiorze. Dodatkowo zacieramy delikatnie granice między klasami ze względu na to, że wiele zmian złośliwych i łagodnych jest do siebie zbliżonych wizualnie.

In [None]:
def get_loss(train_df, device):
    """
    Konfiguruje funkcję straty (Loss Function) z mechanizmem równoważenia klas.

    Funkcja oblicza wagi dla każdej klasy na podstawie ich liczebności w zbiorze treningowym,
    stosując strategię "odwrotności pierwiastka" (1/sqrt(N)). Jest to podejście łagodniejsze
    niż prosta odwrotność częstości, zapobiegające niestabilności gradientów w przypadku
    bardzo rzadkich klas. Dodatkowo wagi są normalizowane do średniej 1.0, a funkcja straty
    wykorzystuje Label Smoothing (0.1) w celu zapobiegania nadmiernej pewności modelu.

    Args:
        train_df (pd.DataFrame): Ramka danych zbioru treningowego (wymagana kolumna 'target').
        device (torch.device): Urządzenie obliczeniowe (CPU/CUDA), na którym umieszczone zostaną wagi.

    Returns:
        nn.CrossEntropyLoss: Instancja funkcji straty z załadowanymi wagami i wygładzaniem etykiet.
    """
    # zliczamy wystąpienia klas
    class_counts = train_df['target'].value_counts().sort_index().values

    # obliczamy wagi
    weights = 1. / np.sqrt(class_counts)

    # normalizujemy wagi
    weights = weights / np.mean(weights)

    # zmieniamy na tensor i przeniesienie na GPU/CPU
    weights_tensor = torch.FloatTensor(weights).to(device)

    # zwracamy gotową funkcję
    return nn.CrossEntropyLoss(weight=weights_tensor, label_smoothing=0.1)

Konstruujemy funkcję, która będzie zamrażać lub odmarażać wagi modeli. Do tego będzie zwracać odpwiedni optymalizator i ewentualnie obiekt do harmonogramowania współczynnika uczenia.

In [None]:
def setup_stage(model, stage):
    """
    Konfiguruje hiperparametry treningu oraz stan zamrożenia warstw dla danego etapu.

    Funkcja realizuje strategię dwuetapowego uczenia transferowego:
    1. 'trening klasyfikatora': Zamraża cały ekstraktor cech, aktualizując wagi jedynie
       w nowej głowicy klasyfikacyjnej.
    2. 'dostrajanie modelu': Odmraża kluczowe bloki konwolucyjne (DenseBlock 3 i 4),
       zmniejsza learning rate i aktywuje scheduler w celu precyzyjnego dopasowania wag.

    Args:
        model (torch.nn.Module): Model DenseNet-121.
        stage (str): Identyfikator etapu ('trening klasyfikatora' lub 'dostrajanie modelu').

    Returns:
        tuple: Krotka konfiguracyjna (optimizer, scheduler, num_epochs, patience).
    """
    if stage == 'trening klasyfikatora':
        print("\nKonfiguracja etapu 1: Trening klasyfikatora")
        for param in model.features.parameters(): param.requires_grad = False
        for param in model.classifier.parameters(): param.requires_grad = True

        optimizer = optim.AdamW(model.classifier.parameters(), lr=WARMUP_LR, weight_decay=WEIGHT_DECAY)
        scheduler = None # w pierwszym etapie scheduler jest zbędny

        return optimizer, scheduler, WARMUP_EPOCHS, WARMUP_PATIENCE

    elif stage == 'dostrajanie modelu':
        print("\nKonfiguracja etapu 2: Dostrajanie modelu")
        # odmrażamy ostatni blok
        for param in model.features.denseblock4.parameters(): param.requires_grad = True
        for param in model.features.norm5.parameters(): param.requires_grad = True

        # odmrażamy kolejny blok
        for param in model.features.transition3.parameters(): param.requires_grad = True
        for param in model.features.denseblock3.parameters(): param.requires_grad = True

        optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()),
                                lr=FINETUNE_LR, weight_decay=WEIGHT_DECAY)

        # definiujemy harmonogramowane learning_rate w oparciu o poprawę MCC
        scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3)

        return optimizer, scheduler, FINETUNE_EPOCHS, FINETUNE_PATIENCE

Konstruujemy funkcję do obsługi instancji klas `ISICDataset`, `DataLoader` oraz do dzielenia zbioru na podstawie metadanych

In [None]:
def get_fold_data(df, fold_idx):
    """
    Inicjalizuje generatory danych (DataLoaders) dla określonego foldu walidacji krzyżowej.

    Funkcja dzieli zbiór danych na podzbiór treningowy (wszystkie foldy poza obecnym)
    i walidacyjny (obecny fold). Następnie tworzy instancje klasy Dataset z odpowiednimi
    transformacjami oraz konfiguruje DataLoadery z obsługą wielowątkowości i transferu
    pamięci do GPU (pin_memory).

    Args:
        df (pd.DataFrame): Główna ramka danych zawierająca metadane i przypisane numery foldów.
        fold_idx (int): Indeks aktualnego foldu, który ma służyć jako zbiór walidacyjny.

    Returns:
        tuple: (train_loader, val_loader, train_df) - Krotka zawierająca gotowe loadery
               oraz ramkę danych treningowych (potrzebną np. do wyliczenia wag klas).
    """
    # filtrujemy zbiór treningowy
    train_df = df[(df['typ_puli'] == 'trening') & (df['nr_podzbioru'] != fold_idx)].reset_index(drop=True)

    # filtrujemy zbiór walidacyjny
    val_df = df[(df['typ_puli'] == 'trening') & (df['nr_podzbioru'] == fold_idx)].reset_index(drop=True)

    # inicjalizujemy instancje klasy
    train_ds = ISICDataset(train_df, PROCESSED_DIR, transform=train_transforms)
    val_ds = ISICDataset(val_df, PROCESSED_DIR, transform=val_transforms)

    # konfigurujemy loadery
    # drop_last=True usuwa ostatnią niepełną partię danych
    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                              num_workers=NUM_WORKERS, pin_memory=True,
                              drop_last=True)

    val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False,
                            num_workers=NUM_WORKERS, pin_memory=True)

    return train_loader, val_loader, train_df

Konstruujemy również funkcję do zbierania predykcji na zbiorze walidacyjnym.

In [None]:
def get_predictions(model, loader, device):
    """
    Generuje predykcje dla całego dataloader'a (używane na koniec foldu).
    Zwraca: y_true (list), y_pred (list)
    """
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            # pobieramy klasę o najwyższym prawdopodobieństwie
            preds = torch.argmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy().tolist())
            all_labels.extend(labels.cpu().numpy().tolist())

    return all_labels, all_preds

Na koniec konstruujemy funkcję, która przeprowadza trening na pojedynczym foldzie. Trening odbywa się w dwóch etapach:
1. Trenujemy zamrożony model, czyli trenujemy wyłącznie warstwę klasyfikacyjną. Ma to na celu uniknięcie tzw. szkoku gradientowego, czyli drastycznej zmiany gradientów na początku fazy treningowej. Szok gradientowy zaburzyłby nam wagi już wytrenowanego modelu i wydłużyłby czas trwania treningu.
2. Trenujemy model z odmrożonymi dwoma ostatnimi blokami. Pozowli dostroić warstwy konwolucyjne do wykrywania cech związanych ze zmianami skórnymi.

In [None]:
def run_fold(fold_idx, df):
    """
    Przeprowadza pełny cykl treningowy i walidacyjny dla pojedynczego foldu.

    Funkcja zarządza całym procesem uczenia dla danego podziału danych:
    1. Przygotowuje DataLoadery (trening/walidacja) dla wskazanego foldu.
    2. Inicjalizuje model (transfer learning) i funkcję straty.
    3. Realizuje dwuetapowy trening:
        - Etap 1: Trening tylko głowicy klasyfikującej.
        - Etap 2: Dostrajanie kluczowych warstw modelu.
    4. Monitoruje metrykę MCC w celu zapisu najlepszego modelu .
       Zapewnia bezpieczny zapis na dysk (wymuszenie synchronizacji os.fsync).
    5. Stosuje mechanizm wczesnego zatrzymania  w przypadku braku poprawy.
    6. Po zakończeniu treningu generuje predykcje walidacyjne, wczytując wagi
       najlepszego modelu (a nie ostatniego stanu z pamięci).

    Args:
        fold_idx (int): Indeks bieżącego foldu (0..CV_FOLDS-1).
        df (pd.DataFrame): Główna ramka danych z metadanymi (ścieżki, etykiety, split).

    Returns:
        tuple: Krotka (history, fold_predictions) zawierająca:
            - history (dict): Słownik ze szczegółowym przebiegiem treningu (wartości straty,
              metryki dla każdej epoki, czasy wykonania, najlepszy wynik MCC).
            - fold_predictions (dict): Słownik z kluczami 'fold', 'y_true', 'y_pred',
              zawierający ostateczne predykcje najlepszego modelu na zbiorze walidacyjnym.
    """
    fold_start_time = time.time()
    print(f"\n{'='*5}START TRENINGU FOLD {fold_idx}/{CV_FOLDS-1}{'='*5}")

    train_loader, val_loader, train_df = get_fold_data(df, fold_idx)
    model = get_model(DEVICE)
    criterion = get_loss(train_df, DEVICE)
    scaler = torch.amp.GradScaler('cuda')

    history = {'fold': fold_idx, 'metryki': []}
    best_mcc_overall = -1.0 # najlepszy MCC w tym foldzie (z obu etapów)

    # ścieżka docelowa (na Google Drive)
    drive_model_path = os.path.join(MODELS_DIR, f'{MODEL_NAME}_fold{fold_idx}.pth')

    # ścieżka tymczasowa (na szybkim dysku lokalnym Colaba)
    local_model_path = f'/content/{MODEL_NAME}_fold{fold_idx}.pth'

    # pętla po etapach
    for stage_name in ['trening klasyfikatora', 'dostrajanie modelu']:
        optimizer, scheduler, n_epochs, patience_limit = setup_stage(model, stage_name)

        # reset liczników dla etapu
        patience_cnt = 0
        best_mcc_stage = -1.0

        print(f"Etap - {stage_name}: Liczba epok={n_epochs}, Cierpliwość={patience_limit}")

        for epoch in range(n_epochs):
            # trening
            t_loss = train_one_epoch(model, train_loader, criterion, optimizer, DEVICE, scaler)

            # walidacja
            metrics = evaluate(model, val_loader, criterion,DEVICE)

            # wyniki
            print(f"\tEpoka {epoch+1} | "
                    f"Strata: {t_loss:.4f} | "
                    f"Val MCC: {metrics['mcc']:.4f} | "
                    f"Val Czułość: {metrics['czułość']:.4f} | "
                    f"Val Specyficzność: {metrics['specyficzność']:.4f} | "
                    f"Val Precyzja: {metrics['precyzja']:.4f} | "
                    f"Val Dokładność: {metrics['dokładność']:.4f}")

            # zapis do historii
            record = {
                'etap': stage_name,
                'ostatnia_epoka': epoch+1,
                'strata_treningowa': t_loss,
                **metrics # rozpakowanie słownika z metrykami
            }
            history['metryki'].append(record)

            # harmonogramowanie współczynnika uczenia
            if scheduler:
                scheduler.step(metrics['mcc'])

            # MECHANIZM WCZESNEGO ZATRZYMANIA
            # sprawdzamy, czy to najlepszy model w ogóle w tym foldzie
            # jeśli tak to zapisujemy jego wagi
            if metrics['mcc'] > best_mcc_overall:
                best_mcc_overall = metrics['mcc']

                # zapisujemy na szybki dysk lokalny
                torch.save(model.state_dict(), local_model_path)

                # próbujemy skopiować na Drive
                try:
                    shutil.copy(local_model_path, drive_model_path)
                    print(f"\t>>> Poprawa modelu! MCC: {best_mcc_overall:.4f}. Zapisano na Drive.")
                except Exception as e:
                    print(f"\t>>> Poprawa modelu! Zapisano LOKALNIE. Błąd kopiowania na Drive: {e}")

            # logika wczesnego zatrzymania na poziomie etapu
            if metrics['mcc'] > best_mcc_stage:
                best_mcc_stage = metrics['mcc']
                patience_cnt = 0
            else:
                patience_cnt += 1
                if patience_cnt >= patience_limit:
                    print(f"  [Info] Zatrzymano trening w etapie {stage_name} po {epoch+1} epokach.")
                    break

    fold_duration = time.time() - fold_start_time
    history['czas_trwania_foldu'] = fold_duration
    history['najlepsze_mcc'] = best_mcc_overall

    # mechanizm generowanie predykcji na zbiorze walidacyjnym
    print(f"\nGenerowanie predykcji dla foldu {fold_idx}...")

    # wczytujemy najlepszy model
    if os.path.exists(local_model_path):
        model.load_state_dict(torch.load(local_model_path))
        print("Wczytano najlepszy model (kopia lokalna).")
    elif os.path.exists(drive_model_path):
        model.load_state_dict(torch.load(drive_model_path))
        print("Wczytano najlepszy model (z Google Drive).")
    else:
        print("[Ostrzeżenie] Nie znaleziono zapisanego modelu, używam ostatniego stanu.")

    # generujemy predykcje
    final_y_true, final_y_pred = get_predictions(model, val_loader, DEVICE)

    # tworzymy osobny obiekt na predykcje
    fold_predictions = {
        'fold': fold_idx,
        'y_true': final_y_true,
        'y_pred': final_y_pred
    }

    print(f"Koniec trening na foldzie {fold_idx}. Czas: {fold_duration/60:.1f} min. Najlepszy MCC: {best_mcc_overall:.4f}")

    # czyścimy pamięć
    del model, optimizer, scheduler, train_loader, val_loader
    torch.cuda.empty_cache()

    return history, fold_predictions

### Trening modelu

Mamy już przygotowany wszelkie potrzebne funkcje możemy przeprowadzić trening modelu DenseNet-121 na zbiorze ISIC2019. Zapisujemy wyniki treningu. Wprowadzamy również zabezpieczenia w razie przerwania sesji, aby oszczędzać jednostki obliczeniowe na GPU.

In [None]:
cv_results = []
densenet_preds = {'y_true': [], 'y_pred': []}
global_start_time = time.time()

print(f"Trening na {CV_FOLDS} foldach.")

for fold_idx in range(CV_FOLDS):
    # definiujemy ścieżki do plików, które mają powstać
    results_path = os.path.join(RESULTS_DIR, f'{MODEL_NAME}_wyniki_treningu_fold_{fold_idx}.json')
    preds_path = os.path.join(RESULTS_DIR, f'{MODEL_NAME}_predykcje_fold_{fold_idx}.json')
    model_path = os.path.join(MODELS_DIR, f'{MODEL_NAME}_fold{fold_idx}.pth')

    # MECHANIZM WZNAWIANIA
    # sprawdzamy, czy ten fold został już w pełni policzony
    if os.path.exists(results_path) and os.path.exists(preds_path) and os.path.exists(model_path):
        print(f"\n[INFO] Fold {fold_idx} został już ukończony. Wczytuję wyniki z dysku i pomijam trening.")

        # wczytujemy historię treningu
        with open(results_path, 'r') as f:
            fold_hist = json.load(f)
            cv_results.append(fold_hist)

        # wczytujemy predykcje i doklejamy do listy zbiorczej
        with open(preds_path, 'r') as f:
            fold_preds = json.load(f)
            densenet_preds['y_true'].extend(fold_preds['y_true'])
            densenet_preds['y_pred'].extend(fold_preds['y_pred'])

        continue # przechodzimy do następnego foldu

    # jeśli plików nie ma, uruchamiamy trening normalnie
    print(f"\n[INFO] Brak pełnych wyników dla Foldu {fold_idx}. Uruchamiam trening.")

    try:
        # trenujemy model i zbieramy predykcje
        fold_hist, fold_preds = run_fold(fold_idx, df)

        # dodajemy metryki do listy wyników
        cv_results.append(fold_hist)

        # dodajemy predykcje do listy zbiorczej
        densenet_preds['y_true'].extend(fold_preds['y_true'])
        densenet_preds['y_pred'].extend(fold_preds['y_pred'])

        # zapisujemy wyniki modelu
        with open(results_path, 'w') as f:
            json.dump(fold_hist, f)

        # zapisujemy predykcje
        with open(preds_path, 'w') as f:
            json.dump(fold_preds, f)

        print(f"[SUKCES] Fold {fold_idx} zapisany bezpiecznie na Dysku.")

    except Exception as e:
        print(f"\n[BŁĄD KRYTYCZNY] Wystąpił błąd podczas foldu {fold_idx}: {e}")
        print("Zatrzymuję pętlę, abyś mógł naprawić błąd. Poprzednie foldy są bezpieczne.")
        break # przerywamy pętlę, żeby nie nadpisać czegoś błędami

global_duration = time.time() - global_start_time
print(f"\n{'='*5} Zakończono cały proces. Czas: {global_duration/60:.2f} min {'='*5}")

# zapisujemy zbiorczy plik z wynikami
with open(os.path.join(RESULTS_DIR, f'{MODEL_NAME}_wyniki_FULL.json'), 'w') as f:
    json.dump(cv_results, f)

# zapisujemy zbiorczy plik z predykcjami
with open(os.path.join(RESULTS_DIR, f'{MODEL_NAME}_cv_predictions_FULL.json'), 'w') as f:
    json.dump(densenet_preds, f)

### Wizualizacja wyników treningu

Spróbujemy zwizualizować wyniki treningu.

In [None]:
def plot_training_history(cv_results, save_dir=None, filename="analiza_treningu.png"):
    """
    Generuje kompleksową wizualizację przebiegu treningu w układzie siatki 2x3.

    Funkcja tworzy wykresy typu 'spaghetti', prezentując stabilność modelu poprzez
    nałożenie wyników poszczególnych foldów (cienkie linie) na średni trend (gruba linia).
    Wizualizuje kluczowe metryki diagnostyczne oraz automatycznie oznacza moment
    rozpoczęcia etapu dostrajania czerwoną linią przerywaną.

    Args:
        cv_results (list): Lista słowników z historią treningu dla każdego foldu.
        save_dir (str, optional): Ścieżka do katalogu zapisu wykresu.
        filename (str, optional): Nazwa pliku wynikowego.
    """

    # KROK 1: przygotowanie danych
    plot_data = []
    fine_tune_start_epochs = []

    for fold_res in cv_results:
        fold_idx = fold_res['fold']
        metrics = fold_res['metryki']

        global_epoch = 0
        ft_start_found = False

        for record in metrics:
            global_epoch += 1
            stage = record.get('etap', 'nieznany')

            # wykrycie momentu dostrajania
            if stage == 'dostrajanie modelu' and not ft_start_found:
                fine_tune_start_epochs.append(global_epoch)
                ft_start_found = True

            # dodajemy dane do listy
            # strata
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('strata_treningowa', 0), 'Metryka': 'Strata treningowa'})
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('strata', 0), 'Metryka': 'Strata walidacyjna'})

            # metryki
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('mcc', 0), 'Metryka': 'MCC'})
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('dokładność', 0), 'Metryka': 'Dokładność'})
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('czułość', 0), 'Metryka': 'Czułość'})
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('specyficzność', 0), 'Metryka': 'Specyficzność'})
            plot_data.append({'Fold': fold_idx, 'Epoka': global_epoch, 'Wartość': record.get('precyzja', 0), 'Metryka': 'Precyzja'})

    df = pd.DataFrame(plot_data)
    avg_ft_start = np.mean(fine_tune_start_epochs) if fine_tune_start_epochs else None

    # KROK 2: rysowanie
    fig, axes = plt.subplots(2, 3, figsize=(24, 12))
    fig.suptitle(f'Analiza treningu ({len(cv_results)} Foldów)', fontsize=20, y=0.95)

    # spłaszczamy tablicę osi
    axes_flat = axes.flatten()

    # definiujemy co i gdzie rysować
    # format: (Tytuł, lista metryk do pokazania, zakres osi Y)
    charts_config = [
        ('Funkcja straty', ['Strata treningowa', 'Strata walidacyjna'], None),
        ('Współczynnik korelacji Mathewsa', ['MCC'], (0.0, 1.0)),
        ('Dokładność', ['Dokładność'], (0.0, 1.0)),
        ('Czułość', ['Czułość'], (0.0, 1.0)),
        ('Specyficzność', ['Specyficzność'], (0.0, 1.0)),
        ('Precyzja', ['Precyzja'], (0.0, 1.0))
    ]

    for i, (title, metrics_to_plot, ylim) in enumerate(charts_config):
        ax = axes_flat[i]

        # filtrujemy dane dla danego wykresu
        subset = df[df['Metryka'].isin(metrics_to_plot)]

        # wykres spaghetti - cienkie linie dla każdego foldu
        sns.lineplot(data=subset, x='Epoka', y='Wartość', hue='Metryka', units='Fold',
                     estimator=None, lw=0.6, alpha=0.3, ax=ax, legend=False, palette='tab10')

        # średnia - gruba linia
        sns.lineplot(data=subset, x='Epoka', y='Wartość', hue='Metryka',
                     lw=3, errorbar=None, ax=ax, palette='tab10')

        # linia startu dostrajania
        if avg_ft_start:
            ax.axvline(avg_ft_start, color='red', linestyle='--', alpha=0.5)
            # podpis tylko na pierwszym wykresie
            if i == 0:
                ax.text(avg_ft_start + 0.2, ax.get_ylim()[1]*0.95, 'Fine-tune', color='red', rotation=90)

        # aspekty kosmetyczne
        ax.set_title(title, fontsize=14, fontweight='bold')
        ax.set_xlabel('Epoka')
        ax.grid(True, alpha=0.3)
        if ylim: ax.set_ylim(ylim)

        # legenda
        handles, labels = ax.get_legend_handles_labels()
        if handles:
            by_label = dict(zip(labels, handles))
            ax.legend(by_label.values(), by_label.keys(), loc='lower right')

    plt.tight_layout(rect=[0, 0.03, 1, 0.95]) # Zostaw miejsce na tytuł główny

    if save_dir:
        save_path = os.path.join(save_dir, filename)
        plt.savefig(save_path)
        print(f"Wykresy zapisano w: {save_path}")

    plt.show()

In [None]:
plot_training_history(cv_results, save_dir=RESULTS_DIR, filename="analiza_treningu_densenet.png")