**Imports**


In [0]:
import tensorflow as tf
import numpy as np
import random
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Conv2D, MaxPooling2D, BatchNormalization, Input, Reshape
from tensorflow.keras.optimizers import Adam
from scipy.spatial import distance
from scipy.special import softmax
from tensorflow.keras import regularizers

**Declaration of the Experiment class which includes methods for creating the neural networks, retrieving and pre-processing the dataset, and starting the experiment**



In [0]:
class Experiment:


  def __init__(self, learning_rate, num_classes, num_train_images_per_digit, num_test_images_per_digit, num_epochs):
    self.dataset = self.load_dataset()
    self.learning_rate = learning_rate
    self.num_classes = num_classes
    self.num_train_images_per_digit = num_train_images_per_digit
    self.num_test_images_per_digit = num_test_images_per_digit
    self.num_epochs = num_epochs


  def shallow_autoencoder_model(self):
    neural_net = Sequential()
    neural_net.add(Flatten(input_shape = (28, 28)))
    neural_net.add(Dense(784, activation='selu', bias_initializer=tf.constant_initializer(value=0.0), kernel_initializer=tf.constant_initializer(value=0.0)))
    neural_net.add(Reshape((28, 28)))
    neural_net.compile(optimizer='rmsprop', loss='mae', metrics=['accuracy'])
    return neural_net


  def conv_multi_classifier_model(self, learning_rate):

    neural_net = Sequential()
    
    neural_net.add(Conv2D(64, activation='relu', input_shape=(28, 28, 1), kernel_size=(3, 3), strides=(1, 1), padding='same'))
    neural_net.add(BatchNormalization())
    neural_net.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))

    neural_net.add(Flatten())

    neural_net.add(Dense(128, activation='relu'))
    neural_net.add(Dense(32, activation='relu'))
    neural_net.add(Dense(10, activation='relu'))
    neural_net.add(Dense(self.num_classes, activation='softmax'))

    neural_net.compile(optimizer=Adam(lr=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    return neural_net


  def ff_multi_classifier_model(self, learning_rate):
    neural_net = Sequential()
    neural_net.add(Flatten(input_shape = (28, 28)))
    neural_net.add(Dense(128, activation='relu'))
    neural_net.add(Dense(32, activation='relu'))
    neural_net.add(Dense(10, activation='relu'))
    neural_net.add(Dense(self.num_classes, activation='softmax'))
    neural_net.compile(optimizer=Adam(lr=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return neural_net


  def load_dataset(self):
    mnist_dataset = tf.keras.datasets.mnist.load_data()
    (x_train, y_train) , (x_test, y_test) = mnist_dataset
    x_train = x_train / 255.0
    x_test = x_test / 255.0
    return (x_train, y_train) , (x_test, y_test)  


  def pick_n_images_per_digit(self, num_images_per_digit=1, train=True):
    train_data , test_data = self.dataset
    picked_numbers = [0]*10
    picked_data = []
    if train:
      data_used = train_data
    else:
      data_used = test_data
    for x, y in zip(data_used[0], data_used[1]):
      if picked_numbers[y] < num_images_per_digit:
        picked_data.append((x, y))
        picked_numbers[y] += 1
        if len(picked_data) >= num_images_per_digit * 10:
          break
    return picked_data


  def put_picked_data_in_bins(self, picked_data, train=True):
    binned_picked_data = [[] for _ in range(10)]
    for x, y in picked_data:
      binned_picked_data[y].append((x, y))
    if train:
      self.picked_data = binned_picked_data
    else:
      self.picked_data_test = binned_picked_data
    return binned_picked_data


  def flatten_data(self, binned_data):
    flat_data = []
    for bin in binned_data:
      for datum in bin:
        flat_data.append(datum)
    np.random.shuffle(flat_data)
    return flat_data


  def test_with_one_network(self, learning_rate, num_epochs, network_type='conv'):
    picked_training_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_train_images_per_digit, train=True)
    binned_picked_training_data = self.put_picked_data_in_bins(picked_data=picked_training_data, train=True)
    

    picked_test_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_test_images_per_digit, train=False)
    binned_picked_test_data = self.put_picked_data_in_bins(picked_data=picked_test_data, train=False)
    

    binned_picked_training_data = binned_picked_training_data[:self.num_classes]
    binned_picked_test_data = binned_picked_test_data[:self.num_classes]

    x_train = []
    y_train = []

    x_test = []
    y_test = []

    if network_type == 'conv':
      classifier = self.conv_multi_classifier_model(learning_rate=learning_rate)
    elif network_type == 'ff':
      classifier = self.ff_multi_classifier_model(learning_rate=learning_rate)

    for bin_index, training_bin in enumerate(binned_picked_training_data):
        for training_datum in training_bin:
          image, y_datum = training_datum
          image = np.expand_dims(np.asarray(image), axis=2)
          x_train.append(image)
          y_train.append(y_datum)


    for bin_index, test_bin in enumerate(binned_picked_test_data):
        for test_datum in test_bin:
          image, y_datum = test_datum
          image = np.expand_dims(np.asarray(image), axis=2)
          x_test.append(image)
          y_test.append(y_datum)

    print('Training The Traditional Network')
    classifier.fit(np.asarray(x_train), np.asarray(y_train), batch_size=self.num_classes, epochs=num_epochs, verbose=0)
    metrics = classifier.evaluate(np.asarray(x_test), np.asarray(y_test))
    print('Accuracy With Traditional Network : %f' % metrics[1])
    return metrics


  def evaluate_test_data(self, test_data, classifiers):
    correct = 0
    encoders = classifiers
    for x_test, y_test in test_data:
      distances = []
      for label, encoder in enumerate(encoders):
        prediction = encoder.predict(np.asarray([np.expand_dims(x_test, axis=2)]))
        dis = distance.euclidean(np.reshape(prediction[0], (1, 784)), np.reshape(x_test, (1, 784)))
        distances.append(dis)
      distances = softmax(distances)
      #Uncomment to see an array of the reconstruction losses for each class
      #print(distances)
      predicted_label = np.argmin(distances)
      if(predicted_label == y_test):
        correct = correct + 1
    return correct / len(test_data)  


  def train_on_new_class(self, digit_data, encoders):

    for index, (x_digit, y_digit) in enumerate(digit_data):
      encoders[-1].fit(np.asarray(np.expand_dims([x_digit], axis=3)), np.asarray([x_digit]), batch_size=1, epochs=self.num_epochs, verbose=0)


  def start(self):

    picked_training_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_train_images_per_digit, train=True)
    binned_picked_training_data = self.put_picked_data_in_bins(picked_data=picked_training_data, train=True)
    

    picked_test_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_test_images_per_digit, train=False)
    binned_picked_test_data = self.put_picked_data_in_bins(picked_data=picked_test_data, train=False)

    picked_digit = self.num_classes
    binned_picked_digit_data = binned_picked_training_data[picked_digit]
    binned_picked_test_digit_data = binned_picked_test_data[picked_digit]
  
    test_data_digit = binned_picked_test_data[:self.num_classes+1]
    test_data_with_additional_class = self.flatten_data(test_data_digit)

    binned_picked_training_data = binned_picked_training_data[:self.num_classes]
    binned_picked_test_data = binned_picked_test_data[:self.num_classes]

    test_data = self.flatten_data(binned_picked_test_data)

    autoencoders = []

    for i in range(self.num_classes):
      autoencoders.append(self.shallow_autoencoder_model())
      self.train_on_new_class(digit_data=binned_picked_training_data[i], encoders=autoencoders)

      digit_accuracy = self.evaluate_test_data(binned_picked_test_data[i], autoencoders)
      print('digit : %d , accuracy : %f' % (i, digit_accuracy))

    accuracy = self.evaluate_test_data(test_data, autoencoders)
    print('Accuracy of Specialized Networks Before Adding Class : %f' % accuracy)

    
    autoencoders.append(self.shallow_autoencoder_model())
    self.train_on_new_class(digit_data=binned_picked_digit_data, encoders=autoencoders)

    accuracy = self.evaluate_test_data(binned_picked_test_digit_data, autoencoders)
    print('digit : %d , accuracy : %f' % (picked_digit, accuracy))

    accuracy = self.evaluate_test_data(test_data_with_additional_class, autoencoders)
    print('Accuracy of Specialized Networks After Adding Class : %f' % accuracy)



**Calls To Start The Experiment**

Options for network_type: 'conv' or 'ff'


In [0]:
# Parameters of the experiment
num_epochs = 10
ipd = 10
ipd_test = 10
learning_rate_sp = 0.1
learning_rate_trad = 0.001
num_classes = 6

# Instantiate the Experiment Class
experiment = Experiment(
                  learning_rate=learning_rate_sp,
                  num_classes=6,
                  num_train_images_per_digit=ipd,
                  num_test_images_per_digit=ipd_test,
                  num_epochs=num_epochs)
#Start the experiment which trains the specialized networks and tests on them
experiment.start()
#Test on a traditional conv net or ff net.
experiment.test_with_one_network(learning_rate_trad, num_epochs, network_type='conv')