In [1]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image
import os
from typing import Optional, Union
import torch.nn as nn

In [2]:
#===================training set -> preprocessing + augmentation====================
#===================test set -> preprocessing====================

# add Gaussian noise
class AddGaussianNoise(object):
    def __init__(self, mean=0.0, std=0.05, p=0.5):
        """
        mean: Gaussian noise mean
        std: Gaussian noise standard deviation
        p:   Probability of applying noise
        """
        self.mean = mean
        self.std = std
        self.p = p

    def __call__(self, tensor):
        if torch.rand(1).item() < self.p:  # Decide whether to add noise based on probability
            noise = torch.randn_like(tensor) * self.std + self.mean
            tensor = tensor + noise
        return tensor.clamp(0., 1.)  # Clamp to [0,1]

    def __repr__(self):
        return f"{self.__class__.__name__}(mean={self.mean}, std={self.std}, p={self.p})"


# preprocessing + augmentation for training set
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),                         # resize
    transforms.RandomHorizontalFlip(p=0.5),                # horizontal flip
    transforms.RandomRotation(degrees=15),                 # rotation ±15°
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # brightness and contrast adjustment
    transforms.ToTensor(),
    AddGaussianNoise(mean=0.0, std=0.05, p=0.5),           # add Gaussian noise
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])])

# preprocessing for test set
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])])


# dataset definition
class ImageDataset(Dataset):
    """
    General-purpose and CelebA-friendly image dataset:
    - Unified RGB, resize, and normalization
    - Supports both supervised/unsupervised learning
    - Supports mapping CelebA labels {-1, 1} to {0, 1} (can be disabled/customized)
    - labels can be pandas.Series / dict / None
    """
    def __init__(
        self,
        image_file_list,
        image_dir: str = "data/img_align_celeba/img_align_celeba",
        labels: Optional[Union["pd.Series", dict]] = None,
        transform: Optional[nn.Module] = None,
        label_mapping: Optional[dict] = {-1: 0, 1: 1},
        return_filename: bool = False,
    ):
        assert os.path.exists(image_dir), f"Image dir not found: {image_dir}"
        self.image_dir = image_dir
        self.image_files = list(image_file_list)
        self.transform = transform
        self.labels = labels
        self.label_mapping = label_mapping
        self.return_filename = return_filename

        # Try to detect pandas.Series
        self._is_pandas_series = False
        try:
            import pandas as pd  # Only used for type checking
            self._is_pandas_series = isinstance(labels, pd.Series)
        except Exception:
            pass

    def __len__(self):
        return len(self.image_files)

    def _get_label(self, img_name):
        if self.labels is None:
            return 0  # unsupervised: placeholder label

        # pandas.Series
        if self._is_pandas_series:
            y = self.labels.loc[img_name]
        else:  # dict-like
            y = self.labels[img_name]

        # Optional mapping (for CelebA: -1/1 -> 0/1)
        if self.label_mapping is not None:
            y = self.label_mapping.get(int(y), y)
        return int(y)

    def __getitem__(self, idx):
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)

        image = Image.open(img_path).convert("RGB")

        if self.transform is not None:
            image = self.transform(image)

        label = self._get_label(img_name)

        if self.return_filename:
            return image, label, img_name
        return image, label


In [3]:
import pandas as pd

# Use pandas to read a CSV file
df = pd.read_csv(r"C:\Users\HOU HENGJIN\Desktop\90051\data\list_eval_partition.csv")

# check the first few rows
print(df.head())

# verify the column names: ["image_id", "partition"]
train_files = df.loc[df["partition"] == 0, "image_id"].tolist()
test_files  = df.loc[df["partition"] == 2, "image_id"].tolist()

print("Train:", len(train_files), "Test:", len(test_files))


     image_id  partition
0  000001.jpg          0
1  000002.jpg          0
2  000003.jpg          0
3  000004.jpg          0
4  000005.jpg          0
Train: 162770 Test: 19962


In [4]:
from pathlib import Path
from torch.utils.data import DataLoader

# Define the directory of the image dataset
IMAGE_DIR = Path(r"C:\Users\HOU HENGJIN\Desktop\90051\data\img_align_celeba") 

# Create the training dataset
train_ds = ImageDataset(
    image_file_list=train_files,   # list of training image filenames
    image_dir=str(IMAGE_DIR),      # directory where the images are stored
    labels=None,                   # no labels for now (unsupervised / placeholder)
    transform=train_transform      # preprocessing + augmentation for training set
)

# Create the testing dataset
test_ds = ImageDataset(
    image_file_list=test_files,    # list of testing image filenames
    image_dir=str(IMAGE_DIR),      # directory where the images are stored
    labels=None,                   # no labels for now
    transform=test_transform       # preprocessing only for test set
)

# Create DataLoaders to efficiently load data in batches
train_loader = DataLoader(
    train_ds,
    batch_size=64,     # number of images per batch
    shuffle=True,      # shuffle the training data at every epoch
    num_workers=4,     # number of subprocesses to use for data loading
    pin_memory=True    # speeds up transfer to GPU
)

test_loader = DataLoader(
    test_ds,
    batch_size=64,     # number of images per batch
    shuffle=False,     # do not shuffle test data
    num_workers=4,
    pin_memory=True
)

# Print dataset sizes
print(len(train_ds), len(test_ds))  # Expected output: 162770, 19962

162770 19962


In [5]:
from torch.utils.data import DataLoader

# Create DataLoader for the training dataset
train_loader = DataLoader(
    train_ds,           # dataset object for training data
    batch_size=64,      # number of samples per batch
    shuffle=True,       # shuffle the training data at every epoch
    num_workers=0,      # for windows + jupyternotebook, num_workers=0 / for GPU,num_workers can be 4 or 8
    pin_memory=torch.cuda.is_available()     # speeds up host-to-GPU memory transfer
)

# Create DataLoader for the testing dataset
test_loader = DataLoader(
    test_ds,            # dataset object for testing data
    batch_size=64,      # number of samples per batch
    shuffle=False,      # keep test data order fixed (no shuffling)
    num_workers=0,      # for windows + jupyternotebook, num_workers=0 / for GPU,num_workers can be 4 or 8
    pin_memory=torch.cuda.is_available()     # speeds up host-to-GPU memory transfer
)

In [6]:
# try one batch
images, labels = next(iter(train_loader))
print(images.shape)   # torch.Size([64, 3, H, W])
print(labels)         # apply attributes in celeA to change labels

torch.Size([64, 3, 224, 224])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
