In [14]:
import torch
from torch.utils.data.dataset import ConcatDataset


class MyFirstDataset(torch.utils.data.Dataset):
    def __init__(self):
        # dummy dataset
        self.samples = torch.cat((-torch.ones(5), torch.ones(5)))

    def __getitem__(self, index):
        # change this to your samples fetching logic
        return self.samples[index]

    def __len__(self):
        # change this to return number of samples in your dataset
        return self.samples.shape[0]
    
    def get_labels(self):
        return [1]*len(self.samples)

class MySecondDataset(torch.utils.data.Dataset):
    def __init__(self):
        # dummy dataset
        self.samples = torch.cat((torch.ones(50) * 5, torch.ones(5) * -5))

    def __getitem__(self, index):
        # change this to your samples fetching logic
        return self.samples[index]

    def __len__(self):
        # change this to return number of samples in your dataset
        return self.samples.shape[0]
    
    def get_labels(self):
        return [2]*len(self.samples)


class MyThirdDataset(torch.utils.data.Dataset):
    def __init__(self):
        # dummy dataset
        self.samples = torch.cat((torch.ones(20) * 10, torch.ones(10) * -10))

    def __getitem__(self, index):
        # change this to your samples fetching logic
        return self.samples[index]

    def __len__(self):
        # change this to return number of samples in your dataset
        return self.samples.shape[0]
    
    def get_labels(self):
        return [3]*len(self.samples)

In [43]:
import math
import torch
from torch.utils.data.sampler import RandomSampler


class BatchSchedulerSampler(torch.utils.data.sampler.Sampler):
    """
    iterate over tasks and provide a random batch per task in each mini-batch
    """
    def __init__(self, dataset, batch_size):
        self.dataset = dataset
        self.batch_size = batch_size
        self.number_of_datasets = len(dataset.datasets)
        self.largest_dataset_size = max([len(cur_dataset.samples) for cur_dataset in dataset.datasets])

    def __len__(self):
        # return self.batch_size * math.ceil(self.largest_dataset_size / self.batch_size) * len(self.dataset.datasets)
        return sum([len(cur_dataset) for cur_dataset in self.dataset.datasets])

    def __iter__(self):
        samplers_list = []
        sampler_iterators = []
        for dataset_idx in range(self.number_of_datasets):
            cur_dataset = self.dataset.datasets[dataset_idx]
            sampler = RandomSampler(cur_dataset)
            samplers_list.append(sampler)
            cur_sampler_iterator = sampler.__iter__()
            sampler_iterators.append(cur_sampler_iterator)

        push_index_val = [0] + self.dataset.cumulative_sizes[:-1]
        step = self.batch_size * self.number_of_datasets
        samples_to_grab = self.batch_size
        # for this case we want to get all samples in dataset, this force us to resample from the smaller datasets
        # epoch_samples = self.largest_dataset_size * self.number_of_datasets
        # epoch_samples = sum([len(cur_dataset) for cur_dataset in self.dataset.datasets])

        final_samples_list = []  # this is a list of indexes from the combined dataset
        for i in range(self.number_of_datasets):
            cur_batch_sampler = sampler_iterators[i]
            cur_samples = []
            while True:
                try:
                    cur_sample_org = cur_batch_sampler.__next__()
                    cur_sample = cur_sample_org + push_index_val[i]
                    cur_samples.append(cur_sample)
                except StopIteration:
                    # got to the end of iterator - restart the iterator and continue to get samples
                    # until reaching "epoch_samples"
                    # sampler_iterators[i] = samplers_list[i].__iter__()
                    # cur_batch_sampler = sampler_iterators[i]
                    # cur_sample_org = cur_batch_sampler.__next__()
                    # cur_sample = cur_sample_org + push_index_val[i]
                    # cur_samples.append(cur_sample)
                    break
            if cur_sample:
                final_samples_list.append(cur_samples)
        import random
        random.shuffle(final_samples_list)
        for d in final_samples_list:
            yield d

In [44]:
first_dataset = MyFirstDataset()
second_dataset = MySecondDataset()
third_dataset = MyThirdDataset()
concat_dataset = ConcatDataset([first_dataset, second_dataset, third_dataset])

batch_size = 5

print(len(first_dataset), len(second_dataset), len(third_dataset))

sampler = BatchSchedulerSampler(dataset=concat_dataset,
                                                                       batch_size=batch_size)

# dataloader with BatchSchedulerSampler
dataloader = torch.utils.data.DataLoader(dataset=concat_dataset,
                                         batch_sampler=BatchSchedulerSampler(dataset=concat_dataset,
                                                                       batch_size=batch_size),
                                         # batch_size=batch_size,
                                         # shuffle=False
                                        )

t = 0
for inputs in dataloader:
    t += inputs.shape[0]
    print(inputs)
print(t, len(sampler))

10 55 30
tensor([ 10.,  10., -10.,  10., -10.])
tensor([ 10., -10., -10.,  10.,  10.])
tensor([ 5.,  5.,  5.,  5., -5.])
tensor([ 10.,  10.,  10.,  10., -10.])
tensor([5., 5., 5., 5., 5.])
tensor([5., 5., 5., 5., 5.])
tensor([5., 5., 5., 5., 5.])
tensor([ 5., -5.,  5.,  5., -5.])
tensor([ 10.,  10., -10., -10., -10.])
tensor([5., 5., 5., 5., 5.])
tensor([5., 5., 5., 5., 5.])
tensor([10., 10., 10., 10., 10.])
tensor([-1., -1.,  1., -1.,  1.])
tensor([-1.,  1., -1.,  1.,  1.])
tensor([-10.,  10., -10.,  10.,  10.])
75 95


In [16]:

from typing import Callable

import pandas as pd
import torch
import torch.utils.data

import math
import torch
from torch.utils.data.sampler import RandomSampler
from torch.utils.data.distributed import DistributedSampler
import random

"""
https://www.kaggle.com/code/haithemhermessi/nlp-multi-task-learning-with-transformers/notebook
https://github.com/bomri/code-for-posts/blob/master/mtl-data-loading/batch_scheduler_dataloader_example.py
""" 
    

class ImbalancedDatasetSampler(torch.utils.data.sampler.Sampler):
    """Samples elements randomly from a given list of indices for imbalanced dataset
    Arguments:
        indices: a list of indices
        num_samples: number of samples to draw
        callback_get_label: a callback-like function which takes two arguments - dataset and index
    """

    def __init__(self, dataset, indices: list = None, num_samples: int = None, callback_get_label: Callable = None):
        # if indices is not provided, all elements in the dataset will be considered
        self.indices = list(range(len(dataset))) if indices is None else indices

        # define custom callback
        self.callback_get_label = callback_get_label

        # if num_samples is not provided, draw `len(indices)` samples in each iteration
        self.num_samples = len(self.indices) if num_samples is None else num_samples

        # distribution of classes in the dataset
        df = pd.DataFrame()
        df["label"] = self._get_labels(dataset)
        df.index = self.indices
        df = df.sort_index()

        label_to_count = df["label"].value_counts()

        weights = 1.0 / label_to_count[df["label"]]

        self.weights = torch.DoubleTensor(weights.to_list())

    def _get_labels(self, dataset):
        if self.callback_get_label:
            return self.callback_get_label(dataset)
        elif isinstance(dataset, torch.utils.data.Dataset):
            return dataset.get_labels()
        else:
            raise NotImplementedError

    def __iter__(self):
        return (self.indices[i] for i in torch.multinomial(self.weights, self.num_samples, replacement=True))

    def __len__(self):
        return self.num_samples
    
class ExampleImbalancedDatasetSampler(ImbalancedDatasetSampler):
    """
    ImbalancedDatasetSampler is taken from:
    https://github.com/ufoym/imbalanced-dataset-sampler/blob/master/torchsampler/imbalanced.py
    In order to be able to show the usage of ImbalancedDatasetSampler in this example I am editing the _get_label
    to fit my datasets
    """
    def _get_label(self, dataset, idx):
        return dataset.data[idx].item()


class BalancedBatchSchedulerSampler(torch.utils.data.sampler.Sampler):
    """
    iterate over tasks and provide a balanced batch per task in each mini-batch
    """
    def __init__(self, dataset, batch_size, balanced=False, mix_batch=False):
        self.dataset = dataset
        self.batch_size = batch_size
        self.number_of_datasets = len(dataset.datasets)
        self.balanced = balanced
        self.largest_dataset_size = max([len(cur_dataset.samples) for cur_dataset in dataset.datasets])
        self.mix_batch = mix_batch

    def __len__(self):
        return self.batch_size * math.ceil(self.largest_dataset_size / self.batch_size) * len(self.dataset.datasets)

    def __iter__(self):
        samplers_list = []
        sampler_iterators = []
        for dataset_idx in range(self.number_of_datasets):
            cur_dataset = self.dataset.datasets[dataset_idx]
            if dataset_idx == 0:
                # the first dataset is kept at RandomSampler
                sampler = RandomSampler(cur_dataset)
            else:
                # the second unbalanced dataset is changed
                sampler = ExampleImbalancedDatasetSampler(cur_dataset)
            samplers_list.append(sampler)
            cur_sampler_iterator = sampler.__iter__()
            sampler_iterators.append(cur_sampler_iterator)

        push_index_val = [0] + self.dataset.cumulative_sizes[:-1]
        step = self.batch_size * self.number_of_datasets
        samples_to_grab = self.batch_size
        # for this case we want to get all samples in dataset, this force us to resample from the smaller datasets
        epoch_samples = self.largest_dataset_size * self.number_of_datasets

        final_samples_list = []  # this is a list of indexes from the combined dataset
        for _ in range(0, epoch_samples, step):
            for i in range(self.number_of_datasets):
                cur_batch_sampler = sampler_iterators[i]
                cur_samples = []
                for _ in range(samples_to_grab):
                    try:
                        cur_sample_org = cur_batch_sampler.__next__()
                        cur_sample = cur_sample_org + push_index_val[i]
                        cur_samples.append(cur_sample)
                    except StopIteration:
                        # got to the end of iterator - restart the iterator and continue to get samples
                        # until reaching "epoch_samples"
                        sampler_iterators[i] = samplers_list[i].__iter__()
                        cur_batch_sampler = sampler_iterators[i]
                        cur_sample_org = cur_batch_sampler.__next__()
                        cur_sample = cur_sample_org + push_index_val[i]
                        cur_samples.append(cur_sample)
                final_samples_list.extend(cur_samples)
        if self.mix_batch:
            random.shuffle(final_samples_list)
        return iter(final_samples_list)
    


In [17]:
# dataloader with BalancedBatchSchedulerSampler
dataloader = torch.utils.data.DataLoader(dataset=concat_dataset,
                                         sampler=BalancedBatchSchedulerSampler(dataset=concat_dataset,
                                                                               batch_size=batch_size),
                                         batch_size=batch_size,
                                         shuffle=False)

for inputs in dataloader:
    print(inputs)

tensor([-1., -1.,  1., -1.,  1.,  1.,  1., -1.])
tensor([5., 5., 5., 5., 5., 5., 5., 5.])
tensor([ 10., -10.,  10.,  10.,  10.,  10.,  10., -10.])
tensor([ 1., -1., -1., -1.,  1., -1.,  1., -1.])
tensor([-5.,  5.,  5.,  5.,  5.,  5.,  5.,  5.])
tensor([ 10.,  10.,  10.,  10.,  10.,  10.,  10., -10.])
tensor([ 1., -1.,  1.,  1., -1.,  1., -1., -1.])
tensor([ 5.,  5., -5.,  5.,  5.,  5.,  5.,  5.])
tensor([-10.,  10.,  10.,  10., -10.,  10.,  10.,  10.])
tensor([-1.,  1., -1.,  1.,  1.,  1.,  1., -1.])
tensor([5., 5., 5., 5., 5., 5., 5., 5.])
tensor([ 10., -10.,  10., -10.,  10., -10., -10., -10.])
tensor([-1.,  1., -1.,  1., -1.,  1.,  1., -1.])
tensor([ 5.,  5., -5.,  5., -5.,  5.,  5.,  5.])
tensor([-10.,  10.,  10.,  10.,  10., -10., -10., -10.])
tensor([ 1., -1.,  1.,  1., -1.,  1.,  1., -1.])
tensor([5., 5., 5., 5., 5., 5., 5., 5.])
tensor([ 10.,  10.,  10., -10.,  10.,  10., -10.,  10.])
tensor([-1., -1., -1., -1.,  1.,  1., -1., -1.])
tensor([5., 5., 5., 5., 5., 5., 5., 5.])
tens