In [3]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.tsa.tsatools import lagmat, lagmat2ds

def generate_fake_data(num_obs=100, num_vars=5, max_lag=3, noise_std=1):
    np.random.seed(42)
    data = np.random.randn(num_obs, num_vars + 1)  # +1 for the target variable
    for i in range(1, num_vars + 1):
        for lag in range(1, max_lag + 1):
            data[lag:, i] += 0.5 * data[:-lag, i - 1]
    data[:, 0] = 0.5 * data[:, 1] + noise_std * np.random.randn(num_obs)  # Target variable
    return pd.DataFrame(data, columns=['Target'] + [f'Var_{i}' for i in range(1, num_vars + 1)])

def select_variables_granger_causality(data, max_lag_method='aic', num_vars_to_select=2):
    results = []

    for var in data.columns[1:]:  # Exclude the target variable
        target_series = data['Target']
        var_series = data[var]

        if max_lag_method == 'acf':
            max_lag = select_lag_acf(var_series)
        elif max_lag_method == 'pacf':
            max_lag = select_lag_pacf(var_series)
        elif max_lag_method == 'aic':
            max_lag = select_lag_aic(target_series, var_series)
        else:
            raise ValueError("Invalid max_lag_method. Choose 'acf', 'pacf', or 'aic'.")

        test_result = granger_causality_test(target_series, var_series, max_lag)
        results.append((var, max_lag, test_result))

    results.sort(key=lambda x: x[2])  # Sort by Granger causality test result
    selected_vars = [(var, max_lag) for var, max_lag, _ in results[:num_vars_to_select]]

    return selected_vars

def select_lag_acf(series, max_lag=10):
    acf_vals = acf(series, nlags=max_lag)
    return np.argmax(np.abs(acf_vals[1:])) + 1

def select_lag_pacf(series, max_lag=10):
    pacf_vals = pacf(series, nlags=max_lag)
    return np.argmax(np.abs(pacf_vals[1:])) + 1

def select_lag_aic(target_series, var_series, max_lag=10):
    lags = range(1, max_lag + 1)
    aic_values = []

    for lag in lags:
        X = lagmat2ds(var_series, lag, trim='both', dropex=1)
        X = sm.add_constant(X)
        model = sm.OLS(target_series[lag:], X)
        result = model.fit()
        aic_values.append(result.aic)

    return lags[np.argmin(aic_values)]

def granger_causality_test(target_series, var_series, max_lag):
    X = lagmat2ds(var_series, max_lag, trim='both', dropex=1)
    X = sm.add_constant(X)
    y = target_series[max_lag:]
    model = sm.OLS(y, X)
    result = model.fit()

    return result.f_pvalue

In [4]:
# Example usage
data = generate_fake_data(num_obs=100, num_vars=5, max_lag=3, noise_std=1)

In [5]:
select_lag_pacf(data['Var_5'])

1

In [6]:
selected_vars = select_variables_granger_causality(data, max_lag_method='aic', num_vars_to_select=2)
print("Selected Variables with Max Lags:", selected_vars)

Selected Variables with Max Lags: [('Var_1', 10), ('Var_3', 10)]


In [14]:
def create_lagged_data(data, selected_vars):
    lagged_data = pd.DataFrame(index=data.index)
    for var, max_lag in selected_vars:
        lagged_values = lagmat2ds(data[var], max_lag, trim='both', dropex=1)
        for lag in range(1, max_lag + 1):
            lagged_data[f'{var}_lag{lag}'] = np.nan  # Initialize the column with NaN values
            lagged_data.loc[max_lag:, f'{var}_lag{lag}'] = lagged_values[:, lag]
        # Include the original series in lagged_data
        lagged_data[var] = data[var]
    return pd.concat([data['Target'], lagged_data], axis=1)

In [15]:
lagged_data = create_lagged_data(data, selected_vars)