In [4]:
import torch
import numpy as np
from torchvision.datasets import CIFAR10
from torchvision.transforms import Compose, RandomCrop, RandomHorizontalFlip, CenterCrop

In [9]:
class Cutout:
    def __init__(self, size):
        self.size = size

    def __call__(self, x):
        x0, y0 = torch.randint(x.shape[-2] - self.size, (1,)), torch.randint(
            x.shape[-1] - self.size, (1,)
        )
        img = x.clone()
        img[..., y0 : y0 + self.size, x0 : x0 + self.size] = 0.0
        return img
    
def preprocess(x_tr, x_te, border=4):
    # Pad
    x_tr = np.pad(
        x_tr, [(0, 0), (border, border), (border, border), (0, 0)], mode="reflect"
    )
    x_te = np.pad(
        x_te, [(0, 0), (border, border), (border, border), (0, 0)], mode="reflect"
    )

    # Normalize.
    mean, std = x_tr.mean(axis=0), x_tr.std(axis=0)
    x_tr = (x_tr - mean) / std
    x_te = (x_te - mean) / std

    # Transpose to put channels before height and width.
    x_tr = x_tr.transpose(0, 3, 1, 2)
    x_te = x_te.transpose(0, 3, 1, 2)

    return x_tr, x_te

class ImageClassificationDataLoader:
    def __init__(
        self,
        features,
        labels,
        batch_size,
        device,
        center=None,
        crop=32,
        flip=0.5,
        cutout=8,
    ):
        if center:
            self.pipeline = CenterCrop(center)
        else:
            pipeline = []
            for param, transform in zip(
                [crop, flip, cutout], [RandomCrop, RandomHorizontalFlip, Cutout]
            ):
                if param:
                    pipeline.append(transform(param))
            self.pipeline = Compose(pipeline)
        self.batch_size = batch_size
        self.device = device

        self.features = features
        self.labels = labels

    def get_batch(self):
        ix = torch.randint(len(self.features), (self.batch_size,))
        x = torch.stack(
            [self.pipeline(torch.from_numpy(self.features[i])) for i in ix]
        )
        y = torch.from_numpy(self.labels[ix])
        if self.device != "cpu":
            # pin arrays x,y, which allows us to move them to GPU asynchronously (non_blocking=True)
            x, y = x.pin_memory().to(self.device, non_blocking=True), y.pin_memory().to(
                self.device, non_blocking=True
            )
        return x, y

In [6]:
def load_cifar10(batch_size, root="/mnt/ssd/ronak/datasets/", device="cuda:0"):
    # Get train data.
    train_data = CIFAR10(root, download=True)
    test_data = CIFAR10(root, train=False, download=True)

    x_train = train_data.data
    x_test = test_data.data
    y_train = np.array(train_data.targets)
    y_test = np.array(test_data.targets)

    # Apply preprocessing.
    x_train, x_test = preprocess(x_train, x_test)

    train_loader = ImageClassificationDataLoader(x_train, y_train, batch_size, device)
    test_loader = ImageClassificationDataLoader(x_test, y_test, batch_size, device)

    return train_loader, test_loader

In [10]:
train_loader, test_loader = load_cifar10(512)

Files already downloaded and verified
Files already downloaded and verified


In [11]:
x, y = train_loader.get_batch()

In [12]:
x.shape

torch.Size([512, 3, 32, 32])

In [2]:
"0".split(":")

['0']