# Financial Time Series Forecasting

A sophisticated ensemble forecasting system combining SARIMA and LSTM models for financial time series prediction.

## 🔍 Overview

This project implements a hybrid forecasting system that combines statistical (SARIMA) and deep learning (LSTM) approaches for financial time series prediction. The system includes data preprocessing, visualization, and ensemble prediction capabilities.

## ✨ Features

- Data preprocessing and feature engineering
- Interactive visualizations
- SARIMA model with hyperparameter optimization
- Bidirectional LSTM with configurable architecture
- Ensemble forecasting
- Comprehensive evaluation metrics


In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
import multiprocessing as mp
from itertools import product
from tqdm.auto import tqdm
from typing import Any, Dict, List, Tuple, Optional
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    mean_squared_error,
    mean_absolute_error,
    mean_absolute_percentage_error,
)
import scipy.stats as stats
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.statespace.sarimax import SARIMAX
import optuna
import logging
import abc
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

## Configuration


In [None]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s: %(message)s",
    handlers=[logging.StreamHandler(), logging.FileHandler("sarima_lstm.log")],
)
logger = logging.getLogger(__name__)

In [None]:
np.random.seed(42)
torch.manual_seed(42)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Data Preprocessing


In [None]:
class DataProcessor:
    """
    Data processor designed to fetch and process financial data from
    yahoo financial.

    Attributes:
        tickers (List[str]): List of tickers to download the data from
        start_date (str): Start date of the tickers
        end_date (str): End date of the tickers
        raw_data (dict): Raw data of the financial data
        processed_data (dict): Processed data
    """

    def __init__(
        self,
        tickers: List[str] = ["AAPL"],
        start_date: str = "2010-01-01",
        end_date: str = "2024-01-01",
    ):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.raw_data, self.processed_data = {}, {}

    def fetch_market_data(self) -> Dict[str, pd.DataFrame]:
        """
        Fetches market data for the specified tickers

        Returns:
            Dict[str, pd.DataFrame]: A dictionary containing the raw data for each ticker
        """

        for ticker in tqdm(self.tickers, desc="Fetching data"):
            try:
                data = yf.download(ticker, start=self.start_date, end=self.end_date)

                if data.empty:
                    logger.warning(f"No data found for {ticker}")
                    continue

                if len(data) < 100:
                    logger.warning(
                        f"Limited data for {ticker}: only {len(data)} records"
                    )

                self.raw_data[ticker] = data

            except Exception as e:
                logger.error(f"Error fetching data for {ticker}: {e}")

        return self.raw_data

    def preprocess_data(self, target_column: str = "Close") -> Dict[str, dict]:
        """
        Data preprocessing pipeline with feature engineering and scaling. It applies the following transformations:

        - Log returns
        - Percentage returns
        - Moving averages (20 and 50 days)
        - Bollinger bands (20 days)
        - Min-max scaling
        - Standard scaling
        - Metadata (volatility, skewness, kurtosis)

        Args:
            target_column (str, optional): The column to predict. Defaults to "Close".

        Returns:
            Dict[str, dict]: A dictionary containing the processed data for each ticker
        """
        for ticker, data in self.raw_data.items():
            try:
                log_returns = np.log(
                    data[target_column] / data[target_column].shift(1)
                ).dropna()
                pct_returns = data[target_column].pct_change().dropna()

                ma_20 = data[target_column].rolling(window=20).mean()
                ma_50 = data[target_column].rolling(window=50).mean()
                std_20 = data[target_column].rolling(window=20).std()

                processed = {
                    "raw": data[target_column],
                    "log_returns": log_returns,
                    "pct_returns": pct_returns,
                    "MA_20": ma_20,
                    "MA_50": ma_50,
                    "bollinger_high": ma_20 + 2 * std_20,
                    "bollinger_low": ma_20 - 2 * std_20,
                }

                scaler_minmax = MinMaxScaler()
                scaler_standard = StandardScaler()

                processed["scaled_minmax"] = scaler_minmax.fit_transform(
                    data[target_column].values.reshape(-1, 1)
                )
                processed["scaled_standard"] = scaler_standard.fit_transform(
                    data[target_column].values.reshape(-1, 1)
                )

                processed["metadata"] = {
                    "volatility": log_returns.std(),
                    "skewness": log_returns.skew(),
                    "kurtosis": log_returns.kurtosis(),
                }

                self.processed_data[ticker] = processed

            except Exception as e:
                logger.error(f"Error preprocessing data: {e}")

        return self.processed_data

    @staticmethod
    def create_lagged_method(data: pd.Series, lags: int = 5) -> pd.DataFrame:
        """
        Creates lagged features for time series data

        Args:
            data (pd.Series): Time series data
            lags (int, optional): Number of lags to create. Defaults to 5.

        Returns:
            pd.DataFrame: DataFrame with lagged features
        """

        df = pd.DataFrame(data)
        for i in range(1, lags + 1):
            df[f"lag_{i}"] = df.iloc[:, 0].shift(i)

        return df.dropna()

    @staticmethod
    def add_rolling_features(data: pd.Series) -> pd.DataFrame:
        """
        Add rolling window features to the data

        Args:
            data (pd.Series): Time series data

        Returns:
            pd.DataFrame: DataFrame with rolling window features
        """

        df = pd.DataFrame(data)
        windows = [7, 14, 30]

        for window in windows:
            df[f"rolling_mean_{window}"] = df.iloc[:, 0].rolling(window=window).mean()
            df[f"rolling_std_{window}"] = df.iloc[:, 0].rolling(window=window).std()

        return df.dropna()

## Visualizer


In [None]:
class FinancialRatiosVisualizer:
    """"""

    def __init__(self, processed_data: pd.DataFrame, raw_data: pd.DataFrame):
        self.processed_data = processed_data
        self.raw_data = raw_data

    def plot_closing_prices(self):
        """
        Visualize the closing prices of the stock
        """

        plt.figure(figsize=(15, 7))
        for ticker, data in self.raw_data.items():
            plt.plot(data.index, data["Close"], label=ticker)

        plt.title("Closing Prices Comparison")
        plt.xlabel("Date")
        plt.ylabel("Price")
        plt.legend()
        plt.tight_layout()
        plt.show()

    def plot_returns_distribution(self):
        plt.figure(figsize=(15, 7))
        for ticker, data in self.raw_data.items():
            returns = data["Close"].pct_change().dropna()
            sns.histplot(returns, kde=True, label=ticker)

        plt.title("Returns Distribution")
        plt.xlabel("Daily Returns")
        plt.legend()
        plt.tight_layout()
        plt.show()

    def plot_correlation_heatmap(self):
        plt.figure(figsize=(10, 7))
        correlation_matrix = pd.concat(
            [data["Close"] for data in self.raw_data.values()], axis=1
        ).corr()

        sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm")
        plt.title("Correlation Heatmap")
        plt.tight_layout()
        plt.show()

    def plot_rolling_volatility(self):
        plt.figure(figsize=(15, 7))
        for ticker, data in self.raw_data.items():
            returns = data["Close"].pct_change().dropna()
            rolling_std = returns.rolling(window=20).std()
            plt.plot(data.index, rolling_std, label=ticker)

        plt.title("Rolling Volatility Comparison")
        plt.xlabel("Date")
        plt.ylabel("Volatility")
        plt.legend()
        plt.tight_layout()
        plt.show()

    def perform_statistical_test(self):
        print("\nStatistical Tests\n")
        for ticker, data in self.raw_data.items():
            returns = data["Close"].pct_change().dropna()
            adf_result = adfuller(returns)

            print(f"\n{ticker} Augmented Dickey-Fuller Test")
            print(f"ADF Statistic: {adf_result[0]}")
            print(f"p-value: {adf_result[1]}")

            _, p_value = stats.normaltest(returns)
            print(f"Normality Test p-value: {p_value}")

    def visualize_single_stock_preprocessing(self, ticker):
        data = self.raw_data[ticker]
        processed = self.processed_data[ticker]
        target_column = "Close"

        plt.figure(figsize=(20, 10))

        # Original vs Moving Averages
        plt.subplot(2, 2, 1)
        plt.title("Original vs Moving Averages")
        plt.plot(data.index, data[target_column], label="Original")
        plt.plot(data.index, processed["MA_20"], label="20-day MA")
        plt.plot(data.index, processed["MA_50"], label="50-day MA")
        plt.legend()

        # Bollinger Bands
        plt.subplot(2, 2, 2)
        plt.title("Bollinger Bands")
        plt.plot(data.index, data[target_column], label="Price")
        plt.plot(data.index, processed["MA_20"], label="MA 20")
        plt.plot(data.index, processed["bollinger_high"], label="Bollinger High")
        plt.plot(data.index, processed["bollinger_low"], label="Bollinger Low")
        plt.legend()

        # Log Returns Distribution
        plt.subplot(2, 2, 3)
        plt.title("Log Returns Distribution")
        sns.histplot(processed["log_returns"], kde=True)
        plt.xlabel("Log Returns")

        # Returns Scaling Comparison
        plt.subplot(2, 2, 4)
        plt.title("Returns Scaling Comparison")
        plt.plot(data.index, processed["scaled_minmax"], label="MinMax Scaled")
        plt.plot(data.index, processed["scaled_standard"], label="Standard Scaled")
        plt.legend()

        plt.tight_layout()
        plt.show()

        # ACF and PACF for Autocorrelation
        plt.figure(figsize=(15, 5))

        plt.subplot(1, 2, 1)
        plot_acf(processed["log_returns"], lags=40, ax=plt.gca())
        plt.title("Autocorrelation Function")

        plt.subplot(1, 2, 2)
        plot_pacf(processed["log_returns"], lags=40, ax=plt.gca())
        plt.title("Partial Autocorrelation Function")

        plt.tight_layout()
        plt.show()

    def run_comprehensive_visualization(self):
        self.plot_closing_prices()
        self.plot_returns_distribution()
        self.plot_correlation_heatmap()
        # self.plot_rolling_volatility()

        self.perform_statistical_test()
        for ticker in self.processed_data.keys():
            self.visualize_single_stock_preprocessing(ticker)

## Base Forecaster


In [None]:
class BaseForecaster(abc.ABC):
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.model = None
        self.preprocessor = None

    @abc.abstractmethod
    def preprocess(self, data: pd.Series) -> np.ndarray:
        """
        Preprocess input data for forecasting

        Args:
            data (pd.Series): Input time series data

        Returns:
            np.ndarray: Preprocessed data
        """

        pass

    @abc.abstractmethod
    def train(self, X_train: np.ndarray, y_train: np.ndarray) -> None:
        """
        Train the forecasting model

        Args:
            X_train (np.ndarray): Training features data
            y_train (np.ndarray): Training target values
        """

        pass

    @abc.abstractmethod
    def forecast(self, steps: int) -> np.ndarray:
        """
        Generate forecast for specified number of steps ahead

        Args:
            steps (int): Number of steps to forecast

        Returns:
            np.ndarray: Forecasted values
        """

        pass

    def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, float]:
        """
        Evaluates the model performance on test data

        Args:
            X_test (np.ndarray): Test features data
            y_test (np.ndarray): Test target values

        Returns:
            Dict[str, float]: Evaluation metrics
        """

        predictions = self.model.predict(X_test)
        return {
            "MSE": mean_squared_error(y_test, predictions),
            "MAE": mean_absolute_error(y_test, predictions),
            "MAPE": mean_absolute_percentage_error(y_test, predictions),
        }

## SARIMA Hyperparameter Tuner


In [None]:
class HyperparameterOptimizer:
    @staticmethod
    def optimize_sarima(data: pd.Series) -> Tuple[Dict[str, int], float]:
        """
        Optimize SARIMA hyperparameters using optuna

        Args:
            data (pd.Series): Input time series data

        Returns:
            Tuple[Dict[str, int], float]: Optimal hyperparameters and best score
        """

        def objective(trial):
            p = trial.suggest_int("p", 0, 3)
            d = trial.suggest_int("d", 0, 2)
            q = trial.suggest_int("q", 0, 3)
            P = trial.suggest_int("P", 0, 2)
            D = trial.suggest_int("D", 0, 1)
            Q = trial.suggest_int("Q", 0, 2)
            m = trial.suggest_categorical("m", [4, 12])

            try:
                model = SARIMAX(data, order=(p, d, q), seasonal_order=(P, D, Q, m))

                results = model.fit(disp=False)
                return results.aic

            except Exception as e:
                return np.inf

        study = optuna.create_study(direction="minimize")
        study.optimize(objective, n_trials=100)

        return study.best_params, study.best_value

## SARIMA Forecaster


In [None]:
class SARIMAForecaster(BaseForecaster):
    def preprocess(self, data):
        return data.values

    def train(self, X_train: np.ndarray, y_train: np.ndarray):
        """
        Train SARIMA model using the optimal hyperparameters

        Args:
            X_train (np.ndarray): Training features data
            y_train (np.ndarray): Training target values

        Returns:
            SARIMAForecaster: Trained SARIMA model
        """

        best_params, _ = HyperparameterOptimizer.optimize_sarima(
            pd.Series(y_train, name="target")
        )

        self.model = SARIMAX(
            y_train,
            order=(best_params["p"], best_params["d"], best_params["q"]),
            seasonal_order=(
                best_params["P"],
                best_params["D"],
                best_params["Q"],
                best_params["m"],
            ),
        ).fit()

    def forecast(self, steps: int) -> np.ndarray:
        """
        Forecast future values using SARIMA model

        Args:
            steps (int): Number of steps to forecast

        Returns:
            np.ndarray: Forecasted values
        """

        return self.model.forecast(steps)

## LSTM


In [None]:
class LSTM(nn.Module):
    def __init__(
        self,
        input_size: int = 1,
        hidden_size: int = 64,
        num_layers: int = 3,
        dropout: float = 0.3,
    ):
        super(LSTM, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=True,
        )

        self.fc = nn.Sequential(
            nn.Linear(hidden_size * 2, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, 1),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Forward pass of the LSTM model with a linear layer

        Args:
            x (torch.Tensor): Input tensor with shape (batch_size, seq_len, input_size)

        Returns:
            torch.Tensor: Output tensor with shape (batch_size, 1)
        """

        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(x.device)

        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])

        return out

    def train_model(
        self,
        train_loader: DataLoader,
        test_loader: DataLoader,
        num_epochs: Optional[int] = 100,
    ):
        """
        Trains the LSTM model using the training and testing DataLoader

        Args:
            train_loader (DataLoader): DataLoader object for training
            test_loader (DataLoader): DataLoader object for testing
            num_epochs (int, optional): Number of epochs. Defaults to 100.

        Returns:
            LSTM model: Trained LSTM model
        """

        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.parameters(), lr=0.001)
        num_epochs: int = num_epochs

        for epoch in tqdm(range(num_epochs), desc="Training model"):
            outputs = self.train()

            for i, (inputs, labels) in enumerate(train_loader):
                optimizer.zero_grad()

                outputs = self(inputs)
                loss = criterion(outputs, labels)

                loss.backward()
                optimizer.step()

            if epoch % 10 == 0:
                self.eval()

                with torch.no_grad():
                    test_loss = 0.0
                    for inputs, labels in test_loader:
                        outputs = self(inputs)
                        loss = criterion(outputs, labels)
                        test_loss += loss.item()

                    logger.info(
                        f"Epoch {epoch}/{num_epochs}, Loss: {test_loss / len(test_loader)}"
                    )

        return self

In [None]:
class LSTMForecaster(BaseForecaster):
    def __init__(
        self,
        input_size: int = 1,
        hidden_size: int = 64,
        num_layers: int = 3,
    ):
        super(LSTMForecaster, self).__init__()
        self.model = LSTM(
            input_size=input_size, hidden_size=hidden_size, num_layers=num_layers
        )

    def preprocess(self, data):
        scaler = StandardScaler()
        return scaler.fit_transform(data.reshape(-1, 1))

    def train(self, X_train: np.ndarray, y_train: np.ndarray):
        """
        Trains the model using the training data

        Args:
            X_train (np.ndarray): Training features data
            y_train (np.ndarray): Training target values

        Returns:
            LSTMForecaster: Trained LSTM model
        """

        X_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
        y_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)

        train_dataset = TensorDataset(X_tensor, y_tensor)
        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

        self.model.train_model(train_loader)

    def forecast(self, steps: int) -> np.ndarray:
        """
        Predict future values using the LSTM model

        Args:
            steps (int): Number of steps to forecast

        Returns:
            np.ndarray: Forecasted values
        """

        last_sequence = self.model.data[-self.model.window_size :]
        last_sequence = torch.tensor(last_sequence, dtype=torch.float32).to(device)

        predictions = []
        for _ in range(steps):
            with torch.no_grad():
                output = self.model(last_sequence)
                predictions.append(output.item())
                last_sequence = torch.cat(
                    [last_sequence[1:], output.unsqueeze(0)], axis=0
                )

        return predictions

## Forecast Ensemble


In [None]:
class ForecastEnsemble:
    def __init__(self, forecasters: List[BaseForecaster]):
        if not forecasters:
            raise ValueError("Forecasters list cannot be empty")

        self.forecasters = forecasters
        self._cached_data: Dict[str, np.ndarray] = {}

    def train(self, X_train: np.ndarray, y_train: np.ndarray):
        """
        Train all forecasters in the ensemble

        Args:
            X_train (np.ndarray): Training input features
            y_train (np.ndarray): Training target values
        """

        if X_train is None or y_train is None:
            raise ValueError("Training data is missing")

        if len(X_train) == 0 or len(y_train) == 0:
            raise ValueError("Empty training data")

        if "sarima_x" not in self._cached_data:
            self._cached_data["sarima_x"] = X_train.reshape(-1)
            self._cached_data["sarima_y"] = y_train.reshape(-1)

        for forecaster in self.forecasters:
            try:
                if isinstance(forecaster, SARIMAForecaster):
                    forecaster.train(
                        self._cached_data["sarima_x"], self._cached_data["sarima_y"]
                    )
                else:
                    forecaster.train(X_train, y_train)

            except Exception as e:
                logger.error(f"Error training forecaster {type(forecaster)}: {e}")
                raise

    def forecast(self, steps: int) -> np.ndarray:
        """
        Generate ensemble forecast

        Args:
            steps (int): Number of steps to forecast

        Returns:
            np.ndarray: Ensemble forecasted values
        """

        forecasts = [f.forecast(steps) for f in self.forecasters]
        return np.mean(forecasts, axis=0)

    def evaluate(self, X_test: np.ndarray, y_test: np.ndarray) -> Dict[str, Any]:
        """
        Evaluate ensemble performance on test data

        Args:
            X_test (np.ndarray): Test input features
            y_test (np.ndarray): Test target values

        Returns:
            Dict[str, Any]: Evaluation metrics for each forecaster and ensemble
        """

        results: Dict[str, Any] = {"individual": {}, "ensemble": {}}
        for i, forecaster in enumerate(self.forecasters):
            results["individual"][f"Model_{i}"] = forecaster.evaluate(X_test, y_test)

        ensemble_predictions = self.forecast(len(y_test))
        results["ensemble"] = {
            "MSE": mean_squared_error(y_test, ensemble_predictions),
            "MAE": mean_absolute_error(y_test, ensemble_predictions),
            "MAPE": mean_absolute_percentage_error(y_test, ensemble_predictions),
        }

        return results

## Main Execution


### Auxiliary functions


In [None]:
def prepare_data(
    data: Dict[str, pd.DataFrame],
    target_column: str = "Close",
    test_size: float = 0.2,
    sequence_length: int = 30,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Prepare time series data for forecasting

    Args:
        data (Dict[str, pd.DataFrame]): Dictionary of stock DataFrames
        target_column (str, optional): Target column for forecasting. Defaults to "Close".
        test_size (float, optional): Test data ratio. Defaults to 0.2.
        sequence_length (int, optional): Sequence length for LSTM. Defaults to 30.

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: Train and test data
    """

    stock_data = list(data.values())[0]
    time_series = stock_data[target_column]

    X, y = (
        DataProcessor.create_lagged_method(time_series, lags=sequence_length).values[
            :, :-1
        ],
        time_series.values[sequence_length:],
    )

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, shuffle=False
    )

    return X_train, X_test, y_train, y_test

### Data Loading


In [None]:
data_processor = DataProcessor(
    tickers=["AAPL"], start_date="2010-01-01", end_date="2024-01-01"
)

stock_data = data_processor.fetch_market_data()

In [None]:
X_train, X_test, y_train, y_test = prepare_data(stock_data)

In [None]:
X_train.shape, X_test.shape, y_train.shape, y_test.shape

### Model


In [None]:
sarima_forecaster = SARIMAForecaster()
lstm_forecaster = LSTMForecaster(input_size=X_train.shape[1])

In [None]:
ensemble = ForecastEnsemble([sarima_forecaster, lstm_forecaster])
ensemble.train(X_train, y_train)

## Forecasting


In [None]:
forecast_steps = len(y_test)
ensemble_forecast = ensemble.forecast(forecast_steps)

In [None]:
evaluation_results = ensemble.evaluate(X_test, y_test)

In [None]:
plt.figure(figsize=(12, 6))
plt.plot(stock_data.index[-len(y_test) :], y_test, label="Actual")
plt.plot(stock_data.index[-len(y_test) :], ensemble_forecast, label="Ensemble Forecast")
plt.title("Stock Price Forecast")
plt.legend()
plt.show()