In [1]:
from tensorflow import keras
import numpy as np

In [2]:
def construct_autoencoder(n, l, act_layer_config, lin_layer_config):
    # First build the encoder
    input_ = keras.layers.Input(shape=n)
    hidden1 = keras.layers.Dense(n, **act_layer_config)(input_)
    hidden2 = keras.layers.Dense(n, **act_layer_config)(hidden1)
    hidden3 = keras.layers.Dense(n, **lin_layer_config)(hidden2)
    added = keras.layers.Add()([input_, hidden3])
    latentspace = keras.layers.Dense(l, **lin_layer_config)(added)
    encoder = keras.Model(inputs=[input_], outputs=[latentspace])
    # Now the decoder
    latent_ = keras.layers.Input(shape=l)
    hidden4 = keras.layers.Dense(n, **lin_layer_config)(latent_)
    hidden5 = keras.layers.Dense(n, **act_layer_config)(hidden4)
    hidden6 = keras.layers.Dense(n, **act_layer_config)(hidden5)
    hidden7 = keras.layers.Dense(n, **act_layer_config)(hidden6)
    added_ = keras.layers.Add()([hidden4, hidden7])
    decoder = keras.Model(inputs=[latent_], outputs=[added_])
    # Tie them together
    autoencoder = keras.models.Sequential([encoder, decoder])
    # And return the autoencoder model
    return autoencoder

In [3]:
data_name = 'Duffing_Equation_expt4'  ## FILL IN HERE (from file name)

# data is num_steps x num_examples x n
data_train_u = np.load(('../data/%s_train1_u.npy' % (data_name)))
data_train_f = np.load(('../data/%s_train1_f.npy' % (data_name)))

# data is num_steps x num_examples x n
data_val_u = np.load(('../data/%s_val_u.npy' % (data_name)))
data_val_f = np.load(('../data/%s_val_f.npy' % (data_name)))

# data is num_steps x num_examples x n
data_test_u = np.load(('../data/%s_test2_u.npy' % (data_name)))
data_test_f = np.load(('../data/%s_test2_f.npy' % (data_name)))

In [4]:
_, n = data_train_u.shape
l = int(n/4)
l = n

print("Training contains:", data_train_u.shape[0], "samples.")
print("Validation contains:", data_val_u.shape[0], "samples.")
print("Input vector is", n, "neurons and latent space is", l, "neurons.")

Training contains: 4798 samples.
Validation contains: 1200 samples.
Input vector is 128 neurons and latent space is 128 neurons.


In [5]:
# Set the configuration to be used for layers with activation functions and linear, non-activated functions
act_layer = dict(activation="relu", kernel_initializer='he_normal')
act_layer = dict(activation="elu", kernel_initializer='he_normal')
lin_layer = dict(activation=None)

# Encoder for u and encoder for f
u_aec = construct_autoencoder(n, l, act_layer, lin_layer)
f_aec = construct_autoencoder(n, l, act_layer, lin_layer)

In [18]:
u_aec.inputs + f_aec.inputs

[<tf.Tensor 'model_input:0' shape=(None, 128) dtype=float32>,
 <tf.Tensor 'model_2_input:0' shape=(None, 128) dtype=float32>]

In [17]:
u_aec.outputs + f_aec.outputs

[<tf.Tensor 'model_1/Identity:0' shape=(None, 128) dtype=float32>,
 <tf.Tensor 'model_3/Identity:0' shape=(None, 128) dtype=float32>]

In [6]:
# Tie them together
dual_autoencoder = keras.models.Model(inputs = u_aec.inputs + f_aec.inputs, 
                                      outputs = u_aec.outputs + f_aec.outputs)

# Set the optimizer to be used
optimizer = keras.optimizers.SGD(lr=0.01)
optimizer = keras.optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, amsgrad=False)

# Compile the autoencoders
dual_autoencoder.compile(loss=["mse","mse"], optimizer=optimizer)

# Specify fit options
cbs = [keras.callbacks.ModelCheckpoint("dae.h5", save_best_only=True),
       tf.keras.callbacks.EarlyStopping()]
fit_options = dict(batch_size = 5, epochs = 500)

In [7]:
%%time
hist = dual_autoencoder.fit(x=[data_train_u, data_train_f], y=[data_train_u, data_train_f], 
                            validation_data=[(data_val_u, data_val_f), (data_val_u, data_val_f)],
                            callbacks=cbs,
                            **fit_options)

CPU times: user 1e+03 ns, sys: 1 µs, total: 2 µs
Wall time: 4.29 µs


In [8]:
# look at the dual encoder's layers
# see if you can break it down easily from top-down view to access encoder/decoder

# if so, build a lyaer between latent spaces

# if not, split autoencoder constructions into encoder constructor and decoder constructor
# that way, it is easier to put together functional API-based model for linking latent spaces


# Looks like modules can be built into layers fairly easily

# Modules as layers: https://www.tensorflow.org/api_docs/python/tf/Module
# LinearOperator is subclass of Module

# another thought -- consider linking weights of both autoencoders?

In [9]:
## also -- consider how to build in superposition 
#  https://en.wikipedia.org/wiki/Superposition_principle
# additivity and homogeneity are two different techniques...

In [10]:
# This is how to access the encoders:
u_encoder = dual_autoencoder.layers[2]
F_encoder = dual_autoencoder.layers[3]
# And get their outputs:
v_space = u_encoder.outputs[0]
f_space = F_encoder.outputs[0]

In [11]:
# the above will be used for computing Lv and then building loss function ||Lv-f||

In [20]:
for layer in dual_autoencoder.layers:
    print(layer.name)

model_input
model_2_input
model
model_2
model_1
model_3


In [12]:
v_space

<tf.Tensor 'dense_3/Identity:0' shape=(None, 128) dtype=float32>

In [13]:
import tensorflow as tf


class SelfAdjointOperator(tf.linalg.LinearOperator):
    def __init__(self, dtype=tf.float32, graph_parents=None):
        super(SelfAdjointOperator, self).__init__(dtype, graph_parents=graph_parents, is_self_adjoint=True)
    
    def _matmul(self, x):
        return self.matmul(x, adjoint=False)
        
    def __call__(self):
        y = self.matmul(x, adjoint=False)
        return y

In [14]:
L = SelfAdjointOperator(graph_parents=v_space)

TypeError: Can't instantiate abstract class SelfAdjointOperator with abstract methods _shape