# Dataset Distribution

In [1]:
import numpy as np
import math

from torch.utils.data import random_split

## Calculating Mean & Std

Calculates mean and std of dataset.

In [2]:
def get_norm(dataset):
    mean = dataset.data.mean(axis=(0, 1, 2)) / 255.
    std = dataset.data.std(axis=(0, 1, 2)) / 255.
    return mean, std

## Split Dataset

Splits dataset into multiple subsets.

### TODO

- [ ] bias

In [3]:
def random_split_by_dist(
    dataset,
    size: int,
    dist: callable = None,
    **params
):
    """Split `dataset` into subsets by distribution function.

    Parameters
    ----------
    dataset : datasets
        See `torchvision.datasets` .
    size : int
        Number (Length) of subsets.
    dist : function
        Distribution function which retures np.array.
        Sum of returned array SHOULD be 1.
    
    Returns
    -------
    out : subsets
         Of `dataset`.
    """

    assert size != 0, "`size` > 0"

    dist = dist or uniform  # default value

    # calculates distribution `dist_val`
    dist_val = dist(size, **params)  # dist_val: np.array
    assert math.isclose(sum(dist_val), 1.), "sum of `dist` SHOULD be 1."

    N = len(dataset)
    result = np.full(size, N) * dist_val
    result = np.around(result).astype('int')    # to integers
    result = result.clip(1, None)               # to positive integers
    # adjustment for that summation of `result` SHOULD be `N`
    result[-1] = N - sum(result[:-1])
    while True:
        if result[-1] < 1:
            result[result.argmax()] -= 1
            result[-1] += 1
        else:
            break

    return random_split(dataset, sorted(result))

In [4]:
def uniform(
    size: int,
    **params  # no longer needed
):
    assert len(params) == 0, \
        "uniform() got an unexpected keyword argument {}".format(
            ', '.join(["""\'""" + k + """\'""" for k in params.keys()])
    )

    return np.ones(size) / size

In [5]:
def normal(
    size: int,
    loc: float = 0.,
    scale: float = 1.,
    lower: float = 0.,
    upper: float = None
):
    """Calculate normal (Gaussian) distribution.

    Uses `abs` to restrict to non-zeros.

    In fact, it is not a normal distribution because there are only
    positive elements in `result`.

    See https://numpy.org/doc/stable/reference/random/generated/numpy.random.normal.html .

    Parameters
    ----------
    size : int
        Number (Length) of chunks.
        Same as length of returned np.array.
    loc : float
        Mean (“centre”) of the distribution.
    scale : float
        Standard deviation (spread or “width”) of the distribution.
        MUST be non-negative.
    lower : float
        Lower-bound before applying scaling.
    upper : float
        Upper-bound before applying scaling.

    Returns
    -------
    out : np.array
        Returns normal (Gaussian) distribution.
    """

    result = np.random.normal(loc, scale, size)
    result = abs(result)  # `result` SHOULD be only positive.
    result = result.clip(lower, upper)
    return result / sum(result)

In [6]:
def pareto(
    size: int,
    alpha: float = 1.16,  # by 80-20 rule, log(5)/log(4)
    lower: float = 0.,
    upper: float = None
):
    """Calculate Pareto distribution.

    See https://numpy.org/doc/stable/reference/random/generated/numpy.random.pareto.html .

    Parameters
    ----------
    size : int
        Number (Length) of chunks.
        Same as length of returned np.array.
    alpha : float
        Shape of the distribution.
        Must be positive.
    lower : float
        Lower-bound before applying scaling.
    upper : float
        Upper-bound before applying scaling.

    Returns
    -------
    out : np.array
        Returns Pareto distribution.
    """

    result = np.random.pareto(alpha, size)
    result = result.clip(lower, upper)
    return result / sum(result)

# main

In [7]:
if __name__ == "__main__":
    from pprint import pprint

    import torchvision.datasets as dset
    import torchvision.transforms as transforms

    """Test `get_norm`"""
    transform = transforms.Compose([
        transforms.ToTensor()
    ])
    trainDataset = dset.CIFAR10(root='cifar', train=True, download=True, transform=transform)
    pprint(get_norm(trainDataset))

    """Test `adv_random_split`"""
    pprint([len(subset) for subset in random_split_by_dist(
        trainDataset,
        size=10,
        dist=pareto,
        alpha=2.
    )])

Files already downloaded and verified
(array([0.49139968, 0.48215841, 0.44653091]),
 array([0.24703223, 0.24348513, 0.26158784]))
[200, 318, 559, 781, 955, 1116, 1466, 3230, 15064, 26311]
