In [8]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
from itertools import product


In [None]:
class CipsTest:
  '''
  Implementation of the standard Cross-Sectionally Augmented Dickey-Fuller 
  precedure to test for non-stationarity I(1) in panel data.
  ----
  The hypotheses are as follows:
  - *H0*: The Target variable is I(1)
  - *H1*: The Target variable is I(0)\n
  PARAMETERS:
  ----------
  ------
  - *df*: A standart Pandas DataFrame containing panelized data. \n
    Ensure that the DataFrame contains the following columns in this exact order: \n
    0 - a column of spatial units. Must contain homogenous data, e.g. only countries, companies, regions, etc. \n
    1 - temporal column. Must contain homogenous data, e.g. only years, months, quarters, etc. \n
    2 - target variable. Must not contain NaN Values. An Error will be raised otherwise. \n
  
  - *T*: Your Temporal window. Will be used to determine the test critical value.
  - *N*: Your Spatial window. Will be used to determine the test critical value.
  
  - *trend*: State whether your ADF model has a trend or not. Will be used to determine the test critical value.
  
  - *intercept*: State whether ADF model has an intercept or not. Will be used to determine the test critical value.
  
  - *n_lags*: Determine the amount of lags in the Augmented Dickey-Fuller regression.
    For relatively small T going over 3 lags may lead to the test losing power.
    
  - *level*: Value of significance to conduct the test at (in %%). Only 5 and 1% are allowed.
  '''
  def __init__(self, df: pd.DataFrame, T: int, N: int, trend: bool =  False, intercept: bool = False, n_lags: int = 2, level: int = 5) -> None:
    CipsTest.__build_tables()
    self.__df = df
    self.__T = T
    self.__N = N
    self.__trend = trend
    self.__C = intercept
    self.__n_lags = n_lags
    self.__alpha = level/100
    self.__df = self.__df.rename(columns={self.__df.columns[0]:'SpUnit', self.__df.columns[1]:'time', self.__df.columns[2]:'target'})
    self.__df.target = np.log(self.__df.target)
    self.verify()
    self.__table = self.get_table()
    self.__CADF_Crit = self.get_critical_value()
    self.__CADF = self.estimate()
  
  def verify(self) -> None:
    if self.__df.target.isnull().sum() > 0:
      raise TypeError('Values in Target must NOT be NaN!')
      self.__del__()
    if self.__alpha != 0.01 and self.__alpha != 0.05:
      raise ValueError('The Significance Level must be either 1 or 5!')
      self.__del__()
      
  @classmethod
  def __build_tables(cls) -> None:
    cls.NTNC_1P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='NTNC_1P', index_col=0)
    cls.NTNC_5P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='NTNC_5P', index_col=0)
    cls.NTC_1P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='NTC_1P', index_col=0)
    cls.NTC_5P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='NTC_5P', index_col=0)
    cls.TC_1P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='TC_1P', index_col=0)
    cls.TC_5P = pd.read_excel('CADF_Crit_Values.xlsx', sheet_name='TC_5P', index_col=0)
    
  def get_table(self) -> pd.DataFrame:
    if not self.__trend and not self.__C:
      if self.__alpha == 0.01:
        return CipsTest.NTNC_1P
      else:
        return CipsTest.NTNC_5P
    if not self.__trend and self.__C:
      if self.__alpha == 0.01:
        return CipsTest.NTC_1P
      else:
        return CipsTest.NTC_5P
    if self.__trend and self.__C:
      if self.__alpha == 0.01:
        return CipsTest.TC_1P
      else:
        return CipsTest.TC_5P
  
  def get_critical_value(self) -> float:
    dct = {}
    for arr in product(self.__table.index, self.__table.index):
      lst = np.array(arr)
      dt = np.array([self.__T, self.__N])
      dct[arr] = np.sqrt(np.sum((dt-lst)**2))
    return self.__table.loc[min(dct, key=dct.get)]

  def build_regressions(self) -> list[pd.DataFrame]:
    lst = []
    for unit in self.__df.SpUnit.unique():
      subdf = self.__df[self.__df.SpUnit == unit]
      if self.__trend:
        subdf.insert(2, 't', np.linspace(1, len(subdf), len(subdf)))
      subdf = pd.concat([subdf, subdf.target.shift(periods=range(1, self.__n_lags+1))], axis=1)
      subdf['cs_avg'] = self.__df.groupby(['time'])['target'].mean().values
      subdf = pd.concat([subdf, subdf.cs_avg.shift(periods=range(1, self.__n_lags+1))], axis=1)
      subdf.insert(3, 'target_diff', subdf.target - subdf.target_1)
      subdf['cs_avg_diff'] = subdf.cs_avg - subdf.cs_avg_1
      subdf = pd.concat([subdf, subdf.cs_avg_diff.shift(periods=range(1, self.__n_lags+1))], axis=1)
      subdf = pd.concat([subdf, subdf.target_diff.shift(periods=range(1, self.__n_lags+1))], axis=1)
      if self.__trend:
        base = ['target_diff', 't', 'target_1', 'cs_avg_1', 'cs_avg_diff']
      else:
        base = ['target_diff', 'target_1', 'cs_avg_1', 'cs_avg_diff']
      additional = []
      for i in range(1, self.__n_lags+1):
        additional.append(f'target_diff_{i}')
        additional.append(f'cs_avg_diff_{i}')
      subdf=subdf.loc[:, base+additional].dropna()
      lst.append(subdf)
    return lst

  def estimate(self) -> float:
    lst = self.build_regressions()
    CADF = []
    for frame in lst:
      if self.__C:
        res = sm.OLS(frame.iloc[:, 0], sm.add_constant(frame.iloc[:, 1:])).fit()
      else:
        res = sm.OLS(frame.iloc[:, 0], frame.iloc[:, 1:]).fit()
      CADF.append(res.tvalues['target_1'])
    return np.array(CADF).sum()/self.__N
  
  def verdict(self) -> None:
    if self.__CADF < self.__CADF_Crit:
      print(f'{self.__CADF} < {self.__CADF_Crit}\n Your target variable is I(0) according to the CIPS test\n Significance level : {self.__alpha*100}%')
    else:
      print(f'{self.__CADF} > {self.__CADF_Crit}\n Your target variable is I(1) according to the CIPS test\n significance level: {self.__alpha*100}%')
  
  def __del__(self) -> None:
    pass

In [179]:
df = pd.read_excel('Test_data.xlsx')
CipsTest(df, T=13, N=11, trend=True, intercept=True, n_lags=1).verdict()

-2.8953285344880846 > -2.92
 Your target variable is I(1) according to the CIPS test
 significance level: 5.0%
