In [4]:
import os

ROOT_DIR = os.path.dirname(os.path.abspath(''))
print(ROOT_DIR)
DATA_DIR = os.path.join(ROOT_DIR, 'Tsing-hua/low-resolution')
print(DATA_DIR)
TRAINVAL_DIR = os.path.join(ROOT_DIR, 'Tsing-hua/TrainAndValList')
print(TRAINVAL_DIR)

c:\jason\semester 8\Magang
c:\jason\semester 8\Magang\Tsing-hua/low-resolution
c:\jason\semester 8\Magang\Tsing-hua/TrainAndValList


In [5]:
train_list_path = os.path.join(TRAINVAL_DIR, 'train.lst')

with open(train_list_path, "r") as f:
    lst_contents = f.readlines()

train_array_of_paths = [line.strip() for line in lst_contents]

train_file_paths = []

for img_path in train_array_of_paths:
    joined_path = DATA_DIR + img_path[2:]
    train_file_paths.append(joined_path)

In [6]:
val_list_path = os.path.join(TRAINVAL_DIR, 'validation.lst')

with open(val_list_path, "r") as f:
    lst_contents = f.readlines()

val_array_of_paths = [line.strip() for line in lst_contents]

val_file_paths = []

for img_path in val_array_of_paths:
    joined_path = DATA_DIR + img_path[2:]
    val_file_paths.append(joined_path)

In [7]:
from torchvision import transforms

RGB_MEAN = [0.51442681, 0.43435301, 0.33421855]
RGB_STD = [0.24099932, 0.246478, 0.23652802]
INPUT_SIZE = (384, 384)

TRAIN_TRANSFORMATION = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomRotation(45),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.CenterCrop(INPUT_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=RGB_MEAN, std=RGB_STD)
])

VAL_TRANSFORMATION = transforms.Compose([
    transforms.ToPILImage(),
    transforms.CenterCrop(INPUT_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=RGB_MEAN, std=RGB_STD)
])

In [8]:
import glob
import os
from typing import Optional, Tuple

import cv2
import numpy as np
import torch
from torch.utils.data import Dataset
import torchvision

class ImageFolderDataset(Dataset):
    """Strutured Image Folder Dataset

    Parameters
    ----------
    root_dir : str
        Path to image root directory
    transform : Optional[torchvision.transforms.Compose], optional
        Data augmentation pipeline, by default None
    """

    def __init__(self, root_dir: str, filepath_list: Optional[list] = None, transform: Optional[torchvision.transforms.Compose] = None):
        self.root_dir = root_dir
        if not os.path.exists(root_dir):
            raise RuntimeError(f'Path to dataset is not valid')
        self.list_images =  filepath_list
        self.transform = transform
        self.labels_name = os.listdir(self.root_dir)
        self.labels_name.sort()

    def __len__(self) -> int:
        """Get number of images."""
        return len(self.list_images)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        """Get sample for the current idx."""
        img_path = self.list_images[idx]

        # NOTE: cv2 read image as BGR.
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Convert class name to label 0-10
        # Note : root_path/class_name/image_file.jpg
        class_name = img_path.split('/')[-2]
        label_index = self.labels_name.index(class_name)
        label_index = [label_index]

        # Apply transformations
        if self.transform:
            image = self.transform(image)

        # If no transformations, convert to C,H,W
        # Normalize to [0,1]
        if isinstance(image, np.ndarray):
            image = np.transpose(image, (1, 2, 0))
            image = torch.from_numpy()
            image /= 255.0

        return image, torch.tensor(label_index, dtype=torch.int64)

In [22]:
train_dataset = ImageFolderDataset(root_dir=DATA_DIR, filepath_list= train_file_paths, transform=TRAIN_TRANSFORMATION)
image, label = train_dataset[10]

assert len(train_dataset) == 65228
assert isinstance(image, torch.Tensor)
assert isinstance(label, torch.Tensor)

val_dataset = ImageFolderDataset(root_dir=DATA_DIR, filepath_list= val_file_paths, transform=VAL_TRANSFORMATION)
image, label = val_dataset[10]


print(len(train_dataset.labels_name))


# val_dataset = ImageFolderDataset(root_dir=VAL_DATA_PATH, transform=VAL_TRANSFORMATION)
# image, label = train_dataset[10]

assert len(val_dataset) == 5200
assert isinstance(image, torch.Tensor)
assert isinstance(label, torch.Tensor)

# Cleanup to reduce memory
train_dataset = None
val_dataset = None
image, label = None, None

130


In [25]:
from typing import List, Optional

from lightning import LightningDataModule
from torch.utils.data import DataLoader, Dataset
from torchvision.transforms import transforms


class TsingHuaLitDatamodule(LightningDataModule):
    """LightningDataModule for Food101 Data Pipeline.

    Read the docs:
        https://lightning.ai/docs/pytorch/latest/data/datamodule.html

    Parameters
    ----------
    data_dir : str, optional
        FiftyOne dataset directory, by default 'data/'
    input_size : List[int], optional
        Input model size, by default [600, 500]
    batch_size : int, optional
        Number of training batch size, by default 64
    num_workers : int, optional
        Number of worksers to process data, by default 0
    pin_memory : bool, optional
        Enable memory pinning, by default False
    """

    def __init__(
        self,
        data_dir: str = DATA_DIR,
        input_size: Tuple[int, int] = (600, 500),
        batch_size: int = 64,
        num_workers: int = 0,
        pin_memory: bool = False,
    ):
        super().__init__()

        # this line allows to access init params with 'self.hparams' attribute
        # also ensures init params will be stored in ckpt
        self.save_hyperparameters(logger=False)

        self.data_train: Optional[Dataset] = None
        self.data_val: Optional[Dataset] = None
        self.data_test: Optional[Dataset] = None

    @property
    def num_classes(self):
        """Get number of classes."""
        return 130

    def setup(self, stage: Optional[str] = None):
        """Load the data with specified stage."""
        if stage in ['train', 'fit', None] and self.data_train is None:
            self.data_train = ImageFolderDataset(
                root_dir=self.hparams.data_dir, filepath_list=train_file_paths, transform=TRAIN_TRANSFORMATION)
            if len(self.data_train) == 0:
                raise ValueError('Train dataset is empty.')
        if stage in ['validation', 'test', 'fit', None]:
            if self.data_val is None:
                self.data_val = ImageFolderDataset(
                    root_dir=self.hparams.data_dir, filepath_list=val_file_paths, transform=VAL_TRANSFORMATION)
                if len(self.data_val) == 0:
                    raise ValueError('Validation dataset is empty.')
            if self.data_test is None:
                self.data_test = ImageFolderDataset(
                    root_dir=self.hparams.data_dir, filepath_list=val_file_paths, transform=VAL_TRANSFORMATION)
                if len(self.data_test) == 0:
                    raise ValueError('Test dataset is empty.')
        if stage == 'predict':
            if self.data_test is None:
                self.data_predict = ImageFolderDataset(
                    root_dir=self.hparams.data_dir, transform=VAL_TRANSFORMATION)
                if len(self.data_predict) == 0:
                    raise ValueError('Predict dataset is empty.')

    def train_dataloader(self):
        """Get train dataloader."""
        return DataLoader(
            dataset=self.data_train,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=True,
        )

    def val_dataloader(self):
        """Get validation dataloader."""
        return DataLoader(
            dataset=self.data_val,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=False,
        )

    def test_dataloader(self):
        """Get test dataloader."""
        return DataLoader(
            dataset=self.data_test,
            batch_size=self.hparams.batch_size,
            num_workers=self.hparams.num_workers,
            pin_memory=self.hparams.pin_memory,
            shuffle=False,
        )

In [29]:
dm = TsingHuaLitDatamodule(data_dir=DATA_DIR, batch_size=4)

assert not dm.data_train and not dm.data_val and not dm.data_test

dm.setup()
assert dm.data_train and dm.data_val and dm.data_test
train_dataloader = dm.train_dataloader()
val_dataloader = dm.val_dataloader()
test_dataloader = dm.test_dataloader()
assert train_dataloader and val_dataloader and test_dataloader

# train = 65228, val = 1300, test = 1300
assert len(dm.data_train) + len(dm.data_val) + len(dm.data_test) == 75628

batch = next(iter(train_dataloader))
x, y = batch
assert len(x) == 4
assert len(y) == 4
assert x.dtype == torch.float32
assert y.dtype == torch.int64

# # Cleanup
dm = None
batch = None
train_dataloader, val_dataloader, test_dataloader = None, None, None