<a href="https://colab.research.google.com/github/niranjananitha/NLP/blob/main/NLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Stock Price Modeling Using AR & MA Models
# Install required packages
!pip install yfinance statsmodels -q

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')

# ============================================================
# 1. DATA ACQUISITION AND PREPROCESSING
# ============================================================

def fetch_stock_data(ticker, start_date, end_date):
    """Fetch stock data from Yahoo Finance"""
    print(f"Fetching data for {ticker}...")
    data = yf.download(ticker, start=start_date, end=end_date)
    return data

def prepare_data(data):
    """Prepare data: calculate returns and check stationarity"""
    # Use closing prices
    prices = data['Close'].dropna()

    # Calculate log returns (more stationary than raw prices)
    returns = np.log(prices / prices.shift(1)).dropna()

    print(f"\nData Summary:")
    print(f"Total observations: {len(prices)}")
    print(f"Date range: {prices.index[0]} to {prices.index[-1]}")
    print(f"\nPrice Statistics:")
    print(prices.describe())
    print(f"\nReturn Statistics:")
    print(returns.describe())

    return prices, returns

# ============================================================
# 2. STATIONARITY TESTING
# ============================================================

from statsmodels.tsa.stattools import adfuller

def test_stationarity(series, name="Series"):
    """Perform Augmented Dickey-Fuller test"""
    result = adfuller(series.dropna())

    print(f'\n{"="*60}')
    print(f'ADF Test Results for {name}')
    print(f'{"="*60}')
    print(f'ADF Statistic: {result[0]:.6f}')
    print(f'p-value: {result[1]:.6f}')
    print(f'Critical Values:')
    for key, value in result[4].items():
        print(f'  {key}: {value:.3f}')

    if result[1] <= 0.05:
        print(f"\n✓ {name} is STATIONARY (reject null hypothesis)")
        return True
    else:
        print(f"\n✗ {name} is NON-STATIONARY (fail to reject null hypothesis)")
        return False

# ============================================================
# 3. ACF AND PACF ANALYSIS
# ============================================================

def plot_acf_pacf(series, lags=40, title=""):
    """Plot ACF and PACF for lag identification"""
    fig, axes = plt.subplots(2, 1, figsize=(14, 8))

    plot_acf(series, lags=lags, ax=axes[0], alpha=0.05)
    axes[0].set_title(f'ACF - {title}', fontsize=14, fontweight='bold')
    axes[0].set_xlabel('Lag')
    axes[0].set_ylabel('Autocorrelation')

    plot_pacf(series, lags=lags, ax=axes[1], method='ywm', alpha=0.05)
    axes[1].set_title(f'PACF - {title}', fontsize=14, fontweight='bold')
    axes[1].set_xlabel('Lag')
    axes[1].set_ylabel('Partial Autocorrelation')

    plt.tight_layout()
    plt.show()

    print(f"\nLag Selection Guidelines:")
    print("AR model: Use PACF - significant lags indicate AR order")
    print("MA model: Use ACF - significant lags indicate MA order")
    print("ARMA: Combine both - gradual decay in both suggests ARMA")

# ============================================================
# 4. MODEL FITTING - AR MODEL
# ============================================================

def fit_ar_model(series, lags, name="AR Model"):
    """Fit Autoregressive model"""
    print(f"\n{'='*60}")
    print(f"Fitting AR({lags}) Model")
    print(f"{'='*60}")

    # Split data: 80% train, 20% test
    train_size = int(len(series) * 0.8)
    train, test = series[:train_size], series[train_size:]

    # Fit AR model
    model = AutoReg(train, lags=lags, old_names=False)
    fitted_model = model.fit()

    print(f"\nModel Summary:")
    print(fitted_model.summary())

    # Predictions
    predictions = fitted_model.predict(start=len(train), end=len(train)+len(test)-1)

    # Metrics
    mse = mean_squared_error(test, predictions)
    mae = mean_absolute_error(test, predictions)
    rmse = np.sqrt(mse)

    print(f"\nPerformance Metrics (Test Set):")
    print(f"MSE:  {mse:.6f}")
    print(f"RMSE: {rmse:.6f}")
    print(f"MAE:  {mae:.6f}")
    print(f"AIC:  {fitted_model.aic:.2f}")
    print(f"BIC:  {fitted_model.bic:.2f}")

    return fitted_model, predictions, test

# ============================================================
# 5. MODEL FITTING - MA MODEL
# ============================================================

def fit_ma_model(series, q, name="MA Model"):
    """Fit Moving Average model"""
    print(f"\n{'='*60}")
    print(f"Fitting MA({q}) Model")
    print(f"{'='*60}")

    train_size = int(len(series) * 0.8)
    train, test = series[:train_size], series[train_size:]

    # MA model is ARIMA(0,0,q)
    model = ARIMA(train, order=(0, 0, q))
    fitted_model = model.fit()

    print(f"\nModel Summary:")
    print(fitted_model.summary())

    # Forecast
    forecast = fitted_model.forecast(steps=len(test))

    # Metrics
    mse = mean_squared_error(test, forecast)
    mae = mean_absolute_error(test, forecast)
    rmse = np.sqrt(mse)

    print(f"\nPerformance Metrics (Test Set):")
    print(f"MSE:  {mse:.6f}")
    print(f"RMSE: {rmse:.6f}")
    print(f"MAE:  {mae:.6f}")
    print(f"AIC:  {fitted_model.aic:.2f}")
    print(f"BIC:  {fitted_model.bic:.2f}")

    return fitted_model, forecast, test

# ============================================================
# 6. MODEL FITTING - ARMA MODEL
# ============================================================

def fit_arma_model(series, p, q, name="ARMA Model"):
    """Fit ARMA model"""
    print(f"\n{'='*60}")
    print(f"Fitting ARMA({p},{q}) Model")
    print(f"{'='*60}")

    train_size = int(len(series) * 0.8)
    train, test = series[:train_size], series[train_size:]

    # ARMA model is ARIMA(p,0,q)
    model = ARIMA(train, order=(p, 0, q))
    fitted_model = model.fit()

    print(f"\nModel Summary:")
    print(fitted_model.summary())

    # Forecast
    forecast = fitted_model.forecast(steps=len(test))

    # Metrics
    mse = mean_squared_error(test, forecast)
    mae = mean_absolute_error(test, forecast)
    rmse = np.sqrt(mse)

    print(f"\nPerformance Metrics (Test Set):")
    print(f"MSE:  {mse:.6f}")
    print(f"RMSE: {rmse:.6f}")
    print(f"MAE:  {mae:.6f}")
    print(f"AIC:  {fitted_model.aic:.2f}")
    print(f"BIC:  {fitted_model.bic:.2f}")

    return fitted_model, forecast, test

# ============================================================
# 7. RESIDUAL DIAGNOSTICS
# ============================================================

def residual_diagnostics(fitted_model, name="Model"):
    """Comprehensive residual analysis"""
    print(f"\n{'='*60}")
    print(f"Residual Diagnostics for {name}")
    print(f"{'='*60}")

    residuals = fitted_model.resid

    # Statistical tests
    print(f"\nResidual Statistics:")
    print(f"Mean: {residuals.mean():.6f} (should be ≈ 0)")
    print(f"Std:  {residuals.std():.6f}")

    # Ljung-Box test for autocorrelation in residuals
    lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
    print(f"\nLjung-Box Test (Residual Autocorrelation):")
    print(lb_test)
    print("p-value > 0.05 indicates no significant autocorrelation (good)")

    # Visualization
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))

    # Residual plot
    axes[0, 0].plot(residuals)
    axes[0, 0].axhline(y=0, color='r', linestyle='--')
    axes[0, 0].set_title(f'Residuals Over Time - {name}')
    axes[0, 0].set_xlabel('Time')
    axes[0, 0].set_ylabel('Residuals')

    # Histogram
    axes[0, 1].hist(residuals, bins=30, edgecolor='black', alpha=0.7)
    axes[0, 1].set_title('Residual Distribution')
    axes[0, 1].set_xlabel('Residual Value')
    axes[0, 1].set_ylabel('Frequency')

    # ACF of residuals
    plot_acf(residuals, lags=20, ax=axes[1, 0], alpha=0.05)
    axes[1, 0].set_title('ACF of Residuals')

    # Q-Q plot
    from scipy import stats
    stats.probplot(residuals, dist="norm", plot=axes[1, 1])
    axes[1, 1].set_title('Q-Q Plot')

    plt.tight_layout()
    plt.show()

# ============================================================
# 8. MODEL COMPARISON
# ============================================================

def compare_models(results_dict):
    """Compare multiple models using AIC and BIC"""
    print(f"\n{'='*60}")
    print("MODEL COMPARISON")
    print(f"{'='*60}")

    comparison = pd.DataFrame({
        'Model': list(results_dict.keys()),
        'AIC': [results_dict[m]['model'].aic for m in results_dict],
        'BIC': [results_dict[m]['model'].bic for m in results_dict],
        'MSE': [results_dict[m]['mse'] for m in results_dict],
        'RMSE': [results_dict[m]['rmse'] for m in results_dict]
    })

    comparison = comparison.sort_values('AIC')
    print("\n", comparison.to_string(index=False))

    print(f"\nInterpretation:")
    print(f"• Lower AIC/BIC = Better model")
    print(f"• Best by AIC: {comparison.iloc[0]['Model']}")
    print(f"• Best by BIC: {comparison.sort_values('BIC').iloc[0]['Model']}")
    print(f"\n⚠ BIC penalizes complexity more than AIC[web:7]")

    return comparison

# ============================================================
# 9. VISUALIZATION
# ============================================================

def plot_predictions(test, predictions, title="Model Predictions"):
    """Plot actual vs predicted values"""
    plt.figure(figsize=(14, 6))
    plt.plot(test.index, test.values, label='Actual', linewidth=2, alpha=0.7)
    plt.plot(test.index, predictions, label='Predicted', linewidth=2, alpha=0.7)
    plt.title(title, fontsize=16, fontweight='bold')
    plt.xlabel('Date')
    plt.ylabel('Returns')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

# ============================================================
# 10. AUTO LAG SELECTION
# ============================================================

def auto_select_order(series, max_p=10, max_q=10):
    """Automatically select best p and q using AIC"""
    print(f"\n{'='*60}")
    print("AUTO LAG SELECTION (Grid Search)")
    print(f"{'='*60}")

    best_aic = np.inf
    best_order = None
    aic_results = []

    train_size = int(len(series) * 0.8)
    train = series[:train_size]

    for p in range(0, max_p + 1):
        for q in range(0, max_q + 1):
            if p == 0 and q == 0:
                continue
            try:
                model = ARIMA(train, order=(p, 0, q))
                fitted = model.fit()
                aic = fitted.aic
                aic_results.append((p, q, aic, fitted.bic))

                if aic < best_aic:
                    best_aic = aic
                    best_order = (p, q)

            except:
                continue

    print(f"\nBest Model: ARMA{best_order}")
    print(f"AIC: {best_aic:.2f}")

    # Show top 5 models
    aic_df = pd.DataFrame(aic_results, columns=['p', 'q', 'AIC', 'BIC'])
    aic_df = aic_df.sort_values('AIC').head(10)
    print(f"\nTop 10 Models:")
    print(aic_df.to_string(index=False))

    return best_order

# ============================================================
# MAIN EXECUTION
# ============================================================

if __name__ == "__main__":

    print("="*60)
    print("STOCK PRICE MODELING USING AR & MA MODELS")
    print("="*60)

    # Configuration
    TICKER = "AAPL"  # Change to any stock: "TSLA", "GOOGL", "MSFT", etc.
    START_DATE = "2023-01-01"
    END_DATE = "2025-12-31"

    # Step 1: Fetch data
    data = fetch_stock_data(TICKER, START_DATE, END_DATE)
    prices, returns = prepare_data(data)

    # Step 2: Test stationarity
    print("\n" + "="*60)
    print("STATIONARITY TESTING")
    print("="*60)
    is_stationary_price = test_stationarity(prices, "Price Series")
    is_stationary_returns = test_stationarity(returns, "Return Series")

    # Use returns (stationary) for modeling
    series = returns

    # Step 3: ACF/PACF Analysis
    print("\n" + "="*60)
    print("ACF & PACF ANALYSIS")
    print("="*60)
    plot_acf_pacf(series, lags=40, title=f"{TICKER} Returns")

    # Step 4: Auto lag selection
    best_order = auto_select_order(series, max_p=5, max_q=5)

    # Step 5: Fit models
    results = {}

    # AR Model
    ar_model, ar_pred, ar_test = fit_ar_model(series, lags=best_order[0] if best_order[0] > 0 else 2)
    results['AR'] = {
        'model': ar_model,
        'predictions': ar_pred,
        'test': ar_test,
        'mse': mean_squared_error(ar_test, ar_pred),
        'rmse': np.sqrt(mean_squared_error(ar_test, ar_pred))
    }
    residual_diagnostics(ar_model, f"AR({best_order[0] if best_order[0] > 0 else 2})")
    plot_predictions(ar_test, ar_pred, f"AR Model - {TICKER}")

    # MA Model
    ma_model, ma_pred, ma_test = fit_ma_model(series, q=best_order[1] if best_order[1] > 0 else 2)
    results['MA'] = {
        'model': ma_model,
        'predictions': ma_pred,
        'test': ma_test,
        'mse': mean_squared_error(ma_test, ma_pred),
        'rmse': np.sqrt(mean_squared_error(ma_test, ma_pred))
    }
    residual_diagnostics(ma_model, f"MA({best_order[1] if best_order[1] > 0 else 2})")
    plot_predictions(ma_test, ma_pred, f"MA Model - {TICKER}")

    # ARMA Model
    arma_model, arma_pred, arma_test = fit_arma_model(series, p=best_order[0], q=best_order[1])
    results['ARMA'] = {
        'model': arma_model,
        'predictions': arma_pred,
        'test': arma_test,
        'mse': mean_squared_error(arma_test, arma_pred),
        'rmse': np.sqrt(mean_squared_error(arma_test, arma_pred))
    }
    residual_diagnostics(arma_model, f"ARMA{best_order}")
    plot_predictions(arma_test, arma_pred, f"ARMA Model - {TICKER}")

    # Step 6: Model Comparison
    comparison_df = compare_models(results)

    print("\n" + "="*60)
    print("PROJECT COMPLETE")
    print("="*60)
