 # Реализация визуального трансформера

 Напишем свой визуальный трансформер для бинарной классификации картинок с кошками и собаками. Попробуем реализовать модель, которая принимает на вход картинку с произвольными размерами.

In [17]:
from google.colab import drive
drive.mount('/content/drive')
import torch
from torch import nn
import torch.nn.functional as F

import math

from PIL import Image
from torchvision.transforms import v2

from torch.utils.data import Dataset, DataLoader
import os

from tqdm import tqdm

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Скачиваем [датасет](https://www.kaggle.com/datasets/tongpython/cat-and-dog)

In [18]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("tongpython/cat-and-dog")

## 1. Построение модели

Изображение сначала нужно разбить на токены. Каждый токен - это часть изображения, представленная в виде ембеддинга.

Напишем Токенизатор для разбиения изображения на токены. В нем мы используем сверточный слой, размер ядра которого равен размеру патча (фрагмента) картинки, а количество выходных каналов - размеру эмбединга. Предполагается, что размеры картинки нацело делятся на размер патча. В ембедингах не будет информации о позиции патча.

In [19]:
class Tokenizer(nn.Module):
	"""
  Класс для разбиения картинки на токены
	"""

	def __init__(self, embed_size: int, patch_size: int):
		"""
		embed_size - размер эмбеддинга
		patch_size - размер патча в пикселях
		"""
		super().__init__()

		self.embed_size = embed_size
		self.patch_size = patch_size

		self.conv = nn.Conv2d(
			in_channels=3,
			out_channels=self.embed_size,
			kernel_size=self.patch_size,
			stride=self.patch_size
		)


	def forward(self, x):
		"""
		Разбивает картинку на токены

		x - тензор картинок (тоже в виде тензора) размерностью (n_samples, n_channels, h_pixels, w_pixels)

		возвращает тензор размера (n_samples, h_patches, w_patches, embedding_size)
		"""

		x = self.conv(x).transpose(1,3).transpose(2,1)

		return x

Далее нужно добавить к эмбеддингам информацию о положении патча на картинке. Для этого возьмем `Positional Encoding Generator` из статьи [Conditional Positional Encodings for Vision Transformers](https://arxiv.org/pdf/2102.10882v2). Этот способ кодировки позволяет модели обрабатывать картинки произвольного размера. В нем кодировка хранит информацию о ближайших соседях токена.

In [20]:
class PositionalEncodingGenerator(nn.Module):
	"""
	Двумерный positional encoding из статьи Conditional Positional Encodings for Vision Transformers
	"""

	def __init__(self, embed_size: int, kernel_size: int):
		"""
		embed_size - размер эмбединга токена
		kernel_size - размер куба, в котором рассматриваются ближайшие соседи, должен быть нечетным
		"""
		super().__init__()

		if kernel_size % 2 == 0:
			raise ValueError("kernel_size должен быть нечетным!")

		self.conv = nn.Conv2d(
			in_channels=embed_size,
			out_channels=embed_size,
			kernel_size=kernel_size,
			stride=1,
			padding = (kernel_size - 1) // 2
		)


	def forward(self, x):
		"""
		Добавляет к эмбеддингам патчей информацию об их положении

		x - тензор эмбеддингов патчей размерностью (n_samples, h_patches, w_patches, embedding_size)

		возвращает новый тензор с эмбеддингами того же размера
		"""

		# считаем позиционные ембеддинги
		pos_x = x.transpose(2,1).transpose(1,3)
		pos_x = self.conv(pos_x).transpose(1,3).transpose(2,1)

		# складываем позиционные ембеддинги с эмбеддингами токенов
		return x + pos_x

Мы преобразовали картинку в токены и теперь можем приступать к написанию основной части трансформара. Сначала реализуем механизм `multi-head self attention`, в котором эмбеддинги насыщаются нформацией о связях с другими эмбеддингами.

In [21]:
class MultiHeadAttention(nn.Module):
	"""
	Класс реализации  multi-head self attention
	"""

	def __init__(self, emb_size: int, heads: int):
		"""
		emb_size - размер эмбеддингов
		heads - количество голов (видов связей между токенами)
		"""
		super().__init__()
		self.k, self.heads = emb_size, heads

		# преобразования каждого эмбеддинга в heads векторов Q, K и V
		self.tokeys    = nn.Linear(emb_size, emb_size * heads, bias=False)  # Wk
		self.toqueries = nn.Linear(emb_size, emb_size * heads, bias=False)  # Wq
		self.tovalues  = nn.Linear(emb_size, emb_size * heads, bias=False)  # Wv

		# преобразование heads эмбеддингов обратно в 1 эмбеддинг
		self.lin = nn.Linear(heads * emb_size, emb_size)


	def forward(self, x):
		"""
		Обработать последовательность

		x - тензор с эмбеддингами токенов вида (n_samples, embedding_size)

		возвращает обработанный тензор того же вида
		"""

		b, t, k = x.size()
		h = self.heads

		# считаем матрицы Q, K и V
		queries = self.toqueries(x).view(b, t, h, k)
		keys    = self.tokeys(x).view(b, t, h, k)
		values  = self.tovalues(x).view(b, t, h, k)

		queries = queries.transpose(1, 2).contiguous().view(b * h, t, k)
		keys    = keys.transpose(1, 2).contiguous().view(b * h, t, k)
		values  = values.transpose(1, 2).contiguous().view(b * h, t, k)

		# считаем веса каждого токена для друг друга
		dot = torch.bmm(queries, keys.transpose(1, 2)) / (k ** (1/4))
		dot = F.softmax(dot, dim=2)

		# считаем взвешанную сумму
		out = torch.bmm(dot, values).view(b, h, t, k)
		out = out.transpose(1, 2).contiguous().view(b, t, h * k)

		# преобразуем обратно в исходную размерность
		out = self.lin(out)
		return out


Имея на руках реализацию механизма внимания можно написать блок трансформера. Он аналогичен енкодеру из статьи `attention is all you need`

In [22]:
class TransformerBlock(nn.Module):
	"""
	Блок трансформера
	"""

	def __init__(self, heads: int, embed_size: int, hiden_size: int):
		"""
		heads - количество голов (видов связей) для self attention
		embed_size - размер эмбеддингов токенов
		hiden_size - размер скрытого слоя полносвязной части блока
		"""
		super().__init__()

		self.hiden_size = hiden_size

		self.norm1 = nn.LayerNorm(embed_size)   # нормализация после self attention
		self.attention = MultiHeadAttention(
			embed_size,
			heads,
		)
		self.norm2 = nn.LayerNorm(embed_size)   # нормализация после полносвязной части

		self.mlp = nn.Sequential(
			nn.Linear(embed_size, hiden_size),
			nn.GELU(),
			nn.LayerNorm(hiden_size),
			nn.Linear(hiden_size, embed_size),
		)


	def forward(self, x):
		"""
		Обработка последовательности

		x - тензор с эмбеддингами токенов вида (n_samples, embedding_size)

		возвращает обработанный тензор того же вида
		"""

		x = self.attention(x) + x
		x = self.norm1(x)

		x1 = self.mlp(x)
		x = x + x1
		x = self.norm2(x)

		return x


У нас есть все части трансформера, напишем класс самой модели. В нем мы, вместо использования специального токена для классификации, полсе прохождения блоков трансформера делаем `Global Avarage Polling` всех токенов, как предложено в `Conditional Positional Encodings for Vision Transformers`, и затем по получившемуся среднему эмбеддингу делаем предсказание.

In [24]:
class VisionTransformer(nn.Module):
	"""
	Визуальный трансформер
	"""

	def __init__(self, blocks: int, heads: int, embed_size: int, hiden_size: int, patch_size: int, kernel_size: int):
		"""
		blocks - количество блоков рансформара
		embed_size - размер эмбеддинга токенов изображения
		hiden_size - размер скрытого слоя полносвязной части в блоке транформера
		patch_size - размер патча в пикселях
		kernel_size - размер куба, в котором рассматриваются ближайшие соседи, должен быть нечетным
		"""
		super().__init__()

		self.tokenizer = Tokenizer(embed_size, patch_size)
		self.pos_encoding = PositionalEncodingGenerator(embed_size, kernel_size)

		self.first_block = TransformerBlock(
								heads,
								embed_size,
								hiden_size
							)

		self.blocks = [
			TransformerBlock(
				heads,
				embed_size,
				hiden_size
			)
			for _ in range(blocks - 1)
		]

		# нормализация и линейный слой для предсказания класса
		self.norm = nn.LayerNorm(embed_size)
		self.head = nn.Linear(embed_size, 1)

		self.sigmoid = nn.Sigmoid()
		self.loss = nn.BCELoss()


	# переопределяем метод to, так как стандартный не переводит блоки и скрытые ембеддинги на device
	def to(self, device):
		for block in self.blocks:
			block.to(device)

		return super().to(device)

	def forward(self, x):
		"""
		прямой проход модели

		x - тензор картинок (тоже в виде тензора) размерностью (n_samples, n_channels, h_pixels, w_pixels)

		возвращает тензор с предсказаниями
		"""

		# превращаем картинку в последовательность токенов
		x = self.tokenizer(x)

		# раскладываем токены в ряд и пропускаем через первый блок
		batch_size, h, w, embed_size = x.shape
		x = x.reshape(batch_size, h * w, embed_size)
		x = self.first_block(x)

		# складываем токены обратно в матрицу и добавляем positional encoding
		x = x.reshape(batch_size, h, w, embed_size)
		x = self.pos_encoding(x)

		# раскладываем токены в ряд
		x = x.reshape(batch_size, h * w, embed_size)

		# проходим блоки
		for block in self.blocks:
			x = block(x)

		# делаем GAP и считаем предсказание
		x = self.head(x.mean(dim=1))

		return self.sigmoid(x)


 ## 2. Подготовка датасета

 Выше было сказано, что класс `Tokenizer` предполагает, что ему на вход подается картинка, размеры которой кратны размеру патча, то есть всю картинку можно разбить на непересекающиеся патчи без потери информации. Конечно, на деле далеко не всегда размер картинки будет такой, так что перед токенизацией картинки мы увеличим ее размер, добавив черные рамки справа и снизу, чтобы ее размеры удовлетворяли условию выше. Напишем функцию для изменения размера изображения.

In [26]:
def pad_img(image: torch.Tensor, patch_size: int) -> torch.Tensor:
    """
    Функция добавляет черные рамки справа и снизу, чтобы картинку можно было разбить на патчи

    аргументы:
      image - картинка в виде тензора размером (n_channels, h_pixels, w_pixels)
      patch_size - размер патча в пикселях

    возвращает картинку в виде тензора размером (n_channels, h_pixels, w_pixels)
    """

    channels, height, width = image.shape

    # Вычисляем количество патчей по каждой координате
    num_patches_h = math.ceil(height / patch_size)
    num_patches_w = math.ceil(width / patch_size)

    # Вычисляем необходимый паддинг
    padded_height = num_patches_h * patch_size
    padded_width = num_patches_w * patch_size

    # Создаем новый тензор с паддингом
    padded_image = torch.zeros(channels, padded_height, padded_width, device=image.device)
    padded_image[:, :height, :width] = image

    return padded_image


Определим класс датасета и заложим в него необходимые преобразования картинки

In [27]:
class CatsDogsDataSet(Dataset):
	"""
	Класс датасета картинок кошек и собак
	"""

	def __init__(self, path: str, patch_size = 32):
		"""
		path - путь к датасету
		patch_size - размер патча
		"""

		self.patch_size = patch_size
		self.path = path

		self.data_list = []     # список пар (путь к картинке, класс картинки)

		# перебираем папки и файлы по указанному пути
		for folder in os.walk(path):

			for file in folder[2]:

				# файл _DS_Store пропускаем
				if file == "_DS_Store":
					continue

				# добавляем файл и его класс в список
				self.data_list.append((folder[0] + "/" + file, torch.tensor([1.0]) if folder[0][-2] == "t" else torch.tensor([0.0])))

		self.to_tensor = v2.Compose(
			[
				v2.ToTensor(),
				v2.ToDtype(torch.float32, scale=True),
			]
		)
		self.norm = v2.Normalize(mean=(0.5,), std=(0.5,))


	def __len__(self):
		return len(self.data_list)


	def __getitem__(self, index: int):
		'''
		index - индекс семпла датасета
		'''

		# открываем картинку
		file_path, target = self.data_list[index]
		sample = Image.open(file_path)

		# преобразуем в тензор, добавляем рамки и нормализуем
		sample = self.to_tensor(sample)
		sample = pad_img(sample, self.patch_size)
		sample = self.norm(sample)

		return sample, target

Создаем объекты класса датасета для тренеровочной и валидационной выборок

In [28]:
train_data = CatsDogsDataSet(path + "/training_set/training_set", 32)
test_data = CatsDogsDataSet(path  + "/test_set/test_set", 32)



Создаем лоадеры для загрузки картинок по батчам. Размер батча равен единице, так как у картинок разный размер

In [29]:
train_loader = DataLoader(train_data, batch_size=1, shuffle=True)
test_loader = DataLoader(test_data, batch_size=1, shuffle=False)

## 3. Обучение модели

In [30]:
device ="cuda" if torch.cuda.is_available() else "gpu"
device

'cuda'

Создаем модель

In [31]:
model = VisionTransformer(
    blocks=8,
    heads=16,
    embed_size=768,
    hiden_size=1024,
    patch_size=32,
    kernel_size=5
)

model = model.to(device)

Обучаем модель

In [34]:
# Оптимизатор Adam
opt = torch.optim.Adam(model.parameters(), lr=0.0005)

epochs = 5        # количество эпох

train_acc = []    # массив accuracy на тренеровочной выборке по эпохам
val_acc = []      # массив accuracy на валидационной выборке по эпохам

train_loss = []   # массив лоссов на тренеровочной выборке по эпохам
val_loss = []     # массив лоссов на валидационной выборке по эпохам

best_acc = 0      # лучший accuracy на валидационной выборке

for epoch in range(epochs):

	model.train()

	train_loop = tqdm(train_loader, leave=False)

	cur_acc = 0
	train_loss_list = []

	for sample, target in train_loop:

		sample = sample.to(device)
		target = target.to(device)

		pred = model(sample)
		loss = model.loss(pred, target)
		train_loss_list.append(loss.item())

		opt.zero_grad()
		loss.backward()
		opt.step()

		cur_acc += int(int(pred.item() >= 0.5) == target.item())
		mean_train_loss = sum(train_loss_list) / len(train_loss_list)

		train_loop.set_description(f'Epoch {epoch + 1}/{epochs}, train_loss {mean_train_loss:.4f}')

	train_acc.append(cur_acc / (len(train_loader) * train_loader.batch_size))
	train_loss.append(mean_train_loss)

	model.eval()

	with torch.no_grad():

		cur_acc = 0
		val_loss_list = []

		for sample, target in test_loader:

			sample = sample.to(device)
			target = target.to(device)

			pred = model(sample)
			loss = model.loss(pred, target)
			val_loss_list.append(loss.item())

			cur_acc += int(int(pred.item() >= 0.5) == target.item())
			mean_val_loss = sum(val_loss_list) / len(val_loss_list)

	val_loss.append(mean_val_loss)
	val_acc.append(cur_acc / (len(test_loader) * test_loader.batch_size))

	if val_acc[-1] > best_acc:
		filename = f"model_acc_{val_acc[-1]:.3f}".replace(".", "") + ".pt"
		torch.save(model.state_dict(), filename)
		drive_path = f"/content/drive/MyDrive/{filename}"
		torch.save(model.state_dict(), drive_path)

		best_acc = val_acc[-1]

	print(f'Epoch {epoch + 1}/{epochs}, train loss {train_loss[-1]:.4f}, val loss {val_loss[-1]:.4f}, train accuracy {train_acc[-1]:.4f}, val accuracy {val_acc[-1]:.4f}')



Epoch 1/5, train loss 0.7125, val loss 0.6950, train accuracy 0.5283, val accuracy 0.5329




Epoch 2/5, train loss 0.7094, val loss 0.7108, train accuracy 0.5332, val accuracy 0.5245




Epoch 3/5, train loss 0.7099, val loss 0.6915, train accuracy 0.5339, val accuracy 0.5487




Epoch 4/5, train loss 0.7122, val loss 0.6983, train accuracy 0.5328, val accuracy 0.5437




Epoch 5/5, train loss 0.7089, val loss 0.6967, train accuracy 0.5408, val accuracy 0.5630


Предсказание класса по изображению

In [39]:
from PIL import Image
import torch
import torchvision.transforms.v2 as v2

image_path = "/content/drive/MyDrive/images/dog.png"


patch_size = 32


to_tensor = v2.Compose([
    v2.ToTensor(),
    v2.ToDtype(torch.float32, scale=True),
])
norm = v2.Normalize(mean=(0.5,), std=(0.5,))

img = Image.open(image_path).convert("RGB")
img_tensor = to_tensor(img)
img_tensor = pad_img(img_tensor, patch_size)
img_tensor = norm(img_tensor)

# Добавляем batch размер и отправляем на устройство
img_tensor = img_tensor.unsqueeze(0).to(device)  # (1, C, H, W)

# Переводим модель в режим оценки
model.eval()

# Предсказание
with torch.no_grad():
    pred = model(img_tensor)
    predicted_class = int(pred.item() >= 0.5)

# Вывод результата
print(f"Предсказанный класс: {predicted_class} ({'кошка' if predicted_class == 1 else 'собака'})")

Предсказанный класс: 0 (собака)
