In [28]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from torch.utils.tensorboard import SummaryWriter
import torch.profiler as profiler
import math
import shutil
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd
from tqdm import tqdm
from datetime import datetime
import time
import copy
import timm
from PIL import Image
import gc
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report

In [2]:
# Установка фиксированных сидов для воспроизводимости
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True


In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
BATCH_SIZE = 32
NUM_EPOCHS = 10
BASE_LR = 0.001
WARMUP_EPOCHS = 2  # Количество эпох для warmup
NUM_CLASSES = 10
VAL_SPLIT = 0.1
OVERFIT_BATCHES = 3  # Для sanity check

In [5]:
# Метки классов CIFAR10
CIFAR10_CLASSES = ['airplane', 'automobile', 'bird', 'cat', 'deer',
                   'dog', 'frog', 'horse', 'ship', 'truck']

# Пути для сохранения данных
BASE_PATH = Path("./cifar10")
LOGS_PATH = Path("./logs")
RESULTS_PATH = Path("./results")

In [31]:
BASE_PATH.mkdir(exist_ok=True)
LOGS_PATH.mkdir(exist_ok=True)
RESULTS_PATH.mkdir(exist_ok=True)

### Сборка датасета CIFAR10 для CNN

In [32]:
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616))
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2435, 0.2616))
])


In [33]:
# Функция для разделения данных на train/val
def split_dataset(dataset, val_split=0.1):
    dataset_size = len(dataset)
    val_size = int(val_split * dataset_size)
    train_size = dataset_size - val_size

    indices = list(range(dataset_size))
    np.random.shuffle(indices)

    train_indices, val_indices = indices[:train_size], indices[train_size:]

    return train_indices, val_indices

In [34]:
def download_cifar10_imagefolder(root_dir, val_frac=0.1):
    os.makedirs(root_dir, exist_ok=True)

    # Загрузка CIFAR-10
    dataset = datasets.CIFAR10(root="./data", train=True, download=True)

    # Разбиение на train и val
    train_indices, val_indices = split_dataset(dataset, val_frac)

    # Сохранение изображений
    def save_subset(indices, split_name):
        for idx in indices:
            img, label = dataset[idx]
            class_name = dataset.classes[label]
            save_dir = os.path.join(root_dir, split_name, class_name)
            os.makedirs(save_dir, exist_ok=True)

            if not isinstance(img, Image.Image):
                img = transforms.ToPILImage()(img)
            save_path = os.path.join(save_dir, f"{idx}.png")
            img.save(save_path)

    save_subset(train_indices, "train")
    save_subset(val_indices, "val")

    print(f"CIFAR-10 сохранен в {root_dir}/train и {root_dir}/val")
    return root_dir

In [19]:
cifar10_root = download_cifar10_imagefolder(BASE_PATH)

CIFAR-10 сохранен в cifar10/train и cifar10/val


In [39]:
train_dataset = datasets.ImageFolder(root=os.path.join(BASE_PATH, "train"), transform=train_transform)
val_dataset = datasets.ImageFolder(root=os.path.join(BASE_PATH, "val"), transform=train_transform)

In [40]:
overfit_dataset = Subset(train_dataset, list(range(BATCH_SIZE * OVERFIT_BATCHES)))
overfit_loader = DataLoader(
    overfit_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

In [42]:


# Загрузка тестового датасета
test_dataset = datasets.CIFAR10(
    root=str(BASE_PATH),
    train=False,
    download=True,
    transform=test_transform
)

# Создаем DataLoaders
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

## Baseline CNN

In [43]:
# Определение модели CNN
class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 4 * 4, 512)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.bn1(self.conv1(x))))
        x = self.pool(F.relu(self.bn2(self.conv2(x))))
        x = self.pool(F.relu(self.bn3(self.conv3(x))))
        x = x.view(-1, 128 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

## Train pipeline

In [44]:
def train_model(model, dataloaders, criterion, optimizer, scheduler=None, num_epochs=10, experiment_name="default"):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    writer = SummaryWriter(f'{LOGS_PATH}/{experiment_name}_{timestamp}')
    print(f'{LOGS_PATH}/{experiment_name}_{timestamp}')

    with profiler.profile(
        activities=[
            profiler.ProfilerActivity.CPU,
            profiler.ProfilerActivity.CUDA,
        ],
        schedule=profiler.schedule(wait=1, warmup=1, active=3),
        on_trace_ready=profiler.tensorboard_trace_handler(str(LOGS_PATH)),
        record_shapes=True,
        profile_memory=True,
        with_stack=True
    ) as prof:

      for epoch in range(num_epochs):

          gc.collect()
          torch.cuda.empty_cache() if torch.cuda.is_available() else None

          print(f'Эпоха {epoch}/{num_epochs - 1}')
          print('-' * 10)

          for phase in ['train', 'val']:
              if phase == 'train':
                  model.train()
              else:
                  model.eval()

              running_loss = 0.0
              running_corrects = 0

              with tqdm(dataloaders[phase], unit="batch") as tepoch:
                  for inputs, labels in tepoch:
                      tepoch.set_description(f"Epoch {epoch} - {phase}")

                      inputs = inputs.to(device)
                      labels = labels.to(device)

                      optimizer.zero_grad()

                      with torch.set_grad_enabled(phase == 'train'):
                          outputs = model(inputs)
                          _, preds = torch.max(outputs, 1)
                          loss = criterion(outputs, labels)

                          if phase == 'train':
                              loss.backward()
                              optimizer.step()

                      running_loss += loss.item() * inputs.size(0)
                      running_corrects += torch.sum(preds == labels.data)

                  if phase == 'train' and scheduler is not None:
                      scheduler.step()

              epoch_loss = running_loss / len(dataloaders[phase].dataset)
              epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

              print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

              writer.add_scalar(f'Loss/{phase}', epoch_loss, epoch)
              writer.add_scalar(f'Accuracy/{phase}', epoch_acc, epoch)
              current_lr = optimizer.param_groups[0]['lr']
              print("curent_lr: ", current_lr)
              writer.add_scalar('learning_rate/epoch', current_lr, epoch)

              prof.step()

              if phase == 'val' and epoch_acc > best_acc:
                  best_acc = epoch_acc
                  best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print(f'Обучение завершено за {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Лучшая точность на валидации: {best_acc:.4f}')

    model.load_state_dict(best_model_wts)
    return {"best_model": model,
            "SummaryWriter": writer,
            "best_validation_accuracy": best_acc}

## Evaluation pipeline

In [29]:
def evaluate_model_with_confusion_matrix(model, test_loader, writer):
    model.eval()

    all_predictions = []
    all_targets = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = outputs.max(1)

            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(targets.cpu().numpy())

    # Вычисление confusion matrix
    cm = confusion_matrix(all_targets, all_predictions)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    cm_df = pd.DataFrame(
        cm,
        index=CIFAR10_CLASSES,
        columns=CIFAR10_CLASSES
    )

    cm_norm_df = pd.DataFrame(
        cm_normalized,
        index=CIFAR10_CLASSES,
        columns=CIFAR10_CLASSES
    )

    # Визуализация
    plt.figure(figsize=(12, 10))

    plt.subplot(1, 2, 1)
    sns.heatmap(cm_df, annot=True, fmt="d", cmap="Blues")
    plt.title("Confusion Matrix")
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")

    plt.subplot(1, 2, 2)
    sns.heatmap(cm_norm_df, annot=True, fmt=".2f", cmap="Blues")
    plt.title("Normalized Confusion Matrix")
    plt.ylabel("True Label")
    plt.xlabel("Predicted Label")

    plt.tight_layout()

    confusion_matrix_path = RESULTS_PATH / "confusion_matrix.png"
    plt.savefig(confusion_matrix_path)

    writer.add_figure("Confusion Matrix", plt.gcf())

    plt.close()

    report = classification_report(
        all_targets,
        all_predictions,
        target_names=CIFAR10_CLASSES,
        digits=3
    )

    print("\nClassification Report:")
    print(report)

    report_path = RESULTS_PATH / "classification_report.txt"
    with open(report_path, 'w') as f:
        f.write(report)

    # Считаем метрики
    accuracy = sum(1 for p, t in zip(all_predictions, all_targets) if p == t) / len(all_targets)
    f1_macro = f1_score(all_targets, all_predictions, average='macro')
    f1_micro = f1_score(all_targets, all_predictions, average='micro')
    f1_weighted = f1_score(all_targets, all_predictions, average='weighted')

    print(f"\nTest Accuracy: {accuracy*100:.2f}%")
    print("f1_macro", f1_macro)
    print("f1_micro", f1_micro)
    print("f1_mweighted", f1_weighted)

    return cm, accuracy

## Sanity check pipeline

In [64]:
def sanity_check(model, criterion, dataloader, optimizer, num_epochs, experiment_name):
    print("\n--- Starting Sanity Check (Overfitting test) ---")

    result = train_model(model, dataloaders, criterion, optimizer,
                          num_epochs=num_epochs, experiment_name=experiment_name)
    acc = result["best_validation_accuracy"]

    if acc > 98.0:
        print(f"Sanity check passed! Model successfully overfit with {acc:.2f}% accuracy")

    print("--- Sanity Check Completed ---\n")

## Sanity check и обучение CNN

In [8]:
# Шедулер с вормапом и cosineAnnealing
def warmup_then_cosine_scheduler(
    optimizer,
    start_factor,
    warmup_duration,
    total_steps
):
  warmup = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    end_factor=1.0,
    total_iters=warmup_duration,
  )

  cos_annealing = optim.lr_scheduler.CosineAnnealingLR(
      optimizer,
      T_max=total_steps - warmup_duration,
  )

  scheduler = optim.lr_scheduler.SequentialLR(
      optimizer,
      schedulers=[warmup, cos_annealing],
      milestones=[warmup_duration],
  )
  return scheduler

In [None]:
# for sanity check
model = SimpleCNN(num_classes=NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=BASE_LR)

start_factor = 0.1
warmup_duration = len(train_loader) * 2
total_steps = len(train_loader) * NUM_EPOCHS

scheduler = warmup_then_cosine_scheduler(optimizer=optimizer, start_factor=start_factor, warmup_duration=warmup_duration, total_steps=total_steps)

In [None]:
dataloaders = {"train": DataLoader(
                            overfit_dataset,
                            batch_size=BATCH_SIZE,
                            shuffle=True,
                            num_workers=2,
                            pin_memory=True
                            ),
                   "val": DataLoader(
                          overfit_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=2,
                          pin_memory=True
                          )}

sanity_check(model, criterion, dataloaders, optimizer, 50, "cnn_sanity_check")

In [46]:
# for train
model = SimpleCNN(num_classes=NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=BASE_LR)

start_factor = 0.1
warmup_duration = len(train_loader) * 2
total_steps = len(train_loader) * NUM_EPOCHS

scheduler = warmup_then_cosine_scheduler(optimizer=optimizer, start_factor=start_factor, warmup_duration=warmup_duration, total_steps=total_steps)
dataloaders = {"train": train_loader, "val": val_loader}

In [None]:
result = train_model(model, dataloaders, criterion, optimizer, scheduler=scheduler,
                          num_epochs=NUM_EPOCHS, experiment_name="cnn_main")

logs/cnn_main_20251002_201159
Эпоха 0/9
----------


Epoch 0 - train: 100%|██████████| 1407/1407 [00:41<00:00, 33.58batch/s]


train Loss: 1.5011 Acc: 0.4591
curent_lr:  0.00010031982942430704


Epoch 0 - val: 100%|██████████| 157/157 [00:03<00:00, 46.16batch/s]


val Loss: 1.2483 Acc: 0.5554
curent_lr:  0.00010031982942430704
Эпоха 1/9
----------


Epoch 1 - train: 100%|██████████| 1407/1407 [01:17<00:00, 18.13batch/s]


train Loss: 1.1835 Acc: 0.5788
curent_lr:  0.00010063965884861408


Epoch 1 - val: 100%|██████████| 157/157 [00:09<00:00, 17.33batch/s]


val Loss: 1.0569 Acc: 0.6232
curent_lr:  0.00010063965884861408
Эпоха 2/9
----------


Epoch 2 - train: 100%|██████████| 1407/1407 [01:27<00:00, 16.07batch/s]

In [41]:
cm, test_accuracy = evaluate_model_with_confusion_matrix(model, test_loader, result["SummaryWriter"])


Classification Report:
              precision    recall  f1-score   support

    airplane      0.762     0.777     0.769      1000
  automobile      0.823     0.858     0.840      1000
        bird      0.731     0.528     0.613      1000
         cat      0.634     0.433     0.515      1000
        deer      0.671     0.703     0.687      1000
         dog      0.552     0.764     0.641      1000
        frog      0.770     0.856     0.811      1000
       horse      0.788     0.795     0.791      1000
        ship      0.885     0.788     0.834      1000
       truck      0.765     0.835     0.799      1000

    accuracy                          0.734     10000
   macro avg      0.738     0.734     0.730     10000
weighted avg      0.738     0.734     0.730     10000


Test Accuracy: 73.37%


## Подготовка датасета для Vit

In [9]:
mean = (0.4914, 0.4822, 0.4465)
std = (0.2470, 0.2435, 0.2616)

train_transform_vit = transforms.Compose([
    transforms.Resize(224),  # Масштабируем до размера, который ожидает ViT
    transforms.RandomCrop(224, padding=16),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

test_transform_vit = transforms.Compose([
    transforms.Resize(224),  # Масштабируем до размера, который ожидает ViT
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])



In [10]:
train_dataset_vit = datasets.ImageFolder(root=os.path.join(BASE_PATH, "train"), transform=train_transform_vit)
val_dataset_vit = datasets.ImageFolder(root=os.path.join(BASE_PATH, "val"), transform=train_transform_vit)

In [11]:
overfit_dataset_vit = Subset(train_dataset_vit, list(range(BATCH_SIZE * OVERFIT_BATCHES)))
overfit_loader_vit = DataLoader(
    overfit_dataset_vit,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

In [12]:


# Создаем DataLoaders
train_loader_vit = DataLoader(
    train_dataset_vit,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=2,
    pin_memory=True
)

val_loader_vit = DataLoader(
    val_dataset_vit,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

## ViT Init, sanity check and train

In [17]:
model_vit = timm.create_model('vit_tiny_patch16_224', pretrained=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


### sanity check

In [27]:

for param in model_vit.parameters():
    param.requires_grad = False
embedding_size = model_vit.head.in_features

model_vit.head = nn.Linear(embedding_size, NUM_CLASSES)

for param in model_vit.head.parameters():
    param.requires_grad = True
model_vit = model_vit.to(device)

In [30]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_vit.parameters(), lr=0.01)

start_factor = 0.1
warmup_duration = 2
total_steps = NUM_EPOCHS

scheduler = warmup_then_cosine_scheduler(optimizer=optimizer, start_factor=start_factor, warmup_duration=warmup_duration, total_steps=total_steps)

In [None]:
dataloaders = {"train": DataLoader(
                            overfit_dataset_vit,
                            batch_size=BATCH_SIZE,
                            shuffle=True,
                            num_workers=2,
                            pin_memory=True
                            ),
                   "val": DataLoader(
                          overfit_dataset_vit,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          num_workers=2,
                          pin_memory=True
                          )}

sanity_check(model_vit, criterion, dataloaders, optimizer, 50, "vit_sanity_check")

### train

In [17]:
model_vit = timm.create_model('vit_tiny_patch16_224', pretrained=True)
for param in model_vit.parameters():
    param.requires_grad = False
embedding_size = model_vit.head.in_features

model_vit.head = nn.Linear(embedding_size, NUM_CLASSES)

for param in model_vit.head.parameters():
    param.requires_grad = True
model_vit = model_vit.to(device)

criterion = nn.CrossEntropyLoss()
params_to_update = [p for p in model_vit.parameters() if p.requires_grad]
optimizer = optim.Adam(params_to_update, lr=0.01)

start_factor = 0.1
warmup_duration = 1
total_steps = 1

scheduler = optim.lr_scheduler.LinearLR(
    optimizer,
    start_factor=start_factor,
    end_factor=1.0,
    total_iters=warmup_duration,
  )

In [18]:
dataloaders_vit = {"train": train_loader_vit, "val": val_loader_vit}

In [19]:
gc.collect()
torch.cuda.empty_cache()

In [20]:
result_vit = train_model(model_vit, dataloaders_vit, criterion, optimizer, scheduler=scheduler,
                          num_epochs=1, experiment_name="vit_main")

logs/vit_main_20251002_195047
Эпоха 0/0
----------


Epoch 0 - train: 100%|██████████| 1407/1407 [01:58<00:00, 11.88batch/s]


train Loss: 0.6482 Acc: 0.7818
curent_lr:  0.01


Epoch 0 - val: 100%|██████████| 157/157 [00:14<00:00, 10.56batch/s]


val Loss: 0.5348 Acc: 0.8120
curent_lr:  0.01
Обучение завершено за 2m 14s
Лучшая точность на валидации: 0.8120


In [21]:
test_dataset_vit = datasets.CIFAR10(
    root=str(BASE_PATH),
    train=False,
    download=True,
    transform=test_transform_vit
)

100%|██████████| 170M/170M [00:26<00:00, 6.39MB/s]


In [24]:
test_loader_vit = DataLoader(
    test_dataset_vit,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=2,
    pin_memory=True
)

### eval

In [30]:
cm, test_accuracy = evaluate_model_with_confusion_matrix(model_vit, test_loader_vit, result_vit["SummaryWriter"])


Classification Report:
              precision    recall  f1-score   support

    airplane      0.823     0.818     0.820      1000
  automobile      0.890     0.867     0.878      1000
        bird      0.845     0.763     0.802      1000
         cat      0.803     0.565     0.663      1000
        deer      0.820     0.789     0.804      1000
         dog      0.651     0.860     0.741      1000
        frog      0.786     0.929     0.852      1000
       horse      0.883     0.814     0.847      1000
        ship      0.866     0.883     0.874      1000
       truck      0.850     0.864     0.857      1000

    accuracy                          0.815     10000
   macro avg      0.822     0.815     0.814     10000
weighted avg      0.822     0.815     0.814     10000


Test Accuracy: 81.52%
f1_macro 0.8138462249930718
f1_micro 0.8152
f1_mweighted 0.8138462249930717
