In [1]:
import os
from torchvision import transforms, datasets
from torch.utils.data import DataLoader, TensorDataset, ConcatDataset, random_split
import torch
from sklearn.decomposition import PCA, IncrementalPCA
from sklearn.preprocessing import StandardScaler
from PIL import Image
from torch.utils.data import Dataset
import numpy as np
from tqdm import tqdm
import csv

Creates the custom image dataset to only get the class ID from the label file

In [2]:
class ImageDataset(Dataset):
    def __init__(self, csv_path: str, transform=transforms):
        self.transform = transform
        self.data = []

        with open(csv_path, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                # Each row is a dict:
                # {
                #   'Width': str_value,
                #   'Height': str_value,
                #   'Roi.X1': str_value,
                #   'Roi.Y1': str_value,
                #   'Roi.X2': str_value,
                #   'Roi.Y2': str_value,
                #   'ClassId': str_value,
                #   'Path': str_value
                # }
                self.data.append(row)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx: int):
        row = self.data[idx]

        # Extract the image path and label
        img_path = os.path.join(
            "/work/flemingc/nvan21/projects/COMS_573_Project/Data", row["Path"]
        )
        class_id = int(row["ClassId"])  # convert label to int if necessary

        # Load the image
        image = Image.open(img_path).convert("RGB")

        # Apply any transforms
        if self.transform:
            image = self.transform(image)

        # Return image and its class label
        return image, class_id


In [3]:
transform = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ]
)
data_paths = ["./Data/Train", "./Data/train_augment"]
batch_size = 512

# Make ImageFolder datasets for each augmented image path
image_datasets = []
for path in data_paths:
    image_datasets.append(datasets.ImageFolder(path, transform=transform))

# Make a combined training dataset and then split into training and validation
combined_dataset = ConcatDataset(image_datasets)
train_size = int(0.8 * len(combined_dataset))
validate_size = len(combined_dataset) - train_size
train_dataset, validate_dataset = random_split(
    combined_dataset, [train_size, validate_size]
)
test_dataset = ImageDataset(csv_path="./Data/Test.csv", transform=transform)

# Create dataloaders
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
)
validate_loader = DataLoader(
    dataset=validate_dataset,
    batch_size=batch_size,
    shuffle=False,
)
test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    shuffle=False,
)

In [4]:
def transform_to_pca(
        loader: DataLoader, scaler: StandardScaler, ipca: IncrementalPCA
    ):
        # Collect all images from the training set for PCA
        transformed_list = []
        label_list = []
        for images, labels in tqdm(loader, "Fitting PCA for dataset"):
            batch_np = images.view(images.size(0), -1).numpy()
            batch_scaled = scaler.transform(batch_np)
            batch_pca = ipca.transform(batch_scaled)
            transformed_list.append(batch_pca)
            label_list.append(labels.numpy())

        # Concatenate all batches
        X_transformed = np.concatenate(transformed_list, axis=0)
        y = np.concatenate(label_list, axis=0)

        # Convert to torch tensors
        X_transformed_tensor = torch.from_numpy(X_transformed).float()
        y_tensor = torch.from_numpy(y).long()

        return X_transformed_tensor, y_tensor, X_transformed, y

In [5]:
n_components = 150

# 1. Incrementally fit the scaler on the entire training set. This scaler will be used to standardize the validation and testing datasets
scaler = StandardScaler()
for images, _ in tqdm(train_loader, desc="Getting PCA scaler"):
    batch_np = images.view(images.size(0), -1).numpy()
    scaler.partial_fit(batch_np)

# 2. Incrementally fit PCA to avoid memory issues
ipca = IncrementalPCA(n_components=n_components)
for images, _ in tqdm(train_loader, desc="Fitting PCA"):
    batch_np = images.view(images.size(0), -1).numpy()
    batch_scaled = scaler.transform(batch_np)
    ipca.partial_fit(batch_scaled)

Getting PCA scaler: 100%|██████████| 123/123 [02:02<00:00,  1.00it/s]
Fitting PCA: 100%|██████████| 123/123 [16:25<00:00,  8.01s/it]


In [6]:
# 3. Convert each dataset to the PCA version
train_pca, train_labels, train_pca_np, train_labels_np = transform_to_pca(
    loader=train_loader, scaler=scaler, ipca=ipca
)
validate_pca, validate_labels, validate_pca_np, validate_labels_np = transform_to_pca(
    loader=validate_loader, scaler=scaler, ipca=ipca
)
test_pca, test_labels, test_pca_np, test_labels_np = transform_to_pca(
    loader=test_loader, scaler=scaler, ipca=ipca
)

Fitting PCA for dataset: 100%|██████████| 123/123 [02:11<00:00,  1.07s/it]
Fitting PCA for dataset: 100%|██████████| 31/31 [00:32<00:00,  1.05s/it]
Fitting PCA for dataset: 100%|██████████| 25/25 [00:29<00:00,  1.19s/it]


In [9]:
path = os.path.join("./Data/pca", f"{n_components}")
os.makedirs(path, exist_ok=True)

torch.save(train_pca, os.path.join(path, "X_train.pt"))
torch.save(train_labels, os.path.join(path, "y_train.pt"))
torch.save(validate_pca, os.path.join(path, "X_validate.pt"))
torch.save(validate_labels, os.path.join(path, "y_validate.pt"))
torch.save(test_pca, os.path.join(path, "X_test.pt"))
torch.save(test_labels, os.path.join(path, "y_test.pt"))