# MNIST model testing
In this notebook we will be working with the MNIST dataset. We will use it to create strips of images with a maximum length of 10 images.
The purpose of this notebook is to train a model that determines whether the last image (target iamge) in the strip contains a class that is already represented by the previous images (context images) in the strip.

We will test several operations for joining the embedding obtained from the context images into one context embedding:
- Maximum
- Average
- Addition
- Dense

# Make the necessary imports

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.datasets import mnist
from tensorflow.keras import layers
from tensorflow.keras.models import *

from matplotlib.pyplot import imshow
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import roc_curve, auc

import pandas as pd
import numpy as np
import random
import math
import os

# Import and preprocess the MNIST dataset

In [None]:
(Xtrain_orig, ytrain_orig), (Xtest_orig, ytest_orig) = mnist.load_data()

Xtrain_norm = tf.expand_dims(Xtrain_orig.astype('float32') / 255., axis=-1)
Xtest_norm = tf.expand_dims(Xtest_orig.astype('float32') / 255., axis=-1)

ytrain_norm = ytrain_orig
ytest_norm = ytest_orig

print ("Xtrain shape: " + str(Xtrain_norm.shape))
print ("ytrain shape: " + str(ytrain_norm.shape))

print ("Xtest shape: " + str(Xtest_norm.shape))
print ("ytest shape: " + str(ytest_norm.shape))

# Support functions

In [None]:
#Allow for repetitions in the context images
def create_set(X, y, strip_size=4, set_size=60000):
    
    #Create a list of lists where every sublist contains the indexes of the images belonging to a class
    list_indices_by_number = [np.where(y == i)[0] for i in range(10)]
    
    #Create the strips of images
    X_groups = []
    number_groups = []
    y_label = []
    
    for i in range(set_size): #Create as many images as strip_size
        group_i = []
        numbers_i = []
        while len(group_i) < strip_size: #While the strip is shorter that the size wanted
            #Choose a random index
            image_idx = random.randint(0, len(X)-1)
            numbers_i.append(y[image_idx])
            group_i.append(image_idx)
        #When the strip is full, add the target image. Use random to obtain a balanced set.
        repeated = np.random.choice([0, 1], p=[0.50, 0.50])
        if repeated:
            #Look for a number whose class is already contained in the strip.
            random_idx = random.randint(0, len(numbers_i)-1)
            number = numbers_i[random_idx]
            numbers_i.append(number)
            #Choose a random image representing the chosen class
            image_idx = random.randint(0, len(list_indices_by_number[number])-1)
            group_i.append(list_indices_by_number[number][image_idx])
            y_label.append(1)
        else:
            #Add a number that is not aready in the strip
            possible_numbers = [x for x in range(10) if x not in numbers_i]
            random_number = random.choice(possible_numbers)
            numbers_i.append(random_number)
            #Choose a random image representing the chosen class
            image_idx = random.randint(0, len(list_indices_by_number[random_number])-1)
            group_i.append(list_indices_by_number[random_number][image_idx])
            y_label.append(0)
        X_groups.append(group_i)
        number_groups.append(numbers_i)
    
    #We now want our examples to have the following shape: (N, X_train[1], X_train[2], (strip_size+1)*3 where
    #And create the expected labels
    N = len(X_groups)
    img_size1 = X.shape[1]
    img_size2 = X.shape[2]
    X_processed= np.zeros([N, strip_size+1, img_size1, img_size2, 1])
    y_processed = np.zeros([N])
    for i in range(N):
        numbers_i = list(dict.fromkeys(number_groups[i]))
        #Creamos los canales para cada imagen
        for j in range(strip_size):
            X_processed[i, j:j+1, :, :, :] = X[X_groups[i][j]]
        #Añadimos la última imagen al último canal
        X_processed[i, strip_size, :, :, :] = X[X_groups[i][strip_size]]
        #Creamos el expected output para cada tira
        y_processed[i] = y_label[i]
        
    return X_processed, y_processed

In [None]:
def create_data_sets(X, y, Xt, yt, strip_size, training_size=50000, test_size=10000):
    Xtrain, ytrain = create_set(X, y, strip_size, set_size=training_size)
    Xtrain, Xval, ytrain, yval = train_test_split(Xtrain, ytrain, test_size=0.2)
    Xtest, ytest = create_set(Xt, yt, strip_size, set_size=test_size)
    
    """print ("Training examples classified as 0: " + str(len(np.where(ytrain==0)[0])))
    print ("Training examples classified as 1: " + str(len(np.where(ytrain==1)[0])))
    print ("Validation examples classified as 0: " + str(len(np.where(yval==0)[0])))
    print ("Validation examples classified as 1: " + str(len(np.where(yval==1)[0])))
    print ("Test examples classified as 0: " + str(len(np.where(ytest==0)[0])))
    print ("Test examples classified as 1: " + str(len(np.where(ytest==1)[0])))"""
    
    return Xtrain, Xval, Xtest, ytrain, yval, ytest

In [None]:
def train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="best_model"):
    
    #Define callbacks
    #Save the best model
    dirname = os.getcwd()
    filepath = os.path.join(dirname, model_save_name)
    filepath = os.path.join(filepath, 'model')
    model_checkpoint_cb = tf.keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss',
        mode='min', verbose = 0, save_best_only=True, save_weights_only=True)
    #Add early stopping
    early_stopping_cb = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=7, verbose = 0)
    #Reduce learning rate on plateau
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=0.0001)
    callbacks = [model_checkpoint_cb, early_stopping_cb, reduce_lr]
    
    #Compile and fit the model
    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(Xtrain, ytrain,
                        batch_size=batch_size,
                        epochs=60,
                        validation_data=(Xval, yval),
                        callbacks=callbacks,
                        verbose=1)
    
    """plt.figure(figsize=(12,6))
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('Model loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(['Train', 'Val'], loc='upper right')


    plt.subplot(1, 2, 2)
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('Model accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend(['Train', 'Val'], loc='upper right')"""
    
    model.load_weights(filepath)
    ypredict = model.predict(Xtest)
    #ypredict = tf.squeeze(ypredict).numpy()
    #print(ypredict)
    #ypredict_round = [round(x) for x in ypredict]
    score = model.evaluate(Xtest, ytest, verbose=0)
    print("Test loss:", score[0])
    print("Test accuracy:", score[1])
    
    #cm = confusion_matrix(ytest, ypredict)
    #disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    #disp.plot()
    #plt.show()
    
    return score[1]

In [None]:
def save_to_db(db_name, column_name, list_to_save):
    df = pd.read_csv(db_name + '.csv')
    df[column_name] = list_to_save
    df.to_csv(db_name + '.csv', index=False)

def create_db(db_name):
    df = pd.DataFrame()
    df.to_csv(db_name + '.csv', index=True)

In [None]:
def plot_db_columns(db_name, title, xlabel, ylabel, save_name):
    df = pd.read_csv(db_name + '.csv')
    # plot lines
    x = [i for i in range(1, 10)]
    plt.figure(figsize=(9,7))
    
    for column in df:
        if (column != 'Unnamed: 0'):
            plt.plot(x, df[column], label = column)

    plt.title(title, fontsize=20)
    plt.xlabel(xlabel, fontsize=16)
    plt.ylabel(ylabel, fontsize=16)
    plt.ylim(0, 1)
    plt.legend()
    plt.grid(axis = 'y', color = 'gray', linestyle = '--', linewidth = 0.5)
    plt.tick_params(labelsize=14)
    plt.savefig(save_name + '.png')
    plt.show()

In [None]:
create_db('join_operations')
create_db('max_operations')

# First models to test the combining operations

## Maximum operation

In [None]:
class Max(keras.Model):
    def __init__(self, latent_dim, channels):
        super(Max, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
        ])
        
        self.classifier = tf.keras.Sequential([
            layers.Flatten(),
            #layers.Dense(512, activation='relu', input_shape=[self.latent_dim, 2]),
            layers.Dense(128, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(1, activation='sigmoid')
        ])

    def call(self, x):

        encoded_images = layers.TimeDistributed(self.encoder)(x)
        if self.channels > 2:
            max_image = layers.Maximum()([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)])
        else:
            max_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)

        target_image = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        stacked_image = tf.stack([max_image, target_image], axis=-1)
        y_predict = self.classifier(stacked_image)
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_max= []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Max(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="max_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_max.append(np.mean(iteration_scores))
save_to_db('join_operations', 'max_32', accuracy_per_strip_size_max)
save_to_db('max_operations', 'max_32', accuracy_per_strip_size_max)

In [None]:
latent_dim = 10
accuracy_per_strip_size_max= []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Max(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="max_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_max.append(np.mean(iteration_scores))
save_to_db('join_operations', 'max_10', accuracy_per_strip_size_max)
save_to_db('max_operations', 'max_10', accuracy_per_strip_size_max)

## Average operation

In [None]:
class Avg(keras.Model):
    def __init__(self, latent_dim, channels):
        super(Avg, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
            
        ])
        
        self.classifier = tf.keras.Sequential([
            layers.Flatten(),
            #layers.Dense(512, activation='relu', input_shape=[self.latent_dim]),
            layers.Dense(128, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(1, activation='sigmoid')
        ])

    def call(self, x):

        encoded_images = layers.TimeDistributed(self.encoder)(x)
        if self.channels > 2:
            avg_image = layers.Average()([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)])
        else:
            avg_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)

        target_image = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        stacked_image = tf.stack([avg_image, target_image], axis=-1)
        y_predict = self.classifier(stacked_image)
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_avg = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Avg(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="avg_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_avg.append(np.mean(iteration_scores))
save_to_db('join_operations', 'avg_32', accuracy_per_strip_size_avg)

In [None]:
latent_dim = 10
accuracy_per_strip_size_avg = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Avg(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="avg_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_avg.append(np.mean(iteration_scores))
save_to_db('join_operations', 'avg_10', accuracy_per_strip_size_avg)

## Addition operation

In [None]:
class Sum(keras.Model):
    def __init__(self, latent_dim, channels):
        super(Sum, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
            
        ])
        
        self.classifier = tf.keras.Sequential([
            layers.Flatten(),
            #layers.Dense(512, activation='relu', input_shape=[self.latent_dim]),
            layers.Dense(128, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(1, activation='sigmoid')
        ])

    def call(self, x):

        encoded_images = layers.TimeDistributed(self.encoder)(x)
        if self.channels > 2:
            add_image = layers.Add()([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)])
        else:
            add_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)

        target_image = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        stacked_image = tf.stack([add_image, target_image], axis=-1)
        y_predict = self.classifier(stacked_image)
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_sum = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Sum(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="sum_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_sum.append(np.mean(iteration_scores))
save_to_db('join_operations', 'sum_32', accuracy_per_strip_size_sum)

In [None]:
latent_dim = 10
accuracy_per_strip_size_sum = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Sum(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="sum_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_sum.append(np.mean(iteration_scores))
save_to_db('join_operations', 'sum_10', accuracy_per_strip_size_sum)

## Dense layer
The idea is for the neural network to learn the best operation to join the embeddings

In [None]:
class Dense(keras.Model):
    def __init__(self, latent_dim, channels):
        super(Dense, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
            
        ])
        
        self.connection = tf.keras.Sequential([
            layers.Flatten(input_shape=[self.latent_dim, self.channels-1]),
            layers.Dense(units=self.latent_dim)
        ])
        
        self.classifier = tf.keras.Sequential([
            layers.Flatten(),
            #layers.Dense(512, activation='relu', input_shape=[self.latent_dim]),
            layers.Dense(128, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(1, activation='sigmoid')
        ])

    def call(self, x):
        
        encoded_images = layers.TimeDistributed(self.encoder)(x)
        if self.channels > 2:
            dense_image = self.connection(tf.stack([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)], axis=-1))
        else:
            dense_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)

        target_image = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        stacked_image = tf.stack([dense_image, target_image], axis=-1)
        y_predict = self.classifier(stacked_image)
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_dense = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Dense(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dense_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_dense.append(np.mean(iteration_scores))
save_to_db('join_operations', 'dense_32', accuracy_per_strip_size_dense)

In [None]:
latent_dim = 10
accuracy_per_strip_size_dense = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Dense(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dense_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_dense.append(np.mean(iteration_scores))
save_to_db('join_operations', 'dense_10', accuracy_per_strip_size_dense)

## Plot the results

In [None]:
plot_db_columns('join_operations', 'Model accuracy per strip size', 'Strip size', 'Accuracy', 'join_operations')

# Models for testing the classifier block

## Dot product

In [None]:
class Dot(keras.Model):
    def __init__(self, latent_dim, channels):
        super(Dot, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
            
        ])

    def call(self, x):
        encoded_images = layers.TimeDistributed(self.encoder)(x)
        
        if(self.channels > 2):
            max_image = layers.Maximum()([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)])
        else:
            max_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)
            
        last_embedding = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        y_predict = layers.Dot(axes=1, normalize=True)([max_image, last_embedding])
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_dot_32 = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Dot(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dot_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_dot_32.append(np.mean(iteration_scores))
save_to_db('max_operations', 'dot_32', accuracy_per_strip_size_dot_32)

In [None]:
latent_dim = 10
accuracy_per_strip_size_dot_10 = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = Dot(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dot_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_dot_10.append(np.mean(iteration_scores))
save_to_db('max_operations', 'dot_10', accuracy_per_strip_size_dot_10)

## Dor product + dense layer

In [None]:
class DotDense(keras.Model):
    def __init__(self, latent_dim, channels):
        super(DotDense, self).__init__()
        self.latent_dim = latent_dim
        self.channels = channels
        
        self.encoder = tf.keras.Sequential([
            layers.Conv2D(filters=1, kernel_size=(3, 3), padding="same", strides=1, input_shape=[28, 28, 1]),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=32, kernel_size=(3, 3), padding="same", strides=2),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Conv2D(filters=64, kernel_size=(3, 3), padding="same", strides=1),
            layers.BatchNormalization(),
            layers.ReLU(),
            
            layers.Flatten(),
            
            layers.Dense(units=latent_dim)
            
        ])
        
        self.classifier = tf.keras.Sequential([
            layers.Dense(1, input_shape=[1])
        ])

    def call(self, x):
        
        encoded_images = layers.TimeDistributed(self.encoder)(x)
        
        if(self.channels > 2):
            max_image = layers.Maximum()([layers.Lambda(lambda x : x[:,i,:])(encoded_images) for i in range(self.channels-1)])
        else:
            max_image = layers.Lambda(lambda x : x[:,0,:])(encoded_images)
            
        last_embedding = layers.Lambda(lambda x : x[:,-1,:])(encoded_images)
        dot = layers.Dot(axes=1, normalize=True)([max_image, last_embedding])
        y_predict = self.classifier(dot)
        
        return y_predict

In [None]:
latent_dim = 32
accuracy_per_strip_size_dot_dense_32 = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = DotDense(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dot_dense_model_32")
        iteration_scores.append(score)
    accuracy_per_strip_size_dot_dense_32.append(np.mean(iteration_scores))
save_to_db('max_operations', 'dot_dense_32', accuracy_per_strip_size_dot_dense_32)

In [None]:
latent_dim = 10
accuracy_per_strip_size_dot_dense_10 = []
for strip_size in range(1, 10):
    print('-------------------' + str(strip_size) + '-------------------')
    iteration_scores = []
    for i in range(1):
        print('------------- Iteration ' + str(i+1) + ' -------------')
        channels = strip_size + 1
        Xtrain, Xval, Xtest, ytrain, yval, ytest = create_data_sets(Xtrain_norm, ytrain_norm, Xtest_norm, ytest_norm, strip_size)
        model = DotDense(latent_dim, channels)
        score = train_model(model, Xtrain, ytrain, Xval, yval, Xtest, ytest, batch_size=32, model_save_name="dot_dense_model_10")
        iteration_scores.append(score)
    accuracy_per_strip_size_dot_dense_10.append(np.mean(iteration_scores))
save_to_db('max_operations', 'dot_dense_10', accuracy_per_strip_size_dot_dense_10)

## Maximum and classifier
Already ran before

# Plot results

In [None]:
plot_db_columns('max_operations', 'Model accuracy per strip size', 'Strip size', 'Accuracy', 'max_operations')