In [36]:
#@title Packages

import random
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
from scipy import stats
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score  
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input, LSTM, GRU, Dense, LeakyReLU, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, LearningRateScheduler
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam, SGD, RMSprop, Adagrad, Adadelta
from tensorflow.keras.losses import MeanSquaredError, MeanAbsoluteError, MeanAbsolutePercentageError, Huber
from tensorflow.keras.backend import sqrt, mean, square
from tensorflow.keras import backend as K

In [37]:
#@tile Read and Prepare Data

def read_prepare_data(symbol):
    #read
    data = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/dataset4.csv')
    train = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/train.csv')
    test = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/test.csv')
    
    #we're going to use only one symbol
    data = data[data['Symbol'] == symbol].copy()
    train = train[train['Symbol'] == symbol].copy()
    test = test[test['Symbol'] == symbol].copy()
    
    #we're going to use the price variable
    data = data[['Date', 'Close']].copy()
    train = train[['Date', 'Close']].copy()
    test = test[['Date', 'Close']].copy()
    
    #set date as index
    data.set_index('Date', inplace=True)
    train.set_index('Date', inplace=True)
    test.set_index('Date', inplace=True)

    #normalize
    scaler = MinMaxScaler(feature_range=(0, 1))
    train = pd.DataFrame(scaler.fit_transform(train), columns=train.columns, index=train.index)
    test = pd.DataFrame(scaler.transform(test), columns=test.columns, index=test.index)
    data = pd.DataFrame(scaler.transform(data), columns=data.columns, index=data.index) 

    return scaler, data, train, test

scaler, data, train, test = read_prepare_data('AAPL')

#verify
#print(data.head())
#print(data.index)   
#print(data.columns)

In [38]:
#@title Create Dataset

def create_dataset(dataframe, look_back):
    dataset = dataframe.values
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back)]
        dataX.append(a)
        dataY.append(dataset[i + look_back])
        
    return np.array(dataX), np.array(dataY)

In [39]:
#@title Reshape

def reshape(train, test, look_back):
    trainX, trainY = create_dataset(train, look_back)
    testX, testY = create_dataset(test, look_back)
    trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], trainX.shape[2]))
    testX = np.reshape(testX, (testX.shape[0], testX.shape[1], testX.shape[2]))

    return trainX, trainY, testX, testY

In [40]:
#@title Forecast

def forecast_values(testY, look_back, horizon, model):
    testY_copy = testY.copy()
    for val in range(0, horizon+1):
        a = testY_copy[-(1+look_back):-1]
        a = np.reshape(a, (1, look_back, 1)) 
        a_predict = model.predict(a, verbose=0)[0]
        a_predict = np.reshape(a_predict, (1, 1))
        testY_copy = np.concatenate((testY_copy, a_predict), axis=0)
    
    forecast = testY_copy[len(testY):]
    return forecast

In [41]:
#@title Auxiliary Function

def predict_forecast_plot(data, train, test, trainX, trainY, testX, testY, nepochs, look_back, horizon, plot_predictions, model):
    #make predictions
    trainPredict = model.predict(trainX)
    testPredict = model.predict(testX)
    
    #forecast
    forecast = forecast_values(testY, look_back, horizon, model)

    #invert predictions
    trainPredict = scaler.inverse_transform(trainPredict)
    trainY = scaler.inverse_transform(trainY)
    testPredict = scaler.inverse_transform(testPredict)
    testY = scaler.inverse_transform(testY)
    forecast = scaler.inverse_transform(forecast)

    #calculate root mean squared error
    trainScore = np.sqrt(mean_squared_error(trainY, trainPredict))
    print('Train Score: %.2f RMSE' % (trainScore))
    testScore = np.sqrt(mean_squared_error(testY, testPredict))
    print('Test Score: %.2f RMSE' % (testScore))

    #plot predictions
    if plot_predictions==True: 
        #shift train predictions for plotting
        trainPredictPlot = np.empty_like(data)
        trainPredictPlot[:, :] = np.nan
        trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
        
        #shift test predictions for plotting
        testPredictPlot = np.empty_like(data)
        testPredictPlot[:, :] = np.nan
        testPredictPlot[len(trainPredict)+(look_back*2)+1:len(data)-1, :] = testPredict
        
        #shift forecast for plotting
        forecastPlot = np.empty_like(pd.concat([data, pd.DataFrame(forecast)]))
        forecastPlot[:, :] = np.nan
        forecastPlot[len(data):len(forecastPlot),:] = forecast
        
        #plot baseline, predictions and forecast
        plt.figure(figsize=(15,7))
        plt.plot(scaler.inverse_transform(data), label='real')
        plt.plot(trainPredictPlot, label='train set prediction')
        plt.plot(testPredictPlot, label='test set prediction')
        plt.plot(forecastPlot, label='forecast')
        plt.legend()
        plt.show()

    return testScore

In [42]:
#@title Train and Predict

def rmse_loss(y_true, y_pred):
    return K.sqrt(K.mean(K.square(y_pred - y_true)))

def apply_data_augmentation(trainX, trainY, noise_factor=0.01):
    noise = np.random.normal(0, noise_factor, trainX.shape)
    trainX_aug = trainX + noise
    return trainX_aug, trainY

def read_prepare_data_with_regularization(symbol):
    data = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/dataset4.csv')
    train = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/train.csv')
    test = pd.read_csv('/Users/pedroalexleite/Desktop/Tese/Dados/test.csv')
    
    data = data[data['Symbol'] == symbol].copy()
    train = train[train['Symbol'] == symbol].copy()
    test = test[test['Symbol'] == symbol].copy()
    
    data = data[['Date', 'Close']].copy()
    train = train[['Date', 'Close']].copy()
    test = test[['Date', 'Close']].copy()
    
    data.set_index('Date', inplace=True)
    train.set_index('Date', inplace=True)
    test.set_index('Date', inplace=True)
    
    scaler = MinMaxScaler(feature_range=(0, 1))
    train_normalized = pd.DataFrame(scaler.fit_transform(train), columns=train.columns, index=train.index)
    test_normalized = pd.DataFrame(scaler.transform(test), columns=test.columns, index=test.index)
    data_normalized = pd.DataFrame(scaler.transform(data), columns=data.columns, index=data.index)
    
    return scaler, data_normalized, train_normalized, test_normalized

def model_with_regularization(data, train, test, look_back=30, nepochs=50, horizon=7, plot_predictions=False, 
                              batch_size=1, learning_rate=0.001, optimizer='adam', activation='relu', loss='mse',
                              regularization_technique=None, **reg_params):
    
    #reshape
    trainX, trainY, testX, testY = reshape(train, test, look_back)
    
    #data augmentation
    if regularization_technique == 'data_augmentation':
        trainX, trainY = apply_data_augmentation(trainX, trainY, **reg_params)
        print(f"Applied Data Augmentation with parameters: {reg_params}")
    
    #build model
    input_layer = Input(shape=(trainX.shape[1], trainX.shape[2]))
    
    #regularization
    if regularization_technique == 'l2_regularization':
        if activation == 'leaky_relu':
            x = LSTM(16, kernel_regularizer=l2(reg_params.get('l2_lambda', 0.01)))(input_layer)
            x = LeakyReLU(alpha=0.01)(x)
        else:
            x = LSTM(16, activation=activation, kernel_regularizer=l2(reg_params.get('l2_lambda', 0.01)))(input_layer)
        print(f"Applied L2 Regularization with lambda: {reg_params.get('l2_lambda', 0.01)}")
        
    elif regularization_technique == 'recurrent_dropout':
        if activation == 'leaky_relu':
            x = LSTM(16, recurrent_dropout=reg_params.get('recurrent_dropout_rate', 0.2))(input_layer)
            x = LeakyReLU(alpha=0.01)(x)
        else:
            x = LSTM(16, activation=activation, recurrent_dropout=reg_params.get('recurrent_dropout_rate', 0.2))(input_layer)
        print(f"Applied Recurrent Dropout with rate: {reg_params.get('recurrent_dropout_rate', 0.2)}")
        
    else:
        if activation == 'leaky_relu':
            x = LSTM(16)(input_layer)
            x = LeakyReLU(alpha=0.01)(x)
        else:
            x = LSTM(16, activation=activation)(input_layer)
    
    if regularization_technique == 'dropout':
        x = Dropout(0.1)(x) 
        print("Applied Dropout with rate: 0.1 after LSTM")
        
    elif regularization_technique == 'batch_normalization':
        x = BatchNormalization()(x)
        print("Applied Batch Normalization after LSTM")
    
    output = Dense(1, activation='linear')(x)
    model_instance = Model(inputs=input_layer, outputs=output)
    
    #optimizer
    optimizer = optimizer.lower()
    optimizers_dict = {
        'adam': Adam(learning_rate=learning_rate),
        'sgd': SGD(learning_rate=learning_rate),
        'rmsprop': RMSprop(learning_rate=learning_rate),
        'adagrad': Adagrad(learning_rate=learning_rate),
        'adadelta': Adadelta(learning_rate=learning_rate)
    }
    
    if optimizer not in optimizers_dict:
        raise ValueError(f"Unsupported optimizer: {optimizer}")
    
    opt = optimizers_dict[optimizer]
    
    if regularization_technique == 'gradient_clipping':
        if optimizer == 'adam':
            opt = Adam(learning_rate=learning_rate, clipnorm=reg_params.get('clip_norm', 1.0))
        elif optimizer == 'sgd':
            opt = SGD(learning_rate=learning_rate, clipnorm=reg_params.get('clip_norm', 1.0))
        elif optimizer == 'rmsprop':
            opt = RMSprop(learning_rate=learning_rate, clipnorm=reg_params.get('clip_norm', 1.0))
        print(f"Applied Gradient Clipping with norm: {reg_params.get('clip_norm', 1.0)}")
    
    #loss function
    loss_map = {
        'mse': MeanSquaredError(),
        'mean_squared_error': MeanSquaredError(),
        'rmse': rmse_loss,
        'mae': MeanAbsoluteError(),
        'mean_absolute_error': MeanAbsoluteError(),
        'mape': MeanAbsolutePercentageError(),
        'mean_absolute_percentage_error': MeanAbsolutePercentageError()
    }
    loss = loss.lower()
    if loss not in loss_map:
        raise ValueError(f"Unsupported loss function: {loss}")
    loss_fn = loss_map[loss]
    
    model_instance.compile(loss=loss_fn, optimizer=opt)
    
    #callbacks
    callbacks = []
    
    if regularization_technique == 'early_stopping':
        early_stop = EarlyStopping(
            monitor='loss',
            patience=reg_params.get('patience', 10),
            restore_best_weights=True,
            verbose=1
        )
        callbacks.append(early_stop)
        print(f"Applied Early Stopping with patience: {reg_params.get('patience', 10)}")
    
    if regularization_technique is None:
        print("No regularization applied")
    
    #fit model
    model_instance.fit(
        trainX, trainY, 
        epochs=nepochs, 
        batch_size=batch_size, 
        verbose=1, 
        callbacks=callbacks if callbacks else None
    )
    
    #predict, forecast and plot
    testScore = predict_forecast_plot(
        data, train, test, trainX, trainY, testX, testY, 
        nepochs, look_back, horizon, plot_predictions, model_instance
    )
    
    return testScore

#model_with_regularization(data, train, test, look_back=30, nepochs=50, horizon=7, plot_predictions=False, batch_size=1, learning_rate=0.001, optimizer='adam', activation='relu', loss='mse',regularization_technique=None, **reg_params)

In [43]:
#@title Optimize Regularization Techniques - PART 2

def run_regularization_methods(symbol='AAPL', n_runs=5, look_back=30, horizon=7):
    configurations = [
        {
            'name': 'Dropout (0.1)',
            'technique': 'dropout',
            'params': {},
            'nepochs': 50
        },
        {
            'name': 'Early Stopping',
            'technique': 'early_stopping',
            'params': {'patience': 10},
            'nepochs': 500
        },
        {
            'name': 'L2 Regularization (0.001)',
            'technique': 'l2_regularization',
            'params': {'l2_lambda': 0.001},
            'nepochs': 50
        },
        {
            'name': 'Batch Normalization',
            'technique': 'batch_normalization',
            'params': {},
            'nepochs': 50
        },
        {
            'name': 'Recurrent Dropout (0.1)',
            'technique': 'recurrent_dropout',
            'params': {'recurrent_dropout_rate': 0.1},
            'nepochs': 50
        },
        {
            'name': 'Gradient Clipping',
            'technique': 'gradient_clipping',
            'params': {'clip_norm': 1.0},
            'nepochs': 50
        },
        {
            'name': 'Data Augmentation (Gaussian Noise 0.01)',
            'technique': 'data_augmentation',
            'params': {'noise_factor': 0.01},
            'nepochs': 50
        }
    ]

    results = {}

    for config in configurations:
        config_scores = []

        for run in range(n_runs):
            print(f"\nRun {run + 1}/{n_runs} for {config['name']}")
            try:
                scaler, data, train, test = read_prepare_data_with_regularization(symbol=symbol)

                score = model_with_regularization(
                    data, train, test,
                    look_back=look_back,
                    nepochs=config['nepochs'],
                    horizon=horizon,
                    plot_predictions=False,
                    regularization_technique=config['technique'],
                    **config['params']
                )
                config_scores.append(score)

            except Exception as e:
                 print(f"Error in run {run + 1}: {str(e)}")
                 continue

        if config_scores:
            results[config['name']] = {
                'scores': config_scores,
                'mean': np.mean(config_scores)
            }
            print(f"\n{config['name']} Results:")
            print(f"Mean RMSE: {results[config['name']]['mean']:.2f}\n")
        else:
            print(f"No successful runs for {config['name']}")

    return results

results = run_regularization_methods(symbol='AAPL', n_runs=5)
print("Final results for Overfitting Rgularization Techniques:")
for config_name, config_results in results.items():
    print(f"{config_name}: Mean RMSE = {config_results['mean']:.2f}")
best_config = min(results.items(), key=lambda x: x[1]['mean'])
print(f"\nBest Configuration: {best_config[0]}")
print(f"Best Mean RMSE: {best_config[1]['mean']:.2f}")

Testing Configuration: No Regularization

Run 1/1 for No Regularization
No regularization applied
Epoch 1/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 4ms/step - loss: 0.0592
Epoch 2/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 0.0020
Epoch 3/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 0.0014
Epoch 4/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 0.0010
Epoch 5/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 9.7728e-04
Epoch 6/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 8.1290e-04
Epoch 7/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 6.8693e-04
Epoch 8/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 4ms/step - loss: 6.3794e-04
Epoch 9/10
[1m1428/1428[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37