## Stacked Transformer and Linear Regression - Weighted Method

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error


# Parameters
stock_symbol = 'AAME' 
start_date = '2013-01-01'
end_date = '2023-01-01'
window_size = 10  

# Fetch data from Yahoo Finance
data = yf.download(stock_symbol, start=start_date, end=end_date)
prices = data['Close'].values

# Create sequences for training
def create_sequences(data, window):
    X, y = [], []
    for i in range(len(data) - window):
        X.append(data[i:i + window])
        y.append(data[i + window])
    return np.array(X), np.array(y)

X, y = create_sequences(prices, window_size)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_layers=1, num_heads=1, ffn_hid_dim=128):  
        super().__init__()
        self.model_dim = input_dim
        self.pos_encoder = nn.Linear(input_dim, self.model_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.model_dim, nhead=num_heads, dim_feedforward=ffn_hid_dim),
            num_layers=num_layers)
        self.fc_out = nn.Linear(self.model_dim, 1)

    def forward(self, x):
        x = self.pos_encoder(x)
        x = x * np.sqrt(self.model_dim)  # Scale embedding
        x = x.permute(1, 0, 2)  # Shape to [seq_len, batch_size, features]
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Shape back to [batch_size, seq_len, features]
        return self.fc_out(x[:, -1, :]).squeeze(-1)

# Initialize the model with the correct input dimension
transformer_model = TransformerModel(input_dim=1)
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)
criterion = nn.MSELoss()


# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32).unsqueeze(-1)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32).unsqueeze(-1)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)



# Training Function
def train_model(model, train_loader, optimizer, criterion, epochs=100):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f'Epoch {epoch+1}, Loss: {total_loss / len(train_loader)}')


# Train Linear Regression
linear_model = LinearRegression().fit(X_train_scaled.reshape(-1, window_size), y_train)

# Predictions
transformer_model.eval()
with torch.no_grad():
    transformer_preds = transformer_model(X_test_tensor).numpy()

linear_preds = linear_model.predict(X_test_scaled)
ensemble_preds = (transformer_preds + linear_preds) / 2




# Evaluate performance
mse = mean_squared_error(y_test, ensemble_preds)
mae = mean_absolute_error(y_test, ensemble_preds)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_test - ensemble_preds) / y_test)) * 100
print(f'Ensemble MSE: {mse}')
print(f"Ensemble MAE: {mae}")
print(f"Ensemble RMSE: {rmse}")
print(f"Ensemble MAPE: {mape}")

In [15]:
## Calculate R2
from sklearn.metrics import r2_score

r2 = r2_score(y_test, ensemble_preds)
print(f'R-squared: {r2}')

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import yfinance as yf
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# Parameters
stock_symbol = 'AAME'
start_date = '2013-01-01'
end_date = '2024-01-01'
window_size = 10  

# Fetch data from Yahoo Finance
data = yf.download(stock_symbol, start=start_date, end=end_date)
prices = data['Close'].values

# Function to create sequences for training
def create_sequences(data, window):
    X, y = [], []
    for i in range(len(data) - window):
        X.append(data[i:i + window])
        y.append(data[i + window])
    return np.array(X), np.array(y)

X, y = create_sequences(prices, window_size)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Transformer Model Definition
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_layers=1, num_heads=1, ffn_hid_dim=128):
        super().__init__()
        self.model_dim = input_dim
        self.pos_encoder = nn.Linear(input_dim, self.model_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.model_dim, nhead=num_heads, dim_feedforward=ffn_hid_dim),
            num_layers=num_layers)
        self.fc_out = nn.Linear(self.model_dim, 1)

    def forward(self, x):
        x = self.pos_encoder(x)
        x = x * np.sqrt(self.model_dim)  # Scale embedding
        x = x.permute(1, 0, 2)  # Shape to [seq_len, batch_size, features]
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Shape back to [batch_size, seq_len, features]
        return self.fc_out(x[:, -1, :]).squeeze(-1)

# Initialize and prepare models
transformer_model = TransformerModel(input_dim=1)
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32).unsqueeze(-1)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32).unsqueeze(-1)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# DataLoader setup
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Training function for the Transformer model
def train_model(model, train_loader, optimizer, criterion, epochs=100):
    model.train()
    training_losses = []
    for epoch in range(epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        training_losses.append(total_loss / len(train_loader))
    return training_losses

# Train the Transformer model
transformer_losses = train_model(transformer_model, train_loader, optimizer, criterion)

# Train Linear Regression model
linear_model = LinearRegression().fit(X_train_scaled.reshape(-1, window_size), y_train)

# Ensemble prediction function
def weighted_ensemble(transformer_model, linear_model, X_test_tensor, X_test_scaled, transformer_weight=0.70):
    transformer_model.eval()
    with torch.no_grad():
        transformer_preds = transformer_model(X_test_tensor).numpy()
    linear_preds = linear_model.predict(X_test_scaled)
    ensemble_preds = (transformer_preds * transformer_weight) + (linear_preds * (1 - transformer_weight))
    return ensemble_preds

# Generate ensemble predictions
ensemble_preds = weighted_ensemble(transformer_model, linear_model, X_test_tensor, X_test_scaled)

# Evaluate performance
ensemble_mse = mean_squared_error(y_test, ensemble_preds)
print(f'Ensemble MSE: {ensemble_mse}')

# Plot results
plt.figure(figsize=(10, 5))
plt.plot(y_test, label='Actual Values', alpha=0.7)
plt.plot(ensemble_preds, label='Ensemble Predictions', alpha=0.7)
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.title('Actual Prices vs. Ensemble Predictions')
plt.legend()
plt.show()


## ARIMA and Stacked TRansformer - Weighted Method

In [10]:
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.tsa.arima.model import ARIMA

# Fit ARIMA model
arima_model = ARIMA(y_train, order=(2,1,0))  
arima_fitted = arima_model.fit()

# Predictions using ARIMA on the test set
arima_preds = arima_fitted.forecast(steps=len(X_test))

# Using the weights
transformer_weight = 0.20  # Adjusted weight to Transformer
arima_weight = 0.80      # Adjusted weight to ARIMA predictions (not linear regression anymore)

# Make sure weights sum to 1
assert transformer_weight + arima_weight == 1, "Weights must sum to 1."

# Load Transformer predictions 
transformer_model.eval()
with torch.no_grad():
    transformer_preds = transformer_model(X_test_tensor).numpy()

# Calculate the weighted average of predictions
ensemble_preds = (transformer_preds * transformer_weight) + (arima_preds * arima_weight)

# Evaluate performance
ensemble_mse = mean_squared_error(y_test, ensemble_preds)
print(f'Ensemble MSE: {ensemble_mse}')

# Plotting
fig, ax = plt.subplots(2, 1, figsize=(10, 10))

# Plot training losses for Transformer
ax[0].plot(transformer_losses, label='Transformer Training Loss')
ax[0].set_xlabel('Epoch')
ax[0].set_ylabel('Loss')
ax[0].set_title('Training Loss per Epoch')
ax[0].legend()

# Plot predictions vs actual values
ax[1].plot(y_test, label='Actual Values', alpha=0.7)
ax[1].plot(ensemble_preds, label='Ensemble Predictions', alpha=0.7)
ax[1].set_xlabel('Time')
ax[1].set_ylabel('Stock Price')
ax[1].set_title('Comparison of Actual Prices vs. Ensemble Predictions')
ax[1].legend()

plt.tight_layout()
plt.show()


## Stacked Transformer and Linear Regression - Weighted Method

In [14]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import yfinance as yf
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt

# Parameters
stock_symbol = 'AAL'
start_date = '2013-01-01'
end_date = '2024-01-01'
window_size = 10  

# Fetch data from Yahoo Finance
data = yf.download(stock_symbol, start=start_date, end=end_date)
prices = data['Close'].values

# Function to create sequences for training
def create_sequences(data, window):
    X, y = [], []
    for i in range(len(data) - window):
        X.append(data[i:i + window])
        y.append(data[i + window])
    return np.array(X), np.array(y)

X, y = create_sequences(prices, window_size)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Transformer Model Definition
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_layers=1, num_heads=1, ffn_hid_dim=128):
        super().__init__()
        self.model_dim = input_dim
        self.pos_encoder = nn.Linear(input_dim, self.model_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=self.model_dim, nhead=num_heads, dim_feedforward=ffn_hid_dim),
            num_layers=num_layers)
        self.fc_out = nn.Linear(self.model_dim, 1)

    def forward(self, x):
        x = self.pos_encoder(x)
        x = x * np.sqrt(self.model_dim)  # Scale embedding
        x = x.permute(1, 0, 2)  # Shape to [seq_len, batch_size, features]
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)  # Shape back to [batch_size, seq_len, features]
        return self.fc_out(x[:, -1, :]).squeeze(-1)

# Initialize and prepare models
transformer_model = TransformerModel(input_dim=1)
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Convert to PyTorch tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32).unsqueeze(-1)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32).unsqueeze(-1)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

# DataLoader setup
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# Training function for the Transformer model
def train_model(model, train_loader, optimizer, criterion, epochs=100):
    model.train()
    training_losses = []
    for epoch in range(epochs):
        total_loss = 0
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        training_losses.append(total_loss / len(train_loader))
    return training_losses

# Train the Transformer model
transformer_losses = train_model(transformer_model, train_loader, optimizer, criterion)

# Train Linear Regression model
linear_model = LinearRegression().fit(X_train_scaled.reshape(-1, window_size), y_train)

# Ensemble prediction function
def weighted_ensemble(transformer_model, linear_model, X_test_tensor, X_test_scaled, transformer_weight=0.70):
    transformer_model.eval()
    with torch.no_grad():
        transformer_preds = transformer_model(X_test_tensor).numpy()
    linear_preds = linear_model.predict(X_test_scaled)
    ensemble_preds = (transformer_preds * transformer_weight) + (linear_preds * (1 - transformer_weight))
    return ensemble_preds

# Generate ensemble predictions
ensemble_preds = weighted_ensemble(transformer_model, linear_model, X_test_tensor, X_test_scaled)

# Evaluate performance
ensemble_mse = mean_squared_error(y_test, ensemble_preds)
print(f'Ensemble MSE: {ensemble_mse}')



# Calculate performance metrics
mse = mean_squared_error(y_test, ensemble_preds)
mae = mean_absolute_error(y_test, ensemble_preds)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_test - ensemble_preds) / y_test)) * 100

print(f"Test MSE: {mse}")
print(f"Test MAE: {mae}")
print(f"Test RMSE: {rmse}")
print(f"Test MAPE: {mape}")


# Plot results
plt.figure(figsize=(10, 5))
plt.plot(y_test, label='Actual Values', alpha=0.7)
plt.plot(ensemble_preds, label='Ensemble Predictions', alpha=0.7)
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.title('Actual Prices vs. Ensemble Predictions')
plt.legend()
plt.show()


### Stacked Transformer and Linear Regression - Stacked Method

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# Fetch data
data = yf.download('AAME', start='2013-01-01', end='2023-01-01')
features = data[['Open', 'High', 'Low']]
target = data['Close']

# Prepare data
def create_sequences(features, targets, window):
    X, y = [], []
    for i in range(len(features) - window):
        X.append(features.iloc[i:(i + window)].values)
        y.append(targets.iloc[i + window])
    return np.array(X), np.array(y)

X, y = create_sequences(features, target, window=10)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train.reshape(-1, X_train.shape[-1])).reshape(X_train.shape)
X_val_scaled = scaler.transform(X_val.reshape(-1, X_val.shape[-1])).reshape(X_val.shape)
X_test_scaled = scaler.transform(X_test.reshape(-1, X_test.shape[-1])).reshape(X_test.shape)

# Flatten for Linear Regression
X_train_flat = X_train_scaled.reshape(-1, X_train.shape[1]*X_train.shape[2])
X_val_flat = X_val_scaled.reshape(-1, X_val.shape[1]*X_val.shape[2])
X_test_flat = X_test_scaled.reshape(-1, X_test.shape[1]*X_test.shape[2])

# Linear Regression Model
linear_model = LinearRegression()
linear_model.fit(X_train_flat, y_train)
val_predictions_linear = linear_model.predict(X_val_flat)

# Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_features, num_layers=1, num_heads=1, ffn_hid_dim=128):
        super().__init__()
        self.pos_encoder = nn.Linear(num_features, input_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=ffn_hid_dim),
            num_layers=num_layers)
        self.fc_out = nn.Linear(input_dim, 1)

    def forward(self, x):
        x = self.pos_encoder(x)
        x *= np.sqrt(self.pos_encoder.out_features)
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)
        return self.fc_out(x[:, -1, :]).squeeze(-1)

transformer_model = TransformerModel(input_dim=64, num_features=3)
optimizer = optim.Adam(transformer_model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# DataLoader
train_dataset = TensorDataset(torch.tensor(X_train_scaled, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

def train_transformer(model, train_loader, optimizer, criterion, epochs=100):
    model.train()
    for epoch in range(epochs):
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
    print("Transformer training complete.")
    return model

# Train Transformer
train_transformer(transformer_model, train_loader, optimizer, criterion)

# Validation predictions from Transformer
transformer_model.eval()
with torch.no_grad():
    val_predictions_transformer = transformer_model(torch.tensor(X_val_scaled, dtype=torch.float32)).numpy()

# Train meta-model (Linear Regression)
meta_model = LinearRegression()
X_val_meta = np.column_stack((val_predictions_transformer, val_predictions_linear))
meta_model.fit(X_val_meta, y_val)

# Test predictions from Transformer and Linear Regression
with torch.no_grad():
    test_predictions_transformer = transformer_model(torch.tensor(X_test_scaled, dtype=torch.float32)).numpy()
test_predictions_linear = linear_model.predict(X_test_flat)

# Test meta predictions
X_test_meta = np.column_stack((test_predictions_transformer, test_predictions_linear))
final_predictions = meta_model.predict(X_test_meta)

# Evaluation
mse = mean_squared_error(y_test, final_predictions)
mae = mean_absolute_error(y_test, final_predictions)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_test - final_predictions) / y_test)) * 100

print(f'Final Ensemble MSE: {mse}, MAE: {mae}, RMSE: {rmse}, MAPE: {mape}%')

# Plotting
plt.figure(figsize=(12, 6))
plt.plot(y_test, label='Actual Prices', alpha=0.7)
plt.plot(final_predictions, label='Predicted Prices', alpha=0.7)
plt.title('Test Data: Actual vs. Predicted Prices by Stacked Ensemble')
plt.xlabel('Index')
plt.ylabel('Price')
plt.legend()
plt.show()


## Stacked Transformer and Linear Regression - Stacking Method

In [12]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.base import BaseEstimator, RegressorMixin
import numpy as np
import pandas as pd
import yfinance as yf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import StackingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

# Download data
data = yf.download('AAME', start='2013-01-01', end='2023-01-01')
features = data[['Open', 'High', 'Low']]
target = data['Close']

# Prepare data
def create_sequences(features, targets, window=10):
    X, y = [], []
    for i in range(len(features) - window):
        X.append(features.iloc[i:(i + window)].values.flatten())
        y.append(targets.iloc[i + window])
    return np.array(X), np.array(y)

X, y = create_sequences(features, target, window=10)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Transformer Model wrapped for use in scikit-learn
class SklearnTransformerWrapper(BaseEstimator, RegressorMixin):
    def __init__(self, model, epochs=100, lr=0.001):
        self.model = model
        self.epochs = epochs
        self.lr = lr
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)

    def fit(self, X, y):
        X_tensor = torch.tensor(X, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.float32)
        dataset = TensorDataset(X_tensor, y_tensor)
        loader = DataLoader(dataset, batch_size=64, shuffle=True)
        self.model.train()
        for _ in range(self.epochs):
            for inputs, targets in loader:
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs.view(-1), targets)
                loss.backward()
                self.optimizer.step()
        return self

    def predict(self, X):
        self.model.eval()
        with torch.no_grad():
            X_tensor = torch.tensor(X, dtype=torch.float32)
            predictions = self.model(X_tensor)
            return predictions.numpy()

class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_features, num_layers=1, num_heads=1, ffn_hid_dim=128):
        super().__init__()
        self.pos_encoder = nn.Linear(num_features, input_dim)
        self.transformer_encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads, dim_feedforward=ffn_hid_dim),
            num_layers=num_layers)
        self.fc_out = nn.Linear(input_dim, 1)

    def forward(self, x):
        x = x.view(-1, 10, x.size(1) // 10)
        x = self.pos_encoder(x)
        x *= np.sqrt(self.pos_encoder.out_features)
        x = x.permute(1, 0, 2)
        x = self.transformer_encoder(x)
        x = x.permute(1, 0, 2)
        return self.fc_out(x[:, -1, :]).squeeze(-1)

transformer = TransformerModel(input_dim=64, num_features=X_train.shape[1] // 10)
wrapped_transformer = SklearnTransformerWrapper(transformer)

# Linear Regression Model
linear_model = LinearRegression()

# Stacking Regressor
stacking_regressor = StackingRegressor(
    estimators=[('lr', linear_model), ('transformer', wrapped_transformer)],
    final_estimator=LinearRegression(),
    cv=5
)

# Train the Stacking Regressor
stacking_regressor.fit(X_train_scaled, y_train)

# Predict and evaluate
y_pred = stacking_regressor.predict(X_test_scaled)
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100

print(f'Final Ensemble MSE: {mse}, MAE: {mae}, RMSE: {rmse}, MAPE: {mape}%')

# Plot results
plt.figure(figsize=(12, 6))
plt.plot(y_test, label='Actual Prices', alpha=0.7)
plt.plot(y_pred, label='Predicted Prices', alpha=0.7)
plt.title('Test Data: Actual vs. Predicted Prices by Stacked Ensemble')
plt.xlabel('Index')
plt.ylabel('Price')
plt.legend()
plt.show()


## Hyperparameter tuning -  Bayesian 

In [6]:
from skopt import BayesSearchCV

# Define the parameter search space
param_search_space = {
    'final_estimator__fit_intercept': [True, False],
    'transformer__epochs': (5, 20),  # search space for number of epochs
    'transformer__lr': (1e-5, 1e-1, 'log-uniform'),  # search space for learning rate
}

# Perform Bayesian Optimization hyperparameter search
opt = BayesSearchCV(stacking_regressor, param_search_space, cv=5, n_iter=20, scoring='neg_mean_absolute_percentage_error', n_jobs=-1)
opt.fit(X_train_scaled, y_train)

# Get the best parameters
best_params = opt.best_params_
best_estimator = opt.best_estimator_

# Train the model with the best parameters
best_estimator.fit(X_train_scaled, y_train)

# Predict with the best model
y_pred_tuned = best_estimator.predict(X_test_scaled)

# Evaluate the tuned model
mape_tuned = np.mean(np.abs((y_test - y_pred_tuned) / y_test)) * 100
print(f'Tuned Stacked Ensemble MAPE: {mape_tuned}%')


## Hyperparameter tuning - GridSearchCV

In [5]:
from sklearn.model_selection import GridSearchCV

# Define the parameter grid

param_search_space = {
    'final_estimator__fit_intercept': [True, False],
    'transformer__epochs': (5, 20),  # search space for number of epochs
    'transformer__lr': [0.001, 0.01, 0.1]  # search space for learning rate
}


# Perform Grid Search with cross-validation
grid_search = GridSearchCV(stacking_regressor, param_search_space, cv=5, scoring='neg_mean_absolute_percentage_error')
grid_search.fit(X_train_scaled, y_train)

# Get the best parameters
best_params = grid_search.best_params_
best_estimator = grid_search.best_estimator_

# Train the model with the best parameters
best_estimator.fit(X_train_scaled, y_train)

# Predict with the best model
y_pred_tuned = best_estimator.predict(X_test_scaled)

# Evaluate the tuned model
mape_tuned = np.mean(np.abs((y_test - y_pred_tuned) / y_test)) * 100
print(f'Tuned Stacked Ensemble MAPE: {mape_tuned}%')


## Hyperparameter tuning - RandomizedSearchCV

In [4]:
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform, randint

# Define the parameter search space
param_distributions = {
    'final_estimator__fit_intercept': [True, False],
    'transformer__epochs': randint(5, 20),  # Search space for number of epochs
    'transformer__lr': uniform(1e-5, 1e-1)  # Search space for learning rate
}

# Perform Randomized Search hyperparameter search
random_search = RandomizedSearchCV(
    stacking_regressor, 
    param_distributions, 
    n_iter=20, 
    cv=5, 
    scoring='neg_mean_absolute_percentage_error', 
    random_state=42,
    n_jobs=-1
)
random_search.fit(X_train_scaled, y_train)

# Get the best parameters
best_params = random_search.best_params_
best_estimator = random_search.best_estimator_

# Train the model with the best parameters
best_estimator.fit(X_train_scaled, y_train)

# Predict with the best model
y_pred_tuned = best_estimator.predict(X_test_scaled)

# Evaluate the tuned model
mape_tuned = np.mean(np.abs((y_test - y_pred_tuned) / y_test)) * 100
print(f'Tuned Stacked Ensemble MAPE: {mape_tuned}%')


## Hyperparameter Optimization - optuna

In [1]:
import optuna
from sklearn.model_selection import cross_val_score

# Define the objective function to optimize
def objective(trial):
    # Define the search space for hyperparameters
    final_estimator_fit_intercept = trial.suggest_categorical('final_estimator__fit_intercept', [True, False])
    transformer_epochs = trial.suggest_int('transformer__epochs', 5, 20)
    transformer_lr = trial.suggest_loguniform('transformer__lr', 1e-5, 1e-1)

    # Update the stacking regressor with the suggested hyperparameters
    stacking_regressor.set_params(
        final_estimator__fit_intercept=final_estimator_fit_intercept,
        transformer__epochs=transformer_epochs,
        transformer__lr=transformer_lr
    )

    # Perform cross-validation
    scores = -cross_val_score(stacking_regressor, X_train_scaled, y_train, cv=5, scoring='neg_mean_absolute_percentage_error')
    
    # Return the mean of the scores
    return scores.mean()

# Create a study object and optimize the objective function
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=20)

# Get the best parameters
best_params = study.best_params
best_estimator = stacking_regressor.set_params(**best_params)

# Train the model with the best parameters
best_estimator.fit(X_train_scaled, y_train)

# Predict with the best model
y_pred_tuned = best_estimator.predict(X_test_scaled)

# Evaluate the tuned model
mape_tuned = np.mean(np.abs((y_test - y_pred_tuned) / y_test)) * 100
print(f'Tuned Stacked Ensemble MAPE: {mape_tuned}%')
