# Transformer-style Encoder + FFN ONLY

This set up only uses the encoder piece of the Transformer and directly attatches it to a feed-forward network to go straight from light curves to classification.

Not intended to be the primary algorithm. Similar to what is done in Allam et al (2021).

Author: Kara Ponder (SLAC)

In [None]:
import tensorflow as tf
import numpy as np

from transformer import Encoder

In [None]:
# Set parameters
d_model = 128  # input vector must have length d_model
target_vocab_size = 6  # possible results to choose from

lc_length = 100 +1 # light curve length
input_vocab_size = lc_length

## hyperparameters:
num_layers = 8 
dropout_rate = 0.0
dff = 64 # hidden layer size of the feed forward network, needs to be larger than 24
num_heads = 8  # d_model % num_heads == 0

# LC stuff
N = 10000 # number of objects
N_days = 100 + 1
Nf = 6 # number of filters
num_classes = 4
num_class = 4


batch_size = 64
EPOCHS = 5

Define FFN for classification.

In [None]:
def classify_ffn(nclass, dff, rate=0.0):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Dense(dff, activation='relu')) 
    model.add(tf.keras.layers.Dropout(rate))
    model.add(tf.keras.layers.Dense(dff, activation='relu'))
    model.add(tf.keras.layers.GlobalMaxPool1D())
    model.add(tf.keras.layers.Dense(nclass, activation='softmax'))
    return model

Define the loss fucntion.

In [None]:
def loss_kld(layer1, alpha=0.3):
    alpha = tf.constant(alpha, dtype=tf.float32)
    layer1 = layer1[0]
    layer1 = tf.math.abs(layer1)

    ones = tf.ones(layer1.shape, dtype=tf.float32)
    rhoc = 0.00001
    rho = rhoc*ones

    def kld(layer):
        kld_1 = tf.math.multiply(rhoc, tf.math.log(tf.math.divide_no_nan(rho, layer)))
        kld_2 = tf.math.multiply((1.0 - rhoc), tf.math.divide_no_nan(tf.math.log(ones-rho), tf.math.log(ones-layer)))
        return tf.reduce_sum(kld_1 + kld_2)

    return tf.multiply(alpha, kld(layer1))


Define Encoder+FFN model

In [None]:
encoder = Encoder(num_layers, d_model, num_heads, dff,
                       lc_length, dropout_rate, embed=True)

class_ffn = classify_ffn(num_classes, dff, rate=dropout_rate)

inp = tf.keras.layers.Input(shape=(None, Nf), dtype=tf.float32)
extras = tf.keras.layers.Input(shape=(None, 2), dtype=tf.float32) # min/max normalization constants

x = encoder(inp)
x = tf.keras.layers.Concatenate(axis=-1)([x, extras])
x = class_ffn(x)

model = tf.keras.models.Model(inputs=[inp, extras], outputs=x)
model.add_loss(lambda x=model.get_layer(name='encoder').get_weights(): loss_kld(x))

model.summary()

Compile model

In [None]:
optimizer = tf.keras.optimizers.Adam(0.00001)

loss_object = tf.keras.losses.CategoricalCrossentropy()

train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.CategoricalAccuracy(name='train_accuracy')

model.compile(loss=loss_object,
                optimizer=optimizer, 
                metrics=['accuracy'])

Read in test data

In [None]:
label = np.load('label.npy')
lc_data = np.load('lc_data.npy')
norm_data = np.loadtxt('min_max.txt')


labels = tf.keras.utils.to_categorical(label, num_classes=num_class, dtype="float64")

dataset = tf.data.Dataset.from_tensor_slices((lc_data, labels, norm_data))
batch_ds = dataset.batch(batch_size)

In [None]:
num_batches = 0
for (batch, (_,_)) in enumerate(batch_ds):
    num_batches = batch

Define the data generator

In [None]:
def generator(data_set):
    while True:
        for in_batch, tar_batch, norm_batch in data_set:
            yield ( [in_batch, norm_batch] , tar_batch)

Fit the model

In [None]:
history = model.fit(x = generator(batch_ds),
                    #validation_data = generator(batch_ds_VALID),
                    epochs=15,
                    steps_per_epoch = num_batches,
                    #validation_steps = num_batches_VALID,
                    )

In [None]:
#model.save_weights('save_encoderffn_weights.h5')