<a href="https://colab.research.google.com/github/smaugcow/test_test/blob/main/examples/notebooks/pytorch/byol.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This example requires the following dependencies to be installed:
pip install lightly

In [1]:
# Установка необходимых библиотек
!pip install lightly
!pip install matplotlib



In [2]:
# Импорт необходимых библиотек
import copy
import torch
import torchvision
from torch import nn
from lightly.loss import NegativeCosineSimilarity
from lightly.models.modules import BYOLPredictionHead, BYOLProjectionHead
from lightly.models.utils import deactivate_requires_grad, update_momentum
from lightly.transforms.byol_transform import (
    BYOLTransform,
    BYOLView1Transform,
    BYOLView2Transform,
)
from lightly.utils.scheduler import cosine_schedule
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
import numpy as np
import matplotlib.pyplot as plt



In [3]:
# Определяем класс BYOL (Bootstrap Your Own Latent)
# Это self-supervised learning архитектура, не требующая аннотированных меток для обучения
class BYOL(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        # Определяем базовый свёрточный слой (backbone), используем ResNet-18 с обрезанной головой
        self.backbone = backbone
        # Создаем проекционный и предсказательный головы, обе используют линейные слои с активацией ReLU
        self.projection_head = BYOLProjectionHead(512, 1024, 256)
        self.prediction_head = BYOLPredictionHead(256, 1024, 256)

        # Моментум версии backbone и проекционной головы, которые обучаются медленнее
        self.backbone_momentum = copy.deepcopy(self.backbone)
        self.projection_head_momentum = copy.deepcopy(self.projection_head)

        # Деактивируем вычисление градиентов для моментум структур
        deactivate_requires_grad(self.backbone_momentum)
        deactivate_requires_grad(self.projection_head_momentum)

    # Метод forward для основной модели
    def forward(self, x):
        # Получаем представления данных от backbone
        y = self.backbone(x).flatten(start_dim=1)
        # Пропускаем через проекционную голову
        z = self.projection_head(y)
        # Пропускаем через предсказательную голову
        p = self.prediction_head(z)
        return p

    # Метод forward для моментум версии модели
    def forward_momentum(self, x):
        y = self.backbone_momentum(x).flatten(start_dim=1)
        z = self.projection_head_momentum(y)
        z = z.detach()  # Отключаем градиенты для этого выхода, чтобы не обновлять эту часть сети
        return z

# Инициализация модели с использованием ResNet-18 в качестве backbone
resnet = torchvision.models.resnet18(pretrained=False)
backbone = nn.Sequential(*list(resnet.children())[:-1])  # Убираем последний полносвязный слой
model = BYOL(backbone)

# Использование GPU, если доступен
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)



BYOL(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
 

In [4]:
# Подготовка данных CIFAR10 для обучения
# Используем трансформации для создания двух различных представлений одного и того же изображения
transform = BYOLTransform(
    view_1_transform=BYOLView1Transform(input_size=32, gaussian_blur=0.0),
    view_2_transform=BYOLView2Transform(input_size=32, gaussian_blur=0.0),
)

# Загрузка датасета CIFAR10 с применением BYOL трансформаций
dataset = torchvision.datasets.CIFAR10(
    "datasets/cifar10", download=True, transform=transform
)

# Создание DataLoader для эффективной загрузки данных во время обучения
dataloader = torch.utils.data.DataLoader(
    dataset,
    batch_size=256,  # Использование достаточно большого размера для улучшения стабилизации градиентов
    shuffle=True,  # Перемешивание данных для каждой эпохи для улучшения качества обучения
    drop_last=True,  # Отбрасываем последний батч, если он меньше заявленного размера
    num_workers=8,  # Используем несколько потоков для ускорения загрузки данных
)

# Определение функции потерь: NegativeCosineSimilarity используется для минимизации угловой дистанции
criterion = NegativeCosineSimilarity()

# Использование стохастического градиентного спуска для обновления весов сети
optimizer = torch.optim.SGD(model.parameters(), lr=0.06)

100%|██████████| 170M/170M [00:02<00:00, 66.7MB/s]


In [None]:
# Начало обучения модели
epochs = 10  # Количество эпох, на протяжении которых будет проходить обучение
all_avg_losses = []

print("Starting Training")
for epoch in range(epochs):
    total_loss = 0  # Инициализируем переменную для накопления потерь

    # Косинусное расписание изменения моментума для более плавного обучения
    momentum_val = cosine_schedule(epoch, epochs, 0.996, 1)

    for batch in dataloader:
        x0, x1 = batch[0]  # Получаем два различных представления одного батча изображений

        # Обновляем моментум весов для backbone и проекционной головы
        update_momentum(model.backbone, model.backbone_momentum, m=momentum_val)
        update_momentum(model.projection_head, model.projection_head_momentum, m=momentum_val)

        # Перемещаем данные на устройство (GPU или CPU)
        x0, x1 = x0.to(device), x1.to(device)

        # Прямое распространение для двух представлений
        p0, p1 = model(x0), model(x1)
        # Прямое распространение для моментум версии модели
        z0, z1 = model.forward_momentum(x0), model.forward_momentum(x1)

        # Вычисляем loss как среднее между двумя противоположными парами представлений
        loss = 0.5 * (criterion(p0, z1) + criterion(p1, z0))
        total_loss += loss.item()  # Накопление потерь в целях логирования

        # Шаги обратного распространения и оптимизации
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

    # Вычисляем среднюю потерю за одну эпоху и добавляем её для логирования
    avg_loss = total_loss / len(dataloader)
    all_avg_losses.append(avg_loss)
    print(f"Epoch: {epoch+1}/{epochs}, Avg Loss: {avg_loss:.5f}")

Starting Training


In [None]:
# Построение графика потерь для визуализации обучения
plt.figure(figsize=(10, 6))
plt.plot(range(1, epochs + 1), all_avg_losses, marker='o', color='b', label='Train Loss')
plt.title('Average Loss over Epochs for BYOL Training')
plt.xlabel('Epochs')  # Метка оси x
plt.ylabel('Average Loss')  # Метка оси y
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Функция для извлечения признаков из данных
def extract_features(dataloader, model):
    model.eval()  # Перевод модели в режим оценки (выключает dropout, batch norm и др.)
    features, labels = [], []
    with torch.no_grad():  # Отключаем градиенты для ускорения и снижения потребления памяти
        for (x, y) in dataloader:
            x = x.to(device)
            feats = model.backbone(x).flatten(start_dim=1)  # Извлекаем признаки и преобразуем их в плоский вектор
            features.append(feats.cpu().numpy())
            labels.extend(y.numpy())
    return np.concatenate(features), np.array(labels)

In [None]:
# Создание данных для KNN классификатора
transform_test = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),  # Конвертация изображений в тензоры
    torchvision.transforms.Normalize((0.5,), (0.5,)),  # Нормализация данных для глубоких моделей
])

# Загрузка тренировочного и тестового набора данных с новыми трансформациями
train_dataset_knn = torchvision.datasets.CIFAR10(
    "datasets/cifar10", train=True, transform=transform_test, download=True
)
test_dataset_knn = torchvision.datasets.CIFAR10(
    "datasets/cifar10", train=False, transform=transform_test, download=True
)

# Создание DataLoader для загрузки данных в KNN
dataloader_train_knn = torch.utils.data.DataLoader(
    train_dataset_knn, batch_size=256, shuffle=False, num_workers=8)
dataloader_test_knn = torch.utils.data.DataLoader(
    test_dataset_knn, batch_size=256, shuffle=False, num_workers=8)

In [None]:
# Извлечение признаков и обучение KNN классификатора
features_train, labels_train = extract_features(dataloader_train_knn, model)
features_test, labels_test = extract_features(dataloader_test_knn, model)

# Инициализация и обучение KNN классификатора на признаках, извлеченных из SSL модели
knn_ssl = KNeighborsClassifier(n_neighbors=5)  # Используем 5 ближайших соседей
knn_ssl.fit(features_train, labels_train)  # Обучение модели KNN
predictions_ssl = knn_ssl.predict(features_test)  # Предсказания на тестовом наборе данных
accuracy_ssl = accuracy_score(labels_test, predictions_ssl)  # Вычисление точности модели

print(f"Accuracy using SSL features: {accuracy_ssl:.2f}")

# Реализация KNN на сырых данных (без извлеченных признаков)
def flatten_images(dataloader):
    images, labels = [], []
    for x, y in dataloader:
        images.append(x.view(x.size(0), -1).numpy())  # Преобразование изображений в вектор
        labels.extend(y.numpy())
    return np.concatenate(images), np.array(labels)

# Извлечение признаков с сырых изображений
raw_train, raw_train_labels = flatten_images(dataloader_train_knn)
raw_test, raw_test_labels = flatten_images(dataloader_test_knn)

# Инициализация и обучение KNN классификатора на сырых изображениях
knn_raw = KNeighborsClassifier(n_neighbors=5)
knn_raw.fit(raw_train, raw_train_labels)
predictions_raw = knn_raw.predict(raw_test)
accuracy_raw = accuracy_score(raw_test_labels, predictions_raw)  # Вычисление точности для сырых данных

print(f"Accuracy using raw images: {accuracy_raw:.2f}")

In [None]:
# Сравнение и визуализация точности KNN классификаторов
accuracies = [accuracy_ssl, accuracy_raw]
labels = ['SSL Features', 'Raw Images']

plt.figure(figsize=(8, 6))
plt.bar(labels, accuracies, color=['navy', 'gray'])
plt.title('Comparison of KNN Classifier Accuracy')
plt.ylabel('Accuracy')
plt.ylim(0, 1)
plt.grid(axis='y')
plt.show()

# Вывод итоговых результатов и сравнение
print("\nComparison of KNN Classifier Performance:")
print(f"Accuracy with SSL features: {accuracy_ssl:.2f}")
print(f"Accuracy with raw pixels: {accuracy_raw:.2f}")

# Вывод о том, какая из моделей показала лучшую точность
if accuracy_ssl > accuracy_raw:
    print("\nSSL features improved the classification accuracy.")
else:
    print("\nRaw pixels provided better classification results.")