In [None]:
# Import necessary libraries
import yfinance as yf
import torch
import torch.nn as nn
import math
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, mean_absolute_percentage_error
import ta # Using the 'ta' library for more features
from datetime import datetime

# ===== 0. Set Random Seeds for Reproducibility =====
# This is a critical step to ensure that the results are the same every time the code is run.
seed_value = 42
np.random.seed(seed_value)
torch.manual_seed(seed_value)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value) # if you are using multi-GPU.
    # The two lines below are often needed for full reproducibility on GPUs
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# ===== 1. Download Stock Data =====
ticker = input("Enter stock ticker (e.g., TSLA): ").upper()
start_date = datetime(2015, 1, 1)
end_date = datetime(2024, 12, 31)

print(f"Downloading {ticker} data from {start_date.date()} to {end_date.date()}...")
data = yf.download(ticker, start=start_date, end=end_date)
# Flatten columns if yfinance returns a MultiIndex
if isinstance(data.columns, pd.MultiIndex):
    data.columns = data.columns.get_level_values(0)
data = data[['Open', 'High', 'Low', 'Close', 'Volume']].dropna()
print("Data download complete.")


# ===== 2. Outlier Detection and Removal using Rolling Z-Score =====
# This robust method helps prevent the model from being skewed by extreme, one-off events.
print("Identifying and removing outliers...")
data['Daily_Return'] = data['Close'].pct_change()
window = 252 # Approximately one year of trading days
rolling_mean = data['Daily_Return'].rolling(window=window).mean()
rolling_std = data['Daily_Return'].rolling(window=window).std()
data['Z_Score'] = (data['Daily_Return'] - rolling_mean) / rolling_std
threshold = 3.0
outliers = data[data['Z_Score'].abs() > threshold]

if not outliers.empty:
    print(f"Found and removed {len(outliers)} outlier(s).")
    data = data.drop(outliers.index)
else:
    print("No significant outliers found.")

data = data.drop(columns=['Daily_Return', 'Z_Score'])
print("Outlier removal complete.")


# ===== 3. Feature Engineering (using 'ta' library) =====
# Adding a rich set of technical indicators to give the model more context.
print("Adding advanced technical indicators...")
data['sma_10'] = ta.trend.SMAIndicator(close=data['Close'], window=10).sma_indicator()
data['ema_10'] = ta.trend.EMAIndicator(close=data['Close'], window=10).ema_indicator()
data['rsi'] = ta.momentum.RSIIndicator(close=data['Close'], window=14).rsi()
macd = ta.trend.MACD(close=data['Close'])
data['macd'] = macd.macd()
data['macd_signal'] = macd.macd_signal()
bollinger = ta.volatility.BollingerBands(close=data['Close'])
data['bb_mavg'] = bollinger.bollinger_mavg()
data['bb_hband'] = bollinger.bollinger_hband()
data['bb_lband'] = bollinger.bollinger_lband()

# Add S&P 500 Market Context
print("Adding market context (S&P 500)...")
spy_data = yf.download('SPY', start=start_date, end=end_date)
if isinstance(spy_data.columns, pd.MultiIndex):
    spy_data.columns = spy_data.columns.get_level_values(0)
spy_data['SPY_Return'] = spy_data['Close'].pct_change()
data = data.join(spy_data['SPY_Return'])

# Add time-based features
data['DayOfWeek'] = data.index.dayofweek
data['IsMonthStart'] = data.index.is_month_start.astype(int)
data['IsMonthEnd'] = data.index.is_month_end.astype(int)
data = pd.get_dummies(data, columns=['DayOfWeek'], prefix='DOW')

data.dropna(inplace=True)
print("Feature engineering complete.")

# Find the index of 'Close' column BEFORE scaling
close_price_index = data.columns.get_loc('Close')


# ===== 4. Scale Data =====
# We use two scalers: one for all data and one specifically for the 'Close' price.
# This makes reversing the scaling on our predictions much more accurate.
print("Scaling data...")
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data)

close_price_scaler = MinMaxScaler()
close_price_scaler.fit(data[['Close']])
print("Data scaling complete.")


# ===== 5. Sequence Creation =====
def create_sequences(data, seq_len, close_price_idx):
    X, y = [], []
    for i in range(seq_len, len(data)):
        X.append(data[i-seq_len:i])
        y.append(data[i, close_price_idx]) # The target is the 'Close' price
    return np.array(X), np.array(y)

seq_len = 60 # Using a 60-day window to predict the next day
X, y = create_sequences(scaled_data, seq_len, close_price_index)
print(f"Created {len(X)} sequences with a window size of {seq_len} days.")


# ===== 6. Data Splitting (70/30) =====
n_total = len(X)
train_size = int(0.7 * n_total)
X_train, y_train = X[:train_size], y[:train_size]
X_test, y_test = X[train_size:], y[train_size:]

train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) # Shuffle training data
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
print(f"Data split into {len(X_train)} training samples and {len(X_test)} testing samples.")


# ===== 7. Model Definition: LSTM + Transformer =====
# This is the hybrid model from your first script.
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class LSTMTransformer(nn.Module):
    def __init__(self, input_dim, model_dim=64, lstm_hidden=64, n_heads=4, num_layers=2):
        super().__init__()
        # Project input features into the model's dimension
        self.input_proj = nn.Linear(input_dim, model_dim)
        # LSTM layer to process sequences
        self.lstm = nn.LSTM(model_dim, lstm_hidden, batch_first=True, bidirectional=False)
        # Positional encoding to give the Transformer time-context
        self.pos_enc = PositionalEncoding(lstm_hidden)
        # Transformer Encoder to weigh the importance of different time steps
        encoder_layer = nn.TransformerEncoderLayer(d_model=lstm_hidden, nhead=n_heads, batch_first=True, activation='relu')
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # Final fully-connected layer to produce the output
        self.fc = nn.Linear(lstm_hidden, 1)

    def forward(self, x):
        x = self.input_proj(x)
        x, _ = self.lstm(x) # LSTM processes the sequence
        x = self.pos_enc(x) # Add positional info
        x = self.transformer(x) # Transformer finds complex relationships
        # We take the output from the last time step for prediction
        out = self.fc(x[:, -1, :])
        return out


# ===== 8. Model Training =====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

model = LSTMTransformer(input_dim=X.shape[2]).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001) # A smaller learning rate is often better for Adam

epochs = 50
print(f"Starting training for {epochs} epochs...")

for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device).unsqueeze(1)
        optimizer.zero_grad()
        pred = model(xb)
        loss = criterion(pred, yb)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    avg_train_loss = total_train_loss / len(train_loader)
    print(f"Epoch {epoch+1:2d}/{epochs} | Train Loss: {avg_train_loss:.5f}")

print("Training complete.")


# ===== 9. Evaluation on Test Set =====
model.eval()
preds, actuals = [], []
with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        pred = model(xb).cpu().numpy()
        preds.extend(pred.flatten())
        actuals.extend(yb.numpy().flatten())


# ===== 10. Inverse Scaling using the dedicated 'close_price_scaler' =====
# This provides a more accurate representation of the real price values.
preds_reshaped = np.array(preds).reshape(-1, 1)
actuals_reshaped = np.array(actuals).reshape(-1, 1)

pred_unscaled = close_price_scaler.inverse_transform(preds_reshaped).flatten()
actual_unscaled = close_price_scaler.inverse_transform(actuals_reshaped).flatten()


# ===== 11. Report Metrics & Plot Results (Expanded) =====
print("\n--- Model Evaluation on Test Data ---")
r2 = r2_score(actual_unscaled, pred_unscaled)
explained_variance = explained_variance_score(actual_unscaled, pred_unscaled)
mae = mean_absolute_error(actual_unscaled, pred_unscaled)
rmse = np.sqrt(mean_squared_error(actual_unscaled, pred_unscaled))
mape = mean_absolute_percentage_error(actual_unscaled, pred_unscaled) * 100
# Symmetric MAPE is useful as it's bounded and fair if actuals or preds are zero.
smape_numerator = np.abs(pred_unscaled - actual_unscaled)
smape_denominator = (np.abs(actual_unscaled) + np.abs(pred_unscaled)) / 2
smape_mask = smape_denominator != 0
smape = np.mean(smape_numerator[smape_mask] / smape_denominator[smape_mask]) * 100

print(f"\n--- Evaluation for {ticker} ---")
print(f"  Goodness of Fit:")
print(f"    R-squared (R²):               {r2:.4f}")
print(f"    Explained Variance:           {explained_variance:.4f}")
print(f"\n  Average Error:")
print(f"    Mean Absolute Error (MAE):    {mae:.4f} (Error in $)")
print(f"    Root Mean Squared Error (RMSE): {rmse:.4f} (Error in $)")
print(f"    Mean Absolute % Error (MAPE): {mape:.2f}%")
print(f"    Symmetric MAPE (SMAPE):       {smape:.2f}%")

plt.style.use('seaborn-v0_8-whitegrid')
plt.figure(figsize=(15, 7))
plt.plot(actual_unscaled, label='Actual Close Price', color='blue', alpha=0.9)
plt.plot(pred_unscaled, label='Predicted Close Price', color='red', linestyle='--', alpha=0.8)
plt.title(f"{ticker} Stock Price Prediction (LSTM-Transformer)", fontsize=16)
plt.xlabel("Time (Test Set Days)", fontsize=12)
plt.ylabel("Close Price (USD)", fontsize=12)
plt.legend(fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.show()
