In [3]:
import numpy as np
import torch
from torchvision import datasets, transforms
import torch.nn as nn
from torch.utils.data import DataLoader

In [4]:
class DataAugmenter:
    def __init__(self, transforms, num_aug):
        self.num_aug = num_aug
        self.transforms = transforms
        
        assert num_aug > 0, "number of augmentations should be at least 1."
        
    def apply_augments(self, img):
        augmented = []
        for i in range(self.num_aug):
            if i == 0:
                augmented += [transforms.ToTensor()(img)]
            else:
                augmented += [self.transforms(img)]
        return augmented
    
transform = transforms.Compose([
                transforms.RandomChoice([
                    transforms.RandomAffine((-90, 90)),
                    transforms.RandomAffine(0, translate=(0.2, 0.4)),
                    transforms.RandomAffine(0, scale=(0.8, 1.1)),
                    transforms.RandomAffine(0, shear=(-20, 20))]), 
                transforms.ToTensor()])

In [5]:
def collate_fn(augmenter):
    def form_batch(batch):
        batch_img = []
        batch_lbl = []
        batch_idx = []

        for idx, (img, lbl) in enumerate(batch):
            aug_img = augmenter.apply_augments(img)
            aug_lbl = np.repeat(lbl, augmenter.num_aug).tolist()
            aug_idx = np.repeat(idx, augmenter.num_aug).tolist()
            
            batch_img += aug_img
            batch_lbl += aug_lbl
            batch_idx += aug_idx
        
        return (torch.stack(batch_img).float(),
                torch.tensor(batch_lbl, dtype=torch.int),
                torch.tensor(batch_idx, dtype=torch.int))
    return form_batch

In [6]:
trainset = datasets.MNIST("~/Datasets/mnist/")
trainset.num_classes = 10
augmenter = DataAugmenter(transform, 10)
trainloader = DataLoader(trainset, batch_size=100 // 10, collate_fn=collate_fn(augmenter))

In [7]:
def asymmetric_noise(trainset, ratio, seed):
    assert 0 <= ratio <= 1., 'ratio is bounded between 0 and 1' 
    np.random.seed(seed)
    train_labels = trainset.targets.numpy()
    train_labels_gt = train_labels.copy()
    for i in range(trainset.num_classes):
        indices = np.where(train_labels == i)[0]
        np.random.shuffle(indices)
        for j, idx in enumerate(indices):
            if j < ratio * len(indices):
#                 self.noise_indx.append(idx)
                # truck -> automobile
                if i == 9:
                    train_labels[idx] = 1
                # bird -> airplane
                elif i == 2:
                    train_labels[idx] = 0
                # cat -> dog
                elif i == 3:
                    train_labels[idx] = 5
                # dog -> cat
                elif i == 5:
                    train_labels[idx] = 3
                # deer -> horse
                elif i == 4:
                    train_labels[idx] = 7

In [8]:
print(trainset.targets[:70])
asymmetric_noise(trainset, 0.5, 10)
print(trainset.targets[:70])

tensor([5, 0, 4, 1, 9, 2, 1, 3, 1, 4, 3, 5, 3, 6, 1, 7, 2, 8, 6, 9, 4, 0, 9, 1,
        1, 2, 4, 3, 2, 7, 3, 8, 6, 9, 0, 5, 6, 0, 7, 6, 1, 8, 7, 9, 3, 9, 8, 5,
        9, 3, 3, 0, 7, 4, 9, 8, 0, 9, 4, 1, 4, 4, 6, 0, 4, 5, 6, 1, 0, 0])
tensor([5, 0, 7, 1, 9, 2, 1, 5, 1, 7, 3, 5, 3, 6, 1, 7, 2, 8, 6, 1, 4, 0, 9, 1,
        1, 2, 7, 5, 2, 7, 5, 8, 6, 9, 0, 3, 6, 0, 7, 6, 1, 8, 7, 1, 3, 1, 8, 5,
        9, 3, 5, 0, 7, 4, 1, 8, 0, 1, 4, 1, 7, 4, 6, 0, 7, 5, 6, 1, 0, 0])


In [36]:
def multiclass_noisify(y, P, random_state):
    """ Flip classes according to transition probability matrix T.
    It expects a number between 0 and the number of classes - 1.
    """
#     print (np.max(y), P.shape[0])
    assert P.shape[0] == P.shape[1]
    assert np.max(y) < P.shape[0]

    # row stochastic matrix
    assert np.allclose(P.sum(axis=1), np.ones(P.shape[1]))
    assert (P >= 0.0).all()

    m = y.shape[0]
#     print(m)
    new_y = y.copy()
    flipper = np.random.RandomState(random_state)

    for idx in np.arange(m):
        i = y[idx]
        # draw a vector with only an 1
        flipped = flipper.multinomial(1, P[i, :], 1)[0]
        new_y[idx] = np.where(flipped == 1)[0]

    return new_y


# noisify_pairflip call the function "multiclass_noisify"
def noisify_pairflip(y_train, noise, seed=None):
    """mistakes:
        flip in the pair
    """
    y_train = trainset.targets.numpy()
    nb_classes = np.unique(trainset.targets).size
    P = np.eye(nb_classes)
    n = noise

    if n > 0.0:
        # 0 -> 1
        P[0, 0], P[0, 1] = 1. - n, n
        for i in range(1, nb_classes-1):
            P[i, i], P[i, i + 1] = 1. - n, n
        P[nb_classes-1, nb_classes-1], P[nb_classes-1, 0] = 1. - n, n

        y_train_noisy = multiclass_noisify(y_train, P=P,
                                           random_state=seed)
        actual_noise = (y_train_noisy != y_train).mean()
        assert actual_noise > 0.0
#         print('Actual noise %.2f' % actual_noise)
        y_train = y_train_noisy

    return y_train, actual_noise,P

def noisify_multiclass_symmetric(trainset, noise, seed=10):
    """mistakes:
        flip in the symmetric way
    """
    y_train = trainset.targets.numpy()
    nb_classes = np.unique(y_train).size
    P = np.ones((nb_classes, nb_classes))
    n = noise
    P = (n / (nb_classes - 1)) * P

    if n > 0.0:
        # 0 -> 1
        P[0, 0] = 1. - n
        for i in range(1, nb_classes-1):
            P[i, i] = 1. - n
        P[nb_classes-1, nb_classes-1] = 1. - n

        y_train_noisy = multiclass_noisify(y_train, P=P,
                                           random_state=seed)
        actual_noise = (y_train_noisy != y_train).mean()
        assert actual_noise > 0.0
#         print('Actual noise %.2f' % actual_noise)
        y_train = y_train_noisy
    
    return y_train, actual_noise, P

In [38]:
before = trainset.targets 
noisify_pairflip(trainset, 0.5, 10)[0][:60]
after = trainset.targets
print(before.eq(after).sum().item())
# noisify_multiclass_symmetric(trainset, 0.5, 10)[0][:60]

60000


In [42]:
torch.tensor([False, True]).eq(torch.tensor([True, False])).sum()

tensor(0)