# Auto- and Partial Autocorrelation Matrices
> Module for computing auto- and partial autocorrelation matrices

In [None]:
#| default_exp corrmat

In [None]:
#| export
#| hide
import pandas as pd
import numpy as np
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import pingouin as pg
from dPCA.lag import *

In [None]:
X = pd.DataFrame([[2,3,4],[5,6,7],[8,8,6],[9,10,3],[11,4,6]]); X

Unnamed: 0,0,1,2
0,2,3,4
1,5,6,7
2,8,8,6
3,9,10,3
4,11,4,6


In [None]:
lag_uniform(X,1)

Variable,0,1,2,0,1,2
Time,t,t,t,t-1,t-1,t-1
0,5,6,7,2,3,4
1,8,8,6,5,6,7
2,9,10,3,8,8,6
3,11,4,6,9,10,3


Here we define the so-called imposter matrix, which will be used for comparing correlation matrices against randomly shuffled ones to determine when a correlation is better than a random one.

In [None]:
#| export
def imposter_matrix(X: pd.DataFrame, # Matrix to be randomized
                   random_state = 42, # Random state to be used
                   )-> pd.DataFrame: # The input matrix with shuffled values for each column
    d = pd.DataFrame(0, index=np.arange(len(X)), columns=X.columns)
    
    for column in d:
        d[column] = X[column].sample(frac = 1, random_state=random_state).reset_index(drop=True)
        
    return d
    

In [None]:
imposter_matrix(X)

Unnamed: 0,0,1,2
0,5,6,7
1,11,4,6
2,8,8,6
3,2,3,4
4,9,10,3


In [None]:
#| export
def correlation_matrix(X: pd.DataFrame, # Matrix 1 for calculating the correlation matrix
                       y: pd.DataFrame, # Matrix 2 for calculating the correlation matrix
                      )-> np.ndarray: # Correlation matrix as a Numpy array
    n = np.shape(X)[0]
    
    X_scaled = (X-np.mean(X,axis = 0))/np.std(X, axis = 0, ddof = 1)
    y_scaled = (y-np.mean(y,axis = 0))/np.std(y, axis = 0, ddof = 1)
    
    correlation_matrix = np.dot(y_scaled.T,X_scaled) / (n-1)
    
    return correlation_matrix
    

In [None]:
correlation_matrix(X,X)

array([[ 1.        ,  0.41978508,  0.0860663 ],
       [ 0.41978508,  1.        , -0.27628324],
       [ 0.0860663 , -0.27628324,  1.        ]])

In [None]:
#| export
def ACM(X: pd.DataFrame, # Dataframe of raw data for which to calculate the ACM
        lag: int, # Lag to calculate correlation against
        )-> pd.DataFrame: # Autocorrelation Matrix (ACM)
    
    n = np.shape(X)[0] # Number of observations
    p = np.shape(X)[1] # Number of variables
    
    X_val = X.values
    l = lag
    
    
    Xtrim = X_val[:n-l] # Observations from start to n-lag
    Xlag =  X_val[l:]   # Observations from lag to end
    
    # Calculate correlation matrix between Xtrim and Xlag
    ACM = correlation_matrix(Xtrim, Xlag)
    
    # New column names
    t0 = ['t' for i in range(p)]
    columns = [X.columns.to_list(),t0]
    
    # New index names
    t = ['t-'+f'{lag}' for i in range(p)]
    index = [X.columns.to_list(),t]
    
    ACM = pd.DataFrame(ACM)
    ACM.index = index
    ACM.columns = columns
    
    return ACM

In [None]:
ACM(X,2)

Unnamed: 0_level_0,Unnamed: 1_level_0,0,1,2
Unnamed: 0_level_1,Unnamed: 1_level_1,t,t,t
0,t-2,0.981981,0.953821,0.5
1,t-2,-0.654654,-0.563621,0.142857
2,t-2,0.0,-0.114708,-0.755929


In [None]:
#| export
def partial_correlation_matrix(X: pd.DataFrame, # Matrix for calculating partial correlation
                       )-> np.ndarray: # Partial correlation matrix as a Numpy array 
    V = X.cov()
    Vi = np.linalg.pinv(V)
    D = np.diag(np.sqrt(1/np.diag(Vi)))
    pcor = -1 * (D @ Vi @ D)
    pcor[np.diag_indices_from(pcor)] = 1
    
    return pcor

In [None]:
partial_correlation_matrix(X)

array([[ 1.        ,  0.46324708,  0.23162552],
       [ 0.46324708,  1.        , -0.34549142],
       [ 0.23162552, -0.34549142,  1.        ]])

In [None]:
#| export
def PACM(X: pd.DataFrame, # Matrix for calculating partial autocorrelation
         lag: int, # Lag to calculate partial correlation against
        ):
    
    p = np.shape(X)[1]
    
    X_lag = lag_uniform(X, lag)
    
    pcor_mat = partial_correlation_matrix(X_lag)
    
    pcor_df = pd.DataFrame(pcor_mat)
    pcor_df.columns = X_lag.columns
    pcor_df.index = X_lag.columns
    
    
    PACM = pcor_df.iloc[lag*p:(lag*p+p),0:p]
    
    return PACM

In [None]:
PACM(X,2)

Unnamed: 0_level_0,Variable,0,1,2
Unnamed: 0_level_1,Time,t,t,t
Variable,Time,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2
0,t-2,-0.807661,-0.109185,0.375985
1,t-2,-0.606224,-0.387828,0.624375
2,t-2,0.018772,-0.876408,0.974483


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()