# Utilities

This notebook contains functions for applying, testing, and detecting shift. Additionally, it contains implementation of all neural network models, including end-to-end, concept bottleneck model (CBM), and concept model extraction (CME).

## Loader Functions

Contains functions to load data.

In [None]:
def get_latent_sizes():
    """
    Get the size of each concept (possible values of each class).
    """
    
    return np.array([1, 3, 6, 40, 32, 32])

def get_latent_bases():
    """
    Given vector (x, y, z) where each dimension is in base (a, b, c).
    The following function will convert each of (x, y, z) dimensions to decimal.
    """
    
    latent_sizes = get_latent_sizes()
    latent_bases = np.concatenate((latent_sizes[::-1].cumprod()[::-1][1:],
                                np.array([1,])))
    
    return latent_bases

In [None]:
def sample_latent(size=1):
    """
    Used to randomly sample latent of size 'size'. Randomly sample data of size 
    'size'.

    :param size: how many random samples
    
    :return: sample of 'size' latents
    """
    
    latents_sizes = get_latent_sizes()
    samples = np.zeros((size, len(latents_sizes)))
    for lat_i, lat_size in enumerate(latents_sizes):
        samples[:, lat_i] = np.random.randint(lat_size, size=size)

    return samples

In [None]:
def latent_to_index(latents):
    """
    Convert from given latent to index position of it in the dataset.

    :param latents: array of latent
    
    :return: list of indices
    """
    
    latents_bases = get_latent_bases()
    return np.dot(latents, latents_bases).astype(int)

In [None]:
def show_images_grid(imgs_, num_images=25):
    """
    Used to visualise dSprite image in a grid.

    :param imgs_: images to be drawn
    :param num_images: number of images shown in the grid
    """
    
    ncols = int(np.ceil(num_images**0.5))
    nrows = int(np.ceil(num_images / ncols))
    _, axes = plt.subplots(ncols, nrows, figsize=(nrows * 2, ncols * 2))
    axes = axes.flatten()
    
    # Draw images on the given grid
    for ax_i, ax in enumerate(axes):
        if ax_i < num_images:
            ax.imshow(imgs_[ax_i], cmap='Greys_r',  interpolation='nearest')
            ax.set_xticks([])
            ax.set_yticks([])
        else:
            ax.axis('off')

In [None]:
def load_dsprites(path, dataset_size_used, train_size=0.85, class_index=1):
    """
    Load dSprites dataset, split into train, validation, and test sets.

    :param path: the path of the dataset
    :param dataset_size_used: how many instances we will load into RAM
    :param train_size: size of the training set
    :param class_index: 1 for shape

    :return" x_train, x_test, y_train, y_test, c_train, c_test
    """

    # Load dataset
    dataset_zip = np.load(path)

    # Extract relevant datas from the zip file
    imgs = dataset_zip["imgs"] # contains image data (737280 x 64 x 64)
    latents_values = dataset_zip['latents_values'] # values of latent factors (or in this case concepts)
    latents_classes = dataset_zip['latents_classes'] # classification targets (integer index of latents_values)

    # Select data that will be used
    indices_sampled = np.random.randint(0, imgs.shape[0], dataset_size_used)
    X = np.expand_dims(imgs, axis=-1).astype(('float32'))
    y = latents_classes[:, class_index] # shape for task 1
    c = latents_classes # concepts
    X = X[indices_sampled]
    y = y[indices_sampled]
    c = c[indices_sampled]

    # Split X (image), y (shape for task 1), concepts to train test sets
    x_train, x_test, y_train, y_test, c_train, c_test = train_test_split(X, y, c, train_size=train_size)
    print('Training samples:', x_train.shape[0])
    print('Testing samples:', x_test.shape[0])

    return x_train, x_test, y_train, y_test, c_train, c_test

## Shifts Functions

Contains various shift applicator, detections. The code here is adapted from https://github.com/steverab/failing-loudly, and modified by Maleakhi. Credit to the original authors.

### Shift Applicator

Function to apply various types of shifts.

In [None]:
def apply_gaussian_shift(X_te_orig, y_te_orig, shift_intensity, shift_prop):
    """
    Given a dataset (in the experimentation, the test dataset), this function applies
    Gaussian shift.
    
    :param X_te_orig: the X matrix (dataset features) where we will apply shifts.
    :param y_te_orig: the y (label) where we will apply shifts.
    :param shift: the shift name comprising shift intensity and proportion of data affected
                 ({large/medium/small}_gn_shift_{0.1/0.5/1})

    :return: shifted X and y
    """

    X_te_1 = None
    y_te_1 = None

    # Gaussian noise shift on the features
    if shift_intensity == "large":
        noise_amt = 100.0
    elif shift_intensity == "medium":
        noise_amt = 10.0
    else:
        noise_amt = 1.0

    normalization = 255.0
    X_te_1, _ = gaussian_noise_subset(X_te_orig, noise_amt, normalization=normalization, delta_total=shift_prop)
    y_te_1 = y_te_orig.copy()
    
    return (X_te_1, y_te_1)

In [None]:
def apply_ko_shift(X_te_orig, y_te_orig, shift_intensity, cl=0):
    """
    Given a dataset (in the experimentation, the test dataset), this function applies
    the knockout shift, removing subsets of data.
    
    :param X_te_orig: the X matrix (dataset features) where we will apply shifts.
    :param y_te_orig: the y (label) where we will apply shifts.
    :param shift_intensity: the shift name comprising shift proportion of data affected
                 ko_shift_(0.1/0.5/1.0)
    :param cl: indicate class that will be removed (making general imbalance in the dataset)

    :return: shifted X and y
    """

    X_te_1 = None
    y_te_1 = None

    # Knockout shift, creating class imbalance on the dataset
    if shift_intensity == "large":
        prop = 1.0
    elif shift_intensity == "medium":
        prop = 0.5
    else:
        prop = 0.1
    
    X_te_1, y_te_1 = knockout_shift(X_te_orig, y_te_orig, cl, prop)

    return (X_te_1, y_te_1)

In [None]:
def apply_img_shift(X_te_orig, y_te_orig, orig_dims, shift_intensity, shift_prop, shift_types=[]):
    """
    Given a dataset (in the experimentation, the test dataset), this function applies
    the knockout shift, removing subsets of data.
    
    :param X_te_orig: the X matrix (dataset features) where we will apply shifts.
    :param y_te_orig: the y (label) where we will apply shifts.
    :param orig_dims: original dimensions of the image (img.shape[1:] - width, height, channel)
    :param shift_intensity: "small", "medium", "large"
    :param shift_proportion: proportion of the data shifted, default value = 0.1, 0.5, 1.0
    :param shift_types: list indicating shifts that we want to apply. If given [], apply all shifts randomly.
        Possible shift types includes:
        - 'width_shift'
        - 'height_shift'
        - 'rotation'
        - 'shear'
        - 'zoom'
        - 'flip'
    
    :return: shifted X and y
    """

    X_te_1 = None
    y_te_1 = None

    # Get default configuration
    config = image_data_generator_config()

    ## No specific type of image shifts given
    if not shift_types:
        rotation_range = config["rotation_range"][shift_intensity]
        width_shift_range = config["width_shift_range"][shift_intensity]
        height_shift_range = config["height_shift_range"][shift_intensity]
        shear_range = config["shear_range"][shift_intensity]
        zoom_range = config["zoom_range"][shift_intensity]
        horizontal_flip = config["flip_range"][shift_intensity][0]
        vertical_flip = config["flip_range"][shift_intensity][1]
    
    ## Else, select parameters accordingly
    else:
        all_shifts = ["width_shift", "height_shift", "rotation", "shear", "zoom", "flip"]

        for shift_type in shift_types:
            if shift_type == "width_shift":
                width_shift_range = config["width_shift_range"][shift_intensity]
            if shift_type == "height_shift":
                height_shift_range = config["height_shift_range"][shift_intensity]
            if shift_type == "rotation":
                rotation_range = config["rotation_range"][shift_intensity]
            if shift_type == "shear":
                shear_range = config["shear_range"][shift_intensity]
            if shift_type == "zoom":
                zoom_range = config["zoom_range"][shift_intensity]
            if shift_type == "flip":
                horizontal_flip = config["flip_range"][shift_intensity][0]
                vertical_flip = config["flip_range"][shift_intensity][1]
            
            all_shifts.remove(shift_type)
        
        # For non-included, use default value
        for shift_type in all_shifts:
            if shift_type == "width_shift":
                width_shift_range = config["width_shift_range"]["default"]
            if shift_type == "height_shift":
                height_shift_range = config["height_shift_range"]["default"]
            if shift_type == "rotation":
                rotation_range = config["rotation_range"]["default"]
            if shift_type == "shear":
                shear_range = config["shear_range"]["default"]
            if shift_type == "zoom":
                zoom_range = config["zoom_range"]["default"]
            if shift_type == "flip":
                horizontal_flip = config["flip_range"]["default"][0]
                vertical_flip = config["flip_range"]["default"][1]

    ## Apply shift using the parameters
    X_te_1, _ = image_generator(X_te_orig, 
                                orig_dims, 
                                rotation_range, 
                                width_shift_range,
                                height_shift_range, 
                                shear_range, 
                                zoom_range, 
                                horizontal_flip, 
                                vertical_flip, 
                                delta=shift_prop)
    y_te_1 = y_te_orig.copy()
    
    return (X_te_1, y_te_1)

### Shift Tester

Contains functions related to statistical tests.

In [None]:
def test_shift_bin(n_successes, n, p):
    """
    Binomial test for domain classifier. Used to check whether accuracy is statistically
    significant.

    :param n_successes: number of correctly predicted instances
    :param n: number of predictions made (test instances)
    :param p: hypothesised probability of success

    :return: pvalue
    """
    p_val = binom_test(n_successes, n, p)
    return p_val

In [None]:
    def one_dimensional_test(X1, X2, test_type="KS"):
        """
        Given two matrices (each matrix is of size n x feature), we conduct one 
        dimensional statistical test to each of the component using Kolmogorov-Smirnov
        or Anderson Darling to check whether each component comes from the same distribution.

        :param X1: matrix of data1 of size n x number of features
        :param X2: matrix of data2 of size n x number of features
        :param test_type: specify the one dimensional test to compare distributions
            of components (can be KS || AD)

        :return: minimum p values from all individual test, where we will check if
            its value less than alpha / number of components
        """
        p_vals = []
        t_vals = []

        # For each dimension we conduct a separate statistical test
        # Iterate over components
        ## Note: need to modify range, weird behaviour python
        for i in range(X1.shape[1]):
            feature_X1 = X1[:, i]
            feature_X2 = X2[:, i]

            t_val, p_val = None, None

            if test_type == "KS":
                # Compute KS statistic and p-value
                t_val, p_val = ks_2samp(feature_X1, feature_X2)
            else:
                t_val, _, p_val = anderson_ksamp([feature_X1.tolist(), feature_X2.tolist()])

            p_vals.append(p_val)
            t_vals.append(t_val)

        # Apply the Bonferroni correction to bound the family-wise error rate. This can be done by picking the minimum
        # p-value from all individual tests.
        p_vals = np.array(p_vals)
        p_val = np.min(p_vals)

        return p_val, p_vals, t_vals

In [None]:
def test_chi2_shift(X1, X2, nb_classes):
    """
    Used for testing BBSD with hard threshold. Theoretically we conduct categorical
    chi2 test to test whether the distribution of the class follow theoretical
    chi2 distributions.

    :param X1: matrix of data1 of size n x number of features
    :param X2: matrix of data2 of size n x number of features
    :param nb_classes: number of classes (for degree of freedom)

    :return: p-value
    """

    # Calculate observed and expected counts
    freq_exp = np.zeros(nb_classes)
    freq_obs = np.zeros(nb_classes)

    unique_X1, counts_X1 = np.unique(X1, return_counts=True)
    total_counts_X1 = np.sum(counts_X1)
    unique_X2, counts_X2 = np.unique(X2, return_counts=True)
    total_counts_X2 = np.sum(counts_X2)

    for i in range(len(unique_X1)):
        freq_exp[unique_X1[i]] = counts_X1[i]
        
    for i in range(0, len(unique_X2)):
        i = int(i)
        freq_obs[unique_X2[i]] = counts_X2[i]

    if np.amin(freq_exp) == 0 or np.amin(freq_obs) == 0:
        # The chi-squared test using contingency tables is not well defined if zero-element classes exist, which
        # might happen in the low-sample regime. In this case, we calculate the standard chi-squared test.
        for i in range(0, len(unique_X1)):
            i = int(i)
            val = counts_X1[i] / total_counts_X1 * total_counts_X2
            freq_exp[unique_X1[i]] = val
        chi2, p_val = chisquare(freq_obs, f_exp=freq_exp)
    else:
        # In almost all cases, we resort to obtaining a p-value from the chi-squared test's contingency table.
        freq_conc = np.array([freq_exp, freq_obs])
        chi2, p_val, _, _ = chi2_contingency(freq_conc)
    
    return chi2, p_val

In [None]:
def multi_dimensional_test(X1, X2):
    """
    Perform MMD multi dimensional test. See paper for more details.

    :param X1: matrix of data1 of size n x number of features
    :param X2: matrix of data2 of size n x number of features

    :return: p-value
    """

    # torch_two_sample somehow wants the inputs to be explicitly casted to float 32.
    X1 = X1.astype(np.float32)
    X2 = X2.astype(np.float32)

    p_val = None

    # Do the MMD test
    mmd_test = MMDStatistic(len(X1), len(X2))

    # As per the original MMD paper, the median distance between all points in the aggregate sample from both
    # distributions is a good heuristic for the kernel bandwidth, which is why compute this distance here.
    if len(X1.shape) == 1:
        X1 = X1.reshape((len(X1),1))
        X2 = X2.reshape((len(X2),1))
        all_dist = distance.cdist(X1, X2, 'euclidean')
    else:
        all_dist = distance.cdist(X1, X2, 'euclidean')
    median_dist = np.median(all_dist)

    # Calculate MMD.
    t_val, matrix = mmd_test(torch.autograd.Variable(torch.tensor(X1)),
                                torch.autograd.Variable(torch.tensor(X2)),
                                alphas=[1/median_dist], ret_matrix=True)
    p_val = mmd_test.pval(matrix)
        
    return p_val

## Dimensionality Reduction Functions

Contain functions implementing various dimensionality reduction methods that we apply in the experiments.

### Standard Dimensionality Reduction Methods

In [None]:
def principal_components_anaylsis(X, n_components=None):
    """
    Apply principal components to data (fit).

    :param X: datasets to be fitted (matrix)
    :param n_components: number of components PCA
    """

    # If number of components is not specified, calculate first
    if n_components is None:
        # Explain 80% variance of the original data
        pca = PCA(n_components=.8, svd_solver="full")
        pca.fit(X)
        n_components = pca.n_components_ 
    
    pca = PCA(n_components=n_components)
    pca.fit(X)
    
    return pca

In [None]:
def sparse_random_projection(X, n_components=None):
    """
    Apply SRP to data (fit).

    :param X: datasets to be fitted (matrix)
    :param n_components: number of components PCA
    """

    # If number of components is not specified, calculate first
    if n_components is None:
        # Explain 80% variance of the original data
        pca = PCA(n_components=.8, svd_solver="full")
        pca.fit(X)
        n_components = pca.n_components_ 
    
    srp = SparseRandomProjection(n_components=n_components)
    srp.fit(X)
    return srp

### BBSD Models

Blackbox shift detection models are the original models for the original task.

#### End to End Models

In [None]:
## Shared layers block class
class CNNBlock(layers.Layer):
    """
    Shared layers for the input-to-concept models and end-to-end.
    """

    def __init__(self):
        super(CNNBlock, self).__init__()

        # Shared layers component
        self.conv1 = layers.Conv2D(64, (8, 8), strides=(2, 2), padding='same')
        self.do1 = layers.Dropout(0.3)

        self.conv2 = layers.Conv2D(128, (6, 6), strides=(2, 2), padding='valid')
        self.bn1 = layers.BatchNormalization()

        self.conv3 = layers.Conv2D(128, (5, 5), strides=(1, 1), padding='valid')
        self.pool1 = layers.MaxPooling2D((2, 2))
        self.do2 = layers.Dropout(0.3)

        self.flatten = layers.Flatten()
        self.dense1 = layers.Dense(128, activation="relu")
        self.do3 = layers.Dropout(0.4)
        self.dense2 = layers.Dense(64, activation="relu")
        self.do4 = layers.Dropout(0.2)
    
    def call(self, input):
        x = self.conv1(input)
        x = self.do1(x)
        x = self.conv2(x)
        x = self.bn1(x)
        x = self.conv3(x)
        x = self.pool1(x)
        x = self.do2(x)
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.do3(x)
        x = self.dense2(x)
        x = self.do4(x)

        return x

In [None]:
def NeuralNetworkClassifier(num_classes):
    """
    Build and return end-to-end model.

    :param num_classes: number of classes.

    :return: model architecture
    """

    img_inputs = tf.keras.Input(shape=(64, 64, 1))

    # Shared layers
    x = CNNBlock()(img_inputs)

    # prediction using softmax (shape has three)
    out = layers.Dense(num_classes, activation="softmax")(x)

    # Return model
    model = tf.keras.Model(inputs=img_inputs, outputs=out)
    return model

#### Concept Bottleneck Models

In [None]:
## Input to Concept Model
def MultitaskModel():
    """
    Build and return multi-task model.
    """
        
    img_inputs = tf.keras.Input(shape=(64, 64, 1))
    
    # Shared layers
    x = CNNBlock()(img_inputs)
    
    # Task specific layer
    task_color = layers.Dense(1, activation="softmax", name="color")(x)
    task_shape = layers.Dense(3, activation="softmax", name="shape")(x)
    task_scale = layers.Dense(6, activation="softmax", name="scale")(x)
    task_rotation = layers.Dense(40, activation="softmax", name="rotation")(x)
    task_x = layers.Dense(32, activation="softmax", name="x")(x)
    task_y = layers.Dense(32, activation="softmax", name="y")(x)
    
    # Return model
    model = tf.keras.Model(inputs=img_inputs, outputs=[task_color, task_shape, task_scale, task_rotation, task_x, task_y])
    return model

In [None]:
## Input to Concept Model
def IndividualModel(num_classes, name):
    """
    Build an individual model for a particular task.

    :param num_classes: number of possible values for a particular task.
    :param name: name of the task

    :return: model architecture
    """
        
    img_inputs = tf.keras.Input(shape=(64, 64, 1))
    
    # Shared layers
    x = CNNBlock()(img_inputs)
    
    # Task specific layer
    output = layers.Dense(num_classes, activation="softmax", name=name)(x)

    # Return model
    model = tf.keras.Model(inputs=img_inputs, outputs=output)
    return model

def EnsembleModel():
    """
    Stack individual model, one for each task.

    :return: list of models
    """

    model_color = IndividualModel(num_classes=1, name="color")
    model_shape = IndividualModel(num_classes=3, name="shape")
    model_scale = IndividualModel(num_classes=6, name="scale")
    model_rotation = IndividualModel(num_classes=40, name="rotation")
    model_x = IndividualModel(num_classes=32, name="x")
    model_y = IndividualModel(num_classes=32, name="y")

    models = [model_color, model_shape, model_scale, model_rotation, model_x, model_y]

    return models

In [None]:
## Input to Concept Model
def BinaryModel():
    """
    Build binary model for the task.

    :param num_classes: number of possible values for a particular task.
    :param name: name of the task

    :return: model architecture
    """
        
    img_inputs = tf.keras.Input(shape=(64, 64, 1))
    
    # Shared layers
    x = CNNBlock()(img_inputs)
    
    # Task specific layer
    output = layers.Dense(1+3+6+40+32+32, activation="sigmoid")(x)

    # Return model
    model = tf.keras.Model(inputs=img_inputs, outputs=output)
    return model

## Domain Classifier Functions

Contains related domain classifier functions, including data generation, model builder, etc.

In [None]:
def generate_domain_classifier_data(x_clean, y_clean, x_altered, y_altered, delta=0.5):
    """
    Given two sets of data (x_clean, y_clean) and (x_altered, y_altered), we merge
    them to create a new dataset that will be the input for the domain classifier.

    :param x_clean: the clean (training) data feature
    :param y_clean: the clean (training) data label
    :param x_altered: the o.o.d/ shifted/ real world (test) data feature
    :param y_altered the o.o.d/ shifted/ real world (test) data label
    :param delta: new training and testing proportion

    :return: x_train_new, y_train_new, y_train_old, x_test_new, y_test_new, y_test_old
       where y_train_new is label 0 if comes from clean and 1 if comes from altered
    """

    ## Clean
    # Get indices
    training_indices = np.random.choice(x_clean.shape[0], ceil(x_clean.shape[0] * delta), replace=False)    
    test_indices = [i for i in range(x_clean.shape[0]) if i not in training_indices]

    # Extract datasets
    x_clean_train = x_clean[training_indices, :]
    x_clean_test = x_clean[test_indices, :]
    y_clean_train = y_clean[training_indices]
    y_clean_test = y_clean[test_indices]

    ## Altered
    x_altered_train = x_altered[training_indices, :]
    x_altered_test = x_altered[test_indices, :]
    y_altered_train = y_altered[training_indices]
    y_altered_test = y_altered[test_indices]

    ## Recombine datasets
    x_train_new = np.append(x_clean_train, x_altered_train, axis=0)
    y_train_old = np.append(y_clean_train, y_altered_train)
    y_train_new = np.zeros(len(x_train_new))
    y_train_new[len(x_clean_train):] = np.ones(len(x_altered_train))

    x_test_new = np.append(x_clean_test, x_altered_test, axis=0)
    y_test_old = np.append(y_clean_test, y_altered_test)
    y_test_new = np.zeros(len(x_test_new))
    y_test_new[len(x_clean_test):] = np.ones(len(x_altered_test))

    ## Shuffle them
    x_train_new, y_train_new, y_train_old = unison_shuffled_copies(x_train_new, y_train_new, y_train_old)
    x_test_new, y_test_new, y_test_old = unison_shuffled_copies(x_test_new, y_test_new, y_test_old)

    return x_train_new, y_train_new, y_train_old, x_test_new, y_test_new, y_test_old

## Helper Functions

Contains functionalities helping the other functions.

In [None]:
def gaussian_noise_subset(x, noise_amt, normalization=1.0, delta_total=1.0, clip=True):
    """
    Apply gaussian noise to x.
    
    :param noise_amt: the amount of gaussian noise applied.
    :param normalization: max range of the value, for pixel it would be 255 (represent color)
    :param delta_total: proportion of data which we randomly applied noise
    :param clip: whether to clip the new result between 0 and 1.
    """
    
    # Indices of images where we apply noise (random indices)
    indices = np.random.choice(x.shape[0], ceil(x.shape[0] * delta_total), replace=False)
    x_mod = x[indices, :] # images
    
    # Create noise of appropriate size for all pixels (or other data structures)
    noise = np.random.normal(0, noise_amt / normalization, (x_mod.shape[0], x_mod.shape[1]))
    
    # Clip X
    if clip:
        x_mod = np.clip(x_mod + noise, 0., 1.)
    else:
        x_mod = x_mod + noise
    
    # Return noisy X
    x[indices, :] = x_mod
    return x, indices

In [None]:
def knockout_shift(x, y, cl, delta):
    """
    The knockout shift remove instances from a class in order to create class imbalance.
    
    :param x: the feature matrix
    :param y: the label array
    :param cl: the class label where we will remove its instances to create imbalance
    :param delta: proportion of data to be shifted.
    """
    
    # Indices to be deleted
    del_indices = np.where(y == cl)[0]
    until_index = ceil(delta * len(del_indices))
    
    if until_index % 2 != 0:
        until_index = until_index + 1
    
    del_indices = del_indices[:until_index]
    
    # Delete instances of del_indices
    x = np.delete(x, del_indices, axis=0)
    y = np.delete(y, del_indices, axis=0)
    
    # Return reduced data
    return x, y

In [None]:
def image_generator(x, 
                    orig_dims, 
                    rot_range, 
                    width_range, 
                    height_range, 
                    shear_range, 
                    zoom_range, 
                    horizontal_flip, 
                    vertical_flip, 
                    delta=1.0):
    """
    Perform image perturbations (e.g., translation, rotation, shear, zoom).
    
    :param x: the image.
    :param orig_dims: original dimension of the input image (not including batch size: height, width, channel only).
    :param rot_range, width_range, height_range, shear_range, zoom_range, horizontal_flip, vertical_flip:
        range of augmentation values (for flip - boolean indicating whether to do horizontal and vertical flip)
    :param delta: proportion of data where we will apply the shift.
    
    :return: new images, and indices where we apply the image perturbations.
    """
    
    # Random indices where we will apply shift transformation
    indices = np.random.choice(x.shape[0], ceil(x.shape[0] * delta), replace=False)
    datagen = ImageDataGenerator(rotation_range=rot_range,
                                 width_shift_range=width_range,
                                 height_shift_range=height_range,
                                 shear_range=shear_range,
                                 zoom_range=zoom_range,
                                 horizontal_flip=horizontal_flip,
                                 vertical_flip=vertical_flip,
                                 fill_mode="constant")
    
    # Subset of images with random indices
    x_mod = x[indices, :]
    for idx in range(len(x_mod)):
        img_sample = x_mod[idx, :].reshape(orig_dims) # reshape single image to original image
        mod_img_sample = datagen.flow(np.array([img_sample]), batch_size=1)[0]
        x_mod[idx, :] = mod_img_sample.reshape(np.prod(mod_img_sample.shape))
    x[indices, :] = x_mod
    
    return x, indices

In [None]:
def unison_shuffled_copies(a, b, c):
    """
    Used to shuffle a, b, c together.

    :param a, b, c: arrays of same length.

    :return: shuffled a, b, c
    """

    assert len(a) == len(b)
    p = np.random.permutation(len(a))
    return a[p], b[p], c[p]

In [None]:
def image_data_generator_config():
    """
    Encapsulate image data generator configuration.

    :return: dictionary containing the configuration.
    """

    # Rotation
    rotation_range = {
        "default": 0,
        "small": 10,
        "medium": 40,
        "large": 90
    }

    # Width shift (x-translation)
    width_shift_range = {
        "default": 0.0,
        "small": 0.05,
        "medium": 0.2,
        "large": 0.4
    }

    # Height shift (y-translation)
    height_shift_range = {
        "default": 0.0,
        "small": 0.05,
        "medium": 0.2,
        "large": 0.4
    }

    # Shear
    shear_range = {
        "default": 0.0,
        "small": 0.1,
        "medium": 0.2,
        "large": 0.3
    }

    # Zoom
    zoom_range = {
        "default": 0.0,
        "small": 0.1,
        "medium": 0.2,
        "large": 0.4
    }

    # Flip
    flip = {
        "default": (False, False),
        "small": (False, False),
        "medium": (True, False),
        "large": (True, True)
    }

    return {
        "rotation_range": rotation_range,
        "width_shift_range": width_shift_range,
        "height_shift_range": height_shift_range,
        "shear_range": shear_range,
        "zoom_range": zoom_range,
        "flip_range": flip
    }
