needed imports, parameters and reproducibility

In [None]:
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
import random

DATA_DIR = "data/artbench-10"
IMAGE_SIZE = 64
BATCH_SIZE = 512
SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
g = torch.Generator()
g.manual_seed(SEED)

pytorch transforms

In [None]:
resize_tf = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
])

dataloader returns a imgs, labs tuple.

imgs.shape = [B, C, H, W]

labs.shape = [B]

In [None]:
train_ds = datasets.ImageFolder(root=f"{DATA_DIR}/train", transform=resize_tf)
test_ds  = datasets.ImageFolder(root=f"{DATA_DIR}/test",  transform=resize_tf)

train_loader = DataLoader(
    train_ds,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=8,
    pin_memory=False,
    persistent_workers=True,
    prefetch_factor=4,
    generator=g,
)


test_loader = DataLoader(
    test_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=8,
    pin_memory=False,
    persistent_workers=True,
    prefetch_factor=4,
)

class_names = train_ds.classes
print(f"Classes: {class_names}")
print(f"Number of classes: {len(class_names)}")
print(f"Train images: {len(train_ds)}")
print(f"Test images:  {len(test_ds)}")
print(f"Image shape: {train_ds[0][0].shape}")

see a subset of resized images

In [None]:
def show_batch(imgs, labels, class_names, n=6):
    imgs = imgs[:n]
    labels = labels[:n]
    plt.figure(figsize=(12, 6))
    for i, (img, lab) in enumerate(zip(imgs, labels)):
        npimg = img.numpy().transpose(1, 2, 0)  # CHW â†’ HWC
        plt.subplot(2, 3, i + 1)
        plt.imshow(np.clip(npimg, 0, 1))
        plt.title(class_names[lab])
        plt.axis("off")
    plt.tight_layout()
    plt.show()

imgs, labs = next(iter(train_loader))
show_batch(imgs, labs, class_names, n=6)

la K optima seria min(N, D) pero eso es demasiado asi que se pone manual
aqui calculamos PCA incremental sobre el train set

In [None]:
from sklearn.decomposition import IncrementalPCA
from tqdm.auto import tqdm
import joblib
import os

KMAX = 512
RESULTS_DIR = "results"
os.makedirs("results", exist_ok=True)
MODEL_PATH = os.path.join(RESULTS_DIR, f"ipca_artbench.pkl")

if os.path.exists(MODEL_PATH):
    print(f"Loading existing IncrementalPCA from {MODEL_PATH}")
    ipca = joblib.load(MODEL_PATH)

else:
    print("Fitting IncrementalPCA from scratch...")
    ipca = IncrementalPCA(n_components=KMAX)

    pbar = tqdm(total=len(train_loader), desc="Fitting IncrementalPCA")
    for xb, _ in train_loader:  # xb: [B,C,H,W]
        Xb = xb.flatten(1).cpu().numpy().astype(np.float32)  # -> [B,D]
        ipca.partial_fit(Xb)
        pbar.update(1)
        pbar.set_postfix(seen=getattr(ipca, "n_samples_seen_", "?"))
    pbar.close()

    joblib.dump(ipca, MODEL_PATH)
    print(f"Saved IncrementalPCA to {MODEL_PATH}")

divide the full dataset into train and test splits

In [None]:
ipca = joblib.load(MODEL_PATH)

print(type(ipca))
print(ipca.n_components_)
print(ipca.components_.shape)

In [None]:
import os
import numpy as np
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

def mse_curve(loader, ipca, Ks):
    n_samples = 0
    total_sse = {K: 0.0 for K in Ks}

    pbar = tqdm(total=len(loader), leave=True)

    for xb, _ in loader:
        Xb = xb.flatten(1).cpu().numpy().astype(np.float32)  # [B, D]
        Z = ipca.transform(Xb)                               # [B, n_components]

        for K in Ks:
            Zt = Z.copy()
            Zt[:, K:] = 0.0
            Xh = ipca.inverse_transform(Zt)
            diff = Xb - Xh
            sse = (diff**2).sum(axis=1)
            total_sse[K] += sse.sum()

        n_samples += Xb.shape[0]
        pbar.update(1)

    pbar.close()
    return {K: total_sse[K] / n_samples for K in Ks}

In [None]:
Ks = sorted(set([10, 20, 50, 100, 150, 200, 250] + list(range(500, KMAX+1, 500)) + [KMAX]))

train_path = "results/pca_curve_train.npy"
test_path  = "results/pca_curve_test.npy"

if os.path.exists(train_path) and os.path.exists(test_path):
    print("Loading curves from disk...")
    L_train = np.load(train_path, allow_pickle=True).item()
    L_test  = np.load(test_path, allow_pickle=True).item()
else:
    print("Computing curves from scratch...")
    L_train = mse_curve(train_loader, ipca, Ks)
    L_test  = mse_curve(test_loader,  ipca, Ks)
    np.save(train_path, L_train)
    np.save(test_path,  L_test)
    print(f"Saved curves to {train_path} and {test_path}")

In [None]:
plt.figure(figsize=(6.5, 3.2))
plt.grid(True)
plt.plot(list(L_train.keys()), list(L_train.values()), '-x', label='Train')
plt.plot(list(L_test.keys()), list(L_test.values()),  '-o', label='Test')
plt.xlabel('K (principal components)')
plt.ylabel('Reconstruction error (SSE per image)')
plt.legend(); plt.tight_layout(); plt.show()