### Understanding Granger Causality:

The test examines whether the past values of $ X_t $ provide statistically significant information about $ Y_t $, given the past values of $ Y_t $ for lag $ p $.

#### Mathematical Form:

1. **Restricted Model**: Regress $ Y_t $ on its own past values:

$$
Y_t = \alpha + \beta_1 Y_{t-1} + \beta_2 Y_{t-2} + \dots + \epsilon_t
$$

2. **Full Model**: Regress $ Y_t $ on its own past values and the past values of \( X_t \):

$$
Y_t = \alpha + \beta_1 Y_{t-1} + \dots + \gamma_1 X_{t-1} + \gamma_2 X_{t-2} + \dots + \epsilon_t
$$

3. **F-Test**: Perform an F-test to determine whether the additional terms from \( X \) significantly reduce the residual error.


#### 1. Lagged Matrix Construction:

##### Purpose:
Creates lagged versions of a time series $ X_t $, constructing the lagged predictors for regression models.

##### Mathematical Expression:
The lagged matrix for a time series $ X_t $ is:

$$
\text{Lagged}_X = 
\begin{bmatrix}
X_{t-1} & X_{t-2} & \dots & X_{t-p}
\end{bmatrix}
$$

where $ p = \text{max_lag} $.


#### Example:

##### Original Series: $ [1, 2, 3, 4, 5, 6] $ 

##### Lags:
- **Lag 1** ($ X_{t-1} $ ): 
$ [2, 3, 4, 5, 6] $ 

- **Lag 2** ($ X_{t-2} $ ): 
$ [1, 2, 3, 4, 5] $ 

##### Aligned Output:
The lagged matrix aligns these two columns:

$ 
\begin{array}{|c|c|}
\hline
\text{Lag 2} & \text{Lag 1} \\
\hline
2 & 1 \\
3 & 2 \\
4 & 3 \\
5 & 4 \\
6 & 5 \\
\hline
\end{array}
$ 


In [48]:
import numpy as np

def lagged_matrix(series, max_lag):
    n = len(series)
    lagged = np.column_stack([series[max_lag - lag:n - lag] for lag in range(1, max_lag + 1)])
    return lagged

#### 2. Restricted Model:

##### Purpose:
Regress $ Y_t $ on its own lagged values, forming the restricted model.

#### Mathematical Formula:
$$
Y_t = \alpha + \sum_{i=1}^{p} \beta_i Y_{t-i} + \epsilon_t
$$

Where:
- $ \alpha $: Intercept term.
- $ \beta_i $: Coefficients for lagged $ Y_t $.
- $ \epsilon_t $: Residual (error term).

The Residual Sum of Squares (RSS) for this model is computed as:

$$
\text{RSS}_{\text{restricted}} = \sum_t (\epsilon_t)^2
$$



(
##### Auxiliary: Ordinary Least Squares (OLS) Regression
###### Purpose:
OLS regression is a method used to estimate the relationship between a dependent variable $ Y $ and one or more independent variables $ X $. The goal of OLS is to minimize the sum of squared residuals, which represents the vertical distance between the observed data points and the predicted values.

###### Mathematical Formula:

The model for simple linear regression is:

$$
Y_t = \alpha + \beta X_t + \epsilon_t
$$

Where:
- $ Y_t $: Dependent variable at time $ t $
- $ X_t $: Independent variable at time $ t $
- $ \alpha $: Intercept term
- $ \beta $: Slope coefficient (the effect of $ X_t $ on $ Y_t $)
- $ \epsilon_t $: Residual (error term)

For multiple linear regression with multiple predictors $ X_1, X_2, ..., X_k $, the model becomes:

$$
Y_t = \alpha + \beta_1 X_{1t} + \beta_2 X_{2t} + \dots + \beta_k X_{kt} + \epsilon_t
$$

###### Objective of OLS:

The goal of OLS is to find the coefficients $ \alpha $ and $ \beta $ that minimize the **Residual Sum of Squares (RSS)**, which is the sum of squared differences between the observed values and the predicted values:

$$
\text{RSS} = \sum_{t=1}^{n} \left( Y_t - \hat{Y_t} \right)^2
$$

Where:
- $ \hat{Y_t} $ is the predicted value of $ Y_t $, which is obtained from the regression model.

###### OLS Estimation:

The estimated coefficients ($ \hat{\alpha} $) and $ \hat{\beta} $) are obtained using the following formula:

$$
\hat{\beta} = (X^T X)^{-1} X^T Y
$$

Where:
- $ X $ is the matrix of independent variables (including a column of ones for the intercept).
- $ Y $ is the vector of observed values of the dependent variable.
- $ \hat{\beta} $ is the vector of estimated coefficients.

This formula minimizes the RSS and provides the best linear fit for the data.
)

In [61]:
def ols(y, X):
    # Compute coefficients: (X'X)^(-1) X'y
    XTX_inv = np.linalg.inv(X.T@X)
    coefficients = XTX_inv@X.T@y
    
    # Compute residuals
    y_pred = X @ coefficients
    residual = y - y_pred
    
    rss = np.sum(residual ** 2)
    return coefficients, residual, rss

##### Auxiliary: F-Test

###### Purpose:
Compare the Residual Sum of Squares (RSS) values between the restricted and full models using the **F-statistic**. F-test is usually used to compare to models. According to the formula, when F has a big value, it means the full model significatly more superior than the restricted mode. In Granger Causality,it means the lagged X causes the inflence to Y.

###### Mathematical Formula:
$$
F = \frac{\text{RSS}_{\text{restricted}} - \text{RSS}_{\text{full}}}{k} \div \frac{\text{RSS}_{\text{full}}}{n - k - p}
$$

Where:
- $ k $: Number of additional parameters in the full model (lags of \( X \)), k is used to standardization, to eliminate the influence from the number of the added parameter
- $ p $: Total number of predictors in the restricted model.
- $ n $: Number of observations after accounting for lags.


In [65]:
def f_test(rss_restricted, rss_full, df1, df2):
    f_stat = ((rss_restricted - rss_full) / df1) / (rss_full / df2)
    return f_stat

In [81]:
from scipy.stats import f
def p_value_function(f_stat, df1, df2):
    p = 1 - f.cdf(f_stat, df1, df2)
    if p < 0.05:
        print("Reject the null hypothesis: X Granger-causes Y.")
    else:
        print("Fail to reject the null hypothesis: X does not Granger-cause Y.")
    return p

)

In [82]:
def granger_causality_test(y, x, max_lag):
    n = len(y)
    if n != len(x):
        raise ValueError("Both time series must have the same length.")
    if max_lag >= n:
        raise ValueError("Max lag must be less than the length of the series.")

    # Create lagged matrices
    y_lagged = lagged_matrix(y, max_lag)
    x_lagged = lagged_matrix(x, max_lag)
    y_target = y[max_lag:]  # Dependent variable

    # Restricted model: only Y's lags
    restricted_vars = np.column_stack([np.ones(len(y_target)), y_lagged])
    _, _, rss_restricted = ols(y_target, restricted_vars)

    # Full model: Y's lags + X's lags
    full_vars = np.column_stack([np.ones(len(y_target)), y_lagged, x_lagged])
    _, _, rss_full = ols(y_target, full_vars)

    # Degrees of freedom
    df1 = max_lag  # Number of additional parameters
    df2 = len(y_target) - full_vars.shape[1]  # Degrees of freedom for the full model

    # Compute F-statistic
    f_stat = f_test(rss_restricted, rss_full, df1, df2)

    # Compute p-value
    p_value = p_value_function(f_stat, df1, df2)

    return f_stat, p_value

In [86]:
# Example: Granger Causality test
np.random.seed(42)
n = 100

x = np.cos(np.linspace(0, 10, n)) + np.random.normal(0, 0.1, n)  # Predictor
y = np.sin(np.linspace(0, 10, n)) + np.random.normal(0, 0.1, n)  # Target variable

# Test if x Granger-causes y
max_lag = 2
f_stat, p_value = granger_causality_test(y, x, max_lag)

print(f"F-statistic: {f_stat}")
print(f"P-value: {p_value}")


Reject the null hypothesis: X Granger-causes Y.
F-statistic: 35.50454288226407
P-value: 3.49220652395843e-12
