In [None]:
import pandas as pd
import numpy as np
import math
import pymc as pm
from pandas import Timedelta, date_range
import arviz as az
import yfinance as yf
import matplotlib.pyplot as plt

In [None]:

def add_seasonality(data, seasonality, amplitude, amplitude_noise = 1, noise = 1):
    for i in range(len(data)):
        amplitude = np.random.normal(amplitude, amplitude_noise)
        data[i] = data[i] + amplitude*np.sin(2*np.pi*i/seasonality) + np.random.normal(0, noise)
    return data


def add_slope(data, intercept, intercept_noise, slope, slope_noise = 1, noise = 1):
    for i in range(len(data)):
        slope = np.random.normal(slope, slope_noise)
        intercept = np.random.normal(intercept, intercept_noise)
        if i == 0:
            data[i] = data[i] + intercept + np.random.normal(0, noise)
        else:
            data[i] = data[i] + slope*i + np.random.normal(0, noise)
    return data 
    # y_t = intercept + np.arange(len(data)) * slope + np.random.normal(0, intercept_noise, len(data))
    # return y_t + ar1_data

def generate_ar_process(coeffs, warmup=50, steps=150, intercept=10, noise=0.5, random=False, num_coeff = 0):
    """
    Generate an autoregressive process based on provided coefficients.
    
    Parameters:
    - coeffs: List of AR coefficients
    - warmup: Number of warmup steps to discard
    - steps: Number of steps to keep
    - intercept: Baseline value
    - noise: Standard deviation of random noise
    - random: If True, generates random coefficients between -0.5 and 0.5
    
    Returns:
    - ar_data: Generated time series data
    """
    if random:
        coeffs = np.random.uniform(-0.99, 0.99, num_coeff)
        print(f"Generated random coefficients: {coeffs}")
    
    # Number of lags is determined by the length of coeffs
    n_lags = len(coeffs)
    
    # Initialize array for the entire process
    draws = np.zeros(warmup + steps)
    
    # Initialize first n_lags values to the intercept
    draws[:n_lags] = intercept
    
    # Generate the AR process: each value depends on previous values
    for step in range(n_lags, warmup + steps):
        # Start with intercept
        value = intercept
        
        # Add contribution from each lag
        for i, coef in enumerate(coeffs):
            value += coef * draws[step - i - 1]
        
        # Add random noise
        value += np.random.normal(0, noise)
        
        draws[step] = value
        
    # Return only the non-warmup portion
    return draws[warmup:]


def add_seasonality(data, seasonality, amplitude, noise = 1):
    y_t = amplitude * np.sin(2 * np.pi * np.arange(len(data)) / seasonality) + np.random.normal(0, noise, len(data))
    return y_t + data


# Generate the AR process


slope_intercept = 2
slope_intercept_noise = 0
slope_coeff = 1
slope_coeff_noise = 0.54


warmup = 100
steps = 300
intercept = 10
noise = 5
coeffs = [-0.9, -0.7] 
num_coeff = len(coeffs)
random = False


seasonality = 12
seasonality_amplitude = 120
seasonality_amplitude_noise = 0
seasonality_noise = 35
ticker = 'AAPL'


# ar1_data = generate_ar_process(coeffs, warmup, steps, intercept, noise, random=random, num_coeff = num_coeff)
# ar1_data = add_slope(ar1_data, intercept = slope_intercept, intercept_noise = slope_intercept_noise, slope = slope_coeff, slope_noise = slope_coeff_noise, noise = 0)
# ar1_data = add_seasonality(ar1_data, seasonality = seasonality, amplitude = seasonality_amplitude, noise = seasonality_noise)

end_date = pd.to_datetime('2025-06-03')
steps = 365*3
start_date = end_date - pd.Timedelta(days=steps)

ar1_data = yf.download(ticker, start=start_date, end=end_date, multi_level_index=False)
ar1_data = ar1_data['Close'] - ar1_data['Open']

# Create a figure to visualize the simulated time series
fig, ax = plt.subplots(figsize=(10, 3))
ax.set_title(f"{ticker} Timeseries", fontsize=15)
# Create date range for x-axis
date_range = pd.date_range(start=start_date, end=end_date, periods=len(ar1_data))
ax.plot(date_range, ar1_data)  # Plot the data with dates on x-axis
ax.tick_params(axis='x', rotation=45)  # Rotate date labels for better readability
ax.set_xlabel('Date')  # Add x-axis label
plt.show()  # Display the plot

In [7]:
import numpy as np
import pymc as pm
import pytensor
pytensor.config.cxx = '/usr/bin/clang++'


arx = 3
priors = {
    "coefs": {"mu": np.zeros(arx + 1), "sigma": np.ones(arx + 1) * 10, "size": arx + 1},
    "sigma": 8,
    "init": {"mu": 9, "sigma": 0.1, "size": 1},
    "alpha": {"mu": -0.3, "sigma": 0.1},
    "beta": {"mu": 0.3, "sigma": 0.2},
    "seasonality": {"mu": 0, "sigma": 0.1}
}
add_trend = True
add_seasonality = False

with pm.Model() as AR:
    t_data = list(range(len(ar1_data)))
    AR.add_coord("obs_id", t_data)
    
    t = pm.Data("t", t_data, dims="obs_id")
    y = pm.Data("y", ar1_data, dims="obs_id")
    coefs = pm.Normal("coefs", priors["coefs"]["mu"], priors["coefs"]["sigma"])
    sigma = pm.HalfNormal("sigma", priors["sigma"])
    init = pm.Normal.dist(
        priors["init"]["mu"], priors["init"]["sigma"], size=priors["init"]["size"]
    )
    
    # Calculate steps explicitly to avoid shape attribute access
    steps = len(t_data) - (priors["coefs"]["size"] - 1)
    
    # Explicitly specify ar_order to fix the NotConstantValueError
    ar1 = pm.AR(
        "ar",
        coefs,
        sigma=sigma,
        init_dist=init,
        constant=True,
        steps=steps,
        ar_order=arx,  # Added explicit ar_order parameter
        dims="obs_id",
    )
    
    alpha = pm.Normal("alpha", priors["alpha"]["mu"], priors["alpha"]["sigma"])
    
    if add_trend:
        beta = pm.Normal("beta", priors["beta"]["mu"], priors["beta"]["sigma"])
        trend = pm.Deterministic("trend", alpha + beta * t, dims="obs_id")
        mu = ar1 + trend
            
        outcome = pm.Normal("likelihood", mu=mu, sigma=sigma, observed=y, dims="obs_id")
    else:
        outcome = pm.Normal("likelihood", mu=ar1, sigma=sigma, observed=y, dims="obs_id")
        
    idata_ar = pm.sample_prior_predictive()
    idata_ar.extend(pm.sample(500, random_seed=100, target_accept=0.95))
    idata_ar.extend(pm.sample_posterior_predictive(idata_ar))

Sampling: [alpha, ar, beta, coefs, likelihood, sigma]
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (4 chains in 4 jobs)
NUTS: [coefs, sigma, ar, alpha, beta]


Sampling 4 chains for 1_000 tune and 500 draw iterations (4_000 + 2_000 draws total) took 6 seconds.
The rhat statistic is larger than 1.01 for some parameters. This indicates problems during sampling. See https://arxiv.org/abs/1903.08008 for details
The effective sample size per chain is smaller than 100 for some parameters.  A higher number is needed for reliable rhat and ess computation. See https://arxiv.org/abs/1903.08008 for details
Sampling: [likelihood]


In [None]:
import arviz as az


def create_figure_layout(arx):
    """
    Create a figure layout for AR model visualization.
    
    Parameters:
    -----------
    arx : int
        The AR order
    
    Returns:
    --------
    fig : matplotlib.figure.Figure
        The figure object
    axes : list
        List of axes for parameter plots
    ax_fit : matplotlib.axes.Axes
        Axis for model fit plot
    """
    import matplotlib.pyplot as plt
    
    # Set up the figure for posterior plots
    fig = plt.figure(figsize=(20, 15))

    # Calculate how many rows we need for parameters (max 3 per row)
    param_rows = (arx + 2 + 2) // 3  # +2 for intercept and sigma, then ceiling division
    # Create a GridSpec layout with param_rows + 1 rows (extra row for model fit)
    gs = fig.add_gridspec(param_rows + 1, 3, height_ratios=[1] * param_rows + [1.5])

    # Create axes: one for each coefficient plus sigma
    axes = []
    for i in range(arx + 2):
        row = i // 3
        col = i % 3
        axes.append(fig.add_subplot(gs[row, col]))  # Parameters in rows of 3

    # Add the model fit plot in the last row, spanning all columns
    ax_fit = fig.add_subplot(gs[param_rows, :])  # bottom row spanning all columns for model fit
    
    return fig, axes, ax_fit

def plot_ar_model_results(idata_ar, t_data, ar1_data, priors, add_trend_coeffs = False, arx=5, view_prev = None):
    """
    Plot the posterior distributions and model fit for an AR model.
    
    Parameters:
    -----------
    idata_ar : InferenceData
        The inference data containing posterior samples and posterior predictive samples
    t_data : list or array
        Time points for the observed data
    ar1_data : list or array
        The observed time series data
    priors : dict
        Dictionary containing prior information
    arx : int, optional
        The AR order (default=5)
        
    Returns:
    --------
    fig : matplotlib.figure.Figure
        The figure object with all plots
    """
    import matplotlib.pyplot as plt
    if add_trend_coeffs:
        arx += 4
    fig, axes, ax_fit = create_figure_layout(arx)
    plot_posterior_parameters(idata_ar, axes, arx, priors, add_trend_coeffs)
    posterior_pred = plot_model_fit(idata_ar, t_data, ar1_data, ax_fit, view_prev = view_prev)
    plt.tight_layout()
    plt.show() 
    return fig, posterior_pred
def plot_posterior_parameters(idata_ar, axes, arx, priors, add_trend_coeffs = False):
    """
    Plot the posterior distributions of model parameters.
    
    Parameters:
    -----------
    idata_ar : InferenceData
        The inference data containing posterior samples
    axes : list
        List of matplotlib axes to plot on
    arx : int
        The AR order
    priors : dict
        Dictionary containing prior information
    """
    az.plot_posterior(idata_ar, var_names=["coefs"], coords={"coefs_dim_0": 0}, ax=axes[0])
    axes[0].set_title("Intercept")
    for i in range(1, arx-1):
        if i <= priors["coefs"]["size"] - 1:  # Check if coefficient exists in the model
            az.plot_posterior(idata_ar, var_names=["coefs"], coords={"coefs_dim_0": i}, ax=axes[i])
            axes[i].set_title(f"AR({i}) Coefficient")
    # Plot trend coefficients (alpha and beta)
    if add_trend_coeffs:
        az.plot_posterior(idata_ar, var_names=["alpha"], ax=axes[arx-1])
        axes[arx-1].set_title("Alpha (Trend Intercept)")
        az.plot_posterior(idata_ar, var_names=["beta"], ax=axes[arx])
        axes[arx].set_title("Beta (Trend Slope)")

    az.plot_posterior(idata_ar, var_names=["sigma"], ax=axes[arx+1])
    axes[arx+1].set_title("Sigma")
    # Plot sigma

def plot_model_fit(idata_ar, t_data, ar1_data, ax, view_prev = None):
    """
    Plot the model fit against the data.
    
    Parameters:
    -----------
    idata_ar : InferenceData
        The inference data containing posterior predictive samples
    t_data : list or array
        Time points for the observed data
    ar1_data : list or array
        The observed time series data
    ax : matplotlib.axes.Axes
        The axis to plot on
    """
    if not view_prev:
        view_prev = len(t_data)
    ax.plot(t_data[-view_prev:], ar1_data[-view_prev:], 'o', color='black', alpha=0.6, label='Observed data')
    posterior_pred = idata_ar.posterior_predictive["likelihood"].values
    n_samples = 100
    sample_idx = np.random.choice(posterior_pred.shape[0] * posterior_pred.shape[1], n_samples, replace=False)
    for idx in sample_idx:
        chain_idx, draw_idx = idx // posterior_pred.shape[1], idx % posterior_pred.shape[1]
        ax.plot(t_data[-view_prev:], posterior_pred[chain_idx, draw_idx][-view_prev:], color='blue', alpha=0.1)

    # # Plot the mean of the posterior predictive
    posterior_pred_mean = posterior_pred.mean(axis=(0, 1))
    ax.plot(t_data[-view_prev:], posterior_pred_mean[-view_prev:], color='red', linewidth=2, label='Posterior mean')

    ax.set_title('NVDA Stock - Model Fit')
    ax.set_xlabel('Time')
    ax.set_ylabel('Value')
    ax.legend()
    return posterior_pred

fig, posterior_pred = plot_ar_model_results(idata_ar, t_data, ar1_data, priors, arx=1, add_trend_coeffs = True, view_prev = 150);

In [None]:
az.plot_trace(idata_ar, var_names=["coefs"], combined=False, compact=False)
az.plot_trace(idata_ar, var_names=["sigma"], kind="rank_bars", combined=False, compact=False);

In [None]:
def setup_future_coordinates(model, data_length, prediction_length):
    """Set up coordinates for future predictions."""
    model.add_coords({"obs_id_fut_1": range(data_length - 1, prediction_length, 1)})
    model.add_coords({"obs_id_fut": range(data_length, prediction_length, 1)})
    return model

def plot_observed_data(ax, t_data, ar1_data, view_prev):
    """Plot the observed data points."""
    ax.plot(t_data[-view_prev:], ar1_data[-view_prev:], '-o', color='black', alpha=0.6, label='Observed data')

def plot_posterior_samples(ax, t_data, posterior_pred, view_prev, n_samples=100, alpha = 0.05):
    """Plot samples from the posterior distribution."""
    sample_idx = np.random.choice(posterior_pred.shape[0] * posterior_pred.shape[1], n_samples, replace=False)
    for idx in sample_idx:
        chain_idx, draw_idx = idx // posterior_pred.shape[1], idx % posterior_pred.shape[1]
        ax.plot(t_data[-view_prev:], posterior_pred[chain_idx, draw_idx][-view_prev:], color='blue', alpha=alpha)

def plot_posterior_mean(ax, t_data, posterior_pred, view_prev):
    """Plot the mean of the posterior predictive distribution."""
    posterior_pred_mean = posterior_pred.mean(axis=(0, 1))
    ax.plot(t_data[-view_prev:], posterior_pred_mean[-view_prev:], color='red', linewidth=2, label='Posterior mean')

def plot_future_samples(ax, future_predictions, t_future, n_samples=100, alpha = 0.05):
    """Plot samples of future predictions."""
    sample_idx = np.random.choice(future_predictions.shape[0] * future_predictions.shape[1], n_samples, replace=False)
    future_pred_mean = future_predictions.mean(axis=(0, 1))
    
    # Calculate max distance for normalization
    all_distances = []
    for idx in sample_idx:
        chain_idx, draw_idx = idx // future_predictions.shape[1], idx % future_predictions.shape[1]
        sample = future_predictions[chain_idx, draw_idx]
        distance = np.mean(np.abs(sample - future_pred_mean))
        all_distances.append(distance)
    
    max_distance = max(all_distances) if all_distances else 1
    
    for idx in sample_idx:
        chain_idx, draw_idx = idx // future_predictions.shape[1], idx % future_predictions.shape[1]
        sample = future_predictions[chain_idx, draw_idx]
        
        # Calculate distance from mean (normalized)
        distance = np.mean(np.abs(sample - future_pred_mean))
        norm_distance = distance / max_distance if max_distance > 0 else 0
        
        # Create a gradient from yellowish (closest) to reddish (middle) to blueish (furthest)
        if norm_distance < 0.5:
            # Yellow to red gradient (0 to 0.5)
            ratio = norm_distance * 2  # Scale to 0-1 range
            color = (1, 1 - ratio, 0)  # Yellow (1,1,0) to Red (1,0,0)
        else:
            # Red to blue gradient (0.5 to 1)
            ratio = (norm_distance - 0.5) * 2  # Scale to 0-1 range
            color = (1 - ratio, 0, ratio)  # Red (1,0,0) to Blue (0,0,1)
        
        ax.plot(t_future, sample, color=color, alpha=alpha)

def plot_future_mean(ax, future_predictions, t_future):
    """Plot the mean of future predictions."""
    future_pred_mean = future_predictions.mean(axis=(0, 1))
    ax.plot(t_future, future_pred_mean, color='green', linewidth=2, label='Future predictions (mean)', marker='o')


def setup_plot_labels(ax, arx):
    """Set up the plot title, labels, and legend."""
    # Fix: plt is a module, not an Axes object. We need to use the ax parameter instead.
    ax.set_title(f'AR({arx}) Model Predictions')
    ax.set_xlabel('Time')
    ax.set_ylabel('Value')
    ax.legend()

def predict_future_points(AR, ar1, ar1_data, coefs, sigma, steps, idata_ar, n=10):
    """
    Predict future points in the time series using the AR model.
    
    Parameters:
    -----------
    AR : PyMC model
        The AR model
    ar1 : PyMC variable
        The AR process variable
    ar1_data : array
        The observed data
    coefs : PyMC variable
        The AR coefficients
    sigma : PyMC variable
        The noise standard deviation
    steps : int
        Number of steps in the original data
    idata_ar : InferenceData
        The inference data from model fitting
    n : int, optional
        Number of future points to predict, default is 10
        
    Returns:
    --------
    tuple
        (future_predictions, t_future)
    """
    prediction_length = steps + n
    view_prev = n*3
    n_samples = 100
    
    # Create a new figure for the predictions
    plt.figure(figsize=(15, 5))
    
    # Try to make predictions, handling the case where variables might already exist
    try:
        with AR:
            # First check if coordinates already exist and remove them to avoid duplicates
            if "obs_id_fut_1" in AR.coords:
                AR.coords.pop("obs_id_fut_1")
            if "obs_id_fut" in AR.coords:
                AR.coords.pop("obs_id_fut")
                
            # Setup coordinates for future predictions
            AR.add_coords({"obs_id_fut_1": range(ar1_data.shape[0] - 1, prediction_length, 1)})
            AR.add_coords({"obs_id_fut": range(ar1_data.shape[0], prediction_length, 1)})
            
            # Handle t_fut data variable
            try:
                # Try to remove existing t_fut if it exists
                if 't_fut' in AR.named_vars:
                    AR.named_vars.pop('t_fut')
                t_fut = pm.Data("t_fut", list(range(ar1_data.shape[0], prediction_length, 1)))
            except Exception as e:
                print(f"Error handling t_fut: {e}")
                # Fallback to direct assignment
                t_fut = list(range(ar1_data.shape[0], prediction_length, 1))
                
            # Create the future AR model
            try:
                # Remove existing ar1_fut if it exists
                if 'ar1_fut' in AR.named_vars:
                    AR.named_vars.pop('ar1_fut')
                ar1_fut = pm.AR(
                    "ar1_fut",
                    init_dist=pm.DiracDelta.dist(ar1[..., -1]),
                    rho=coefs,
                    sigma=sigma,
                    constant=True,
                    dims="obs_id_fut_1",
                )
            except Exception as e:
                print(f"Error creating ar1_fut: {e}")
                ar1_fut = AR.named_vars.get('ar1_fut')
           
            # Handle trend and seasonality components
            if 'add_trend' in globals() and add_trend and 'add_seasonality' not in globals():
                print('Adding trend component')
                if 'trend_fut' in AR.named_vars:
                    AR.named_vars.pop('trend_fut')
                trend = pm.Deterministic("trend_fut", alpha + beta * t_fut, dims="obs_id_fut")
                mu = ar1_fut[1:] + trend
                if 'yhat_fut' in AR.named_vars:
                    AR.named_vars.pop('yhat_fut')
                yhat_fut = pm.Normal("yhat_fut", mu=mu, sigma=sigma, dims="obs_id_fut")
            elif 'add_trend' in globals() and 'add_seasonality' in globals() and add_trend and add_seasonality:
                print('Adding trend and seasonality components')
                if 'trend_fut' in AR.named_vars:
                    AR.named_vars.pop('trend_fut')
                if 'seasonality_fut' in AR.named_vars:
                    AR.named_vars.pop('seasonality_fut')
                trend = pm.Deterministic("trend_fut", alpha + beta * t_fut, dims="obs_id_fut")
                seasonality = pm.Deterministic("seasonality_fut", seasonality, dims="obs_id_fut")
                mu = ar1_fut[1:] + trend + seasonality
                if 'yhat_fut' in AR.named_vars:
                    AR.named_vars.pop('yhat_fut')
                yhat_fut = pm.Normal("yhat_fut", mu=mu, sigma=sigma, dims="obs_id_fut")
            else:
                print('Using basic AR model without trend or seasonality')
                if 'yhat_fut' in AR.named_vars:
                    AR.named_vars.pop('yhat_fut')
                yhat_fut = pm.Normal("yhat_fut", mu=ar1_fut[1:], sigma=sigma, dims="obs_id_fut")
                
            # Sample from the posterior predictive distribution
            idata_preds = pm.sample_posterior_predictive(
                idata_ar, var_names=["likelihood", "yhat_fut"], predictions=True, random_seed=100
            )
    except Exception as e:
        print(f"Error during prediction: {e}")
        # Try a more robust approach if the first attempt fails
        try:
            # Create a new model for predictions to avoid conflicts
            with pm.Model() as pred_model:
                # Set up coordinates
                pred_model.add_coords({"obs_id_fut_1": range(ar1_data.shape[0] - 1, prediction_length, 1)})
                pred_model.add_coords({"obs_id_fut": range(ar1_data.shape[0], prediction_length, 1)})
                
                # Get posterior samples
                posterior_samples = idata_ar.posterior
                
                # Extract parameters from posterior
                coefs_samples = posterior_samples.coefs.values
                sigma_samples = posterior_samples.sigma.values
                
                # Create deterministic variables for predictions
                ar_init = pm.ConstantData("ar_init", ar1[..., -1].eval())
                
                # Create AR process
                ar1_fut = pm.AR(
                    "ar1_fut",
                    init_dist=pm.DiracDelta.dist(ar_init),
                    rho=coefs_samples.mean(axis=(0, 1)),  # Use mean of posterior
                    sigma=sigma_samples.mean(axis=(0, 1)),  # Use mean of posterior
                    constant=True,
                    dims="obs_id_fut_1",
                )
                
                # Create predictions
                yhat_fut = pm.Normal("yhat_fut", mu=ar1_fut[1:], sigma=sigma_samples.mean(axis=(0, 1)), dims="obs_id_fut")
                
                # Sample from the posterior predictive
                idata_preds = pm.sample_posterior_predictive(
                    posterior_samples, var_names=["yhat_fut"], predictions=True, random_seed=100
                )
        except Exception as nested_e:
            print(f"Fallback prediction approach also failed: {nested_e}")
            raise RuntimeError(f"Could not generate predictions: {e}\nNested error: {nested_e}")
    
    # Extract future predictions
    future_predictions = idata_preds.predictions["yhat_fut"].values
    t_future = np.arange(ar1_data.shape[0], prediction_length)
    
    return future_predictions, t_future

# Predict the next 10 points in the time series
n = 15
future_predictions, t_future = predict_future_points(AR = AR, ar1 = ar1, ar1_data = ar1_data, coefs = coefs, sigma = sigma, steps = len(ar1_data), idata_ar = idata_ar, n=n)
view_prev = n*4
n_samples = 500
alpha = 0.05

end_date = pd.to_datetime('2025-05-27')
start_date = end_date - pd.Timedelta(days=steps)
forecast_start = end_date + pd.Timedelta(days=0)
forecast_end = end_date + pd.Timedelta(days=n-1)
fig, ax = plt.subplots(figsize=(10, 6))

# # Plot the data and predictions
from pandas import date_range
dr = date_range(start_date, end_date)
fr = date_range(forecast_start, forecast_end)
plot_posterior_samples(ax, dr, posterior_pred, view_prev, n_samples, alpha = alpha)
# plot_posterior_mean(ax, t_data, posterior_pred, view_prev)
plot_future_samples(ax, future_predictions, fr, n_samples, alpha = alpha)
plot_future_mean(ax, future_predictions, fr)
ax.axvline(x=min(fr), color='red', linestyle='--', alpha=0.5)
plot_observed_data(ax, dr, ar1_data, view_prev)
setup_plot_labels(ax, arx)

plt.tight_layout()
plt.show()
