<a href="https://colab.research.google.com/github/mhtabkrklt/ML_Tasks/blob/main/product_quantization.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

ДЗ: Реализовать product quantization самостоятельно и посчитать ошибку представления на 2 датасетах:
- сгенерированом так, чтобы в каждом под-пространстве вектора хорошо кластеризовались (центр + белый шум)
- эмбеддингах любого датасета от любой нейронки

In [1]:
import numpy as np
import tqdm
from sklearn.cluster import KMeans
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import torch
import torchvision
import torchvision.transforms as transforms

np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x781eeb717110>

Реализация Product Quantizer

In [2]:
class ProductQuantizer:
    def __init__(self, m: int, nbits: int = 8):
        """
        m: количество подпространств (сегментов)
        nbits: количество бит на сегмент (обычно 8 -> 256 кластеров)
        """
        self.m = m
        self.k = 2 ** nbits
        self.codebooks = None
        self.d_s = None # Размерность одного сегмента

    def fit(self, x: np.ndarray):
        n_samples, d = x.shape
        assert d % self.m == 0, f"Размерность вектора {d} должна делиться на m={self.m}"
        self.d_s = d // self.m

        # codebooks: (m, k, d_s) - m книг, в каждой k слов длины d_s
        self.codebooks = np.empty((self.m, self.k, self.d_s), dtype=np.float32)

        print(f"Обучение PQ: Вектор {d} dim -> {self.m} сегментов по {self.d_s} dim.")

        # Для каждого подпространства обучаем свой K-Means
        for i in tqdm.tqdm(range(self.m), desc="Обучение кодовых книг"):
            # Вырезаем i-й сегмент у всех векторов
            x_sub = x[:, i * self.d_s : (i + 1) * self.d_s]

            # Кластеризуем
            kmeans = KMeans(n_clusters=self.k, n_init=5, max_iter=50, random_state=42)
            kmeans.fit(x_sub)

            # Сохраняем центроиды
            self.codebooks[i] = kmeans.cluster_centers_

    def encode(self, x: np.ndarray) -> np.ndarray:
        n_samples, d = x.shape
        codes = np.empty((n_samples, self.m), dtype=np.uint8)

        for i in range(self.m):
            x_sub = x[:, i * self.d_s : (i + 1) * self.d_s]
            codebook = self.codebooks[i]

            # Считаем расстояния: (N, 1, d_s) - (1, K, d_s) -> (N, K)
            # Используем broadcasting для скорости
            dists = np.linalg.norm(x_sub[:, None, :] - codebook[None, :, :], axis=2)

            # Индекс ближайшего центроида
            codes[:, i] = np.argmin(dists, axis=1)

        return codes

    def decode(self, codes: np.ndarray) -> np.ndarray:
        n_samples, m = codes.shape
        d = self.m * self.d_s
        x_reconstructed = np.empty((n_samples, d), dtype=np.float32)

        for i in range(self.m):
            # Извлекаем векторы из книги по индексам
            x_reconstructed[:, i * self.d_s : (i + 1) * self.d_s] = self.codebooks[i][codes[:, i]]

        return x_reconstructed

**Часть 1**: Эксперимент на синтетических данных (Валидация)
На этом этапе мы проводим Sanity Check (проверку корректности) нашего алгоритма. Мы генерируем искусственный набор данных, который по своему строению идеально ложится на гипотезу Product Quantization.

Суть эксперимента: Данные создаются так, чтобы они уже имели скрытую структуру центроидов.
Мы задаем размерность вектора D, количество подпространств M и количество кластеров K.
- В каждом из M подпространств генерируются K случайных центров (идеальных значений).
- Каждый обучающий вектор собирается как конструктор: мы берем по одному случайному центру из каждого подпространства и склеиваем их.
- К полученному "идеальному" вектору добавляется Гауссовский шум.

Ожидаемый результат: Если алгоритм PQ реализован верно, он должен "проигнорировать" добавленный шум и найти исходные центроиды. Ошибка восстановления (MSE) должна быть очень близка к дисперсии добавленного шума.

Синтетические данные

In [3]:
def generate_clustered_data(n_samples, d, m, n_clusters_per_subspace, noise_level=0.1):
    d_s = d // m

    # 1. Генерируем центры кластеров для каждого подпространства
    true_centroids = np.random.randn(m, n_clusters_per_subspace, d_s).astype(np.float32)

    data = np.empty((n_samples, d), dtype=np.float32)

    perfect_data = np.empty((n_samples, d), dtype=np.float32)

    for i in range(m):
        cluster_indices = np.random.randint(0, n_clusters_per_subspace, size=n_samples)

        sub_data = true_centroids[i, cluster_indices, :]
        perfect_data[:, i*d_s : (i+1)*d_s] = sub_data

        noise = np.random.randn(n_samples, d_s) * noise_level
        data[:, i*d_s : (i+1)*d_s] = sub_data + noise

    return data, perfect_data

print("\nЭКСПЕРИМЕНТ 1: Синтетические данные")
D_synth = 128
M_synth = 8
X_synth, X_perfect = generate_clustered_data(n_samples=5000, d=D_synth, m=M_synth, n_clusters_per_subspace=256)

pq_synth = ProductQuantizer(m=M_synth, nbits=8)
pq_synth.fit(X_synth)

codes_synth = pq_synth.encode(X_synth)
X_rec_synth = pq_synth.decode(codes_synth)

mse_synth = mean_squared_error(X_synth, X_rec_synth)
mse_noise = mean_squared_error(X_synth, X_perfect)

print(f"MSE восстановления: {mse_synth:.6f}")
print(f"MSE (уровень шума): {mse_noise:.6f}")
print("Вывод: PQ должен найти центроиды, поэтому ошибка восстановления близка к уровню шума.")


--- ЭКСПЕРИМЕНТ 1: Синтетические данные ---
Обучение PQ: Вектор 128 dim -> 8 сегментов по 16 dim.


Обучение кодовых книг: 100%|██████████| 8/8 [00:11<00:00,  1.44s/it]


MSE восстановления: 0.009872
MSE (уровень шума): 0.010008
Вывод: PQ должен найти центроиды, поэтому ошибка восстановления близка к уровню шума.


**Часть 2**: Реальные данные (Эмбеддинги ResNet-18)
В этом эксперименте мы тестируем алгоритм на задаче сжатия изображений CIFAR-10. Вместо сырых пикселей используются семантические векторы (эмбеддинги) размерностью 512, полученные с выхода предобученной нейросети ResNet-18.

Параметры эксперимента:

Входные данные: Векторы 512 float32 (2048 байт).

Настройки PQ: Разбиение вектора на 16 подпространств (M=16).

Сжатие: Каждый сегмент кодируется 1 байтом. Итоговый размер кода — 16 байт.

Цель: Достичь коэффициента сжатия 128x и оценить ошибку восстановления (MSE).

In [4]:
print("\nЭКСПЕРИМЕНТ 2: Эмбеддинги (ResNet18 + CIFAR-10)")
def get_resnet_embeddings(n_images=2000):
    model = torchvision.models.resnet18(weights='DEFAULT')

    model.fc = torch.nn.Identity()
    model.eval()

    # Препроцессинг для ResNet
    preprocess = transforms.Compose([
        transforms.Resize(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])

    dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=preprocess)

    loader = torch.utils.data.DataLoader(dataset, batch_size=50, shuffle=True)

    embeddings_list = []
    print("Генерация эмбеддингов с помощью нейросети...")

    with torch.no_grad():
        for i, (images, labels) in enumerate(loader):
            if i * 50 >= n_images: break

            emb = model(images) # Shape: (batch, 512)
            embeddings_list.append(emb.numpy())

    return np.vstack(embeddings_list)

X_emb = get_resnet_embeddings(n_images=2000)
print(f"Получены эмбеддинги формы: {X_emb.shape}")

# Настраиваем PQ
M_emb = 16
D_emb = X_emb.shape[1]

if D_emb % M_emb != 0:
    print("Warning: Размерность не делится на M. Обрезаем.")
    X_emb = X_emb[:, :(D_emb // M_emb) * M_emb]

pq_emb = ProductQuantizer(m=M_emb, nbits=8)
pq_emb.fit(X_emb)

# Считаем ошибку
codes_emb = pq_emb.encode(X_emb)
X_rec_emb = pq_emb.decode(codes_emb)

mse_emb = mean_squared_error(X_emb, X_rec_emb)

# Считаем относительную ошибку
relative_error = np.linalg.norm(X_emb - X_rec_emb) / np.linalg.norm(X_emb)

print(f"MSE на эмбеддингах: {mse_emb:.6f}")
print(f"Относительная ошибка восстановления: {relative_error:.4f}")

original_size_kb = X_emb.nbytes / 1024
compressed_size_kb = codes_emb.nbytes / 1024
print(f"Сжатие: {original_size_kb:.1f} KB -> {compressed_size_kb:.1f} KB (в {original_size_kb/compressed_size_kb:.1f} раз)")


--- ЭКСПЕРИМЕНТ 2: Эмбеддинги (ResNet18 + CIFAR-10) ---
Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth


100%|██████████| 44.7M/44.7M [00:00<00:00, 77.6MB/s]
100%|██████████| 170M/170M [00:04<00:00, 34.4MB/s]


Генерация эмбеддингов с помощью нейросети...
Получены эмбеддинги формы: (2000, 512)
Обучение PQ: Вектор 512 dim -> 16 сегментов по 32 dim.


Обучение кодовых книг: 100%|██████████| 16/16 [00:13<00:00,  1.18it/s]


MSE на эмбеддингах: 0.190406
Относительная ошибка восстановления: 0.3525
Сжатие: 4000.0 KB -> 31.2 KB (в 128.0 раз)
