In [1]:
try:
    from google.colab import drive
    import zipfile
    drive.mount('/content/drive')
    in_colab = True
except ImportError:
    in_colab = False
print("In Colab:", in_colab)

In Colab: False


In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
from tqdm.notebook import tqdm
import datetime
import numpy as np

# Датасет из https://visualqa.org/download.html
class InitialImageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = [os.path.join(root_dir, fname) for fname in os.listdir(root_dir)
                            if fname.lower().endswith(('png', 'jpg', 'jpeg', 'bmp'))]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# Архитектура энкодера
class Encoder(nn.Module):
    def __init__(self, latent_dim=512):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.2),
        )

        # Рассчитываем размер после сверток
        self.flatten_size = 256 * 16 * 16  # После 3 сверток размер карты признаков (для 128x128)

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flatten_size, latent_dim),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.encoder(x)
        return self.fc(x)

# Архитектура декодера
class Decoder(nn.Module):
    def __init__(self, latent_dim=512):
        super(Decoder, self).__init__()
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 256 * 16 * 16),
            # nn.ReLU(),
            nn.Unflatten(1, (256, 16, 16)),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 3, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.decoder(x)

# Автоэнкодер
class Autoencoder(nn.Module):
    def __init__(self, latent_dim=512):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder(latent_dim)
        self.decoder = Decoder(latent_dim)

    def forward(self, x):
        latent = self.encoder(x)
        reconstructed = self.decoder(latent)
        return reconstructed, latent

# Функция тренировки с сохранением
def train_autoencoder(autoencoder, dataloader, start_epoch=0, epochs=10, lr=0.001, save_dir="saved_models"):
    optimizer = optim.Adam(autoencoder.parameters(), lr=lr)
    criterion = nn.MSELoss()
    os.makedirs(save_dir, exist_ok=True)

    for epoch in range(epochs):
        epoch_loss = 0
        with tqdm(dataloader, desc=f"Epoch {start_epoch + epoch + 1}/{start_epoch + epochs}", unit="batch") as tqdm_dataloader:
            for imgs in tqdm_dataloader:
                imgs = imgs.to(device)
                reconstructed, _ = autoencoder(imgs)
                loss = criterion(reconstructed, imgs)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()
                tqdm_dataloader.set_postfix(loss=loss.item())

        # Сохранение модели с указанием времени и номера эпохи
        timestamp = datetime.datetime.now().strftime(f"%Y-%m-%d_%H-%M-%S")
        model_filename = f"autoencoder_{timestamp}_epoch_{start_epoch + epoch + 1}.pt"
        model_path = os.path.join(save_dir, model_filename)
        torch.save(autoencoder.state_dict(), model_path)
        print(f"Model saved as {model_filename}")

        print(f"Epoch {start_epoch + epoch + 1} finished with Loss: {epoch_loss / len(dataloader):.6f}")


# Настройка датасета, устройства и модели
if in_colab:
    zip_path = '/content/drive/MyDrive/diploma/data/train2014_lite.zip'
    dataset_folder = "/content/data/train2014_lite"

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall("/content/data")
else:
    dataset_folder = "F:\\!Институт МУИВ\\4 курс\\Clagnosco\\data\\train2014"

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

dataset = InitialImageDataset(dataset_folder, transform=transform)
dataloader = DataLoader(dataset, batch_size=64, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
autoencoder = Autoencoder(latent_dim=512).to(device)


In [5]:
# Загрузка состояния модели
model_path = "saved_models/autoencoder_2024-12-24_06-58-14_epoch_32.pt"
if in_colab:
    model_path = "/content/drive/MyDrive/diploma/" + model_path
    autoencoder.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
else:
    autoencoder.load_state_dict(torch.load(model_path))


  autoencoder.load_state_dict(torch.load(model_path))


In [6]:
# Тренировка модели с сохранением
start_epoch = int(model_path.split("epoch_")[1].split(".pt")[0])
train_autoencoder(autoencoder, dataloader, start_epoch=start_epoch, epochs=1000, save_dir="saved_models")

Epoch 33/1032:   0%|          | 0/1294 [00:00<?, ?batch/s]

KeyboardInterrupt: 

In [8]:
# Извлечение латентных признаков для всех изображений
def extract_latent_vectors(autoencoder, dataloader):
    autoencoder.eval()
    latent_vectors = []
    with torch.no_grad():
        for imgs in tqdm(dataloader, desc="Extracting latent vectors"):
            imgs = imgs.to(device)
            latent = autoencoder.encoder(imgs)
            latent_vectors.append(latent.cpu().numpy())
    return np.vstack(latent_vectors)

# Извлечение латентных признаков
latent_vectors = extract_latent_vectors(autoencoder, dataloader)

Extracting latent vectors:   0%|          | 0/1294 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [9]:
import random
from matplotlib import pyplot as plt

# Проверка похожих изображений
def euclidean_distance(query_vector, latent_vectors):
    distances = np.linalg.norm(latent_vectors - query_vector, axis=1)
    return distances

def manhattan_distance(query_vector, latent_vectors):
    distances = np.sum(np.abs(latent_vectors - query_vector), axis=1)
    return distances

def find_similar_images(latent_vectors, query_vector, top_k=10, metric="euclidean"):
    if metric == "euclidean":
        distances = euclidean_distance(query_vector, latent_vectors)
    elif metric == "manhattan":
        distances = manhattan_distance(query_vector, latent_vectors)
    else:
        raise ValueError("Unsupported metric. Use 'euclidean' or 'manhattan'.")

    similar_indices = np.argsort(distances)[:top_k]
    return similar_indices, distances[similar_indices]

# Функция для отображения изображений
def display_images(image_paths, indices, similarities, query_image_path, max_per_row=5):
    num_similar = len(indices)
    num_rows = (num_similar + max_per_row - 1) // max_per_row
    fig, axes = plt.subplots(num_rows + 1, max_per_row, figsize=(max_per_row * 4, (num_rows + 1) * 4))

    for ax in axes.flat:
        ax.axis("off")

    query_image = Image.open(query_image_path).convert("RGB")
    axes[0, 0].imshow(query_image)
    axes[0, 0].set_title("Query Image")
    axes[0, 0].axis("off")

    for i, idx in enumerate(indices):
        row = (i // max_per_row) + 1
        col = i % max_per_row
        similar_image = Image.open(image_paths[idx]).convert("RGB")
        axes[row, col].imshow(similar_image)
        axes[row, col].set_title(f"Similar {i + 1}, {similarities[i]}")
        axes[row, col].axis("off")

    plt.tight_layout()
    plt.show()

# Функция для выбора случайного изображения из папки
def get_random_image(query_folder_path):
    image_files = [f for f in os.listdir(query_folder_path) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp'))]
    if not image_files:
        raise ValueError("No valid image files found in the specified folder.")
    random_image = random.choice(image_files)
    return os.path.join(query_folder_path, random_image)

# Путь к папке с изображениями для запроса
if in_colab:
    query_folder_path = "/content/data/train2014_lite"
else:
    query_folder_path = "F:/!Институт МУИВ/4 курс/Clagnosco/data/train2014/"

# Выбор случайного изображения для запроса
query_image_path = get_random_image(query_folder_path)
query_image = Image.open(query_image_path).convert("RGB")
query_image_tensor = transform(query_image).unsqueeze(0).to(device)

# Получение латентного вектора для запроса
query_vector = autoencoder.encoder(query_image_tensor).detach().cpu().numpy()

# Поиск похожих изображений
similar_indices, similarities = find_similar_images(latent_vectors, query_vector, top_k=10, metric="euclidean")

# Отображение результатов
display_images(dataset.image_paths, similar_indices, similarities, query_image_path, max_per_row=5)


NameError: name 'latent_vectors' is not defined