**Imports**


In [None]:
import tensorflow as tf
import tensorflow.keras.backend as kb
import numpy as np
import random
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Conv2D, Conv1D, MaxPooling2D, MaxPooling1D, BatchNormalization, Input, Reshape, Dropout, ActivityRegularization, Activation
from tensorflow.keras.optimizers import Adam, RMSprop
from scipy.spatial import distance
from scipy.special import softmax
from tensorflow.keras import regularizers
from google.colab.patches import cv2_imshow
import time
import math
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.decomposition import PCA
import matplotlib.cm as cm

**Declaration of the Experiment class which includes methods for creating the neural networks, retrieving and pre-processing the dataset, and starting the experiment**



In [None]:
class NegativeWeightsConstraint(tf.keras.constraints.Constraint):

  def map_pos_to_zero(element):
    if element > 0.0:
      return 0.0
    else:
      return element

  def __call__(self, w):
    return tf.map_fn(self.map_pos_to_zero, w)

class Experiment:


  def __init__(self, learning_rate, learning_rate_trad, learning_rate_preconv, num_classes, num_train_images_per_digit, num_test_images_per_digit, num_epochs, encoder_sizes):
    self.dataset = self.load_dataset()
    self.learning_rate = learning_rate
    self.learning_rate_trad = learning_rate_trad
    self.learning_rate_preconv = learning_rate_preconv
    self.num_classes = num_classes
    self.num_train_images_per_digit = num_train_images_per_digit
    self.num_test_images_per_digit = num_test_images_per_digit
    self.num_epochs = num_epochs
    self.autoencoders = []
    self.intermediates = []
    self.encoder_sizes = encoder_sizes
    self.outputs = [[] for _ in range(self.num_classes)]
    self.averages = [[0.0]*self.encoder_sizes[-1] for _ in range(self.num_classes)]
    self.sums = [[0.0]*self.encoder_sizes[-1] for _ in range(self.num_classes)]
    self.std_devs = [[0.0]*self.encoder_sizes[-1] for _ in range(self.num_classes)]
    
  def custom_loss(self, y_actual, y_pred):
    #custom_loss = tf.math.square(y_actual-y_pred) + tf.losses.cosine_similarity(y_actual, y_pred) 
    custom_loss = kb.sqrt(kb.sum(kb.square(y_pred - y_actual), axis=-1))
    return custom_loss

  class PoolingKernelInitializer(tf.keras.initializers.Initializer):  
    def __call__(self, shape, dtype=None):
      first_layer_size = shape[0]
      second_layer_size = shape[1]
      weight_matrix = [[0.0 for _ in range(second_layer_size)] for _ in range(first_layer_size)]
      sqrt_first_layer = math.floor(pow(first_layer_size, 0.5))
      sqrt_second_layer = math.floor(pow(second_layer_size, 0.5))
      ratio = math.floor(sqrt_first_layer/sqrt_second_layer)
      for i in range(0, first_layer_size):
          row1 = math.floor(i/sqrt_first_layer)
          column1 = i % sqrt_first_layer
          for j in range(0, second_layer_size):
              row2 = math.floor(j/sqrt_second_layer)
              column2 = j % sqrt_second_layer
              if abs(math.floor(row1 / ratio) - row2) < 1 and abs(math.floor(column1 / ratio) - column2) < 1:
                  #weight_matrix[i][j] = np.random.choice([-0.25, 0.75], p=[1 / 4, 3 / 4])
                  weight_matrix[i][j] = np.random.normal()
              else:
                  weight_matrix[i][j] = 0.0
      return tf.convert_to_tensor(np.array(weight_matrix), dtype=tf.float32)

  class InversePoolingKernelInitializer(tf.keras.initializers.Initializer):  
    def __call__(self, shape, dtype=None):
      first_layer_size = shape[0]
      second_layer_size = shape[1]
      weight_matrix = [[0.0 for _ in range(second_layer_size)] for _ in range(first_layer_size)]
      sqrt_first_layer = math.floor(pow(first_layer_size, 0.5))
      sqrt_second_layer = math.floor(pow(second_layer_size, 0.5))
      ratio = math.floor(sqrt_second_layer/sqrt_first_layer)
      for i in range(0, first_layer_size):
          row1 = math.floor(i/sqrt_first_layer)
          column1 = i % sqrt_first_layer
          for j in range(0, second_layer_size):
              row2 = math.floor(j/sqrt_second_layer)
              column2 = j % sqrt_second_layer
              if abs(math.floor(row2 / ratio) - row1) < 1 and abs(math.floor(column2 / ratio) - column1) < 1:
                  #weight_matrix[i][j] = np.random.choice([-0.25, 0.75], p=[1 / 4, 3 / 4])
                  weight_matrix[i][j] = np.random.normal()
              else:
                  weight_matrix[i][j] = 0.0
      #return tf.constant(np.array(weight_matrix))
      return tf.convert_to_tensor(np.array(weight_matrix), dtype=tf.float32)


  def shallow_autoencoder_model(self, encoder_size, encoder_index):
    neural_net = Sequential()
    neural_net.add(Flatten(input_shape = (int(math.sqrt(encoder_size)), int(math.sqrt(encoder_size)), 1)))
    # neural_net.add(Dense(round(self.encoder_sizes[encoder_index + 1]),
    #                   activation='linear',
    #                   bias_initializer=tf.constant_initializer(value=-0.001),
    #                   kernel_initializer=self.PoolingKernelInitializer(),
    #                   kernel_regularizer=tf.keras.regularizers.l1(l1=0.000),
    #                   activity_regularizer=tf.keras.regularizers.l1(l1=0.005)))
    # neural_net.add(Activation('selu'))
    # neural_net.add(Dense(encoder_size,
    #                       activation='linear',
    #                       bias_initializer=tf.constant_initializer(value=0.000),
    #                       kernel_initializer=self.InversePoolingKernelInitializer,
    #                       kernel_regularizer=tf.keras.regularizers.l1(l1=0.000),
    #                       activity_regularizer=tf.keras.regularizers.l1(l1=0.005)))
    neural_net.add(Dense(round(self.encoder_sizes[encoder_index + 1]),
                         activation='linear',
                         activity_regularizer=tf.keras.regularizers.l1_l2(l1=0.01, l2=0.01),
                         #kernel_regularizer=tf.keras.regularizers.l1(l1=0.000),
                         #bias_initializer=tf.keras.initializers.constant(value=-1.0)
                         ))
    neural_net.add(Activation('selu'))
    neural_net.add(Dense(encoder_size,
                         activation='linear'))
    neural_net.add(Activation('sigmoid'))
    neural_net.add(Reshape((int(math.sqrt(encoder_size)), int(math.sqrt(encoder_size)), 1)))
    neural_net.compile(optimizer=RMSprop(lr=self.learning_rate), loss='mse', metrics=['accuracy'])
    return neural_net

  def conv_encoder(self):
    neural_net = Sequential()
    neural_net.add(Conv2D(32, activation='relu', input_shape=(28, 28, 1), kernel_size=(3, 3), strides=(1, 1), padding='same'))
    neural_net.add(BatchNormalization())
    neural_net.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) 
    neural_net.add(Conv2D(16, activation='relu', input_shape=(28, 28, 1), kernel_size=(3, 3), strides=(1, 1), padding='same'))
    neural_net.add(BatchNormalization())
    neural_net.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2))) 
    neural_net.add(Flatten())
    neural_net.add(Dense(196, activation='relu'))
    neural_net.add(Dense(784, activation='relu'))
    neural_net.add(Reshape((28, 28, 1)))
    neural_net.compile(optimizer=Adam(lr=self.learning_rate_preconv), loss='mse', metrics=['accuracy'])
    return neural_net

  def conv_multi_classifier_model(self, learning_rate):

    neural_net = Sequential()
    
    neural_net.add(Conv2D(64, activation='relu', input_shape=(28, 28, 1), kernel_size=(3, 3), strides=(1, 1), padding='same'))
    neural_net.add(BatchNormalization())
    neural_net.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))

    neural_net.add(Flatten())

    neural_net.add(Dense(128, activation='relu'))
    neural_net.add(Dense(32, activation='relu'))
    neural_net.add(Dense(10, activation='relu'))
    neural_net.add(Dense(self.num_classes, activation='softmax'))

    neural_net.compile(optimizer=Adam(lr=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

    return neural_net


  def ff_multi_classifier_model(self, learning_rate):
    neural_net = Sequential()
    neural_net.add(Flatten(input_shape = (28, 28)))
    neural_net.add(Dense(128, activation='relu'))
    neural_net.add(Dense(32, activation='relu'))
    neural_net.add(Dense(10, activation='relu'))
    neural_net.add(Dense(self.num_classes, activation='softmax'))
    neural_net.compile(optimizer=Adam(lr=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return neural_net


  def load_dataset(self):
    mnist_dataset = tf.keras.datasets.mnist.load_data()
    (x_train, y_train) , (x_test, y_test) = mnist_dataset
    x_train = x_train / 255.0
    x_test = x_test / 255.0
    return (x_train, y_train) , (x_test, y_test)  


  def pick_n_images_per_digit(self, num_images_per_digit=1, train=True):
    train_data , test_data = self.dataset
    picked_numbers = [0]*10
    picked_data = []
    if train:
      data_used = train_data
    else:
      data_used = test_data

    data_used_zipped_shuffled = list(zip(data_used[0], data_used[1]))
    np.random.shuffle(data_used_zipped_shuffled)
    data_used_zipped_shuffled_x, data_used_zipped_shuffled_y = zip(*data_used_zipped_shuffled)

    for x, y in zip(data_used_zipped_shuffled_x, data_used_zipped_shuffled_y):
      if picked_numbers[y] < num_images_per_digit:
        picked_data.append((x, y))
        picked_numbers[y] += 1
        if len(picked_data) >= num_images_per_digit * 10:
          break
    return picked_data


  def put_picked_data_in_bins(self, picked_data, train=True):
    binned_picked_data = [[] for _ in range(10)]
    for x, y in picked_data:
      binned_picked_data[y].append((x, y))
    if train:
      self.picked_data = binned_picked_data
    else:
      self.picked_data_test = binned_picked_data
    return binned_picked_data


  def flatten_data(self, binned_data, randomize=True):
    flat_data = []
    for bin in binned_data:
      for datum in bin:
        flat_data.append(datum)
    if randomize is True:
      np.random.shuffle(flat_data)
    return flat_data


  def test_with_one_network(self, learning_rate, num_epochs, network_type='conv'):
    picked_training_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_train_images_per_digit, train=True)
    binned_picked_training_data = self.put_picked_data_in_bins(picked_data=picked_training_data, train=True)
    

    picked_test_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_test_images_per_digit, train=False)
    binned_picked_test_data = self.put_picked_data_in_bins(picked_data=picked_test_data, train=False)
    

    binned_picked_training_data = binned_picked_training_data[:self.num_classes]
    binned_picked_test_data = binned_picked_test_data[:self.num_classes]

    x_train = []
    y_train = []

    x_test = []
    y_test = []

    if network_type == 'conv':
      classifier = self.conv_multi_classifier_model(learning_rate=learning_rate)
    elif network_type == 'ff':
      classifier = self.ff_multi_classifier_model(learning_rate=learning_rate)

    for bin_index, training_bin in enumerate(binned_picked_training_data):
        for training_datum in training_bin:
          image, y_datum = training_datum
          image = np.expand_dims(np.asarray(image), axis=2)
          x_train.append(image)
          y_train.append(y_datum)


    for bin_index, test_bin in enumerate(binned_picked_test_data):
        for test_datum in test_bin:
          image, y_datum = test_datum
          image = np.expand_dims(np.asarray(image), axis=2)
          x_test.append(image)
          y_test.append(y_datum)

    print('Training The Traditional Network')
    classifier.fit(np.asarray(x_train), np.asarray(y_train), batch_size=self.num_classes, epochs=num_epochs, verbose=0)
    metrics = classifier.evaluate(np.asarray(x_test), np.asarray(y_test))
    print('Accuracy With Traditional Network : %f' % metrics[1])
    return metrics


  def initialize_autoencoders(self):
    for encoder_index, encoder_size in enumerate(self.encoder_sizes[:-1]):
      autoencoder = self.shallow_autoencoder_model(encoder_size, encoder_index)
      intermediate = tf.keras.Model(inputs=autoencoder.input, outputs=autoencoder.layers[2].output)
      self.autoencoders.append(autoencoder)
      self.intermediates.append(intermediate)

  def train_autoencoders(self, train_data):
    for _ in range(self.num_epochs):
      np.random.shuffle(train_data)
      for index, (x, y) in enumerate(train_data):
        out = x
        for index, autoencoder in enumerate(self.autoencoders):
          autoencoder.fit(np.asarray([out]), np.asarray([out]), batch_size=1, epochs=1, verbose=0)
          out = self.intermediates[index](np.asarray([out]))
          out = np.squeeze(out, axis=0)
          out = np.reshape(out, newshape=(int(math.sqrt(out.shape[0])), int(math.sqrt(out.shape[0]))))
        out_flattened = np.reshape(out, newshape=(out.shape[0]*out.shape[0],))
        self.outputs[y].append(out_flattened)
        self.sums[y] = self.sums[y] + out_flattened
        self.averages[y] = self.sums[y] / len(self.outputs[y])
        if len(self.outputs[y]) > 1:
          self.std_devs[y] = math.sqrt(((self.std_devs[y]*(len(self.outputs[y])-2))+(math.pow(distance.euclidean(out_flattened, self.averages[y]), 2))) / (len(self.outputs[y])-1))
        else:
          self.std_devs[y] = 0.0

  def evaluate_on_test_data(self, test_data):
      score = 0
      results = [[] for _ in range(self.num_classes)]
      sums = [[0.0]*self.encoder_sizes[-1] for _ in range(self.num_classes)]
      averages = [[0.0]*self.encoder_sizes[-1] for _ in range(self.num_classes)]
      for index, (x, y) in enumerate(test_data):
        out = x
        for intermediate in self.intermediates:
          out = intermediate(np.asarray([out]))
          out = np.squeeze(out, axis=0)
          out = np.reshape(out, newshape=(int(math.sqrt(out.shape[0])), int(math.sqrt(out.shape[0]))))
        out_flattened = np.reshape(out, newshape=(out.shape[0]*out.shape[0],))
        # plt.matshow(out)
        # plt.show()
        #distances = list(map(lambda avg_vec_index, avg_vec: mean_squared_error(avg_vec, out_flattened), enumerate(self.averages)))
        distances = [0.0 for _ in range(self.num_classes)]
        for index, avg in enumerate(self.averages):
          dist = distance.euclidean(avg, out_flattened)
          z_value = abs(dist / self.std_devs[index])
          distances[index] = z_value
        distances = softmax(distances)
        predicted_digit = np.argmin(distances)
        if predicted_digit == y:
          score += 1
        # else:
        #   print("predicted: {}, actual: {}".format(predicted_digit, y))
        #   print(distances)
        results[y].append(out_flattened)
        sums[y] = sums[y] + out_flattened
        averages[y] = sums[y] / len(results[y])
        

      self.results = results      
      print('Accuracy is {}'.format((score / len(test_data))))

  def train_conv_model(self, train_data):
    conv_encoder = self.conv_encoder()
    #conv_encoder.summary()
    x_train, y_train = zip(*train_data)
    conv_encoder.fit(np.expand_dims(np.asarray(x_train), axis=-1), np.expand_dims(np.asarray(x_train), axis=-1), batch_size=1, epochs=1)
    conv_intermediate = tf.keras.Model(inputs=conv_encoder.input, outputs=conv_encoder.layers[7].output)
    return conv_intermediate
  
  def transform_train_and_test_data(self, conv_intermediate, train_data, test_data):
    def transform_data_point(row):
      x = conv_intermediate(np.asarray([row[0]]))
      x = np.squeeze(x, axis=0)
      x = np.reshape(x, newshape=(int(math.sqrt(x.shape[0])), int(math.sqrt(x.shape[0])), 1))
      new_row = (x, row[1])
      return new_row

    transformed_train_data = list(map(transform_data_point, train_data))
    transformed_test_data = list(map(transform_data_point, test_data))
    
    return transformed_train_data, transformed_test_data
    

  def plot_pca_reduced_points(self):
    pca = PCA(n_components=2)
    pca_input = []
    for result in self.results:
      for entry in result:
        pca_input.append(entry)
    pca_output = pca.fit(np.array(pca_input))
    for result in self.results:
      output = pca.transform(result)
      x, y = zip(*output)
      plt.scatter(x, y)
    plt.show()
  
  def start(self):

    picked_training_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_train_images_per_digit, train=True)
    binned_picked_training_data = self.put_picked_data_in_bins(picked_data=picked_training_data, train=True)
    

    picked_test_data = self.pick_n_images_per_digit(num_images_per_digit=self.num_test_images_per_digit, train=False)
    binned_picked_test_data = self.put_picked_data_in_bins(picked_data=picked_test_data, train=False)

    binned_picked_training_data = binned_picked_training_data[:self.num_classes]
    binned_picked_test_data = binned_picked_test_data[:self.num_classes]

    train_data = self.flatten_data(binned_picked_training_data, randomize=True)
    test_data = self.flatten_data(binned_picked_test_data, randomize=True)
    
    ### Using A Pre-Conv Layer - Turn Off If Not Desired
    # conv_intermediate = self.train_conv_model(train_data)
    # train_data, test_data = self.transform_train_and_test_data(conv_intermediate, train_data, test_data)
    ###
    self.initialize_autoencoders()
    self.train_autoencoders(train_data)
    self.evaluate_on_test_data(test_data)


**Calls To Start The Experiment**

Options for network_type: 'conv' or 'ff'


In [None]:
# Parameters of the experiment
num_epochs = 1
ipd = 20
ipd_test = 100
learning_rate_sp = 0.000001
learning_rate_preconv = 0.0005
learning_rate_trad = 0.001
num_classes = 10
encoder_sizes = [784, 196, 64]

# Instantiate the Experiment Class
experiment = Experiment(
                  learning_rate=learning_rate_sp,
                  learning_rate_trad=learning_rate_trad,
                  learning_rate_preconv=learning_rate_preconv,
                  num_classes=num_classes,
                  num_train_images_per_digit=ipd,
                  num_test_images_per_digit=ipd_test,
                  num_epochs=num_epochs,
                  encoder_sizes=encoder_sizes)
#Start the experiment which trains the specialized networks and tests on them
experiment.start()
#experiment.plot_pca_reduced_points()
#im = plt.matshow(experiment.autoencoders[0].layers[1].weights[0])
#Test on a traditional conv net or ff net.
#experiment.test_with_one_network(learning_rate_trad, num_epochs, network_type='ff')

Accuracy is 0.565
