In [None]:
# -*- coding: utf-8 -*-
"""
Модуль для проведения K-Fold Cross-Validation модели SSD300 с VGG16.

Содержит функции для:
1. Загрузки и подготовки данных в формате COCO
2. Разделения данных на K фолдов
3. Обучения модели на каждом фолде
4. Оценки качества модели с расчетом доверительных интервалов
"""

import os
import json
import numpy as np
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torchvision
from torchvision.models.detection import SSD300_VGG16_Weights
from torchvision.models.detection.ssd import SSDHead
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import StepLR
from torchvision.transforms import functional as F

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from sklearn.model_selection import KFold


class Config:
    """Класс для хранения конфигурации обучения."""

    # Пути к данным
    TRAIN_ANNOTATIONS = "/kaggle/working/kpk_dataset-9/train/_annotations.coco.json"
    TRAIN_DATA_DIR = "/kaggle/working/kpk_dataset-9/train"
    VAL_ANNOTATIONS = "/kaggle/working/kpk_dataset-9/valid/_annotations.coco.json"
    VAL_DATA_DIR = "/kaggle/working/kpk_dataset-9/valid"

    # Параметры обучения
    BATCH_SIZE = 8
    NUM_EPOCHS = 50
    LEARNING_RATE = 0.0005
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Параметры K-Fold
    K_FOLDS = 5
    CURRENT_FOLD = 1  # Текущий фолд для обучения


class FoldCocoDataset(Dataset):
    """Кастомный датасет для работы с COCO аннотациями и K-Fold разбиением."""

    def __init__(self, root, annotation, image_ids, transforms=None):
        """
        Инициализация датасета.

        Args:
            root (str): Путь к директории с изображениями
            annotation (str): Путь к файлу аннотаций COCO
            image_ids (list): Список ID изображений для включения в датасет
            transforms (callable, optional): Трансформы для изображений
        """
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.image_ids = image_ids
        self.cat_ids = self.coco.getCatIds()
        self.cat2label = {cat_id: i + 1 for i, cat_id in enumerate(self.cat_ids)}

        # Фильтруем изображения только из выбранного фолда
        self.ids = [img_id for img_id in self.coco.imgs.keys()
                   if img_id in image_ids]

    def __getitem__(self, idx):
        """Получение одного элемента датасета."""
        img_id = self.ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.root, img_info['file_name'])
        img = Image.open(img_path).convert("RGB")

        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)

        boxes = []
        labels = []
        for ann in anns:
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(self.cat2label[ann['category_id']])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([img_id])
        }

        if self.transforms:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        """Количество элементов в датасете."""
        return len(self.ids)


def get_transform():
    """Возвращает трансформы для изображений."""
    def transform(image, target):
        image = F.to_tensor(image)
        image = F.normalize(
            image,
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
        return image, target
    return transform


def create_model(num_classes):
    """Создает модель SSD300 с VGG16 backbone.

    Args:
        num_classes (int): Количество классов (включая background)

    Returns:
        torch.nn.Module: Модель SSD300
    """
    model = torchvision.models.detection.ssd300_vgg16(
        weights=SSD300_VGG16_Weights.DEFAULT
    )
    in_channels = [512, 1024, 512, 256, 256, 256]
    num_anchors = model.anchor_generator.num_anchors_per_location()
    model.head = SSDHead(in_channels, num_anchors, num_classes)
    return model.to(Config.DEVICE)


def train_one_epoch(model, optimizer, data_loader, device, epoch):
    """Обучение модели на одной эпохе.

    Args:
        model (torch.nn.Module): Модель для обучения
        optimizer (torch.optim.Optimizer): Оптимизатор
        data_loader (DataLoader): Загрузчик данных
        device (torch.device): Устройство для обучения
        epoch (int): Номер текущей эпохи

    Returns:
        float: Среднее значение функции потерь на эпохе
    """
    model.train()
    total_loss = 0
    progress = tqdm(data_loader, desc=f"Epoch {epoch}")

    for images, targets in progress:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0)
        optimizer.step()

        total_loss += losses.item()
        progress.set_postfix({"loss": losses.item()})

    return total_loss / len(data_loader)


def create_kfold_split():
    """Создает K-Fold разбиение данных.

    Returns:
        tuple: (train_image_ids, val_image_ids) - списки ID изображений
    """
    # Загружаем аннотации
    coco_train = COCO(Config.TRAIN_ANNOTATIONS)
    coco_val = COCO(Config.VAL_ANNOTATIONS)

    # Получаем все image ids
    train_ids = list(sorted(coco_train.imgs.keys()))
    val_ids = list(sorted(coco_val.imgs.keys()))
    all_ids = train_ids + val_ids

    # Создаем KFold разбиение
    kf = KFold(n_splits=Config.K_FOLDS, shuffle=True, random_state=42)
    splits = list(kf.split(all_ids))

    # Получаем train/val индексы для текущего фолда
    train_idx, val_idx = splits[Config.CURRENT_FOLD - 1]

    # Получаем соответствующие image ids
    train_image_ids = [all_ids[i] for i in train_idx]
    val_image_ids = [all_ids[i] for i in val_idx]

    return train_image_ids, val_image_ids


def train_with_kfold():
    """Основной цикл обучения с K-Fold кросс-валидацией."""
    # Создаем K-Fold разбиение
    train_image_ids, val_image_ids = create_kfold_split()

    # Создаем датасеты для текущего фолда
    train_dataset = FoldCocoDataset(
        Config.TRAIN_DATA_DIR,
        Config.TRAIN_ANNOTATIONS,
        train_image_ids,
        get_transform()
    )

    val_dataset = FoldCocoDataset(
        Config.VAL_DATA_DIR,
        Config.VAL_ANNOTATIONS,
        val_image_ids,
        get_transform()
    )

    num_classes = len(train_dataset.cat_ids) + 1  # +background
    model = create_model(num_classes)

    # DataLoader
    train_loader = DataLoader(
        train_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=True,
        collate_fn=lambda x: tuple(zip(*x)),
        num_workers=2
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=Config.BATCH_SIZE,
        shuffle=False,
        collate_fn=lambda x: tuple(zip(*x)),
        num_workers=2
    )

    # Оптимизатор
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = AdamW(params, lr=Config.LEARNING_RATE, weight_decay=0.0005)
    scheduler = StepLR(optimizer, step_size=3, gamma=0.1)

    # Обучение
    for epoch in range(1, Config.NUM_EPOCHS + 1):
        train_loss = train_one_epoch(
            model, optimizer, train_loader, Config.DEVICE, epoch
        )

        print(
            f"Fold {Config.CURRENT_FOLD}, "
            f"Epoch {epoch}/{Config.NUM_EPOCHS} - "
            f"Train Loss: {train_loss:.4f}"
        )

    # Сохранение модели
    os.makedirs(f"fold_{Config.CURRENT_FOLD}", exist_ok=True)
    torch.save(
        model.state_dict(),
        f"fold_{Config.CURRENT_FOLD}/final_model.pth"
    )

    print(f"Обучение для фолда {Config.CURRENT_FOLD} завершено!")
    return model


def evaluate_model(model, dataset):
    """Оценка модели на датасете с использованием метрик COCO.

    Args:
        model (torch.nn.Module): Обученная модель
        dataset (Dataset): Датасет для оценки

    Returns:
        np.array: Массив с метриками оценки
    """
    model.eval()
    results = []
    coco = dataset.coco
    cat_ids = coco.getCatIds()

    for idx in tqdm(range(len(dataset))):
        image, _ = dataset[idx]
        with torch.no_grad():
            prediction = model([image.to(Config.DEVICE)])[0]

        prediction = {k: v.cpu() for k, v in prediction.items()}
        image_info = coco.loadImgs(dataset.ids[idx])[0]

        for box, label, score in zip(
            prediction['boxes'],
            prediction['labels'],
            prediction['scores']
        ):
            results.append({
                "image_id": image_info['id'],
                "category_id": cat_ids[label - 1],
                "bbox": [
                    box[0].item(),
                    box[1].item(),
                    (box[2] - box[0]).item(),
                    (box[3] - box[1]).item()
                ],
                "score": score.item()
            })

    coco_pred = coco.loadRes(results)
    coco_eval = COCOeval(coco, coco_pred, 'bbox')
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()

    return coco_eval.stats


def calculate_confidence_intervals(data, confidence=0.95):
    """Вычисляет среднее значение и доверительный интервал.

    Args:
        data (list or np.array): Массив значений
        confidence (float): Уровень доверия (по умолчанию 0.95)

    Returns:
        tuple: (mean, (lower_bound, upper_bound))
    """
    data = np.array(data)
    mean = np.mean(data)
    std = np.std(data, ddof=1)
    n = len(data)

    from scipy import stats
    t = stats.t.ppf((1 + confidence) / 2., n - 1)
    margin = t * std / np.sqrt(n)

    return mean, (mean - margin, mean + margin)


if __name__ == "__main__":
    # Запуск кросс-валидации
    model = train_with_kfold()

    # Оценка модели
    test_dataset = FoldCocoDataset(
        Config.VAL_DATA_DIR,
        Config.VAL_ANNOTATIONS,
        list(range(100)),
        get_transform()
    )

    metrics = evaluate_model(model, test_dataset)
    print("Metrics:", metrics)