In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Dropout, MultiHeadAttention, LayerNormalization, Add, Flatten, TimeDistributed
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import LearningRateScheduler
from tensorflow.keras.utils import plot_model

In [None]:
# Load and preprocess data
data = pd.read_csv('GOOGL_2006-01-01_to_2018-01-01.csv', parse_dates=['Date'])
data = data.drop("Name", axis=1)
data.set_index('Date', inplace=True)
data = data.asfreq('b')
data = data.fillna(method='bfill').fillna(method='ffill')
data_vals = data.values

In [None]:
# Define the scheduler function
def step_decay(epoch):
    initial_lr = 0.01
    drop = 0.5
    epochs_drop = 10
    lr = initial_lr * (drop ** (epoch // epochs_drop))
    return lr

# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
data_normalized = scaler.fit_transform(data_vals)

# Define sequence length and lag
sequence_length = 100
lags = [1, 5, 10, 20, 30]
y_tests = []
preds = []
rmses = []

# Positional encoding function
def positional_encoding(sequence_length, d_model):
    pos = np.arange(sequence_length)[:, np.newaxis]
    i = np.arange(d_model)[np.newaxis, :]
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    angle_rads = pos * angle_rates

    # Apply sin to even indices; cos to odd indices
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
    return angle_rads

# Transformer block
def transformer_block(inputs, head_size, num_heads, ff_dim, dropout=0.1):
    x = MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(inputs, inputs)
    x = Dropout(dropout)(x)
    x = Add()([x, inputs])
    x = LayerNormalization(epsilon=1e-6)(x)
    
    ff = Dense(ff_dim, activation="relu")(x)
    ff = Dense(inputs.shape[-1])(ff)
    x = Add()([x, ff])
    x = LayerNormalization(epsilon=1e-6)(x)
    return x

for lag in lags:
    # Prepare input (X) and output (Y) sequences
    X, Y = [], []
    for i in range(len(data_normalized) - sequence_length - lag):
        X.append(data_normalized[i:i + sequence_length])
        Y.append(data_normalized[i + sequence_length + lag, 3])  # Close price

    X = np.array(X)
    Y = np.array(Y)

    # Split the data into training and testing sets
    split_ratio = 0.8
    split_index = int(split_ratio * len(X))
    X_train, X_test = X[:split_index], X[split_index:]
    Y_train, Y_test = Y[:split_index], Y[split_index:]

    # Create the transformer model
    d_model = X.shape[2]
    inputs = Input(shape=(sequence_length, d_model))
    pos_encoding = positional_encoding(sequence_length, d_model)

    x = inputs + pos_encoding
    x = transformer_block(x, head_size=64, num_heads=4, ff_dim=256, dropout=0.05)
    x = transformer_block(x, head_size=64, num_heads=4, ff_dim=256, dropout=0.05)
    x = Flatten()(x)
    x = Dense(64, activation="relu")(x)
    outputs = Dense(1)(x)

    model = Model(inputs, outputs)
    model.summary()
    plot_model(model, to_file="transformer_model.png", show_shapes=True, show_layer_names=True)

    # Compile the model
    model.compile(optimizer=Adam(learning_rate=0.01), loss='mean_squared_error')

    # Learning rate scheduler
    lr_scheduler = LearningRateScheduler(step_decay)

    # Train the model
    model.fit(X_train, Y_train, epochs=50, batch_size=32, verbose=1, callbacks=[lr_scheduler])

    # Predict on the test set
    predictions = model.predict(X_test)

    # Denormalize the predictions
    min_val = scaler.data_min_[3]
    max_val = scaler.data_max_[3]

    predictions_rescaled = predictions * (max_val - min_val) + min_val
    Y_test_rescaled = Y_test * (max_val - min_val) + min_val

    # Evaluate the model using RMSE
    rmse = np.sqrt(mean_squared_error(Y_test_rescaled, predictions_rescaled))
    print(f"Root Mean Squared Error: {rmse}")

    y_tests.append(Y_test_rescaled)
    preds.append(predictions_rescaled)
    rmses.append(rmse)


In [None]:
# Plot results
plt.figure(figsize=(10, 6))
plt.plot(y_tests[0], label='Actual')

for i in range(len(preds)):
    plt.plot(preds[i], label='Predictions: ' + str(lags[i]) + ' days forward')
    print('Predictions ' + str(lags[i]) + ' days forward rmse: ' + str(rmses[i]))

plt.legend()
plt.title("Actual vs Predicted (Close Price)")
plt.savefig("Transformer_preds.png")
plt.show()