# Shuffling Methods

test cases

In [None]:
test_inputs = ['abc', 'def', 'egh', 'ijk', 'lmn', 'opq', 'rst', 'ade', 'des', 'asd', 'a', 'sd', 'sda', 'e']
test_targets = [1, 2, 1, 1, 2, 1, 2, 3, 4, 3, 4, 4, 2, 3]

expected_output_one_class = \
(['abc','egh','ijk','def','lmn','rst','ade','asd','e','des','a','sd','opq','sda'],
 [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 1, 2])

expected_output_all_classes = \
(['opq','sda','e','sd','ijk','rst','asd','a','egh','lmn','ade','des','abc','def'],
 [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2])

implementation

In [None]:
def build_map_classes_buffer(inputs, targets):
    '''
    Build a dictionary which has the targets as keys and all the corresponding inputs as values
    -- Params:
    @inputs: input data for the model
    @targets: desired outcome for the model given that it gets @inputs
    -- Return: dictionary
    '''
    # build a dictionary to grab pairs from
    map_classes_buffer = defaultdict(list) # stores class -> inputs pairs
    list(map(lambda x, y: map_classes_buffer[y].append(x), inputs, targets)) #without list, the writes to the map are not commited
    return map_classes_buffer

In [None]:
from collections import defaultdict
import numpy as np
from sklearn.utils import shuffle

def sort_one_class(inputs, targets, batch_size, use_shuffle=True, random_state=None):
    '''
    Splits the data (inputs and targets) into as many homogeneous batches as possible,
    i.e. that in as many cases as possible, the batches consist of one target only.
    -- Params:
    @inputs: input data for the model
    @targets: desired outcome for the model given that it gets @inputs
    @batch_size: number of samples per batch
    @use_shuffle: shuffle the data beforehand
    -- Return: 2d list
    '''
    
    assert len(inputs) == len(targets), 'Inputs and targets do not have the same size'
    
    if use_shuffle:        
        inputs, targets = shuffle(inputs, targets, random_state=random_state)
    
    input_batches = []
    target_batches = []
    inputs_buffer = [] # store inputs that don't fit into homogeneous batches anymore
    targets_buffer = []
    
    num_batches = np.ceil(len(inputs) / batch_size)
    map_classes_buffer = build_map_classes_buffer(inputs, targets)
    
    for target in map_classes_buffer.keys():
        while len(map_classes_buffer[target]) > 0:
            taken_inputs, retrieved_inputs = map_classes_buffer[target][:batch_size], map_classes_buffer[target][batch_size:]
            if len(taken_inputs) < batch_size:
                inputs_buffer.extend(taken_inputs)
                targets_buffer.extend([target for taken_input in taken_inputs])
            else:
                input_batches.extend(taken_inputs)
                target_batches.extend([target for taken_input in taken_inputs])
            map_classes_buffer[target] = retrieved_inputs
            
    # take missing values
    while len(inputs_buffer) > 0:
        taken_inputs, inputs_buffer = inputs_buffer[:batch_size], inputs_buffer[batch_size:]
        taken_targets, targets_buffer = targets_buffer[:batch_size], targets_buffer[batch_size:]
        input_batches.extend(taken_inputs)
        target_batches.extend(taken_targets)
                
    return input_batches, target_batches
 
sort_one_class(test_inputs, test_targets, batch_size=3, use_shuffle=False)
assert sort_one_class(test_inputs, test_targets, batch_size=3, use_shuffle=False) == expected_output_one_class, \
    'Actual and expected outputs differ!'

In [None]:
from copy import deepcopy
from sklearn.utils import shuffle

def sort_all_classes(inputs, targets, batch_size, use_shuffle=True, random_state=None):
    '''
    Splits the data (inputs and targets) into as many purely heterogeneous batches as possible,
    i.e. that in as many cases as possible, the batches consist of all targets.
    -- Params:
    @inputs: input data for the model
    @targets: desired outcome for the model given that it gets @inputs
    @batch_size: number of samples per batch
    @use_shuffle: shuffle the data beforehand
    -- Return: 2d list
    '''
    
    def try_pop(buffer):
        try:
            return buffer.pop()
        except:
            pass
            
    
    assert len(inputs) == len(targets), 'Inputs and targets do not have the same size'
    
    if use_shuffle:
        inputs, targets = shuffle(inputs, targets, random_state=random_state)
    
    input_batches = []
    target_batches = []
    inputs_buffer = [] # store inputs that don't fit into purely heterogeneous batches anymore
    targets_buffer = []
    
    num_batches = np.ceil(len(inputs) / batch_size)
    map_classes_buffer = build_map_classes_buffer(inputs, targets)
    
    # check if all targets in map class buffer have at least one element
    sorted_inputs = []
    sorted_targets = []
    copy_map_classes_buffer = deepcopy(map_classes_buffer)
    while any(list(map(lambda x: len(map_classes_buffer[x]) > 0, map_classes_buffer))):
        taken_targets = list(map(lambda x: x if try_pop(copy_map_classes_buffer[x]) is not None else None,\
                                 copy_map_classes_buffer)) # get one 'column'
        taken_inputs = list(map(lambda x: try_pop(map_classes_buffer[x]), map_classes_buffer)) # get one 'column'
        sorted_inputs.extend(taken_inputs)
        sorted_targets.extend(taken_targets)
        
    sorted_inputs = [val for val in sorted_inputs if val is not None] # None vals due to try_pop workaround
    sorted_targets = [val for val in sorted_targets if val is not None]
    while len(sorted_inputs) > batch_size:
        input_batches.extend(sorted_inputs[:batch_size])
        target_batches.extend(sorted_targets[:batch_size])
        sorted_inputs, sorted_targets = sorted_inputs[batch_size:], sorted_targets[batch_size:]
        
    if len(sorted_inputs) > 0: # if there are any values left
        input_batches.extend(sorted_inputs)
        target_batches.extend(sorted_targets)
                
    return input_batches, target_batches 

sort_all_classes(test_inputs, test_targets, batch_size=3, use_shuffle=False)
assert sort_all_classes(test_inputs, test_targets, batch_size=3, use_shuffle=False) == expected_output_all_classes, \
    'Actual and expected outputs differ!'

In [66]:
import numpy as np
np.random.seed(42)

def weighted_random_sampling(inputs, targets, weighted_indices, m=3):
    eps = 0.0000001 # small threshold to accept rounding errors when comparing float values
    
    assert len(inputs) == len(targets), 'The number of inputs to pull from must fit the number of targets'
    assert len(inputs) == len(weighted_indices), 'The number of inputs to pull from must fit the given weighted indices'
    assert type(inputs) == list, 'The inputs to pull from must be given as a list'
    assert type(weighted_indices) == dict, 'The weighted indices must be given as a dictionary'
    assert sum(weighted_indices.values()) < 1 + eps and sum(weighted_indices.values()) > 1 - eps, \
        'The sum of the values for the input must add up to 1.'
    
    pulled_samples = []
    pulled_targets = []
    for k in range(m):
        # calculate the weights of an element to be pulled
        # must be recalculated each round, as the values have to add up to 1
        pulled_index = np.random.choice(list(weighted_indices.keys()), p = list(weighted_indices.values()))
        weighted_indices[pulled_index] = 0
        sum_of_values = sum(weighted_indices.values())
        for index, weight in weighted_indices.items():
            weighted_indices[index] = weight / sum_of_values
        pulled_samples.append(inputs[pulled_index])
        pulled_targets.append(targets[pulled_index])
    return pulled_samples, pulled_targets
    
    
inputs = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'] # will be tensors
targets = [0, 0, 0, 1, 1, 0, 1, 1]
weighted_indices = {
    0: 0.3,
    1: 0.05,
    2: 0.2,
    3: 0.05,
    4: 0.05,
    5: 0.15,
    6: 0.05,
    7: 0.15
}

weighted_random_sampling(inputs, targets, weighted_indices, m=3)

(['c', 'h', 'f'], [0, 1, 0])

---

test on real data

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
from sklearn.utils import shuffle

class DataLoader():
    
    def __init__(self, root='./data'):
        self.root = root        
        self.trainset = None
        self.testset = None
        
    def download_cifar(self):
        print('==> Preparing data..')
        transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])

        transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ])

        # data needs to be loaded through dataloader to get into the correct format
        trainset = torchvision.datasets.CIFAR10(root=self.root, train=True, download=True, transform=transform_train)
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=False, num_workers=2)
        self.X_train, self.Y_train = [], []
        for x, y in trainloader:
            self.X_train.extend(x.numpy()) #numpy needed for a casting workaround
            self.Y_train.extend(y.numpy())
        self.X_train = np.array(self.X_train)
        self.Y_train = np.array(self.Y_train)
        
        testset = torchvision.datasets.CIFAR10(root=self.root, train=False, download=True, transform=transform_test)
        testloader = torch.utils.data.DataLoader(testset, batch_size=128, shuffle=False, num_workers=2)
        self.X_test, self.Y_test = [], []
        for x, y in testloader:
            self.X_test.extend(x.numpy())
            self.Y_test.extend(y.numpy())
        self.X_test = np.array(self.X_test)
        self.Y_test = np.array(self.Y_test)
        
        
        self.X_batches_train, self.Y_batches_train = None, None
        self.X_batches_test, self.Y_batches_test = None, None
        
    def prepare_cifar(self, strategy, batch_size=128, random_state=None):
        self.batch_size = batch_size
        assert strategy in ['freeze', 'shuffle', 'homogeneous', 'heterogeneous'], 'Unknown action'
        if strategy == 'freeze':
            self.X_batches_train, self.Y_batches_train = self.X_train, self.Y_train
            self.X_batches_test, self.Y_batches_test = self.X_test, self.Y_test
        elif strategy == 'shuffle':
            self.X_batches_train, self.Y_batches_train = shuffle(self.X_train, self.Y_train, random_state=random_state)
            self.X_batches_test, self.Y_batches_test = shuffle(self.X_test, self.Y_test, random_state=random_state)
        elif strategy == 'homogeneous':
            self.X_batches_train, self.Y_batches_train = sort_one_class(self.X_train, self.Y_train, self.batch_size, \
                                                            use_shuffle=True, random_state=random_state)
            self.X_batches_test, self.Y_batches_test = sort_one_class(self.X_test, self.Y_test, self.batch_size, \
                                                            use_shuffle=True, random_state=random_state)
        elif strategy == 'heterogeneous':
            self.X_batches_train, self.Y_batches_train = sort_all_classes(self.X_train, self.Y_train, self.batch_size, \
                                                              use_shuffle=True, random_state=random_state)
            self.X_batches_test, self.Y_batches_test = sort_all_classes(self.X_test, self.Y_test, self.batch_size, \
                                                              use_shuffle=True, random_state=random_state)
    
    def yield_batches(self, use_train=True):
        batch_idx = 0
        X = self.X_batches_train if use_train else self.X_batches_test
        Y = self.Y_batches_train if use_train else self.Y_batches_test
        X, Y = torch.from_numpy(X), torch.from_numpy(Y)
        while batch_idx < len(X):
            yield X[batch_idx: batch_idx+self.batch_size], Y[batch_idx: batch_idx+self.batch_size]
            batch_idx += self.batch_size
            
        
dataloader = DataLoader()
dataloader.download_cifar()
dataloader.prepare_cifar('shuffle', random_state=42)

for batch_idx, (inputs, targets) in enumerate(dataloader.yield_batches(use_train=True)):
    _inputs = inputs
    _targets = targets