# Time Series Transformers
### We aim to generate a transformer-based structure that generates an embedding from a 1024 time sequences obtained from our database generated with [this matlab code](https://github.com/soykilian/Signal-Generator.git)
### Also following [this tutorial](https://towardsdatascience.com/the-time-series-transformer-2a521a0efad3)

In [1]:
# import libraries
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow_addons.layers import MultiHeadAttention
import numpy as np
import tensorflow.keras as keras

2022-08-26 10:45:28.421324: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.1


In [2]:
class Time2Vec(layers.Layer):

    def __init__(self, kernel_size : int):
        super(Time2Vec, self).__init__(trainable=True,name='Time2VecLayer')
        self.k = kernel_size 
    def build(self, input_shape : int = 1024):
        self.wb = self.add_weight(name='wb', shape=(input_shape[1],), initializer='uniform', trainable=True)
        self.bb = self.add_weight(name='bb', shape=(input_shape[1],), initializer='uniform', trainable=True)
        self.wa = self.add_weight(name='wa',shape=(1, input_shape[1], self.k),initializer='uniform',trainable=True)
        self.ba = self.add_weight(name='ba',shape=(1, input_shape[1], self.k),initializer='uniform',trainable=True)
        super(Time2Vec, self).build(input_shape)
    def call(self, inputs, **kwargs):
        bias = self.wb * inputs + self.bb
        dp = np.dot(inputs, self.wa) + self.ba
        wgts =np.sin(dp)
        ret = np.concatenate([K.expand_dims(bias, -1), wgts], -1)
        ret = np.reshape(ret, (-1, inputs.shape[1]*(self.k+1)))
        return ret
    def compute_output_shape(self, input_shape : int):
        return (input_shape[0], input_shape[1]*(self.k + 1))

In [3]:
class AttentionBlock(keras.Model):
    def __init__(self, name : str = 'AttentionBlock',
                 num_heads : int =2,
                 head_size : int = 128,
                ff_dim : int = None,
                dropout : int = 0,
                **kwargs):
        super().__init__(name=name, **kwargs)
        if ff_dim is None:
            ff_dim = head_size
        self.attention = MultiHeadAttention(num_heads=num_heads, head_size=128, dropout=dropout)
        self.attention_dropout = keras.layers.Dropout(dropout)
        self.attention_norm = keras.layers.LayerNormalization(epsilon=1e-6)
        self.conv1 = keras.layers.Conv1D(filters=ff_dim, kernel_size=1, activation='relu')
        self.dropout = keras.layers.Dropout(dropout)
        self.norm = keras.layers.LayerNormalization(epsilon=1e-6)
    
    def build(self, input_shape : int):
        self.conv2 = keras.layers.Conv1D(filters=input_shape[-1], kernel_size=1)
    
    def call(self, inputs):
        x = self.attention([inputs])
        x = self.attention_dropout(x)
        x = self.attention_norm(inputs + x)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.dropout(x)
        x = self.norm(inputs + x)
        return x

In [4]:
class ModelTrunk(keras.Model):
    def __init__(self, name='ModelTrunk',
                 time2vec_dim=1,
                 num_heads=2,
                 head_size=128,
                 ff_dim=None,
                 num_layers=1,
                 dropout=0,
                **kwargs):
        super().__init__(name=name, **kwargs)
        self.time2vec = Time2Vec(kernel_size=time2vec_dim)
        if ff_dim is None:
            ff_dim = head_size
        self.dropout = dropout
        self.attention_layers = [AttentionBlock(num_heads=num_heads, head_size=head_size, ff_dim=ff_dim, dropout=dropout) for _ in range (num_layers)]
        
    
    def call(self, inputs):
        time_embeddings = keras.layers.TimeDistributed(self.time2vec)(inputs)
        x = np.concatenate([inputs,time_embeddings ])
        for layer in self.attention_layers :
            x = layer(x)
        return np.reshape(x, (-1, x.shape[1] * x.shape[2]))
            

In [None]:
def lr_scheduler(epoch, lr, warmup_epochs=15, decay_epochs=100, initial_lr=1e-6, base_lr=1e-3, min_lr=5e-5):
    if epoch <= warmup_epochs:
        pct = epoch / warmup_epochs
        return ((base_lr - initial_lr) * pct) + initial_lr

    if epoch > warmup_epochs and epoch < warmup_epochs+decay_epochs:
        pct = 1 - ((epoch - warmup_epochs) / decay_epochs)
        return ((base_lr - min_lr) * pct) + min_lr

    return min_lr

callbacks = [keras.callbacks.LearningRateScheduler(schedule=lr_scheduler, verbose=0)]

In [None]:
model = ModelTrunk()