# Import Libraries and Data

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import statsmodels.api as sm
import seaborn as sns
sns.set_theme(style="darkgrid")

import warnings
warnings.filterwarnings("ignore")

In [None]:
df = pd.read_csv('/kaggle/input/gold-silver-oil/gold_silver_oil.csv', header=0, index_col=0, parse_dates=True)
df.head()

# Exploratory Data Analysis

In [None]:
import statsmodels.tsa.api as smt
import scipy.stats as scs

def tsplot(y, lags=None, figsize=(10, 8), style='bmh'):
    if not isinstance(y, pd.Series):
        y = pd.Series(y)
    with plt.style.context(style):    
        fig = plt.figure(figsize=figsize)
        layout = (3, 2)
        ts_ax = plt.subplot2grid(layout, (0, 0), colspan=2)
        acf_ax = plt.subplot2grid(layout, (1, 0))
        pacf_ax = plt.subplot2grid(layout, (1, 1))
        qq_ax = plt.subplot2grid(layout, (2, 0))
        pp_ax = plt.subplot2grid(layout, (2, 1))
        
        y.plot(ax=ts_ax)
        ts_ax.set_title('Time Series Analysis Plots')
        smt.graphics.plot_acf(y, lags=lags, ax=acf_ax, alpha=0.5)
        smt.graphics.plot_pacf(y, lags=lags, ax=pacf_ax, alpha=0.5)
        sm.qqplot(y, line='s', ax=qq_ax)
        qq_ax.set_title('QQ Plot')        
        scs.probplot(y, sparams=(y.mean(), y.std()), plot=pp_ax)

        plt.tight_layout()
    return

In [None]:
tsplot(df['Gold'], lags=30)

In [None]:
tsplot(df['Silver'], lags=30)

In [None]:
tsplot(df['Oil'], lags=30)

In [None]:
df = df[df['Oil'] > 0]
df['Oil'].hist(figsize=(8,5));

In [None]:
from sklearn.preprocessing import MinMaxScaler
mm = MinMaxScaler()
scaled_values = mm.fit_transform(df)
df_scaled = pd.DataFrame(scaled_values, columns=('Gold Scaled', 'Silver_Scaled', 'Oil_Scaled'))
df_scaled.plot(figsize=(13,6));

In [None]:
from sklearn.preprocessing import StandardScaler
sc = StandardScaler()
std_values = sc.fit_transform(df)
df_std = pd.DataFrame(std_values, columns=('Gold Std', 'Sliver Std', 'Oil Std'))
df_std.plot(figsize=(13,6));

In [None]:
sns.heatmap(df.corr(), annot=True);

In [None]:
_ = sns.pairplot(df)

# Error Metrics

In [None]:
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from math import sqrt

def mape(actual, pred): 
    actual, pred = np.array(actual), np.array(pred)
    return np.mean(np.abs((actual - pred) / actual)) * 100

def rmse(actual, pred):
    return np.sqrt(np.mean((actual - pred)**2))

# Baseline Methods

In [None]:
def train_test_split(data, n_test):
    return data[:-n_test], data[-n_test:]

## Naive Forecast

### Walkforward Validation

In [None]:
def naive_forecast_walkforward(train, test):
    history = [x for x in train]
    predictions = list()
    for i in range(len(test)):
        yhat = history[-1]
        predictions.append(yhat)
        history.append(test[i])
    return predictions

In [None]:
def evaluate_forecast(actuals, predictions):
    rmse_result = rmse(actuals, predictions)
    mae_result = mean_absolute_error(actuals, predictions)
    mape_result = mape(actuals, predictions)

    print('MAPE: %.2f%%' % mape_result)
    print('RMSE: %.2f' % rmse_result)
    print('MAE: %.2f' % mae_result)

    plt.figure(figsize=(10,5))
    plt.plot(actuals)
    plt.plot(predictions, 'r')
    plt.show();

In [None]:
series = 'Gold'
n_test = 10
X = df[series].values
train, test = train_test_split(X, n_test)

In [None]:
predictions = naive_forecast_walkforward(train,test)
evaluate_forecast(test, predictions)

### Multi-Step Forecast

In [None]:
for x in range(len(predictions)):
    predictions[x] = predictions[0]
evaluate_forecast(test, predictions)

## Double Exponential Smoothing

In [None]:
def forecast(history, param_model):
    model = param_model(history)
    model_fit = model.fit(optimized=True)
    yhat = model_fit.predict(len(history), len(history))
    return yhat[0]

### Walkforward Validation

In [None]:
def walkforward_val(history, test, param_model):
    predictions = list()
    history = [x for x in train]
    for i in range(len(test)):
        yhat = forecast(history, param_model)
        predictions.append(yhat)
        history.append(test[i])
    evaluate_forecast(test,predictions)

In [None]:
from statsmodels.tsa.holtwinters import Holt
holt = Holt(df[series])
result = holt.fit()
result.params

In [None]:
predictions = walkforward_val(train, test, Holt)

### Multi-Step Forecast

In [None]:
holt = Holt(train)
result = holt.fit(optimized=True)
predictions = result.predict(start=len(train)+1, end=len(train)+n_test)

In [None]:
evaluate_forecast(test, predictions)

## ARIMA

In [None]:
!pip install pmdarima
import pmdarima as pm
model = pm.auto_arima(train[:],
                      trace=True,
                      suppress_warnings=True)

In [None]:
from statsmodels.tsa.arima.model import ARIMA

In [None]:
X = df[series].values
train, test = train_test_split(X, n_test)

### Walkforward Validation

In [None]:
def walkforward_val_arima(history, test):
    predictions = list()
    history = [x for x in train]
    for i in range(len(test)):
        arima = ARIMA(history, order=(0,1,0))
        result = arima.fit()
        yhat = result.forecast()[0]
        predictions.append(yhat)
        history.append(test[i])
    
    return predictions

In [None]:
predictions = walkforward_val_arima(train, test)

In [None]:
evaluate_forecast(test, predictions)

### Multi-Step Forecast

In [None]:
arima = ARIMA(train, order=(0,1,0))
result = arima.fit()
predictions = result.predict(start=len(train)+1, end=len(train)+n_test)

In [None]:
evaluate_forecast(test, predictions)

# Stationarity

In [None]:
from statsmodels.tsa.stattools import adfuller

def adf(x):
    res = adfuller(x)
    print("Test-Statistic:", res[0])
    print("P-Value:", res[1])
    if res[1] < 0.05:
        print("Stationary")
    else:
        print("Non-Stationary")

In [None]:
adf(df['Gold'])

In [None]:
adf(df['Silver'])

In [None]:
adf(df['Oil'])

In [None]:
df_diff = pd.DataFrame()
df_diff['Gold_Diff'] = df['Gold'].diff()
df_diff['Silver_Diff'] = df['Silver'].diff()
df_diff['Oil_Diff'] = df['Oil'].diff()
df_diff.dropna(how='any', inplace=True)

In [None]:
adf(df_diff['Gold_Diff'])

In [None]:
adf(df_diff['Silver_Diff'])

In [None]:
adf(df_diff['Oil_Diff'])

# Granger Causality

In [None]:
from statsmodels.tsa.stattools import grangercausalitytests
maxlag=20
def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):    
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(df, variables = df.columns) 

# VARMAX

### Parameter Selection - p

In [None]:
from statsmodels.tsa.api import VAR

In [None]:
train, test = train_test_split(df_diff, n_test)
train.drop(columns="Oil_Diff", inplace=True)
test.drop(columns="Oil_Diff", inplace=True)

In [None]:
model = VAR(train)
lag_order_result = model.select_order(maxlags=20)
lag_order_result.selected_orders

### Parameter Selection - q

In [None]:
from statsmodels.tsa.statespace.varmax import VARMAX

In [None]:
p = 1
for q in range(0,7):
    model = VARMAX(train, order=(p, q))
    result = model.fit(maxiter=10)
    print(q, result.aic)

### Walkforward Validation

In [None]:
p = 1
q = 0

pred_gold = np.array([])
pred_silver = np.array([])
history = train
    
for i in range(0,len(test)):

    model = VARMAX(history, order=(p,q))
    result = model.fit(maxiter=20)
    yhat = result.predict(start=len(history)+1,end=len(history)+1)

    pred_gold = np.append(pred_gold, yhat.values.flatten()[0])
    pred_silver = np.append(pred_silver, yhat.values.flatten()[1])
    history = history.append(test.iloc[0:1,:])

In [None]:
test['Gold_Pred_Undiff'] = np.zeros(len(test))
test['Silver_Pred_Undiff'] = np.zeros(len(test))

test['Gold_Pred_Undiff'][0] = df['Gold'][len(train['Gold_Diff'])] + pred_gold[0]
test['Silver_Pred_Undiff'][0] = df['Silver'][len(train['Silver_Diff'])] + pred_silver[0]

for i in range (1, len(test)):
    test['Gold_Pred_Undiff'][i] = test['Gold_Pred_Undiff'][i-1] + pred_gold[i]
    test['Silver_Pred_Undiff'][i] = test['Silver_Pred_Undiff'][i-1] + pred_silver[i]

In [None]:
df_train, df_test = train_test_split(df, n_test)

In [None]:
evaluate_forecast(df_test['Gold'], test['Gold_Pred_Undiff'])

In [None]:
evaluate_forecast(df_test['Silver'], test['Silver_Pred_Undiff'])

# VARMAX Parameter Grid Search

In [None]:
test.drop(columns='Gold_Pred_Undiff', inplace=True)
test.drop(columns='Silver_Pred_Undiff', inplace=True)

In [None]:
print("p", "q", "rmse-g", "rmse-s")

for p in range(1,5):
    for q in range(1,5):
        pred_gold = np.array([])
        pred_silver = np.array([])
        history = train
        try:
            for i in range(0,len(test)):
                model = VARMAX(history, order=(p,q))
                result = model.fit(maxiter=10)
                yhat = result.predict(start=len(history)+1,end=len(history)+1)
                pred_gold = np.append(pred_gold, yhat.values.flatten()[0])
                pred_silver = np.append(pred_silver, yhat.values.flatten()[1])
                history = history.append(test.iloc[i-1:i,:])
            print(p, q, round(rmse(test['Gold_Diff'].values,pred_gold),3), round(rmse(test['Silver_Diff'].values,pred_silver),3))
        except:
            print(p, q, 'LinAlg Error')

### Walkforward Validation

In [None]:
p = 2
q = 3

pred_gold = np.array([])
pred_silver = np.array([])
history = train
    
for i in range(0,len(test)):

    model = VARMAX(history, order=(p,q))
    result = model.fit(maxiter=20)
    yhat = result.predict(start=len(history)+1,end=len(history)+1)

    pred_gold = np.append(pred_gold, yhat.values.flatten()[0])
    pred_silver = np.append(pred_silver, yhat.values.flatten()[1])
    history = history.append(test.iloc[i-1:i,:])

In [None]:
test['Gold_Pred_Undiff'] = np.zeros(len(test))
test['Silver_Pred_Undiff'] = np.zeros(len(test))

test['Gold_Pred_Undiff'][0] = df['Gold'][len(train['Gold_Diff'])] + pred_gold[0]
test['Silver_Pred_Undiff'][0] = df['Silver'][len(train['Silver_Diff'])] + pred_silver[0]

for i in range (1, len(test)):
    test['Gold_Pred_Undiff'][i] = test['Gold_Pred_Undiff'][i-1] + pred_gold[i]
    test['Silver_Pred_Undiff'][i] = test['Silver_Pred_Undiff'][i-1] + pred_silver[i]

In [None]:
evaluate_forecast(df_test['Gold'], test['Gold_Pred_Undiff'])

In [None]:
evaluate_forecast(df_test['Silver'], test['Silver_Pred_Undiff'])

### Multi-Step Forecast

In [None]:
p = 2
q = 3

model = VARMAX(train, order=(p,q))
result = model.fit(maxiter=20)
predictions = result.predict(start=len(train)+1,end=len(train)+len(test)+1)

In [None]:
test['Gold_Pred_Undiff'] = np.zeros(len(test))
test['Silver_Pred_Undiff'] = np.zeros(len(test))

test['Gold_Pred_Undiff'][0] = df['Gold'][len(train['Gold_Diff'])] + predictions['Gold_Diff'].values[0]
test['Silver_Pred_Undiff'][0] = df['Silver'][len(train['Silver_Diff'])] + predictions['Silver_Diff'].values[0]

for i in range (1, len(test)):
    test['Gold_Pred_Undiff'][i] = test['Gold_Pred_Undiff'][i-1] + predictions['Gold_Diff'].values[i]
    test['Silver_Pred_Undiff'][i] = test['Silver_Pred_Undiff'][i-1] + predictions['Silver_Diff'].values[i]

In [None]:
evaluate_forecast(df['Gold'].iloc[len(train)+1:].values,test['Gold_Pred_Undiff'].values)

In [None]:
evaluate_forecast(df['Silver'].iloc[len(train)+1:].values,test['Silver_Pred_Undiff'].values)

# Ensembling

### Walkforward Validation

In [None]:
def naive_forecast_walkforward(train, test):
    history = [x for x in train]
    predictions = list()
    for i in range(len(test)):
        yhat = history[-1]
        predictions.append(yhat)
        history.append(test[i])
    return predictions

In [None]:
series = 'Gold'
n_test = 10
X = df[series].values
train, test = train_test_split(X, n_test)

In [None]:
gold_pred = naive_forecast_walkforward(train,test)
evaluate_forecast(test, gold_pred)

In [None]:
combined = list()
for i in range(len(test)):
    yhat = (test[i] + gold_pred[i]) / 2
    combined.append(yhat)
combined

In [None]:
evaluate_forecast(test,combined)