---
title: "Stock Forecasting using ARIMA, Prophet & LSTM"
week: 4
author: "Praveen Kumar"
date: 2025-10-07
version: v1.0
---

# Week 4: Stock Forecasting using ARIMA, Prophet & LSTM

This notebook demonstrates time series forecasting using three different approaches:
1. **ARIMA** - Classical statistical approach
2. **Prophet** - Modern additive model
3. **LSTM** - Deep learning approach

We'll compare their performance on stock price forecasting.

In [None]:
# Parameters
SEED = 42
SAMPLE_MODE = True  # Use subset for faster execution
DATA_PATH = "data/synthetic/stock_prices.csv"
SYMBOL = "AAPL"  # Stock symbol to forecast

In [None]:
# Setup: Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

# Time series specific
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Prophet
try:
    from prophet import Prophet
    PROPHET_AVAILABLE = True
except ImportError:
    print("Prophet not available. Installing...")
    PROPHET_AVAILABLE = False

# Deep Learning
try:
    import tensorflow as tf
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import LSTM, Dense, Dropout
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    TF_AVAILABLE = True
except ImportError:
    print("TensorFlow not available for LSTM modeling")
    TF_AVAILABLE = False

# Financial data
import yfinance as yf

# Set random seeds
np.random.seed(SEED)
if TF_AVAILABLE:
    tf.random.set_seed(SEED)

print("Setup complete!")

## Data Loading

Load stock price data using yfinance with fallback options.

In [None]:
def load_stock_data(symbol="AAPL", period="5y", sample_mode=True):
    """Load stock data with fallback to synthetic data"""
    try:
        print(f"Loading {symbol} data...")
        ticker = yf.Ticker(symbol)
        df = ticker.history(period=period)
        
        if df.empty:
            raise ValueError("No data returned")
            
        # Convert to lowercase columns
        df.columns = df.columns.str.lower()
        
        # Subset for sample mode (last 1000 days for faster execution)
        if sample_mode and len(df) > 1000:
            df = df.tail(1000)
            
        print(f"✅ Loaded {len(df)} days of {symbol} data")
        return df
        
    except Exception as e:
        print(f"❌ Error loading {symbol}: {e}")
        return generate_synthetic_stock_data(sample_mode)

def generate_synthetic_stock_data(sample_mode=True):
    """Generate synthetic stock price data"""
    np.random.seed(SEED)
    
    n_days = 1000 if sample_mode else 2000
    dates = pd.date_range(start='2020-01-01', periods=n_days, freq='D')
    
    # Generate price series with realistic properties
    returns = np.random.normal(0.0005, 0.02, n_days)  # Daily returns
    prices = 100 * np.exp(np.cumsum(returns))  # Cumulative returns to prices
    
    # Add some trend
    trend = np.linspace(0, 0.3, n_days)
    prices *= (1 + trend)
    
    df = pd.DataFrame({
        'close': prices,
        'open': prices * (1 + np.random.normal(0, 0.005, n_days)),
        'high': prices * (1 + np.abs(np.random.normal(0, 0.01, n_days))),
        'low': prices * (1 - np.abs(np.random.normal(0, 0.01, n_days))),
        'volume': np.random.randint(1000000, 5000000, n_days)
    }, index=dates)
    
    print(f"✅ Generated {len(df)} days of synthetic data")
    return df

# Load data
df = load_stock_data(SYMBOL, period="5y", sample_mode=SAMPLE_MODE)

print(f"\nDataset Info:")
print(f"Shape: {df.shape}")
print(f"Date range: {df.index.min().date()} to {df.index.max().date()}")
print(f"Missing values: {df.isnull().sum().sum()}")

# Display first few rows
print("\nFirst 5 rows:")
print(df.head())

## Exploratory Data Analysis

Visualize the data and check for stationarity.

In [None]:
# Plot price series
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Price chart
axes[0, 0].plot(df.index, df['close'], linewidth=1.5)
axes[0, 0].set_title('Stock Price Over Time')
axes[0, 0].set_ylabel('Price')
axes[0, 0].grid(True, alpha=0.3)

# Returns
returns = df['close'].pct_change().dropna()
axes[0, 1].plot(df.index[1:], returns, linewidth=0.8)
axes[0, 1].set_title('Daily Returns')
axes[0, 1].set_ylabel('Returns')
axes[0, 1].grid(True, alpha=0.3)

# Returns distribution
axes[1, 0].hist(returns, bins=50, alpha=0.7, edgecolor='black')
axes[1, 0].set_title('Returns Distribution')
axes[1, 0].set_xlabel('Daily Returns')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].grid(True, alpha=0.3)

# Rolling statistics
window = 30
rolling_mean = df['close'].rolling(window=window).mean()
rolling_std = df['close'].rolling(window=window).std()

axes[1, 1].plot(df.index, df['close'], label='Price', alpha=0.7)
axes[1, 1].plot(df.index, rolling_mean, label=f'{window}-day Mean', linewidth=2)
axes[1, 1].fill_between(df.index, 
                       rolling_mean - 2*rolling_std, 
                       rolling_mean + 2*rolling_std, 
                       alpha=0.2, label='±2 Std')
axes[1, 1].set_title(f'Price with {window}-day Rolling Statistics')
axes[1, 1].set_ylabel('Price')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Basic statistics
print(f"Price Statistics:")
print(df['close'].describe())
print(f"\nReturns Statistics:")
print(returns.describe())

## Stationarity Testing

Check if the series is stationary using the Augmented Dickey-Fuller test.

In [None]:
def check_stationarity(series, name="Series"):
    """Perform ADF test and display results"""
    result = adfuller(series.dropna())
    
    print(f'Stationarity Test Results for {name}:')
    print(f'ADF Statistic: {result[0]:.6f}')
    print(f'p-value: {result[1]:.6f}')
    print(f'Critical Values:')
    for key, value in result[4].items():
        print(f'\t{key}: {value:.3f}')
    
    # Interpretation
    if result[1] <= 0.05:
        print("✅ Series is stationary (reject null hypothesis)")
        is_stationary = True
    else:
        print("❌ Series is non-stationary (fail to reject null hypothesis)")
        is_stationary = False
    
    return is_stationary, result[1]

# Test stationarity for prices and returns
print("=" * 60)
is_stationary_price, p_val_price = check_stationarity(df['close'], "Stock Prices")
print("\n" + "=" * 60)
is_stationary_returns, p_val_returns = check_stationarity(returns, "Daily Returns")

# Choose target variable based on stationarity
if is_stationary_returns:
    target_series = returns
    target_name = "Daily Returns"
    print(f"\n📊 Using {target_name} for modeling (stationary)")
else:
    # Use differenced prices if returns are not stationary
    target_series = df['close'].diff().dropna()
    target_name = "Price Differences"
    print(f"\n📊 Using {target_name} for modeling")

print(f"Target series length: {len(target_series)}")

## Data Preparation

Split data into train/test sets for modeling.

In [None]:
# Split data (80% train, 20% test)
split_point = int(len(target_series) * 0.8)
train_data = target_series[:split_point].copy()
test_data = target_series[split_point:].copy()

print(f"Data Split:")
print(f"Training set: {len(train_data)} observations")
print(f"Test set: {len(test_data)} observations")
print(f"Split date: {train_data.index[-1].date()}")

# Visualization of split
plt.figure(figsize=(12, 6))
plt.plot(train_data.index, train_data, label='Training Data', color='blue', alpha=0.7)
plt.plot(test_data.index, test_data, label='Test Data', color='red', alpha=0.7)
plt.axvline(x=train_data.index[-1], color='black', linestyle='--', alpha=0.5, label='Split Point')
plt.title(f'{target_name} - Train/Test Split')
plt.ylabel(target_name)
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

## Model 1: ARIMA

Implement ARIMA forecasting with automatic parameter selection.

In [None]:
def build_arima_model(train_data, max_p=3, max_q=3):
    """Build ARIMA model with automatic parameter selection"""
    
    # Since we're using returns (already differenced), d=0
    # If using prices, you might need d=1
    best_aic = np.inf
    best_order = None
    best_model = None
    
    print("Searching for optimal ARIMA parameters...")
    
    # Grid search for p and q (d=0 for stationary series)
    for p in range(max_p + 1):
        for q in range(max_q + 1):
            try:
                model = ARIMA(train_data, order=(p, 0, q))
                fitted_model = model.fit()
                
                if fitted_model.aic < best_aic:
                    best_aic = fitted_model.aic
                    best_order = (p, 0, q)
                    best_model = fitted_model
                    
            except Exception as e:
                continue
    
    print(f"Best ARIMA order: {best_order}")
    print(f"Best AIC: {best_aic:.4f}")
    
    return best_model, best_order

# Build ARIMA model
arima_model, arima_order = build_arima_model(train_data)

# Generate forecasts
forecast_steps = len(test_data)
arima_forecast = arima_model.forecast(steps=forecast_steps)
arima_conf_int = arima_model.get_forecast(steps=forecast_steps).conf_int()

# Model summary
print("\nARIMA Model Summary:")
print(arima_model.summary())

# Calculate ARIMA metrics
def calculate_metrics(actual, predicted):
    """Calculate forecast accuracy metrics"""
    mae = mean_absolute_error(actual, predicted)
    rmse = np.sqrt(mean_squared_error(actual, predicted))
    mape = np.mean(np.abs((actual - predicted) / actual)) * 100
    
    return {'MAE': mae, 'RMSE': rmse, 'MAPE': mape}

arima_metrics = calculate_metrics(test_data, arima_forecast)
print(f"\nARIMA Forecast Metrics:")
for metric, value in arima_metrics.items():
    print(f"{metric}: {value:.6f}")

## Model 2: Prophet

Implement Prophet forecasting with automatic seasonality detection.

In [None]:
if PROPHET_AVAILABLE:
    def build_prophet_model(train_data):
        """Build Prophet model"""
        # Prepare data in Prophet format
        prophet_df = pd.DataFrame({
            'ds': train_data.index,
            'y': train_data.values
        })
        
        # Create and fit Prophet model
        model = Prophet(
            daily_seasonality=True,
            weekly_seasonality=True,
            yearly_seasonality=False,  # Not enough data typically
            changepoint_prior_scale=0.05  # Flexibility of trend
        )
        
        model.fit(prophet_df)
        return model
    
    # Build Prophet model
    print("Building Prophet model...")
    prophet_model = build_prophet_model(train_data)
    
    # Create future dataframe for forecasting
    future_dates = pd.DataFrame({
        'ds': test_data.index
    })
    
    # Generate forecasts
    prophet_forecast = prophet_model.predict(future_dates)
    prophet_predictions = prophet_forecast['yhat'].values
    
    # Calculate Prophet metrics
    prophet_metrics = calculate_metrics(test_data, prophet_predictions)
    print(f"\nProphet Forecast Metrics:")
    for metric, value in prophet_metrics.items():
        print(f"{metric}: {value:.6f}")
    
    # Plot Prophet components
    fig, axes = plt.subplots(2, 1, figsize=(12, 8))
    
    # Forecast plot
    prophet_model.plot(prophet_forecast, ax=axes[0])
    axes[0].set_title('Prophet Forecast')
    axes[0].grid(True, alpha=0.3)
    
    # Components plot
    prophet_model.plot_components(prophet_forecast, ax=axes[1])
    plt.tight_layout()
    plt.show()
    
else:
    print("❌ Prophet not available. Skipping Prophet modeling.")
    prophet_predictions = np.zeros(len(test_data))  # Dummy values
    prophet_metrics = {'MAE': np.nan, 'RMSE': np.nan, 'MAPE': np.nan}

## Model 3: LSTM

Implement LSTM neural network for sequence forecasting.

In [None]:
if TF_AVAILABLE:
    def create_lstm_dataset(data, lookback=20):
        """Create supervised learning dataset for LSTM"""
        X, y = [], []
        for i in range(lookback, len(data)):
            X.append(data[i-lookback:i])
            y.append(data[i])
        return np.array(X), np.array(y)
    
    def build_lstm_model(train_data, lookback=20, epochs=50):
        """Build and train LSTM model"""
        
        # Scale the data
        scaler = MinMaxScaler(feature_range=(0, 1))
        train_scaled = scaler.fit_transform(train_data.values.reshape(-1, 1)).flatten()
        
        # Create supervised learning dataset
        X_train, y_train = create_lstm_dataset(train_scaled, lookback)
        
        # Reshape for LSTM [samples, time steps, features]
        X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
        
        # Build LSTM model
        model = Sequential([
            LSTM(50, return_sequences=True, input_shape=(lookback, 1)),
            Dropout(0.2),
            LSTM(50, return_sequences=False),
            Dropout(0.2),
            Dense(1)
        ])
        
        model.compile(optimizer='adam', loss='mse')
        
        # Train model
        print(f"Training LSTM model for {epochs} epochs...")
        history = model.fit(
            X_train, y_train,
            epochs=epochs,
            batch_size=32,
            validation_split=0.2,
            verbose=0,
            shuffle=False  # Important for time series
        )
        
        return model, scaler, history
    
    # Build LSTM model
    LOOKBACK = 20
    lstm_model, lstm_scaler, lstm_history = build_lstm_model(train_data, LOOKBACK, epochs=30)
    
    # Prepare test data for LSTM prediction
    # We need the last LOOKBACK points from training data
    full_data = pd.concat([train_data, test_data])
    full_scaled = lstm_scaler.transform(full_data.values.reshape(-1, 1)).flatten()
    
    # Generate predictions for test set
    lstm_predictions = []
    
    for i in range(len(train_data), len(full_data)):
        # Get the last LOOKBACK points
        X_test = full_scaled[i-LOOKBACK:i].reshape(1, LOOKBACK, 1)
        
        # Predict next value
        pred_scaled = lstm_model.predict(X_test, verbose=0)
        pred = lstm_scaler.inverse_transform(pred_scaled.reshape(-1, 1))[0, 0]
        lstm_predictions.append(pred)
    
    lstm_predictions = np.array(lstm_predictions)
    
    # Calculate LSTM metrics
    lstm_metrics = calculate_metrics(test_data, lstm_predictions)
    print(f"\nLSTM Forecast Metrics:")
    for metric, value in lstm_metrics.items():
        print(f"{metric}: {value:.6f}")
    
    # Plot training history
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1)
    plt.plot(lstm_history.history['loss'], label='Training Loss')
    plt.plot(lstm_history.history['val_loss'], label='Validation Loss')
    plt.title('LSTM Training History')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.subplot(1, 2, 2)
    plt.plot(test_data.index, test_data.values, label='Actual', linewidth=2)
    plt.plot(test_data.index, lstm_predictions, label='LSTM Prediction', linewidth=2)
    plt.title('LSTM Forecast vs Actual')
    plt.xlabel('Date')
    plt.ylabel(target_name)
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
else:
    print("❌ TensorFlow not available. Skipping LSTM modeling.")
    lstm_predictions = np.zeros(len(test_data))  # Dummy values
    lstm_metrics = {'MAE': np.nan, 'RMSE': np.nan, 'MAPE': np.nan}

## Model Comparison & Evaluation

Compare all three models and visualize results.

In [None]:
# Compile all results
models = ['ARIMA', 'Prophet', 'LSTM']
predictions = [arima_forecast, prophet_predictions, lstm_predictions]
metrics = [arima_metrics, prophet_metrics, lstm_metrics]

# Create comparison visualization
fig, axes = plt.subplots(2, 2, figsize=(16, 10))

# Forecast comparison
axes[0, 0].plot(test_data.index, test_data.values, label='Actual', linewidth=3, color='black')
colors = ['blue', 'green', 'red']
for i, (model, pred, color) in enumerate(zip(models, predictions, colors)):
    if not np.isnan(pred).all():  # Only plot if model was actually trained
        axes[0, 0].plot(test_data.index, pred, label=f'{model} Forecast', 
                       linewidth=2, color=color, alpha=0.8)

axes[0, 0].set_title('Forecast Comparison')
axes[0, 0].set_ylabel(target_name)
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Add confidence intervals for ARIMA
if 'arima_conf_int' in locals():
    axes[0, 0].fill_between(test_data.index, 
                           arima_conf_int.iloc[:, 0], 
                           arima_conf_int.iloc[:, 1], 
                           alpha=0.2, color='blue', label='ARIMA Conf. Int.')

# Metrics comparison
metrics_df = pd.DataFrame(metrics, index=models)
metrics_df = metrics_df.dropna()  # Remove models that weren't trained

for i, metric in enumerate(['RMSE', 'MAE', 'MAPE']):
    ax = axes[0, 1] if i == 0 else axes[1, i-1]
    
    if not metrics_df.empty and metric in metrics_df.columns:
        bars = ax.bar(metrics_df.index, metrics_df[metric], 
                     color=['blue', 'green', 'red'][:len(metrics_df)], alpha=0.7)
        ax.set_title(f'{metric} Comparison')
        ax.set_ylabel(metric)
        ax.grid(True, alpha=0.3)
        
        # Add value labels on bars
        for bar, value in zip(bars, metrics_df[metric]):
            ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + max(metrics_df[metric])*0.01, 
                   f'{value:.4f}', ha='center', va='bottom')

# Residuals analysis
residuals_data = []
for model, pred in zip(models, predictions):
    if not np.isnan(pred).all():
        residuals = test_data.values - pred
        residuals_data.append(residuals)

if residuals_data:
    axes[1, 2].boxplot(residuals_data, labels=[models[i] for i in range(len(residuals_data))])
    axes[1, 2].set_title('Residuals Distribution')
    axes[1, 2].set_ylabel('Residuals')
    axes[1, 2].grid(True, alpha=0.3)
    axes[1, 2].axhline(y=0, color='red', linestyle='--', alpha=0.7)

plt.tight_layout()
plt.show()

# Print detailed comparison
print("\n" + "="*60)
print("MODEL PERFORMANCE SUMMARY")
print("="*60)

best_model = None
best_rmse = float('inf')

for model, metric in zip(models, metrics):
    print(f"\n{model}:")
    if not any(np.isnan(list(metric.values()))):
        for k, v in metric.items():
            print(f"  {k}: {v:.6f}")
        
        if metric['RMSE'] < best_rmse:
            best_rmse = metric['RMSE']
            best_model = model
    else:
        print("  Model not available/trained")

if best_model:
    print(f"\n🏆 Best performing model: {best_model} (RMSE: {best_rmse:.6f})")

# Save forecasts
forecast_results = pd.DataFrame({
    'Date': test_data.index,
    'Actual': test_data.values,
    'ARIMA': predictions[0],
    'Prophet': predictions[1],
    'LSTM': predictions[2]
})

# Save to CSV
output_path = '/tmp/week4_forecasts.csv'  # Use /tmp for cross-platform compatibility
forecast_results.to_csv(output_path, index=False)
print(f"\n💾 Forecasts saved to: {output_path}")

## Exercises

Complete the following exercises to deepen your understanding:

### Exercise 1: Exponential Smoothing Baseline
**TODO**: Add Holt-Winters exponential smoothing as a baseline model and compare its performance.

### Exercise 2: Walk-forward Validation  
**TODO**: Implement walk-forward validation for ARIMA model to assess performance over multiple forecast periods.

### Exercise 3: LSTM Hyperparameter Tuning
**TODO**: Experiment with different LSTM lookback windows (10, 30, 50 days) and compare results.

*Implementation details and solutions are provided in the instructor notebook.*

## Summary and Insights

### Key Findings:
1. **Model Performance**: Each model has different strengths based on data characteristics
2. **Stationarity**: Important for ARIMA; Prophet and LSTM handle non-stationary data better  
3. **Seasonality**: Prophet excels with seasonal patterns; LSTM can learn complex patterns
4. **Data Requirements**: LSTM needs more data; ARIMA works with smaller datasets

### Practical Recommendations:
- Use **ARIMA** for short-term, linear forecasts with limited data
- Use **Prophet** for business forecasting with seasonal patterns
- Use **LSTM** for complex, non-linear patterns with sufficient data
- Consider **ensemble methods** combining multiple approaches

### Next Steps:
- Explore ensemble forecasting methods
- Implement real-time model updating
- Add exogenous variables (economic indicators)
- Develop automated model selection pipelines