# Stock Forecasting using Transformers

In this notebook we implement a Transformer model to forecast stock data.

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # https://stackoverflow.com/a/64438413

In [2]:
import seaborn as sns
import tensorflow as tf
import tensorflow.keras as keras

## Time2Vec Embedding

https://arxiv.org/abs/1907.05321

In [3]:
class Time2Vec(keras.layers.Layer):
    def __init__(self, embed_dim: int, activation: str = 'sin', **kwargs):
        """Vector embedding representation of time.

        Based on the original concept proposed by Kazemi et al., 2019 (https://arxiv.org/abs/1907.05321).

        Args:
            embed_dim (int): Length of the time embedding vector.
            activation (str, optional): Periodic activation function. Possible values are ['sin', 'cos']. Defaults to 'sin'.
        """
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.activation = activation.lower() # Convert to lower-case.

        # Set periodic activation function.
        if self.activation.startswith('sin'):
            self.activation_func = tf.sin
        elif self.activation.startswith('cos'):
            self.activation_func = tf.cos
        else:
            raise ValueError(f'Unsupported periodic activation function "{activation}"')

    def build(self, input_shape: list[int]):

        # Weight and bias term for linear portion (i = 0)
        # of embedding.
        self.w_linear = self.add_weight(
            name='w_linear',
            shape=(input_shape[1], 1,),
            initializer='uniform',
            trainable=True,
        )
        self.b_linear = self.add_weight(
            name='b_linear',
            shape=(input_shape[1], 1,),
            initializer='uniform',
            trainable=True,
        )

        # Weight and bias terms for the periodic
        # portion (1 <= i <= k) of embedding.
        self.w_periodic = self.add_weight(
            name='w_periodic',
            shape=(input_shape[-1], self.embed_dim,),
            initializer='uniform',
            trainable=True,
        )
        self.b_periodic = self.add_weight(
            name='b_periodic',
            shape=(input_shape[1], self.embed_dim,),
            initializer='uniform',
            trainable=True,
        )

    def call(self, x: tf.Tensor) -> tf.Tensor:
        """Embed input into linear and periodic feature components.

        Args:
            x (tf.Tensor): Input tensor with shape (batch_size, sequence_length, feature_size)

        Returns:
            tf.Tensor: Output tensor with shape (batch_size, sequence_length, embed_dim + 1)
        """

        # Linear term (i = 0).
        embed_linear = tf.tensordot(x, self.w_linear, axes=1) + self.b_linear

        # Periodic terms (1 <= i <= k).
        inner = tf.tensordot(x, self.w_periodic, axes=1) + self.b_periodic
        embed_periodic = self.activation_func(inner)

        # Return concatenated linear and periodic features.
        return tf.concat([embed_linear, embed_periodic], axis=-1)

    def get_config(self) -> dict:
        """Retreive custom layer configuration for future loading.

        Returns:
            dict: Configuration dictionary.
        """
        config = super().get_config().copy()
        config.update({
            'embed_dim': self.embed_dim,
            'activation': self.activation,
        })
        return config

stock_feat = 5
seq_len = 128
embed_dim = 32
inp = keras.Input(shape=(seq_len, stock_feat))
print(f"{inp.shape=}")
x = Time2Vec(embed_dim)(inp)
print(f"{x.shape=}")
x = keras.layers.Concatenate(axis=-1)([inp, x])
print(f"{x.shape=}")

inp.shape=TensorShape([None, 128, 5])
x.shape=TensorShape([None, 128, 33])
x.shape=TensorShape([None, 128, 38])


## Transformer Architecture

https://arxiv.org/abs/1706.03762

### Attention Layers

Currently uses attention layers provided by TensorFlow. See https://www.tensorflow.org/api_docs/python/tf/keras/layers/MultiHeadAttention.

In [4]:
# class MultiHeadAttention(keras.layers.Layer):
#     def __init__(self, d_k: int, d_v: int, n_heads: int):
#         """Single-head attention layer.

#         Based on the original concept proposed by Vaswani et al., 2017 (https://arxiv.org/abs/1706.03762).

#         Args:
#             d_k (int): Key dimension (also used for Query dimension).
#             d_v (int): Value dimension.
#             n_heads (int): Number of attention heads.
#         """
#         self.d_k = d_k # Query and Key have same dimension.
#         self.d_v = d_v
#         self.n_heads = n_heads # Number of attention heads.
#         self.heads = [] # List of attention layers as heads.

#     def build(self, input_shape: list[int]):

#         # Build attention heads.
#         self.heads = [
#             keras.layers.Attention()
#             for i in range(self.n_heads)
#         ]

#         # Build linear relationship between 

In [5]:
# class Attention(keras.layers.Layer):
#     def __init__(self, d_k: int, d_v: int):
#         """Single-head attention layer.

#         Based on the original concept proposed by Vaswani et al., 2017 (https://arxiv.org/abs/1706.03762).

#         Args:
#             d_k (int): Key dimension (also used for Query dimension).
#             d_v (int): Value dimension.
#         """
#         self.d_k = d_k # Query and Key have same dimension.
#         self.d_v = d_v

#     def build(self, input_shape: list[int]):
#         self.query = keras.layers.Dense(
#             units=self.d_k,
#             input_shape=input_shape,
#             kernel_initializer='glorot_uniform', 
#             bias_initializer='glorot_uniform',
#         )
#         self.key = keras.layers.Dense(
#             units=self.d_k,
#             input_shape=input_shape,
#             kernel_initializer='glorot_uniform', 
#             bias_initializer='glorot_uniform',
#         )
#         self.value = keras.layers.Dense(
#             units=self.d_v,
#             input_shape=input_shape,
#             kernel_initializer='glorot_uniform', 
#             bias_initializer='glorot_uniform',
#         )

#     def call(self, x: tf.Tensor) -> tf.Tensor:
        


#     def get_config(self) -> dict:
#         """Retreive custom layer configuration for future loading.

#         Returns:
#             dict: Configuration dictionary.
#         """
#         config = super().get_config().copy()
#         config.update({
#             'd_k': self.d_k,
#             'd_v': self.d_v,
#         })
#         return config

### Transformer Encoder Layer

In [7]:
class TransformerEncoder(keras.layers.Layer):
    def __init__(self,
        d_k: int,
        d_v: int,
        n_heads: int,
        d_model: int,
        dropout: float = 0.0,
        **kwargs,
        ):
        """Transformer encoder layer.

        Based on the original concept proposed by Vaswani et al., 2017 (https://arxiv.org/abs/1706.03762).

        Args:
            d_k (int): Key dimension (also used for Query dimension).
            d_v (int): Value dimension.
            n_heads (int): Number of attention heads.
            d_model (int): Dimension of the feed forward sublayer.
            dropout (float, optional): Dropout rate. Defaults to 0.0.
        """
        super().__init__(**kwargs)
        self.d_k = d_k # Query and Key have same dimension.
        self.d_v = d_v
        self.n_heads = n_heads # Number of attention heads.
        self.d_model = d_model
        self.dropout = dropout

    def build(self, input_shape: tuple[tf.TensorShape,tf.TensorShape,tf.TensorShape]):

        # First sublayer.
        # Multi-head attention with add and norm.
        self.attn_multi = keras.layers.MultiHeadAttention(
            num_heads=self.n_heads,
            key_dim=self.d_k,
            value_dim=self.d_v,
        )
        self.attn_multi._build_from_signature(*input_shape)
        self.attn_dropout = keras.layers.Dropout(rate=self.dropout)
        self.attn_add = keras.layers.Add()
        self.attn_norm = keras.layers.LayerNormalization(epsilon=1e-6)

        # Second sublayer.
        # Feed forward with add and norm.
        d_query_feat = input_shape[0][-1] # Query feature size.
        self.ff_dense_1 = keras.layers.Dense(units=self.d_model, activation='relu')
        self.ff_dense_2 = keras.layers.Dense(units=d_query_feat)
        self.ff_dropout = keras.layers.Dropout(rate=self.dropout)
        self.ff_add = keras.layers.Add()
        self.ff_norm = keras.layers.LayerNormalization(epsilon=1e-6)

    def call(self, x: tuple[tf.Tensor,tf.Tensor,tf.Tensor]) -> tf.Tensor:
        """Encode input using multi-head self-attention mechanisms.

        Args:
            x (tf.Tensor): Tuple of Query, Value, and Key tensors. Note that the Key tensor is optional, if omitted the Value tensor will be used for both Key and Value.

        Returns:
            tf.Tensor: Output tensor with shape (batch_size, sequence_length, embed_dim + 1)
        """
        # x = (query, value, key)
        # note that "key" is optional.

        # First, do the attention sublayer.
        x_attn = self.attn_multi(*x) # Unpack input as Query, Value, and optional Key.
        x_attn = self.attn_dropout(x_attn)
        x_attn = self.attn_add([x[0], x_attn]) # (residual) Add Query matrix with result of attention layer.
        x_attn = self.attn_norm(x_attn) # Normalize the residual.

        # Second, do the feed forward sublayer.
        x_ff = self.ff_dense_1(x_attn)
        x_ff = self.ff_dense_2(x_ff)
        x_ff = self.ff_dropout(x_ff)
        x_ff = self.ff_add([x_attn, x_ff])
        x_ff = self.ff_norm(x_ff)

        # Return output of feed forward sublayer.
        return x_ff

    def get_config(self) -> dict:
        """Retreive custom layer configuration for future loading.

        Returns:
            dict: Configuration dictionary.
        """
        config = super().get_config().copy()
        config.update({
            'n_heads': self.n_heads,
            'd_k': self.d_k,
            'd_v': self.d_v,
            'd_model': self.d_model,
            'dropout': self.dropout,
        })
        return config


stock_feat = 5
seq_len = 128
embed_dim = 32
d_k = 512
d_v = 256
n_heads = 8
d_model = 512
inp = keras.Input(shape=(seq_len, stock_feat))
print(f"{inp.shape=}")
x = Time2Vec(embed_dim)(inp)
print(f"Time2Vec {x.shape=}")
x = keras.layers.Concatenate(axis=-1)([inp, x])
print(f"Concatenate {x.shape=}")
x = TransformerEncoder(d_k, d_v, n_heads, d_model)([x, x, x])
print(f"TransformerEncoder {x.shape=}")

inp.shape=TensorShape([None, 128, 5])
Time2Vec x.shape=TensorShape([None, 128, 33])
Concatenate x.shape=TensorShape([None, 128, 38])
TransformerEncoder x.shape=TensorShape([None, 128, 38])
