# Databricks notebook source


This notebook performs the Chow test to verify if a structural break occurred in the NET_VALUE time series.

A structural break can affect the mean, variance, seasonality, or dependence between series observations, causing predictive model degradation.

Model degradation due to a structural break occurs because the model is not adjusted to the new detected behavior, requiring retraining with new data to adapt the model to the new pattern.


## 1. Libraries

In [None]:
!pip install holidays==0.45

In [None]:
# Statistical analysis
from scipy import stats
import statsmodels.api as sm

# Utilities
import pandas as pd
import numpy as np
from datetime import timedelta
import holidays
from sklearn.preprocessing import PowerTransformer
from statsmodels.tsa.seasonal import seasonal_decompose

import warnings
warnings.filterwarnings('ignore')


## 2. Data Import and Preparation

In [None]:
# Sales Series
str_select_sales = '''
                        SELECT *
                        FROM analytics.refined_sales_orders_agg
                        '''

df_series_sales = spark.sql(str_select_sales).toPandas()
df_series_sales.sort_values('SYSTEM_TIMESTAMP', inplace=True)
df_series_sales.set_index('SYSTEM_TIMESTAMP', inplace=True)

# Converts granularity to Daily
df_series_sales_resampled = df_series_sales.resample('D').sum()

# Outlier removal
# Decomposing the series for residual outlier verification
result = seasonal_decompose(df_series_sales_resampled['NET_VALUE'], model='additive', period=8)
trend = result.trend
seasonal = result.seasonal
residual = result.resid

# Calculates residual z-scores
z_scores = stats.zscore(residual.dropna())

# Gets outlier indices
zscore_threshold = 3 # Since the series has seasonality and high amplitude, a lower threshold would cut natural peaks
outliers = np.abs(z_scores) > zscore_threshold
outliers_index = residual.dropna().index[outliers]

# Residual interpolation at outlier points
residual_adjusted = residual.copy()
residual_adjusted[outliers_index] = np.nan
residual_adjusted = residual_adjusted.interpolate()
residual_adjusted = residual_adjusted.fillna(method='bfill').fillna(method='ffill')

# Reconstruction of adjusted time series
adjusted_time_series = trend + seasonal + residual_adjusted
df_series_sales_resampled['NET_VALUE_clean'] = adjusted_time_series
df_series_sales_resampled['NET_VALUE_clean'] = df_series_sales_resampled['NET_VALUE_clean'].fillna(df_series_sales_resampled['NET_VALUE'])

df_series_sales_resampled.info()

In [None]:
# Function to calculate Easter date
def calculate_easter(year):
    "Returns the Easter date for a given year."
    a = year % 19
    b = year // 100
    c = year % 100
    d = b // 4
    e = b % 4
    f = (b + 8) // 25
    g = (b - f + 1) // 3
    h = (19 * a + b - d - g + 15) % 30
    i = c // 4
    k = c % 4
    l = (32 + 2 * e + 2 * i - h - k) % 7
    m = (a + 11 * h + 22 * l) // 451
    month = (h + l - 7 * m + 114) // 31
    day = ((h + l - 7 * m + 114) % 31) + 1
    return pd.Timestamp(year, month, day)

# Function to calculate Carnival date
def calculate_carnival(year):
    "Returns the Carnival date for a given year."
    easter = calculate_easter(year)
    return easter - timedelta(days=47)
    
# Function to determine custom events
def custom_events(date):
    month_day = date.strftime('%m-%d')
    year = date.year

    # Carnival
    carnival = calculate_carnival(year)
    # Carnival Eve
    carnival_eve = carnival - timedelta(days=1)
    # Mother's Day (second Sunday of May)
    mothers_day = pd.Timestamp(year, 5, 1) + pd.DateOffset(weekday=6, weeks=1)
    # Mother's Day Eve
    mothers_day_eve = mothers_day - timedelta(days=1)
    # Valentines Day (June 12th in Brazil)
    valentines_day = pd.Timestamp(year, 6, 12)
    # Black Friday (fourth Friday of November)
    black_friday = pd.Timestamp(year, 11, 1) + pd.DateOffset(weekday=4, weeks=3)
    
    if date == carnaval:
        return 'Carnival'
    elif date == carnival_eve:
        return 'Carnival Eve'
    elif month_day == mothers_day.strftime('%m-%d'):
        return 'Mothers Day'
    elif month_day == mothers_day_eve.strftime('%m-%d'):
        return 'Mothers Day Eve'
    elif month_day == valentines_day.strftime('%m-%d'):
        return 'Valentines Day'
    elif month_day == black_friday.strftime('%m-%d'):
        return 'Black Friday'
    else:
        return None

# Function to include events in holiday column
def include_events(row):
    if pd.isna(row['event']):
        return custom_events(row.name)
    else:
        return row['event']

In [None]:
# Including holiday
brazil_holidays = holidays.Brazil()
df_series_sales_resampled['event'] = df_series_sales_resampled.index.map(lambda x: brazil_holidays.get(x, None))

# Adding custom holidays
df_series_sales_resampled['event'] = df_series_sales_resampled.apply(include_events, axis=1)

df_series_sales_resampled['day_of_week'] = df_series_sales_resampled.index.day_of_week #  Monday=0 and Sunday=6

# Power transform
scaler = PowerTransformer()
df_series_sales_resampled['NET_VALUE_clean_scld'] = scaler.fit_transform(df_series_sales_resampled[['NET_VALUE_clean']])

In [None]:
df_series_sales_resampled.info()


## 3. Structural Break Verification

In [None]:
def chow_test(serie_before, serie_after):

    '''
    Calculates the Chow test for a time series, determining if there is a structural break 
    between two specified periods.

    The Chow test is used to verify if there is a significant difference between coefficients 
    of two linear regressions in different time periods, suggesting a structural change in the time series.

    Parameters:
    -----------
    serie_before : pandas.Series
        Time series corresponding to the period before the possible structural break.
    serie_after : pandas.Series
        Time series corresponding to the period after the possible structural break.

    Returns:
    --------
    chow : float
        The Chow F-test value, which can be compared to an F distribution to determine the significance
        of the structural break.

    References:
    ------------
    - Chow, Gregory C. "Tests of equality between sets of coefficients in two linear regressions." Econometrica: 
      Journal of the Econometric Society (1960): 591-605.
    - Toyoda, Toshihisa. "Use of the Chow test under heteroscedasticity." Econometrica: Journal of the Econometric 
      Society (1974): 601-608.
    - https://en.wikipedia.org/wiki/Chow_test (Accessed on 06/19/2024)
    '''


    # Pooled regression
    all_series = pd.concat([serie_before, serie_after])
    X = all_series.shift()[1:]
    y = all_series[1:]
    result_pooled = sm.OLS(y, X).fit()
    ssr_pooled = result_pooled.ssr

    # Regression for each period
    X_before = serie_before.shift()[1:]
    y_before = serie_before[1:]

    X_after = serie_after.shift()[1:]
    y_after = serie_after[1:]

    result_before = sm.OLS(y_before, X_before).fit()
    result_after = sm.OLS(y_after, X_after).fit()

    ssr_1 = result_before.ssr
    ssr_2 = result_after.ssr

    # F-test for coefficients
    k = 2 # degrees of freedom: slope and intercept
    N1 = len(X_before) # number of observations before break
    N2 = len(X_after) # number of observations after break
    chow = ((ssr_pooled - (ssr_1 + ssr_2)) / k) / ((ssr_1 + ssr_2) / (N1+N2-2*k))

    return chow

In [None]:
# Creating time series to be analyzed
series = df_series_sales_resampled[(df_series_sales_resampled['event'].isna()) & (df_series_sales_resampled['day_of_week'] < 5)][['NET_VALUE_clean_scld']]

# Defining periods of interest
months_to_analyze = 3
current_end = series.index.max()  # Last 3 months
current_start = current_end - pd.DateOffset(months=months_to_analyze)

comparison_end = current_end - pd.DateOffset(years=1)  # Same period last year
comparison_start = comparison_end - pd.DateOffset(months=months_to_analyze)

# Separating series between periods to be checked
serie_before = series[comparison_start:comparison_end]
serie_after = series[current_start:current_end]

# Applying Chow statistic
f_chow = chow_test(serie_before, serie_after)

# Calculating critical value
k = 2 # degrees of freedom: slope and intercept
N1 = len(serie_before[1:]) # number of observations before break
N2 = len(serie_after[1:]) # number of observations after break
critical_value = stats.f.ppf(q=0.99, dfn=k, dfd= N1+N2 -(2*k))

# Checking results
print(f'Chow Statistic: {round(f_chow, 3)}')
print(f'Critical Value: {round(critical_value, 3)}')

if f_chow > critical_value:
    print('H0 rejected: Periods are structurally different')
    print('Result: retrain')
    result = 'retrain'
else:
    print('H0 accepted: Periods are structurally equal')
    print('Result: keep')
    result = 'keep'

In [None]:
# Returns whether retraining should happen. This output is used by the Synapse pipeline to trigger the retraining notebook if necessary.
dbutils.notebook.exit(result)