In [1]:
import numpy as np
import plotly.express as px
import plotly.graph_objects as go 
from plotly.offline import init_notebook_mode, iplot
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.tsa.seasonal import seasonal_decompose
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import plotly.subplots as sp
import itertools

## Stationarity EDA

In [2]:
class StationaryAnalysis:
    def __init__(self):
        pass
    
    def adf_test(self, df):
        result = adfuller(df)
        adf_statistic = result[0]
        p_value = result[1]
        critical_values = result[4]

        adf_results = {
            'statistic': adf_statistic,
            'p_value': p_value,
            'critical_values': critical_values
        }
        return p_value < 0.05,{'adf': adf_results}

    def kpss_test(self, df):
        result = kpss(df)
        kpss_statistic = result[0]
        p_value = result[1]
        critical_values = result[3]

        kpss_results = {
            'statistic': kpss_statistic,
            'p_value': p_value,
            'critical_values': critical_values
        }

        return p_value < 0.05,{'kpss': kpss_results}
    
    def print_results(self, result):
        print("Analysis Results:")
        for test_name, test_result in result.items():
            print(f"{test_name.upper()}:")
            print(f"Passed: {test_result['passed']}")
            print("Test Results:")
            for key, value in test_result['test_result'].items():
                print(f"{key}: {value}")
            print("\n")
            
    def run_tests(self, df, tests=['adf','kpss']):
        result = {}
        for test_name in tests:
            if test_name == 'adf':
                passed, test_result = self.adf_test(df)
            elif test_name == 'kpss':
                passed, test_result = self.kpss_test(df)
            else:
                passed, test_result = False, None
            
            result[test_name] = {
                'passed': passed,
                'test_result': test_result
            }
        self.print_results(result)
        return result
    
    
    
    def find_autocorrelation(self, df, lags=-1):
        lags=len(df)-1 if lags==-1 else lags
        acf_vals, conf_int = sm.tsa.stattools.acf(df, nlags=lags, fft=True, alpha=0.05)

        fig, ax = plt.subplots(figsize=(8, 5))
        plot_acf(df.values, ax=ax, lags=lags)
        ax.fill_between(range(len(acf_vals)), conf_int[:, 0], conf_int[:, 1], color='gray', alpha=0.3)
        ax.set_title('Autocorrelation Function (ACF)')
        ax.set_xlabel('Lag')
        ax.set_ylabel('ACF Value')
        ax.legend(['ACF', 'Confidence Intervals'])
        plt.show()

        return {
            'acf_vals': acf_vals,
            'confidence_intervals': conf_int
        }

    def find_partautocorrelation(self, df, lags=20):
        pacf_vals, conf_int = sm.tsa.stattools.pacf(df, nlags=lags, alpha=0.05)
        fig, ax = plt.subplots(figsize=(8, 5))
        plot_pacf(df.values, ax=ax, lags=lags)
        ax.fill_between(range(len(pacf_vals)), conf_int[:, 0], conf_int[:, 1], color='gray', alpha=0.3)
        ax.set_title('Partial Autocorrelation Function (PACF)')
        ax.set_xlabel('Lag')
        ax.set_ylabel('PACF Value')
        ax.legend(['PACF', 'Confidence Intervals'])
        plt.show()

        return {
            'pacf_vals': pacf_vals,
            'confidence_intervals': conf_int
        }
    
    
    def find_seasonality(self, df):
        if ts.isna().any().any():
            return "Nan values in data."
        frequency = pd.infer_freq(df.index)
        df = df.asfreq(frequency) if frequency is not None else df.asfreq('B')
        if df.isna().any().any():
            df = df.backfill()
            self.data = df
        self.decomposition = seasonal_decompose(df, 'additive')
        trend = self.decomposition.trend
        seasonal = self.decomposition.seasonal
        residual = self.decomposition.resid

        fig = go.Figure()
        fig.add_trace(go.Scatter(x=df.index, y=df.values.flatten(), name='Original Data'))
        fig.add_trace(go.Scatter(x=trend.index, y=trend.values.flatten(), name='Trend'))
        fig.add_trace(go.Scatter(x=seasonal.index, y=seasonal.values.flatten(), name='Seasonal'))
        fig.show()
        seasonal_stats = {
            'seasonal_mean': np.mean(seasonal),
            'seasonal_std': np.std(seasonal),
            'seasonal_min': np.min(seasonal),
            'seasonal_max': np.max(seasonal)
        }

        return {'seasonality': seasonal_stats}
    
    def detrend(self):
        trend = self.decomposition.trend
        trend = trend[self.data.index]
        detrended = (self.data.T - trend).T
        fig = sp.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data.values.flatten(), name='Original'), row=1, col=1)
        fig.update_xaxes(title_text='Time', row=1, col=1)
        fig.update_yaxes(title_text='Original', row=1, col=1)
        fig.add_trace(go.Scatter(x=detrended.index, y=detrended.values.flatten(), name='Detrended'), row=2, col=1)
        fig.update_xaxes(title_text='Time', row=2, col=1)
        fig.update_yaxes(title_text='Detrended', row=2, col=1)
        fig.update_layout(height=600, width=800, title='Detrended Time Series')
        fig.show()
        return trend
    
    def deseasonalize(self):
        seasonal = self.decomposition.seasonal
        seasonal = seasonal[self.data.index]
        deseasonalized = (self.data.T - seasonal).T
        fig = sp.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1)
        fig.add_trace(go.Scatter(x=self.data.index, y=self.data.values.flatten(), name='Original'), row=1, col=1)
        fig.update_xaxes(title_text='Time', row=1, col=1)
        fig.update_yaxes(title_text='Original', row=1, col=1)
        fig.add_trace(go.Scatter(x=deseasonalized.index, y=deseasonalized.values.flatten(), name='Deseasonalized'), row=2, col=1)
        fig.update_xaxes(title_text='Time', row=2, col=1)
        fig.update_yaxes(title_text='Deseasonalized', row=2, col=1)
        fig.update_layout(height=600, width=800, title='Deseasonalized Time Series')
        fig.show()
        return seasonal
    
    def method_stationarize(self, df, method, order):
        if method == 'diff':
            return df.diff(order).backfill()
        elif method == 'std':
            t = StandardScaler()
            t.fit(df)
            ret = t.transform(df) 
            return ret
        elif method == 'minmax':
            t = MinMaxScaler()
            t.fit(df)
            ret = t.transform(df) 
            return ret
        
    
    def stationarize(self, df, method, order=1):
        stationed = self.method_stationarize(df, method, order)
        pd.DataFrame(stationed).plot()
        plt.show()
        res = self.run_tests(stationed)
        return stationed
    
    
