In [None]:
# default_exp data_generator

# Data Generator

> Data Generation classes for experimentation

In [None]:
# export
from fastcore.test import *
from nbdev.showdoc import *
import numpy as np
import jovsatools
import tensorflow as tf

### MNIST multi-target 

In [None]:
#export

class MNISTDataGenerator(object):
    """Generates a multi-calss regression/classification data based on MNIST
    
        x = mnist data flattened
        y = regression/classification targets based on self.initialize()
        
        Arguments:
            additional_y: Integer, used to specify the additional targets
            seed = Integer, used to specify np.random.seed()
        
        Returns: 
            train_x: numpy.ndarray, MNIST train data flattened
            train_y: numpy.ndarray, all the targets; MNIST label + additional_y
            test_x: numpy.ndarray, MNIST test data flattened
            test_y: numpy.ndarray, all the targets; MNIST label + additional_y
    """
    def __init__(self, additional_y=0, seed=1123):
        self.additional_y = additional_y
        self.seed = np.random.seed(seed=seed)

        # needed to store additional generation functions
        self._func_map = {}
        self._initialize(additional_y)
        
        assert list(filter(lambda x: x is None, [
            self.mnist_train_x, 
            self.mnist_train_y, 
            self.mnist_test_x, 
            self.mnist_test_y
        ])) == []
        
        self.train_n = len(self.mnist_train_x)
        self.test_n = len(self.mnist_test_x)
       
    def _initialize(self, additional_y):
        """Prepare functions to approximate """
        epsilon = 0.000123
        C = 102 # emperical value from analyzing MNIST data
        
        # label 0: mnist class
        if additional_y >= 1:
            # label 1: classification (0, 1) target
            self._func_map[0] = lambda x: int(np.random.random()<0.9) if 2*(x[10]**3)-2*x[3]+15 > 8.9 else 0
        
        if additional_y >= 2:
            # label 2: unbounded regression target
            self._func_map[1] = lambda x: np.log(np.sum(x)+epsilon)
        
        if additional_y >= 3:
            # label 3: bounded (0, 1) regression target
            self._func_map[2] = lambda x: np.mean(x)/C
        
        # regular MNIST train/test data
        train, test = tf.keras.datasets.mnist.load_data()
        self.mnist_train_x, self.mnist_train_y = train[0], train[1]
        self.mnist_test_x, self.mnist_test_y = test[0], test[1]
    
    def prepare_datasets(self, mnist_x, mnist_y, n):
        """Main worker function for dataset preparation """
        x, y = [], [] 
        for i in range(n):
            features = mnist_x[i].flatten().reshape(-1, 1)
            labels = [mnist_y[i]]
            for f in range(self.additional_y):
                labels.append(self._func_map[f](features))
            x.append(features)
            y.append(labels)
        return np.asarray(x), np.asarray(y)
                
    def __call__(self, sample_n):
        """Data generation call """
        assert sample_n <= self.train_n, f"max alloable size is {self.train_n}"
        train_x, train_y = [], []
        test_x, test_y = [], []
            
        train_x, train_y = self.prepare_datasets(self.mnist_train_x, self.mnist_train_y, n=sample_n)
        test_x, test_y = self.prepare_datasets(self.mnist_test_x, self.mnist_test_y, n=self.test_n)
        return (train_x, train_y, test_x, test_y)

In [None]:
# data hyperparms
additional_y, train_n = 3, 1000
data_generator = MNISTDataGenerator(additional_y)
datasets  = data_generator(train_n)
train_x, train_y, test_x, test_y = datasets

In [None]:
# tests

#testing if every dataset returned is numpy.ndarray
test_eq(
    len(
        list(
            filter(lambda x: 'numpy.ndarray' in x,
                   (map(lambda x: str(type(x)), datasets))
                  )
        )
    ), 
    len(datasets)
)
test_eq(len(train_y), train_n)
test_eq(train_y.shape[1], additional_y+1)
test_eq(test_y.shape[1], additional_y+1)

# head 0 tests
test_eq(list(set(train_y[:, 0])), [i for i in range(0, 10)])
test_eq(list(set(test_y[:, 0])), [i for i in range(0, 10)])

# head 1 tests
test_eq(list(set(train_y[:, 1])), [i for i in range(0, 2)])
test_eq(list(set(test_y[:, 1])), [i for i in range(0, 2)])

# head 3 tests
test_eq(np.min(train_y[:, 3]) >= 0.0, True ) 
test_eq(np.min(test_y[:, 3]) >= 0.0, True ) 
test_eq(np.max(train_y[:, 3]) <= 1.0, True ) 
test_eq(np.max(test_y[:, 3]) <= 1.0, True ) 


test_eq(len(test_y), 10000) # based on MNIST