# Theta model 

In [None]:
import numpy as np
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
plt.rcParams["figure.figsize"] = (9,6)

In [None]:
fig, ax = plt.subplots()

ax.plot(df['co2'])
ax.set_xlabel('Time')
ax.set_ylabel('CO2 concentration (ppmw)')

fig.autofmt_xdate()
plt.tight_layout()

In [None]:
fig, ax = plt.subplots()

ax.plot(df['co2'])
ax.set_xlabel('Time')
ax.set_ylabel('CO2 concentration (ppmw)')

fig.autofmt_xdate()
plt.tight_layout()

## Modeling 

In [None]:
# Train/test split (keep the last 2 years of data for the test set)



In [None]:
from statsmodels.tsa.forecasting.theta import ThetaModel

def rolling_forecast(df: pd.DataFrame, train_len: int, horizon: int, window: int, period: int, method: str) -> list:
    
    seasonal_steps = int((window/period))
    
    TOTAL_LEN = train_len + horizon

    if method == 'last_season':
        pred_last_season = []
        
        for i in range(train_len, TOTAL_LEN, window):
            last_season = df[:i][-period:].values
            pred_last_season.extend(last_season for _ in range(seasonal_steps))

        pred_last_season = np.array(pred_last_season).reshape(1, -1)
        
        return pred_last_season[0][:horizon]
    
    elif method == 'theta':
        # Get predictions from Theta

        return pred_theta

In [None]:
TRAIN_LEN = len(train)
HORIZON = len(test)
PERIOD = 52
WINDOW = 52

pred_last_season = rolling_forecast(df, TRAIN_LEN, HORIZON, WINDOW, PERIOD, 'last_season')
pred_theta = rolling_forecast(df, TRAIN_LEN, HORIZON, WINDOW, PERIOD, 'theta')

test = test.copy()

test.loc[:, 'pred_last_season'] = pred_last_season
test.loc[:, 'pred_theta'] = pred_theta

test.head()

## Testing triple exponential smoothing 

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing

def rolling_forecast(df: pd.DataFrame, train_len: int, horizon: int, window: int, period: int, method: str) -> list:
    
    seasonal_steps = int((window/period))
    
    TOTAL_LEN = train_len + horizon

    if method == 'last_season':
        pred_last_season = []
        
        for i in range(train_len, TOTAL_LEN, window):
            last_season = df[:i][-period:].values
            pred_last_season.extend(last_season for _ in range(seasonal_steps))

        pred_last_season = np.array(pred_last_season).reshape(1, -1)
        
        return pred_last_season[0][:horizon]
    
    elif method == 'theta':
        pred_theta = []
        
        for i in range(train_len, TOTAL_LEN, window):
            tm = ThetaModel(endog=df[:i], period=52)
            res = tm.fit()
            predictions = res.forecast(window)
            pred_theta.extend(predictions)
            

        return pred_theta
            
    elif method == 'tes':
        # Get predictions from TES

        return pred_tes

In [None]:
pred_tes = rolling_forecast(df, TRAIN_LEN, HORIZON, WINDOW, PERIOD, 'tes')

test.loc[:, 'pred_tes'] = pred_tes

test.head()

In [None]:
fig, ax = plt.subplots()

ax.plot(df['co2'])
ax.plot(test['co2'], 'b-', label='actual')
ax.plot(test['pred_last_season'], 'r:', label='baseline')
ax.plot(test['pred_theta'], 'g-.', label='Theta')
ax.plot(test['pred_tes'], 'k--', label='TES')

ax.set_xlabel('Time')
ax.set_ylabel('CO2 concentration (ppmv)')
ax.axvspan('2000-01-08', '2001-12-29', color='#808080', alpha=0.2)

ax.legend(loc='best')

ax.set_xlim('1998-03-07', '2001-12-29')

fig.autofmt_xdate()
plt.tight_layout()

## Evaluation 

In [None]:
def mape(y_true, y_pred):
    return round(np.mean(np.abs((y_true - y_pred) / y_true)) * 100, 2)

In [None]:
mape_baseline = mape(test['co2'], test['pred_last_season'])
mape_theta = mape(test['co2'], test['pred_theta'])
mape_tes = mape(test['co2'], test['pred_tes'])

In [None]:
fig, ax = plt.subplots()

x = ['Baseline', 'Theta', 'TES']
y = [mape_baseline, mape_theta, mape_tes]

ax.bar(x, y, width=0.4)
ax.set_xlabel('Exponential smoothing models')
ax.set_ylabel('MAPE (%)')
ax.set_ylim(0, 1)

for index, value in enumerate(y):
    plt.text(x=index, y=value + 0.05, s=str(value), ha='center')
    
plt.tight_layout()

## Testing SARIMA 

In [None]:
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
from tqdm import tqdm_notebook
from itertools import product

### Test for stationarity 

In [None]:
# Run the ADF test

print(f'ADF Statistic: {ad_fuller_result[0]}')
print(f'p-value: {ad_fuller_result[1]}')

In [None]:
# Difference the series and run the ADF test


print(f'ADF Statistic: {ad_fuller_result[0]}')
print(f'p-value: {ad_fuller_result[1]}')

In [None]:
def SARIMA_gridsearch(endog, min_p, max_p, min_q, max_q, min_P, max_P, min_Q, max_Q, d, D, s):
    
    all_p = range(min_p, max_p+1, 1)
    all_q = range(min_q, max_q+1, 1)
    all_P = range(min_P, max_P+1, 1)
    all_Q = range(min_Q, max_Q+1, 1)
    
    all_orders = list(product(all_p, all_q, all_P, all_Q))
    
    print(f'Fitting {len(all_orders)} unique models')
    
    results = []
    
    for order in tqdm_notebook(all_orders):
        try: 
            model = SARIMAX(
                endog, 
                order=(order[0], d, order[1]),
                seasonal_order=(order[2], D, order[3], s)).fit(disp=False)
        except:
            continue
            
        results.append([order, model.aic])
        
    result_df = pd.DataFrame(results)
    result_df.columns = ['(p,q,P,Q)', 'AIC']
    
    #Sort in ascending order, lower AIC is better
    result_df = result_df.sort_values(by='AIC', ascending=True).reset_index(drop=True)
    
    return result_df

In [None]:
min_p = 1
max_p = 3
min_q = 1
max_q = 3

min_P = 1
max_P = 1
min_Q = 1
max_Q = 1

d = 1
D = 0
s = 52

result_df = SARIMA_gridsearch(train, min_p, max_p, min_q, max_q, min_P, max_P, min_Q, max_Q, d, D, s)
result_df.head()

### Residuals analysis 

In [None]:
def ljung_box_test(residuals, is_seasonal, period):
    
    if is_seasonal:
        lb_df = acorr_ljungbox(residuals, period=period)
    else:
        max_lag = min([10, len(residuals)/5])
        
        lb_df = acorr_ljungbox(residuals, np.arange(1, max_lag+1, 1))

    fig, ax = plt.subplots()
    ax.plot(lb_df['lb_pvalue'], 'b-', label='p-values')
    ax.hlines(y=0.05, xmin=1, xmax=len(lb_df), color='black')
    plt.tight_layout()
    
    if all(pvalue > 0.05 for pvalue in lb_df['lb_pvalue']):
        print('All values are above 0.05. We fail to reject the null hypothesis. The residuals are uncorrelated')
    else:
        print('At least one p-value is smaller than 0.05')

### Forecasting 

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing

def rolling_forecast(df: pd.DataFrame, train_len: int, horizon: int, window: int, period: int, method: str) -> list:
    
    seasonal_steps = int((window/period))
    
    TOTAL_LEN = train_len + horizon

    if method == 'last_season':
        pred_last_season = []
        
        for i in range(train_len, TOTAL_LEN, window):
            last_season = df[:i][-period:].values
            pred_last_season.extend(last_season for _ in range(seasonal_steps))

        pred_last_season = np.array(pred_last_season).reshape(1, -1)
        
        return pred_last_season[0][:horizon]
    
    elif method == 'theta':
        pred_theta = []
        
        for i in range(train_len, total_len, window):
            tm = ThetaModel(endog=df[:i], period=52)
            res = tm.fit()
            predictions = res.forecast(window)
            pred_theta.extend(predictions)
            

        return pred_theta
            
    elif method == 'tes':
        pred_tes = []
        
        for i in range(train_len, total_len, window):
            tes = ExponentialSmoothing(
                df[:i],
                trend='add',
                seasonal='add',
                seasonal_periods=52,
                initialization_method='estimated'
            ).fit()
            
            predictions = tes.forecast(window)
            pred_tes.extend(predictions)

        return pred_tes

    elif method == 'SARIMA':
        # Get predictions from SARIMA
            
        return pred_SARIMA

In [None]:
pred_SARIMA = rolling_forecast(df, TRAIN_LEN, HORIZON, WINDOW, PERIOD, 'SARIMA')

test.loc[:, 'pred_SARIMA'] = pred_SARIMA

test.head()

In [None]:
fig, ax = plt.subplots()

ax.plot(df['co2'])
ax.plot(test['co2'], 'b-', label='actual')
ax.plot(test['pred_last_season'], 'r:', label='baseline')
ax.plot(test['pred_theta'], 'g-.', label='Theta')
ax.plot(test['pred_tes'], 'k--', label='TES')
ax.plot(test['pred_SARIMA'], label='SARIMA')

ax.set_xlabel('Time')
ax.set_ylabel('CO2 concentration (ppmv)')
ax.axvspan('2000-01-08', '2001-12-29', color='#808080', alpha=0.2)

ax.legend(loc='best')

ax.set_xlim('1998-03-07', '2001-12-29')

fig.autofmt_xdate()
plt.tight_layout()

In [None]:
fig, ax = plt.subplots()

x = ['Baseline', 'Theta', 'TES', 'SARIMA']
y = [mape_baseline, mape_theta, mape_tes, mape_SARIMA]

ax.bar(x, y, width=0.4)
ax.set_xlabel('Exponential smoothing models')
ax.set_ylabel('MAPE (%)')
ax.set_ylim(0, 1)

for index, value in enumerate(y):
    plt.text(x=index, y=value + 0.05, s=str(value), ha='center')
    
plt.tight_layout()