In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime
from keras import layers, Model
from sklearn.metrics import mean_absolute_percentage_error, root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import MinMaxScaler
import time

In [35]:

class ETL:
    """
    ticker: str
    period: string
    test_size: float betwee 0 and 1
    n_input: int
    timestep: int
    Extracts data for stock with ticker `ticker` from yf api,
    splits the data into train and test sets by date,
    reshapes the data into np.array of shape [#weeks, 5, 1],
    converts our problem into supervised learning problem.
    """

    def __init__(self, datainput, test_size=0.2, n_input=5, timestep=5, normalize_bool=False, scaler=None) -> None:
        self.datainput = datainput
        self.test_size = test_size
        self.n_input = n_input
        self.timestep = timestep
        self.normalize_bool = normalize_bool
        self.scaler = scaler
        self.df = self.extract_historic_data()
        self.train, self.test = self.etl()
        self.X_train, self.y_train = self.to_supervised(self.train)
        self.X_test, self.y_test = self.to_supervised(self.test)

        if self.normalize_bool:
            print('normalized', normalize_bool)
            self.scaler = scaler
        else:
            print('not normalized', normalize_bool)
            self.scaler = None

    def extract_historic_data(self) -> pd.Series:
        """
        Extracts historic data and optionally normalizes it.
        """
        # data = self.datainput
        if self.normalize_bool:
            data = self.scaler.fit_transform(self.datainput.values)
            return pd.DataFrame(data, columns=self.datainput.columns)
        else:
            return self.datainput

    def split_data(self) -> tuple:
        """
        Splits our pd.Series into train and test series with
        test series representing test_size * 100 % of data.
        """
        # data = self.extract_historic_data()
        data = self.df
        print("data shape:", data.shape)
        if len(data) != 0:
            train_idx = round(len(data) * (1-self.test_size))
            train = data[:train_idx]
            test = data[train_idx:]
            # train = np.array(train)
            # test = np.array(test)
            # return train[:, np.newaxis], test[:, np.newaxis]
            return train.values, test.values
        else:
            raise Exception('Data set is empty, cannot split.')

    def window_and_reshape(self, data) -> np.array:
        """
        Reformats data into shape our model needs,
        namely, [# samples, timestep, # feautures]
        samples
        """
        # NUM_FEATURES = 1
        samples = int(data.shape[0] // self.timestep)
        # result = np.array(np.array_split(data, samples))
        # output = result.reshape((samples, self.timestep, NUM_FEATURES))
        trimmed_data = data[:samples * self.timestep]
        reshaped_data = trimmed_data.reshape(
            (samples, self.timestep, data.shape[1]))
        print(reshaped_data.shape)
        # print(output.shape)
        return reshaped_data

    def transform(self, train, test) -> np.array:
        train_remainder = train.shape[0] % self.timestep
        test_remainder = test.shape[0] % self.timestep
        # if train_remainder != 0:
        #     train = train[train_remainder:]
        # if test_remainder != 0:
        #     test = test[test_remainder:]
        if train_remainder != 0:
            train = train[:-train_remainder]
        if test_remainder != 0:
            test = test[:-test_remainder]
        print("Train shape:", train.shape)
        print("Test shape:", test.shape)
        # print("train:", train, "test:", test)
        return self.window_and_reshape(train), self.window_and_reshape(test)

    def etl(self) -> tuple[np.array, np.array]:
        """
        Runs complete ETL
        """
        train, test = self.split_data()
        print("train shape:", train.shape, "test shape:", test.shape)
        return self.transform(train, test)

    def to_supervised(self, data, n_out=5) -> tuple:
        """
        Converts our time series prediction problem to a
        supervised learning problem.
        """
        # flatted the data
        data = data.reshape((data.shape[0] * data.shape[1], data.shape[2]))
        X, y = [], []
        in_start = 0
        # step over the entire history one time step at a time
        for _ in range(len(data)):
            # define the end of the input sequence
            in_end = in_start + self.n_input
            out_end = in_end + n_out
            # ensure we have enough data for this instance
            if out_end <= len(data):
                x_input = data[in_start:in_end, 0]
                x_input = x_input.reshape((len(x_input), 1))
                X.append(x_input)
                y.append(data[in_end:out_end, 0])
                # move along one time step
                in_start += 1
        print(np.array(X), np.array(y))
        return np.array(X), np.array(y)

# *******************************
# Implementing a Transformer


def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.2, epsilon=1e-6, attention_axes=None, kernel_size=1):
    """
    Creates a single transformer block.
    """
    x = layers.LayerNormalization(epsilon=epsilon)(inputs)
    x = layers.MultiHeadAttention(
        key_dim=head_size, num_heads=num_heads, dropout=dropout,
        attention_axes=attention_axes
    )(x, x)
    x = layers.Dropout(dropout)(x)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=epsilon)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=kernel_size,
                      activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=kernel_size)(x)
    return x + res


def build_transfromer(head_size, num_heads, ff_dim, num_trans_blocks, mlp_units, dropout=0.2, mlp_dropout=0.2, attention_axes=None, epsilon=1e-6, kernel_size=1):
    """
    Creates final model by building many transformer blocks.
    """
    n_timesteps, n_features, n_outputs = 5, 3, 5
    inputs = tf.keras.Input(shape=(n_timesteps, n_features))
    x = inputs
    for _ in range(num_trans_blocks):
        x = transformer_encoder(x, head_size=head_size, num_heads=num_heads, ff_dim=ff_dim,
                                dropout=dropout, attention_axes=attention_axes, kernel_size=kernel_size, epsilon=epsilon)

    x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    for dim in mlp_units:
        x = layers.Dense(dim, activation="relu")(x)
        x = layers.Dropout(mlp_dropout)(x)

    outputs = layers.Dense(n_outputs)(x)
    return tf.keras.Model(inputs, outputs)


def fit_transformer(transformer: tf.keras.Model, x_train, y_train, X_val, y_val, display_loss=False):
    """
    Compiles and fits our transformer.
    """
    transformer.compile(
        loss="mse",
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        metrics=["mae", 'mape'])

    callbacks = [tf.keras.callbacks.EarlyStopping(
        monitor='loss', patience=10, restore_best_weights=True)]
    # hist = transformer.fit(data.X_train, data.y_train, batch_size=32, epochs=25, verbose=1, callbacks=callbacks)
    hist = transformer.fit(x_train, y_train, validation_data=(X_val, y_val),
                           batch_size=32, epochs=25, verbose=1, callbacks=callbacks)
    history_df = pd.DataFrame(hist.history)

    if display_loss:
        history_df['val_loss'].plot()
        plt.title('Validation Loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Validation'], loc='upper right')
        plt.show()
        history_df['loss'].plot()
        plt.title('Training Loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train'], loc='upper right')
        plt.show()

        # Display minimum validation loss
        min_val_loss = history_df['val_loss'].min()
        print("Minimum validation loss: {:.4f}".format(min_val_loss))
        # Plot mae
        history_df.loc[:, ['mae', 'val_mae']].plot()
        plt.title('Model MAE')
        plt.ylabel('MAE')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper right')
        plt.show()
        # Display minimum validation mae and mape
        min_val_mae = history_df['val_mae'].min()
        min_val_mape = history_df['val_mape'].min()
        print("Minimum validation MAE: {:.4f}".format(min_val_mae))
        print("Minimum validation MAPE: {:.4f}".format(min_val_mape))

    return hist


def recursive_predict(model, input_seq, n_steps, n_features):

    predictions = []
    current_seq = input_seq

    for _ in range(n_steps):
        # Make a prediction
        next_pred = model.predict(current_seq)

        # Append the prediction to the results
        predictions.append(next_pred[0, 0])  # Assuming single output

        # Update the current sequence
        next_seq = np.append(current_seq[:, 1:, :], [[next_pred]], axis=1)
        current_seq = next_seq

    return np.array(predictions)
# **********************


class PredictAndForecast:
    """
    model: tf.keras.Model
    train: np.array
    test: np.array
    Takes a trained model, train, and test datasets and returns predictions
    of len(test) with same shape.
    """

    def __init__(self, model, train, test, n_input=5, scaler=None, normalize_bool=False) -> None:
        self.model = model
        self.train = train
        self.test = test
        self.n_input = n_input
        self.scaler = scaler
        self.normalize_bool = normalize_bool
        self.predictions = self.get_predictions()

    def forecast(self, history) -> np.array:
        """
        Given last weeks actual data, forecasts next weeks prices.
        """
        # flatten data
        data = np.array(history)
        data = data.reshape((data.shape[0]*data.shape[1], data.shape[2]))
        # retrieve last observations for input data
        input_x = data[-self.n_input:, :]
        # reshape into [1, n_input, 1]
        input_x = input_x.reshape((1, len(input_x), input_x.shape[1]))
        # forecast the next week
        yhat = self.model.predict(input_x, verbose=0)
        # we only want the vector forecast
        yhat = yhat[0]
        return yhat

    def get_predictions(self) -> np.array:
        """
        compiles models predictions week by week over entire
        test set.
        """
        # history is a list of weekly data
        history = [x for x in self.train]
        # walk-forward validation over each week
        predictions = []
        for i in range(len(self.test)):
            yhat_sequence = self.forecast(history)
            # store the predictions
            predictions.append(yhat_sequence)
            # get real observation and add to history for predicting the next week
            history.append(self.test[i, :])
        predictions = np.array(predictions)
        # if self.normalize_bool:
        # Inverse transform to get back to original scale
        #    predictions = self.scaler.inverse_transform(
        #        predictions.reshape(-1, 1)).reshape(predictions.shape)
        return predictions


class Evaluate:

    def __init__(self, actual, predictions, normalize_bool, scaler) -> None:
        print(
            f"Initial lengths - actual: {len(actual)}, predictions: {len(predictions)}")
        if normalize_bool == True:
            actual = scaler.inverse_transform(
                actual.reshape(-1, 1)).reshape(actual.shape)
            predictions = scaler.inverse_transform(
                predictions.reshape(-1, 1)).reshape(predictions.shape)
        self.actual = actual
        self.predictions = predictions
        self.var_ratio = self.compare_var()
        self.mape = self.evaluate_model_with_mape()
        self.rmse = self.evaluate_model_with_rmse()
        self.mae = self.evaluate_model_with_mae()
        self.r2 = self.evaluate_model_with_r2()

    def compare_var(self):
        return abs(1 - (np.var(self.predictions) / np.var(self.actual)))

    def evaluate_model_with_mape(self):
        return mean_absolute_percentage_error(self.actual.flatten(), self.predictions.flatten())

    def evaluate_model_with_rmse(self):
        return root_mean_squared_error(self.actual.flatten(), self.predictions.flatten())

    def evaluate_model_with_mae(self):
        return mean_absolute_error(self.actual.flatten(), self.predictions.flatten())

    def evaluate_model_with_r2(self):
        return r2_score(self.actual.flatten(), self.predictions.flatten())


def plot_results(train, test, preds, df, normalize_bool, scaler, title_suffix=None, xlabel='AAPL stock Price'):
    """
    Plots training data in blue, actual values in red, and predictions in green,
    over time.
    """

    fig, ax = plt.subplots(figsize=(20, 6))
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)
    # x = df.Close[-498:].index
    plot_test = test[1:]
    plot_preds = preds[1:]
    if normalize_bool == True:
        # Inverse transform to get back to original scale
        plot_test = scaler.inverse_transform(
            plot_test.reshape(-1, 1)).reshape(plot_test.shape)
        train = scaler.inverse_transform(
            train.reshape(-1, 1)).reshape(train.shape)
        plot_preds = scaler.inverse_transform(
            plot_preds.reshape(-1, 1)).reshape(plot_preds.shape)
        print("normalization convert active")

    # x = df[-(plot_test.shape[0]*plot_test.shape[1]):].index
    plot_test = plot_test.reshape((plot_test.shape[0]*plot_test.shape[1], 1))
    plot_preds = plot_preds.reshape((plot_test.shape[0]*plot_test.shape[1], 1))
    plot_train = train.reshape((train.shape[0]*train.shape[1], 1))
    print(f'plot_train shape: {plot_train.shape}')
    print(f'plot_test shape: {plot_test.shape}')
    print(f'plot_preds shape: {plot_preds.shape}')

    x_train = df[:len(plot_train)].index
    x_test = df[len(plot_train):len(plot_train) + len(plot_test)].index

    ax.plot(x_train, plot_train, label='Train', color='blue')
    ax.plot(x_test, plot_test, label='actual', color='red')
    ax.plot(x_test, plot_preds, label='preds', color='green')
    if title_suffix == None:
        ax.set_title('Predictions vs. Actual')
    else:
        ax.set_title(f'Predictions vs. Actual, {title_suffix}')
    ax.set_xlabel('Date')
    ax.set_ylabel(xlabel)
    ax.legend()
    plt.show()

In [None]:
def calculate_TEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    ema3 = ema2.ewm(span=period, adjust=False).mean()
    tema = (3 * ema1) - (3 * ema2) + ema3
    return tema.dropna()


def calculate_DEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    dema = 2 * ema1 - ema2
    return dema


ticker = 'AAPL'
# timeframe days
dataset = yf.download(ticker, start='1990-01-01',
                      end=datetime.now().strftime('%Y-%m-%d'), interval='1d')
dataset = dataset[['Close']]

data1 = calculate_DEMA(dataset['Close'], 10)
data2 = calculate_TEMA(dataset['Close'], 10)

normaliza_bool_update = False
type_normalisation_update = None

etl = ETL(datainput1=dataset, datainput2=data1,
          test_size=0.2, n_input=5, timestep=5,
          normalize_bool=normaliza_bool_update,
          scaler=type_normalisation_update)

X_train1, y_train1 = etl.X_train1, etl.y_train1
X_train2, _ = etl.X_train2, None
X_test1, y_test1 = etl.X_test1, etl.y_test1
X_test2, _ = etl.X_test2, None

num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = 10000  # Size of the input vocabulary
target_vocab_size = 8000  # Size of the target vocabulary
dropout_rate = 0.1
transformer = Transformer(num_layers, num_heads, d_model, dff, input_vocab_size, target_vocab_size,
                          pe_input=input_vocab_size, pe_target=target_vocab_size, rate=dropout_rate)

##############################

Multi-Head Attention 
Final

https://www.kaggle.com/code/miljan/stock-predictions-with-multi-head-attention/notebook

In [9]:
import yfinance as yf
import pandas as pd
import numpy as np
from tensorflow.keras.layers import Dense, Dropout, LayerNormalization, Concatenate, TimeDistributed, Lambda
from tensorflow.keras import layers
from tensorflow.keras import backend as K
import tensorflow as tf
# from keras.engine.topology import Layer
from datetime import datetime
from sklearn.metrics import mean_absolute_percentage_error, root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import MinMaxScaler

import matplotlib.pyplot as plt

In [10]:

class ETL:

    def __init__(self, dataInput, test_size=0.2, n_input=5, timestep=5, normalize_bool=False, scaler=None) -> None:
        self.dataInput = dataInput
        self.test_size = test_size
        self.n_input = n_input
        self.timestep = timestep
        self.normalize_bool = normalize_bool
        self.scaler = scaler

        self.dataframes = [self.extract_historic_data(
            data) for data in dataInput]
        self.train_data, self.test_data = self.process_dataframes()

        if self.normalize_bool:
            print('normalized', normalize_bool)
            self.scaler = scaler
        else:
            print('not normalized', normalize_bool)
            self.scaler = None

    def extract_historic_data(self, datainput) -> pd.Series:
        """
        Extracts historic data and optionally normalizes it.
        """
        # data = self.datainput
        if self.normalize_bool:
            data = self.scaler.fit_transform(
                datainput.values.reshape(-1, 1))
        else:
            data = datainput
        return data

    def split_data(self, data) -> tuple:
        if len(data) != 0:
            train_idx = round(len(data) * (1-self.test_size))
            train = data[:train_idx]
            test = data[train_idx:]
            train = np.array(train)
            test = np.array(test)
            # return train[:, np.newaxis], test[:, np.newaxis]
            return np.array(train)[:, np.newaxis], np.array(test)[:, np.newaxis]
        else:
            raise Exception('Data set is empty, cannot split.')

    def window_and_reshape(self, data) -> np.array:
        """
        Reformats data into shape our model needs,
        namely, [# samples, timestep, # feautures]
        samples
        """
        NUM_FEATURES = 1
        samples = int(data.shape[0] / self.timestep)
        result = np.array(np.array_split(data, samples))
        output = result.reshape((samples, self.timestep, NUM_FEATURES))
        print(output.shape)
        return output

    def transform(self, train, test) -> np.array:
        train_remainder = train.shape[0] % self.timestep
        test_remainder = test.shape[0] % self.timestep
        if train_remainder != 0 and test_remainder != 0:
            train = train[train_remainder:]
            test = test[test_remainder:]
        elif train_remainder != 0:
            train = train[train_remainder:]
        elif test_remainder != 0:
            test = test[test_remainder:]
        # print("train:", train, "test:", test)
        return self.window_and_reshape(train), self.window_and_reshape(test)

    def process_dataframes(self):
        train_data = []
        test_data = []
        for df in self.dataframes:
            train, test = self.split_data(df)
            train_transformed, test_transformed = self.transform(train, test)
            train_data.append(train_transformed)
            test_data.append(test_transformed)
        return train_data, test_data

    def to_supervised(self, data, n_out=5) -> tuple:
        """
        Converts our time series prediction problem to a
        supervised learning problem.
        """
        # flatted the data
        # data = train.reshape((train.shape[0]*train.shape[1], train.shape[2]))
        data = data.reshape((data.shape[0] * data.shape[1], data.shape[2]))
        X, y = [], []
        in_start = 0
        # step over the entire history one time step at a time
        for _ in range(len(data)):
            # define the end of the input sequence
            in_end = in_start + self.n_input
            out_end = in_end + n_out
            # ensure we have enough data for this instance
            if out_end <= len(data):
                x_input = data[in_start:in_end, 0]
                x_input = x_input.reshape((len(x_input), 1))
                X.append(x_input)
                y.append(data[in_end:out_end, 0])
                # move along one time step
                in_start += 1
        print(np.array(X), np.array(y))
        return np.array(X), np.array(y)

In [13]:

class LayerNormalization(Layer):
    def __init__(self, eps=1e-6, **kwargs):
        self.eps = eps
        super(LayerNormalization, self).__init__(**kwargs)

    def build(self, input_shape):
        self.gamma = self.add_weight(name='gamma', shape=input_shape[-1:],
                                     initializer='ones', trainable=True)
        self.beta = self.add_weight(name='beta', shape=input_shape[-1:],
                                    initializer='zeros', trainable=True)
        super(LayerNormalization, self).build(input_shape)

    def call(self, x):
        mean = K.mean(x, axis=-1, keepdims=True)
        std = K.std(x, axis=-1, keepdims=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta

    def compute_output_shape(self, input_shape):
        return input_shape


class ScaledDotProductAttention():
    def __init__(self, d_model, attn_dropout=0.1):
        self.temper = np.sqrt(d_model)
        self.dropout = Dropout(attn_dropout)

    def __call__(self, q, k, v, mask):
        attn = Lambda(lambda x: K.batch_dot(
            x[0], x[1], axes=[2, 2])/self.temper)([q, k])
        if mask is not None:
            mmask = Lambda(lambda x: (-1e+10)*(1-x))(mask)
            attn = Add()([attn, mmask])
        attn = Activation('softmax')(attn)
        attn = self.dropout(attn)
        output = Lambda(lambda x: K.batch_dot(x[0], x[1]))([attn, v])
        return output, attn


class MultiHeadAttention():
    # mode 0 - big martixes, faster; mode 1 - more clear implementation
    def __init__(self, n_head, d_model, d_k, d_v, dropout, mode=0, use_norm=True):
        self.mode = mode
        self.n_head = n_head
        self.d_k = d_k
        self.d_v = d_v
        self.dropout = dropout
        if mode == 0:
            self.qs_layer = Dense(n_head*d_k, use_bias=False)
            self.ks_layer = Dense(n_head*d_k, use_bias=False)
            self.vs_layer = Dense(n_head*d_v, use_bias=False)
        elif mode == 1:
            self.qs_layers = []
            self.ks_layers = []
            self.vs_layers = []
            for _ in range(n_head):
                # time series tensorflow
                self.qs_layers.append(
                    TimeDistributed(Dense(d_k, use_bias=False)))
                self.ks_layers.append(
                    TimeDistributed(Dense(d_k, use_bias=False)))
                self.vs_layers.append(
                    TimeDistributed(Dense(d_v, use_bias=False)))
        self.attention = ScaledDotProductAttention(d_model)
        self.layer_norm = LayerNormalization() if use_norm else None
        self.w_o = TimeDistributed(Dense(d_model))

    def __call__(self, q, k, v, mask=None):
        d_k, d_v = self.d_k, self.d_v
        n_head = self.n_head

        if self.mode == 0:
            qs = self.qs_layer(q)  # [batch_size, len_q, n_head*d_k]
            ks = self.ks_layer(k)
            vs = self.vs_layer(v)

            def reshape1(x):
                s = tf.shape(x)   # [batch_size, len_q, n_head * d_k]
                x = tf.reshape(x, [s[0], s[1], n_head, d_k])
                x = tf.transpose(x, [2, 0, 1, 3])
                # [n_head * batch_size, len_q, d_k]
                x = tf.reshape(x, [-1, s[1], d_k])
                return x
            qs = Lambda(reshape1)(qs)
            ks = Lambda(reshape1)(ks)
            vs = Lambda(reshape1)(vs)

            if mask is not None:
                mask = Lambda(lambda x: K.repeat_elements(x, n_head, 0),
                              output_shape=lambda s: (s[0] * n_head, s[1], s[2]))(mask)
            head, attn = self.attention(qs, ks, vs, mask=mask)

            def reshape2(x):
                s = tf.shape(x)   # [n_head * batch_size, len_v, d_v]
                x = tf.reshape(x, [n_head, -1, s[1], s[2]])
                x = tf.transpose(x, [1, 2, 0, 3])
                # [batch_size, len_v, n_head * d_v]
                x = tf.reshape(x, [-1, s[1], n_head*d_v])
                return x
            head = Lambda(reshape2)(head)
        elif self.mode == 1:
            heads = []
            attns = []
            for i in range(n_head):
                qs = self.qs_layers[i](q)
                ks = self.ks_layers[i](k)
                vs = self.vs_layers[i](v)
                head, attn = self.attention(qs, ks, vs, mask)
                heads.append(head)
                attns.append(attn)
            head = Concatenate()(heads) if n_head > 1 else heads[0]
            attn = Concatenate()(attns) if n_head > 1 else attns[0]

        outputs = self.w_o(head)
        outputs = Dropout(self.dropout)(outputs)
        if not self.layer_norm:
            return outputs, attn
        # outputs = Add()([outputs, q]) # sl: fix
        return self.layer_norm(outputs), attn


class CustomMultiHeadAttention(layers.Layer):
    def __init__(self, head_size, num_heads, dropout=0.2):
        super(CustomMultiHeadAttention, self).__init__()
        self.head_size = head_size
        self.num_heads = num_heads
        self.dropout = layers.Dropout(dropout)
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=head_size)

    def call(self, inputs, mask=None):
        return self.attention(inputs, inputs, attention_mask=mask)


def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0.2, epsilon=1e-6, kernel_size=1):
    """
    Creates a single transformer block.
    """
    x = layers.LayerNormalization(epsilon=epsilon)(inputs)
    mask = layers.Lambda(lambda x: tf.reduce_sum(tf.cast(x != 0, 'float32'), axis=-1, keepdims=True) != 0,
                         output_shape=lambda s: (s[0], s[1], 1))(inputs)
    mask = layers.Lambda(lambda x: tf.repeat(x, repeats=inputs.shape[1], axis=-1),
                         output_shape=lambda s: (s[0], s[1], s[1]))(mask)

    multi_head_attention = MultiHeadAttention(
        num_heads, head_size, head_size, head_size, dropout)
    x, attn = multi_head_attention(x, x, x, mask=mask)
    res = x + inputs

    # Feed Forward Part
    x = layers.LayerNormalization(epsilon=epsilon)(res)
    x = layers.Conv1D(filters=ff_dim, kernel_size=kernel_size,
                      activation="relu")(x)
    x = layers.Dropout(dropout)(x)
    x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=kernel_size)(x)
    return x + res


def build_transformer(head_size, num_heads, ff_dim, num_trans_blocks, mlp_units, dropout=0.2, mlp_dropout=0.2, epsilon=1e-6, kernel_size=1):
    """
    Creates final model by building many transformer blocks.
    """
    n_timesteps, n_features, n_outputs = 5, 1, 1  # Example values for the shape
    inputs = [tf.keras.Input(shape=(n_timesteps, n_features))
              for _ in range(num_trans_blocks)]

    # Masking layer
    x = [layers.Masking(mask_value=0.0)(inp) for inp in inputs]

    for i in range(num_trans_blocks):
        x[i] = transformer_encoder(x[i], head_size=head_size, num_heads=num_heads, ff_dim=ff_dim,
                                   dropout=dropout, kernel_size=kernel_size, epsilon=epsilon)

    x = [layers.GlobalAveragePooling1D(
        data_format="channels_first")(xi) for xi in x]
    combined = layers.Concatenate()(x)

    for dim in mlp_units:
        combined = layers.Dense(dim, activation="relu")(combined)
        combined = layers.Dropout(mlp_dropout)(combined)

    outputs = layers.Dense(n_outputs)(combined)
    return tf.keras.Model(inputs=inputs, outputs=outputs)


def fit_transformers(transformer: tf.keras.Model, x_train_list, y_train, x_val_list, y_val, display_loss=False):
    transformer.compile(
        loss="mse",
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        metrics=["mae", 'mape'])
    callbacks = [tf.keras.callbacks.EarlyStopping(
        monitor='loss', patience=10, restore_best_weights=True)]
    hist = transformer.fit(x_train_list, y_train, validation_data=(
        x_val_list, y_val), batch_size=32, epochs=25, verbose=1, callbacks=callbacks)
    history_df = pd.DataFrame(hist.history)

    if display_loss:
        history_df['loss'].plot()
        plt.title('Training Loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train'], loc='upper right')
        plt.show()
        history_df['val_loss'].plot()
        plt.title('Validation Loss')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Validation'], loc='upper right')
        plt.show()

        # Display minimum validation loss
        min_val_loss = history_df['val_loss'].min()
        print("Minimum validation loss: {:.4f}".format(min_val_loss))
        # Plot mae
        history_df.loc[:, ['mae', 'val_mae']].plot()
        plt.title('Model MAE')
        plt.ylabel('MAE')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation'], loc='upper right')
        plt.show()
        # Display minimum validation mae and mape
        min_val_mae = history_df['val_mae'].min()
        min_val_mape = history_df['val_mape'].min()
        print("Minimum validation MAE: {:.4f}".format(min_val_mae))
        print("Minimum validation MAPE: {:.4f}".format(min_val_mape))

    return hist


class PredictAndForecast:
    """
    model: tf.keras.Model
    train: np.array
    test: np.array
    Takes a trained model, train, and test datasets and returns predictions
    of len(test) with same shape.
    """

    def __init__(self, model, train_data, test_data, n_input=5, n_steps=5, scaler=None, normalize_bool=False) -> None:
        self.model = model
        self.train_data = train_data
        self.test_data = test_data
        self.n_input = n_input
        self.n_steps = n_steps
        self.scaler = scaler
        self.normalize_bool = normalize_bool
        self.predictions = self.get_predictions()

    def forecast(self, history_data) -> np.array:
        """
        Given last weeks actual data, forecasts next weeks prices.
        """
        data_list = [np.array(history) for history in history_data]
        data_list = [data.reshape(
            (data.shape[0]*data.shape[1], data.shape[2])) for data in data_list]

        predictions = []
        for _ in range(self.n_steps):
            input_x_list = [data[-self.n_input:, :] for data in data_list]
            input_x_list = [input_x.reshape(
                (1, len(input_x), input_x.shape[1])) for input_x in input_x_list]
            yhat = self.model.predict(input_x_list, verbose=0)
            yhat = yhat[0]
            predictions.append(yhat)
            data_list = [np.vstack([data, yhat]) for data in data_list]
        return yhat

    def get_predictions(self) -> np.array:
        """
        compiles models predictions week by week over entire test set.
        """
        history_data = [[x for x in train] for train in self.train_data]
        predictions = []
        for i in range(len(self.test_data[0])):
            yhat_sequence = self.forecast(history_data)  # Recursive prediction
            predictions.append(yhat_sequence)
            for history, test in zip(history_data, self.test_data):
                history.append(test[i, :])
        predictions = np.array(predictions)
        print(f"Total predictions shape: {predictions.shape}")
        total_elements = predictions.size
        if total_elements % self.n_steps != 0:
            new_size = total_elements - (total_elements % self.n_steps)
            predictions = predictions[:new_size]
            print(f"Trimmed predictions to new size: {new_size}")

        return predictions.reshape(-1, self.n_steps)


def plot_results(train_list, test_list, preds, df, convertActive, normalize_bool=False, scaler=None, title_suffix=None, xlabel='AAPL stock Price'):
    """
    Plots training data in blue, actual values in red, and predictions in green,
    over time.
    """

    fig, ax = plt.subplots(figsize=(20, 6))

    print(f'train shapes: {[train.shape for train in train_list]}')
    print(f'test shapes: {[test.shape for test in test_list]}')
    print(f'preds shape: {preds.shape}')

    if convertActive:
        plot_train_list = [train.squeeze(axis=-1).flatten()
                           for train in train_list]
        plot_test_list = [test.squeeze(axis=-1).flatten()
                          for test in test_list]
        plot_preds = preds.flatten()

    if normalize_bool:
        # Inverse transform to get back to original scale
        plot_test_list = [scaler.inverse_transform(
            plot_test.reshape(-1, 1)).reshape(plot_test.shape) for plot_test in plot_test_list]
        plot_train_list = [scaler.inverse_transform(
            train.reshape(-1, 1)).reshape(train.shape) for train in train_list]
        plot_preds = scaler.inverse_transform(
            plot_preds.reshape(-1, 1)).reshape(plot_preds.shape)
        print("normalization convert active")

    plot_train = np.concatenate(plot_train_list)
    plot_test = np.concatenate(plot_test_list)

    print(f'plot_train shape: {plot_train.shape}')
    print(f'plot_test shape: {plot_test.shape}')
    print(f'plot_pret shape: {plot_preds.shape}')

    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    x_train = df[:len(plot_train)].index
    x_test = df[len(plot_train):len(plot_train) + len(plot_test)].index

    print(f'x_train shape: {len(x_train)}')
    print(f'x_test shape: {len(x_test)}')
    print(f'plot_train shape after concatenate: {plot_train.shape}')
    print(f'plot_test shape after concatenate: {plot_test.shape}')
    print(f'plot_preds shape after inverse transform: {plot_preds.shape}')

    if len(plot_preds) > len(plot_test):
        plot_preds = plot_preds[:len(plot_test)]
    elif len(plot_preds) < len(plot_test):
        plot_preds = np.pad(plot_preds, (0, len(
            plot_test) - len(plot_preds)), 'constant', constant_values=np.nan)

    print(f'plot_preds shape after padding: {plot_preds.shape}')

    if len(x_train) != len(plot_train):
        print(f'Length mismatch: x_train ({
              len(x_train)}) and plot_train ({len(plot_train)})')
    if len(x_test) != len(plot_test):
        print(f'Length mismatch: x_test ({
              len(x_test)}) and plot_test ({len(plot_test)})')

    ax.plot(x_train, plot_train, label='Train', color='blue')
    ax.plot(x_test, plot_test, label='Actual', color='red')
    ax.plot(x_test, plot_preds, label='Predictions', color='green')
    if title_suffix is None:
        ax.set_title('Predictions vs. Actual')
    else:
        ax.set_title(f'Predictions vs. Actual, {title_suffix}')
    ax.set_xlabel('Date')
    ax.set_ylabel(xlabel)
    ax.legend()
    plt.show()


class Evaluate:
    def __init__(self, actual, predictions, normalize_bool=False, scaler=None) -> None:
        actual = np.array(actual).flatten()
        predictions = np.array(predictions).flatten()

        # Check dimensions and reshape if necessary
        if normalize_bool and scaler is not None:
            actual = scaler.inverse_transform(actual.reshape(-1, 1)).flatten()
            predictions = scaler.inverse_transform(
                predictions.reshape(-1, 1)).flatten()

        # Debugging transformed data
        print(f"Transformed actual: {actual[:10]}")
        print(f"Transformed predictions: {predictions[:10]}")
        min_len = min(len(actual), len(predictions))
        self.actual = actual[:min_len]
        self.predictions = predictions[:min_len]
        # Debugging lengths after transformation
        print(f"Final lengths - actual: {len(self.actual)
                                         }, predictions: {len(self.predictions)}")
        self.var_ratio = self.compare_var()
        self.mape = self.evaluate_model_with_mape()
        self.rmse = self.evaluate_model_with_rmse()
        self.mae = self.evaluate_model_with_mae()
        self.r2 = self.evaluate_model_with_r2()

    def compare_var(self):
        return abs(1 - (np.var(self.predictions) / np.var(self.actual)))

    def evaluate_model_with_mape(self):
        mape = mean_absolute_percentage_error(self.actual, self.predictions)
        return mape

    def evaluate_model_with_rmse(self):
        rmse = root_mean_squared_error(self.actual, self.predictions)
        return rmse

    def evaluate_model_with_mae(self):
        mae = mean_absolute_error(self.actual, self.predictions)
        return mae

    def evaluate_model_with_r2(self):
        r2 = r2_score(self.actual, self.predictions)
        return r2

In [28]:


def calculate_TEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    ema3 = ema2.ewm(span=period, adjust=False).mean()
    tema = (3 * ema1) - (3 * ema2) + ema3
    return tema.dropna()


def calculate_DEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    dema = 2 * ema1 - ema2
    return dema


ticker = 'AAPL'
dataset = yf.download(ticker, start='1990-01-01',
                      end=datetime.now().strftime('%Y-%m-%d'), interval='1d')
dataset = dataset[['Close']]

data1 = calculate_DEMA(dataset['Close'], 10)
data2 = calculate_TEMA(dataset['Close'], 10)

data_inputs = [dataset, data1, data2]

result = pd.DataFrame({
    'Close': dataset['Close'],
    'DEMA': data1,
    'TEMA': data2
})
print(result)
print(result.shape)

[*********************100%%**********************]  1 of 1 completed

                 Close        DEMA        TEMA
Date                                          
1990-01-02    0.332589    0.332589    0.332589
1990-01-03    0.334821    0.333327    0.333599
1990-01-04    0.335938    0.334239    0.334771
1990-01-05    0.337054    0.335275    0.336033
1990-01-08    0.339286    0.336766    0.337844
...                ...         ...         ...
2024-07-26  217.960007  220.698230  217.263572
2024-07-29  218.240005  219.626401  216.564154
2024-07-30  218.800003  219.048188  216.497588
2024-07-31  222.080002  219.737212  218.076320
2024-08-01  218.360001  219.046155  217.562488

[8712 rows x 3 columns]
(8712, 3)





In [36]:
normaliza_bool_update = False
type_normalisation_update = None

data = ETL(datainput=result, test_size=0.2, n_input=5, timestep=5,
           normalize_bool=normaliza_bool_update, scaler=type_normalisation_update)
x_train, y_train = data.X_train, data.y_train
X_val, y_val = data.X_test, data.y_test


transformer = build_transfromer(head_size=128, num_heads=4, ff_dim=2, num_trans_blocks=4,
                                mlp_units=[256], mlp_dropout=0.10, dropout=0.10, attention_axes=1)

hist = fit_transformer(transformer, x_train,
                       y_train, X_val, y_val, display_loss=False)

transformer_preds = PredictAndForecast(transformer, data.train, data.test, n_input=5,
                                       scaler=type_normalisation_update, normalize_bool=normaliza_bool_update)

plot_results(data.train, data.test, transformer_preds.predictions,
             data.df, normalize_bool=normaliza_bool_update, scaler=type_normalisation_update, title_suffix=f'Transformer Apple - {list_type_used[j]}')
train_evaluation = Evaluate(data.test, transformer_preds.predictions,
                            normalize_bool=normaliza_bool_update, scaler=type_normalisation_update)
print(f"MAPE for : {train_evaluation.mape}")
print(f"RMSE for : {train_evaluation.rmse}")
print(f"MAE for : {train_evaluation.mae}")
print(f"R-squared for : {train_evaluation.r2}")

data shape: (8712, 3)
train shape: (6970, 3) test shape: (1742, 3)
Train shape: (6970, 3)
Test shape: (1740, 3)
(1394, 5, 3)
(348, 5, 3)
[[[ 0.332589  ]
  [ 0.33482099]
  [ 0.33593801]
  [ 0.33705401]
  [ 0.339286  ]]

 [[ 0.33482099]
  [ 0.33593801]
  [ 0.33705401]
  [ 0.339286  ]
  [ 0.33593801]]

 [[ 0.33593801]
  [ 0.33705401]
  [ 0.339286  ]
  [ 0.33593801]
  [ 0.32142901]]

 ...

 [[39.36999893]
  [39.96250153]
  [40.40000153]
  [40.23749924]
  [39.46500015]]

 [[39.96250153]
  [40.40000153]
  [40.23749924]
  [39.46500015]
  [39.375     ]]

 [[40.40000153]
  [40.23749924]
  [39.46500015]
  [39.375     ]
  [39.30250168]]] [[ 0.33593801  0.32142901  0.308036    0.308036    0.30580401]
 [ 0.32142901  0.308036    0.308036    0.30580401  0.31138399]
 [ 0.308036    0.308036    0.30580401  0.31138399  0.296875  ]
 ...
 [39.375      39.30250168 39.94499969 39.99499893 39.81750107]
 [39.30250168 39.94499969 39.99499893 39.81750107 39.96500015]
 [39.94499969 39.99499893 39.81750107 39.9650

ValueError: Exception encountered when calling LayerNormalization.call().

[1mCannot reshape a tensor with 3 elements to shape [1,1,1] (1 elements) for '{{node functional_9_1/layer_normalization_39_1/Reshape}} = Reshape[T=DT_FLOAT, Tshape=DT_INT32](functional_9_1/layer_normalization_39_1/Reshape/ReadVariableOp, functional_9_1/layer_normalization_39_1/Reshape/shape)' with input shapes: [3], [3] and with input tensors computed as partial shapes: input[1] = [1,1,1].[0m

Arguments received by LayerNormalization.call():
  • inputs=tf.Tensor(shape=(None, 5, 1), dtype=float32)

ljhbkuhkjhjh

In [15]:
def calculate_TEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    ema3 = ema2.ewm(span=period, adjust=False).mean()
    tema = (3 * ema1) - (3 * ema2) + ema3
    return tema.dropna()


def calculate_DEMA(data, period=10):
    ema1 = data.ewm(span=period, adjust=False).mean()
    ema2 = ema1.ewm(span=period, adjust=False).mean()
    dema = 2 * ema1 - ema2
    return dema


ticker = 'AAPL'
dataset = yf.download(ticker, start='1990-01-01',
                      end=datetime.now().strftime('%Y-%m-%d'), interval='1d')
dataset = dataset[['Close']]

data1 = calculate_DEMA(dataset['Close'], 10)
data2 = calculate_TEMA(dataset['Close'], 10)

data_inputs = [dataset, data1, data2]

# Initialize the ETL process
etl = ETL(data_inputs, test_size=0.2,
          normalize_bool=False, scaler=MinMaxScaler())
print(etl.dataframes.shape)
# Process the data into supervised learning format
X_train_list, y_train_list = [], []
X_test_list, y_test_list = [], []

for train, test in zip(etl.train_data, etl.test_data):
    X_train, y_train = etl.to_supervised(train)
    X_test, y_test = etl.to_supervised(test)
    X_train_list.append(X_train)
    y_train_list.append(y_train)
    X_test_list.append(X_test)
    y_test_list.append(y_test)

# Convert lists to numpy arrays
X_train_list = [np.array(x) for x in X_train_list]
X_test_list = [np.array(x) for x in X_test_list]
y_train = y_train_list[0]  # Assuming both datasets have the same y
y_test = y_test_list[0]  # Assuming both datasets have the same y

# Build the transformer model
model = build_transformer(
    head_size=64,
    num_heads=4,
    ff_dim=64,
    num_trans_blocks=len(data_inputs),
    mlp_units=[128],
    dropout=0.2,
    mlp_dropout=0.2
)

# Fit the transformer model
fit_transformers(model, X_train_list, y_train,
                 X_test_list, y_test, display_loss=True)

predict_and_forecast = PredictAndForecast(
    model=model,
    train_data=X_train_list,
    test_data=X_test_list,
    n_input=5,
    n_steps=5,
    scaler=MinMaxScaler(),
    normalize_bool=False
)

# Get predictions
predictions = predict_and_forecast.predictions
print(predictions)

[*********************100%%**********************]  1 of 1 completed

(1394, 5, 1)
(348, 5, 1)
(1394, 5, 1)
(348, 5, 1)
(1394, 5, 1)
(348, 5, 1)
not normalized False





AttributeError: 'list' object has no attribute 'shape'

In [None]:
evaluate = Evaluate(etl.test_data, [predictions],
                    normalize_bool=False, scaler=MinMaxScaler())
print(f"MAPE: {evaluate.mape}")
print(f"RMSE: {evaluate.rmse}")
print(f"MAE: {evaluate.mae}")
print(f"R2: {evaluate.r2}")

In [None]:
# Plot the results
# plot_results(train=X_train_list, test=X_test_list, preds=predictions, df=dataset, convertActive=True,
#              normalize_bool=False, scaler=MinMaxScaler(), title_suffix="AAPL", xlabel='AAPL stock Price')
plot_results(train=etl.train_data, test=etl.test_data, preds=predictions, df=dataset, convertActive=True,
             normalize_bool=False, scaler=MinMaxScaler(), title_suffix="AAPL", xlabel='AAPL stock Price')