In [1]:

import numpy as np
import pandas as pd
from scipy.stats import shapiro

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import pacf, adfuller

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt

import yfinance as yf

from datetime import datetime

%matplotlib inline

# VALE 

**SUMMARY**

Vale is a Brazilian multinational corporation in the metals and mining industry, and is one of the largest logistics operators in Brazil. They are the largest producer of iron ore and nickel globally, and also produce manganese, ferroalloys, copper, bauxite, potash, kaolin, and cobalt. The company operates hydroelectric plants, railroads, ships, and ports to transport their products. Despite being the most valuable company in Latin America, Vale has faced criticism for its two catastrophic tailings dam failures in Mariana (2015) and Brumadinho (2019), which resulted in the loss of its license to operate eight dams and a decline in stock value. (- Wikipedia page)

**RATIONAL FOR CHOOSING TIME SERIES**

The article mentions that Vale, the Brazilian mining group, has received multiple bids for a stake in its base metals business, which includes nickel, copper, cobalt, and platinum group metals that are vital for the energy transition. The CEO of Vale has stated that the base metals unit could one day outgrow the company and float on the stock market. As the world moves towards electrification of transport and power, the demand for energy transition metals is expected to grow. Share prices for major mining companies have risen over the past year due to the growth in demand for energy transition metals. Therefore, as Vale's base metals unit is positioned to benefit from this trend, the stock of Vale could be worth watching for investors interested in the energy transition. Therefore, this report investigates this hypothesis by investigating the last 300 trading days of VALE stock.

**FT ARTICLE:** https://www.ft.com/content/b03bc946-73da-44d5-8d9e-5541ddb5038b

# **Time Series**

## 1. Download a price time series using an API. The length of the time series T, with $T=300$. The resolution could be any, from tick data to months.

In [2]:
def download_vale_data( start_date, end_date ):
    """ 
    Downloades VALE time series data from Yahoo! Finance's API\\
        between the specified start and end date.
    
    Arguments:
    ----------
        start_date (datetime) : earliest time point from which to collect data
        end_date (datetime) : lates time point from which to collect data
    
    Returns:
    ---------
        data (dataframe) : dataframe of VALE stock data
    """
    vale = yf.download(tickers = 'VALE', start = start_date, end = end_date)
    vale['Date'] = [idx.date() for idx in vale.index] # add date

    return vale

In [3]:
# get VALE data
start_date = datetime(2021, 11, 24) # yyyy-mm-dd
end_date = datetime(2023, 2, 6) # yyyy-mm-dd
vale = download_vale_data( start_date, end_date )

[*********************100%***********************]  1 of 1 completed


In [4]:
# print data
vale.head()

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume,Date
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2021-11-24 00:00:00-05:00,12.56,12.74,12.48,12.71,12.045043,27093600,2021-11-24
2021-11-26 00:00:00-05:00,12.2,12.45,12.11,12.37,11.722832,23473100,2021-11-26
2021-11-29 00:00:00-05:00,12.64,12.66,12.35,12.44,11.789168,25153400,2021-11-29
2021-11-30 00:00:00-05:00,12.66,12.81,12.21,12.37,11.722832,37493900,2021-11-30
2021-12-01 00:00:00-05:00,12.69,12.81,12.24,12.25,11.60911,35263600,2021-12-01


In [5]:
# print meta
vale.info();


<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 300 entries, 2021-11-24 00:00:00-05:00 to 2023-02-03 00:00:00-05:00
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Open       300 non-null    float64
 1   High       300 non-null    float64
 2   Low        300 non-null    float64
 3   Close      300 non-null    float64
 4   Adj Close  300 non-null    float64
 5   Volume     300 non-null    int64  
 6   Date       300 non-null    object 
dtypes: float64(5), int64(1), object(1)
memory usage: 18.8+ KB


In [6]:
# print data statistics
vale.describe()

Unnamed: 0,Open,High,Low,Close,Adj Close,Volume
count,300.0,300.0,300.0,300.0,300.0,300.0
mean,15.602733,15.832533,15.376033,15.618333,15.10448,34696640.0
std,2.292406,2.301912,2.271924,2.296789,2.17158,12598940.0
min,12.0,12.19,11.72,12.14,11.60911,11023100.0
25%,13.5675,13.7875,13.37,13.6,13.226705,25951650.0
50%,15.295,15.56,15.135,15.355,14.760154,32599800.0
75%,17.24,17.5175,17.01,17.2825,16.692041,41765350.0
max,21.09,21.290001,21.040001,21.23,20.119297,83276300.0


## 2. Plot the price time series

In [7]:
def plot_time_series( dates, title, xlabel, ylabel, **series ):
    """
    Plots a given time series

    Arguments:
    ----------
        series (array_like) : time series to plot
        dates (datetime) : coresponding time stamps of time series
        title (str) : plot title
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label

    Returns:
    ----------
        fig (figure) : figure object of plot
    """

    # blues = ["#072F5F", "#1261A0", "#3895D3", "#58CCED"]

    names = series.keys()
    data = [x for x in series.values()]
    palette = ['#000000', '#58CCED', '#1261A0', '#072F5F']

    fig = go.Figure()

    for name, P, colour in zip(names, data, palette[:len(names)]):
        n_dates = len(dates)
        n_series = len(P)
        dif = n_dates - n_series
        fig.add_trace(go.Scatter(x=dates[dif:], y=P, mode='lines', name=name, line=dict(color=colour))) # Price series

    fig.update_layout(
        height = 500,
        title={'text': f"<b>{title}</b>", 'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'},
        xaxis_title="Date",
        yaxis_title="Open Price USD",
        showlegend = True,
        legend_title_text='Series:',
        legend=dict(
                    orientation="h",
                    yanchor="bottom",
                    y=1.02,
                    xanchor="right",
                    x=1
                ))

    return fig


In [8]:
# plot time series
dates = vale.Date.values
open_price = vale.Open.values
open_price_plot = plot_time_series( dates, title='VALE open price USD', xlabel='Date', ylabel='Open Price USD', open_price = open_price)
open_price_plot.update_layout(height = 400, width = 1300)
open_price_plot.show()
open_price_plot.write_image("images/fig_1_vale_open_price.png")

# **Moving Averages**

## 3. Define mathematically the moving average of the price time series with an arbitrary time window $\tau$

The moving average of a time series with time window $\tau$ is the average of the $\tau$ consecutive values in the time series, calculated for each time step and shifted by one time step at a time, resulting in a smoothed version of the original time series. The choice of $\tau$ determines the smoothness of the moving average. A larger $\tau$ results in a smoother moving average, while a smaller $\tau$ will result in a less smooth but more responsive moving average that is closer to the original time series.

Mathematically we can define it as follows: 

Let $X$ be the time series and $X_t$ be the value of the time series at time step $t$. Then, the moving average of $X$ with window size $\tau$ is defined as:

$$ Y_t = \frac{1}{\tau}(X_{t} + X_{t-1} + X_{t-2} + ... + X_{t-\tau}) $$

where $Y_t$ is the moving average at time step t. The moving average is calculated for each time step by taking the average of the $\tau$ consecutive values in the time series, starting from the current time step and including the $\tau - 1$ preceding values. The result is a smoothed version of the original time series.

In [9]:
def moving_average( X, t ):
    """ 
    Computes the moving average for a time sereis X and window t

    Arguments:
    ----------
        X (array_like) : time series 
        t (int) : window of moving average
    
    Returns:
    __________
        Y (int) : moving average time series
    """
    assert t < len(X), 'Specified window is larger than series'

    cumsum = np.cumsum(X)
    Y = (cumsum[t-1:] - cumsum[:-t+1]) / t
    
    return Y

In [10]:
# get moving average
open_price_ma_10 = moving_average(open_price, 10) # light blue
open_price_ma_20 = moving_average(open_price, 20) # medium blue
open_price_ma_30 = moving_average(open_price, 30) # dark blue

open_price_and_ma_plot = plot_time_series( dates, title='VALE open price USD with three moving averages', xlabel='Date', ylabel='Open Price USD', 
                                        open_price = open_price, ma_10 = open_price_ma_10, ma_20 = open_price_ma_20, ma_30 = open_price_ma_30,)
open_price_and_ma_plot.update_layout(showlegend=True, height = 400, width = 1300)
open_price_and_ma_plot.show()
open_price_and_ma_plot.write_image("images/fig_2_vale_open_price_with_moving_averages.png")

## 6. Compute the linear and log-return of the price time series

**Linear Return**

Let $\mathbf{p} = \{p_t : t = 1, ..., T\}$ be a time series with $p_t$ denoting the value at time $t$. The linear return $r_{\text{linear}(t)}$ of $\mathbf{p}$ at time $t$ is defined as the ratio of the difference between $p_t$ and $p_{t-1}$ to $p_t$. 

$$ r_{\text{linear}(t)} = \frac{p_t - p_{t-1}}{p_t} $$

In this way, the linear return series $\mathbf{r}_{\text{linear}} = \{r_{\text{linear}(t)} : t = 1, ..., T-1\}$ measures the proportionate change in the value of a time series between consecutive time periods.

**Log Return**

Let $\mathbf{p} = \{p_t : t = 1, ..., T\}$ be a time series with $p_t$ denoting the value at time $t$. The log return $r_{\text{log}(t)}$ of $\mathbf{p}$ at time $t$ is defined as the log of the ratio of the difference between $p_t$ and $p_{t-1}$. 

$$ r_{\text{log}(t)} = \log\left(\frac{p_t}{p_{t-1}}\right) $$

In this way, the log return series $\mathbf{r}_{\text{log}} = \{r_{\text{log}(t)} : t = 1, ..., T-1\}$ measures the log of the rate of change between consecutive time periods.

In [11]:
def linear_return(P):
    """ 
    Computes the linear return of a price times series P

    Arguments:
    ----------
        P (array_like) : price time series

    Returns:
    ----------
        R (array_like) : linear return series
    """

    R = (P[1:] - P[:-1]) / P[:-1]
    
    return R


def log_return(P):
    """ 
    Computes the log return of a price times series P

    Arguments:
    ----------
        P (array_like) : price time series

    Returns:
    ----------
        R (array_like) : log return series
    """

    R = np.log(P[1:]) - np.log(P[:-1])
    
    return R


def add_series_to_plot( figure, row, col, title, xlabel, ylabel, showlegend, dates, **series ):
    """
    Plots a given time series

    Arguments:
    ----------
        figure (object) : plotly figure object
        row (int) : specified row in destination plot
        col (int) : specified col in destination plot
        title (str) : plot title
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label
        showlegend (bool) : whether to show legend
        dates (datetime) : coresponding time stamps of time series
        series (array_like) : series to plot

    Returns:
    ----------
        None
    """
    
    names = series.keys()
    data = [x for x in series.values()]
    palette = ['#000000', '#58CCED', '#1261A0', '#072F5F']

    for name, P, colour in zip(names, data, palette[:len(names)]):
        n_dates = len(dates)
        n_series = len(P)
        dif = n_dates - n_series
        figure.add_trace(go.Scatter(x=dates[dif:], y=P, mode='lines', name=name, line=dict(color=colour), showlegend=showlegend), row=row, col=col) # Price series

    figure.update_layout(
        showlegend = showlegend,
        legend_title_text='Series:',
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),)

    # Update xaxis properties
    figure.update_xaxes(title_text=xlabel, row=row, col=col)

    # Update yaxis properties
    figure.update_yaxes(title_text=ylabel, row=row, col=col)

    return None

In [12]:
# linear return
linear_r = linear_return(open_price)
linear_r_ma_10 = moving_average(linear_r, 10)
linear_r_ma_20 = moving_average(linear_r, 20)
linear_r_ma_30 = moving_average(linear_r, 30)

# log return
log_r = log_return(open_price)
log_r_ma_10 = moving_average(log_r, 10)
log_r_ma_20 = moving_average(log_r, 20)
log_r_ma_30 = moving_average(log_r, 30)

# plot
linear_log_return_and_ma_plot = make_subplots(rows=2, cols=1, subplot_titles= ['<b>Linear Return</b>', '<b>Log Return</b>'])
add_series_to_plot( linear_log_return_and_ma_plot, 1, 1, 'Linear Return', 'Date', 'Linear Return', False, dates, return_series = linear_r, ma_10 = linear_r_ma_10, ma_20 = linear_r_ma_20, ma30 = linear_r_ma_30 )
add_series_to_plot( linear_log_return_and_ma_plot, 2, 1, 'Log Return', 'Date', 'Log Return', True, dates, return_series = log_r, ma_10 = log_r_ma_10, ma_20 = log_r_ma_20, ma30 = log_r_ma_30 )
linear_log_return_and_ma_plot.update_layout(height = 500, width = 1300)
linear_log_return_and_ma_plot.show()
linear_log_return_and_ma_plot.write_image("images/fig_3_return_series_with_moving_averages.png")


In [13]:
# print descriptive statistics of series
print(pd.DataFrame({'linear_return' : linear_r, 'log_return' : log_r}).describe())

       linear_return  log_return
count     299.000000  299.000000
mean        0.001548    0.001138
std         0.028704    0.028651
min        -0.086070   -0.090002
25%        -0.017701   -0.017859
50%         0.000627    0.000627
75%         0.018974    0.018796
max         0.084421    0.081046


In [14]:
def plot_scatter( x, y, title, xlabel, ylabel, showlinear = True ):
    """ 
    Plots scatter plot data
    
    Arguments:
    ----------
        x (array_like) : series to be plotted against
        y (array_like) : seris to plot against
        title (str) : plot title
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label
    
    Returns:
    ----------
        figure (object) : plotly figure object
    """
    fig = go.Figure()

    if showlinear:
        min_val = round(min(linear_r), 1)
        max_val = round(max(linear_r), 1)
        lin_data = [min_val, max_val]
        fig.add_trace(go.Scatter(x=lin_data, y=lin_data, mode='lines', line=dict(color='red'))) # Linear Relationshiop

    fig.add_trace(go.Scatter(x=x, y=y, mode='markers', line=dict(color='#000000'))) # Scatter plot
    fig.update_layout(
                title={'text': f"<b>{title}</b>", 'y':0.9, 'x':0.5, 'xanchor': 'center','yanchor': 'top'},
                xaxis_title=f"{xlabel}",
                yaxis_title=f"{ylabel}",
                showlegend = False)
    
    return fig

In [15]:
linear_vs_log_return_fig = plot_scatter( linear_r, log_r, 'Linear Return against Log Return', 'Linear Return', 'Log Return', True )
linear_vs_log_return_fig.update_layout(height = 400, width = 1300)
linear_vs_log_return_fig.show()
linear_vs_log_return_fig.write_image("images/fig_4_linear_return_vs_log_return.png")

# **Time Series Analysis**

## 8. Define the auto-correlation function (for a stationary time-series)


**AUTO-CORRELATION FUNCTION (ACF) OF A STATIONARY TIME SERIES**

The acf of a stationary time series is defined as follow:

$$ \rho(h) = \frac{\gamma(t+h, t)}{\sqrt{\gamma(t+h, t+h)\gamma(t, t)}} = \frac{\gamma(h, 0)}{\gamma(0, 0)} $$

where the following results have be used:
1. A weakly stationary times series has constant mean $\mu$
2. A weekly staitonary times sereies's $\gamma(s,t)$ depends only on $s$ and $t$ via their lag $h = |s - t| = |t+h - t|$. Therefore, $\gamma(t+h , t) = \gamma(h, 0)$ because the "lag" of these two shifts are equal (justifying the numerator). Further note that for any $\gamma(t, t) = \gamma(0,0)$ because again, the value of the "lag" are equal (justifying the denominator). 

**SAMPLE AUTO-CORRELATION FUNCTION**

The ACF for a limited number of observations $x_1, ..., x_n$ is define as:

$$ \hat{\rho}(h) = \frac{\hat{\gamma}(h, 0)}{\hat{\gamma}(0, 0)} $$

where,

* $ \hat{\rho}(h) = \frac{1}{n} \sum_{t=1}^{n-h} (x_{t+h} - \bar{x})(x_t - \bar{x}) $
* $ \bar{x} = \frac{1}{n} \sum_{t=1}^n x_t $

In [16]:
def acf( P ):
    """  
    Returns the sample auto-correlation function for a price seiers P

    Arguments:
        P: Price time series

    Returns:
        rho: acf for the price time series P
    """
    n = len(P)
    mu = np.mean(P)

    # get gamma(0,0)
    gamma_0 = (1/n) * ((P - mu) @ (P - mu))

    # get gamma(h, 0)
    def gamma( h ):
        gamma_h = (1/n) * ((P[h:] - mu) @ (P[:n-h] - mu))
        return gamma_h
    
    gamma_h = np.array([gamma(h) for h in range(n)])

    # get rho(h)
    rho = gamma_h / gamma_0

    return rho


def plot_correlation_function( correlation_function, type, title, xlabel, ylabel, n_observation ):
    """
    Plots the ACF or PCAF with a provided length of the series for the significance estimates

    Arguments:
    ----------
    correlation_function (array_like) : the computed ACF / PCAF function
    type (str) : either 'ACF' or 'PCAF'
    title (str) : figure title
    xlabel (str) : x-axis label
    ylabel (str) : y-axis label
    n_observations (int) : number of samples in the original series

    Returns:
    ---------
        figure (object) : plotly figure object
    """
    N = len(correlation_function)
    x = [x for x in range(N)]

    fig = go.Figure()
    fig.add_trace(go.Bar(x=x, y=correlation_function, marker_color='black')) # correlation function
    fig.add_trace(go.Scatter(x = x, y = [2/(np.sqrt(n_observation))]*N, mode='lines', line=dict(color='red', dash='dash', width = 1.5),))
    fig.add_trace(go.Scatter(x = x, y = [-2/(np.sqrt(n_observation))]*N, mode='lines', line=dict(color='red', dash='dash', width = 1.5),))
    fig.update_layout(title={'text': f"<b>{title}</b>", 'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'},
                        xaxis_title=f"{xlabel}",
                        yaxis_title=f"{ylabel}",
                        showlegend = False)
    return fig




In [17]:
# get ACF
acf_open_price = acf(open_price)

# plot
acf_open_price_plot = plot_correlation_function( acf_open_price, 'ACF', 'ACF of Vale Open Price USD', 'Lag', 'ACF', 300 )
acf_open_price_plot.update_layout(height = 400, width = 1300)
acf_open_price_plot.show()
acf_open_price_plot.write_image("images/fig_5_acf_open_price.png")

## 11. Compute the partial auto-correlation function (PACF) of the price time series


**PARTIAL AUTO-CORRELATION FUNCTION (ACF)**

_The partial autocorrelation function (PCAF) of a stationary time series x_t denoted $\phi_{hh}$, for $h = 1, 2, ...,$ is_

$$ \phi_{11} = corr(x_{t+1}, x_t) = \rho(1) $$

_and for $h \geq 2$ ,_

$$ \phi_{hh} = corr(x_{t+h} - \hat{x}_{t+h}, x_t - \hat{x}_t) $$

In the above definition, $\hat{x}_{t+h}$ denotes the regression of $x_{t+h}$ on $\{ x_{t+h−1} , x_{t+h−2} , ... , x_{t+1} \}$, that is, 

$$ \hat{x}_{t+h} =  \beta_1 x_{t+h−1} + \beta_2 x_{t+h−2} + ... + \beta_{h−1}x_{t+1} $$

andn similiarly, $\hat{x}_t$ denotes the regression of $x_t$ on $\{ x_{t+1}, x_{t+2}, ... , x_{t+h−1} \}$ that is, 

$$ \hat{x}_t = \beta_1 x_{t+1} + \beta_2 x_{t+2} + ... + \beta_{h−1} x_{t+h−1} $$

Further, if the mean is not $0$, then we need only replace $x_t$ by $x_t - \mu_x$ where $\mu_x$ is the mean of the series.

In [18]:
# get ACF
pacf_open_price = pacf(open_price, nlags= len(open_price) // 2 - 1, method='ols')

# plot
pacf_open_price_plot = plot_correlation_function( pacf_open_price, 'PACF', 'PACF of Vale Open Price USD', 'Lag', 'ACF', 300 )
pacf_open_price_plot.update_layout(height = 400, width = 1300)
pacf_open_price_plot.show()
pacf_open_price_plot.write_image("images/fig_6_pacf_open_price.png")

In [19]:
def add_barplot_to_plot( figure, row, col, xlabel, ylabel, showlegend, n_observation, correlation_function ):
    """
    Plots a given time series

    Arguments:
    ----------
        figure (object) : plotly figure object
        row (int) : specified row in destination plot
        col (int) : specified col in destination plot
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label
        showlegend (bool) : whether to show legend
        correlation_functions (array_like) : the correlation function to plot

    Returns:
    ----------
        None
    """

    N = len(log_r) // 2 - 1
    x = [x for x in range(N)]

    # plot traces
    figure.add_trace(go.Bar(x = x, y = correlation_function, marker_color='black'), row=row, col=col) # correlation function
    figure.add_trace(go.Scatter(x = x, y = [2/(np.sqrt(n_observation))]*N, mode='lines', line=dict(color='red', dash='dash', width = 1.5),), row=row, col=col)
    figure.add_trace(go.Scatter(x = x, y = [-2/(np.sqrt(n_observation))]*N, mode='lines', line=dict(color='red', dash='dash', width = 1.5),), row=row, col=col)
    figure.update_layout(showlegend=False)  

    # label axes
    figure.update_xaxes(title_text=xlabel, row=row, col=col)
    figure.update_yaxes(title_text=ylabel, row=row, col=col)
    

    return None

In [20]:
# get ACF and PCAF of log return
acf_log_return = acf(log_r)
pacf_log_return = pacf(log_r, nlags= len(log_r) // 2 - 1, method='ols')

# plot
log_return_acf_and_pcaf_plot = make_subplots(rows=2, cols=1, subplot_titles=['<b>Log Return ACF</b>', '<b>Log Return PCAF</b>'])
add_barplot_to_plot(log_return_acf_and_pcaf_plot, 1, 1, 'Lag', 'ACF', False, 300, acf_log_return)
add_barplot_to_plot(log_return_acf_and_pcaf_plot, 2, 1, 'Lag', 'PACF', False, 300, pacf_log_return)
log_return_acf_and_pcaf_plot.update_layout(height=500, width = 1300)
log_return_acf_and_pcaf_plot.show()
log_return_acf_and_pcaf_plot.write_image("images/fig_7_acf_pacf_log_return.png")

# **ARMA Models**

## 17. Define mathematically an ARMA(p,q) model

**ARMA(p,q)**

The notion of autoregressive (AR) and moving average (MA) models can be mixed to an autoregressive moving average (ARMA), models for stationary time series. It can formally be defined as follows:

_A time series $\{ x_t; t = 0, \pm1, \pm2, ... \}$ is ARMA(p,q) if it is,_

1. _Stationary_
2. $ x_t  = \phi_1 x_{t-1} + ... + \phi_p x_{t-p} + w_t + \theta_1 w_{t-1} + ... + \theta_q w_{t-q} $

_with $\phi_p \neq 0$, $\theta_q \neq 0$, and $\sigma_w^2 > 0$._ 

Note: The parameters p and q are called the autoregressive and the moving average orders, respectively.

_If the series $x_t$ has a nonzero mean $\mu$, we set $\alpha=\mu(1−\phi_1 − ... −\phi_p)$ and write the model as_

$$ x_t = \alpha + \phi_1 x_{t−1} + ... + \phi_p x_{t−p} + w_t + \theta_1 w_{t−1} + ... + \theta_q w_{t−q} $$

_Alternatively, the ARMA model can be written in terms of the AR and MA operators concisely as_

$$ \phi(B)x_t = \theta(B)w_t  $$




## 18. Define a training and test set and fit an ARMA model to the price time series

In [21]:
def train_test_split( X, test_size ):
    """
    Split data into a training and test set preserving 
    the order of the data using spefied split

    Arguments:
    ----------
        X (array_like) :  Time series data to split
        test_size (float) : percentage of data to put in training set
    
    Returns:
        train (array_like) : Training data containing 100*(1-test_size)% of the data
        test (array_like) : test data containing 100*test_size% of the data
    """

    assert test_size <= 1, 'Not a valid percentage, requires 0 <= test_size <= 1'

    # split dataset
    split_value = int(len(X) * (1-test_size))
    train, test = X[:split_value], X[split_value:]

    return train, test

def train_val_test_split( X, val_size, test_size ):
    """
    Split data into a training, validation and test set preserving 
    the order of the data using spefied split

    Arguments:
    ----------
        X (array_like) :  Time series data to split
        val_size (float) : percentage of data to put in training set
    
    Returns:
        train (array_like) : Training data containing 100*(1-val_size-test_size)% of the data
        val (array_like) : Validation data containing 100*val_size of the data
        test (array_like) : test data containing 100*test_size% of the data
    """

    assert val_size + test_size <= 1, 'Not a valid percentage, requires 0 <= val_size + test_size <= 1'

    # split dataset
    train, test = train_test_split( X, test_size )
    train, val = train_test_split( train, val_size )

    return train, val, test

In [22]:
# split dataset
train, test = train_test_split(open_price, 0.2)

## 19. Display the parameters of the model and its Mean Squared Error (MSE) in the training set and in the test set

In [23]:
def print_model_summary( arma_model ):
    """
    Prints model summary for a given ARMA(p,q) model

    Arguments:
    ----------
        arma_model: fitted ARMA(p, q) model
    
    Returns:
    ----------
        None
    """
    print(arma_model.summary())

    return None

def print_model_measures_of_fit( arma_model ):
    """
    Prints the AIC, BIC and HQIC for a given ARMA(p, q) model

    Arguments:
    ----------
        model: fitted model 
    
    Returns:
    ----------
        none
    """
    print('##### MEASURES OF FIT #####')
    print(f'AIC:     {arma_model.aic:.3f}')
    print(f'BIC:     {arma_model.bic:.3f}')
    print(f'HQIC:    {arma_model.hqic:.3f}')

    return None


def mean_squared_error( y_true, y_pred ):
    """ 
    Computes the MSE for predicted values

    Parameters
    ----------
        y_true : array_like
                True values 

        y_pred : array_like
                Predicted values from model

    Returns
    -------
        mse : float
                MSE for predicted values given true observations
    """
    return ((y_true - y_pred) ** 2).mean()

def multi_arma_aic_bic_mse( train_data, max_p, max_q ):
    """
    Cycles through all ARMA models formed by combinations 
    of 0 <= p <= max_p, and 0 <= q <= max_q and stores the
    respective AIC and BIC in dictionary fomat

    Arguments:
    ----------
        train_data (array_like) : Training data for model
        max_p (int) : max auto-regressive order of ARMA model
        max_q (int) : max moving average order of ARMA model
    
    Returns:
    ----------
        aic_bic_dict (dict) : Dictionary with respective AIC and BIC
                                indexed by string 'ARMA(p,q)' : {'AIC' : value, 'BIC': value}
    """
    aic_bic_dict = {}
    for p in range(1, max_p+1):
        for q in range(1, max_q+1):
            arma_model = ARIMA(train_data, order=(p, 0, q)).fit()
            y_train_pred  = arma_model.predict(start=max(p,q)+1, end = len(train_data))
            mse = mean_squared_error(train_data[max(p,q):], y_train_pred)
            aic_bic_dict[f'ARMA({p},{q})'] = {'AIC' : round(arma_model.aic,3), 'BIC' : round(arma_model.bic, 3), 'MSE' : round(mse, 5)}
    
    return aic_bic_dict

def print_model_selection_criteria( model_selection_dict ):
    """
    Print the AIC, BIC and MSE of models in the dectionary 

    Arguments:
    ----------
        model_section_dict (dictionary) : Dictionary indexed by model names holding the corresponding AIC, BIC and MSE

    Return:
    ----------
        None
    """
    for a in model_selection_dict:
        print('Model:   ', a)
        print('Results: ', model_selection_dict[a],'\n')

    return None

In [24]:
# model selection
criterion_dict = multi_arma_aic_bic_mse( train, 2, 2 )
print_model_selection_criteria(criterion_dict)


Non-stationary starting autoregressive parameters found. Using zeros as starting parameters.


Non-invertible starting MA parameters found. Using zeros as starting parameters.



Model:    ARMA(1,1)
Results:  {'AIC': 327.691, 'BIC': 341.613, 'MSE': 0.00393} 

Model:    ARMA(1,2)
Results:  {'AIC': 329.48, 'BIC': 346.884, 'MSE': 0.00402} 

Model:    ARMA(2,1)
Results:  {'AIC': 329.532, 'BIC': 346.935, 'MSE': 0.00397} 

Model:    ARMA(2,2)
Results:  {'AIC': 329.003, 'BIC': 349.887, 'MSE': 0.00732} 



In [25]:
# train ARMA(1, 1) model
arma11 = ARIMA(train, order=(1, 0, 1)).fit()

## 19. Display the parameters of the model and its Mean Squared Error (MSE) in the training set and in the test set

In [26]:
def get_out_of_sample_model_train_test_mse( model, train, test ):
    """
    Prints a models training and test MSE

    Arguments:
    ----------
        model (object) : given model to be evaluated
        train (array_like) : model training set
        test (array_like) : model test set

    Returns:
    ----------
        train_mse (float) : training MSE
        test_mse (float) : test MSE
    """
    # get lengths
    n_train = len(train)
    n_test = len(test)

    # make predictions
    y_train_pred = model.predict(start=1, end = n_train)
    y_test_pred = model.forecast(steps = n_test)

    # get mse
    train_mse = mean_squared_error(train, y_train_pred)
    test_mse = mean_squared_error(test, y_test_pred)

    return train_mse, test_mse


def get_pseudo_out_of_sample_forecast( model, train, test ):
    """
    Computes the pseudo out-of-sample forecast for a given model
    
    Arguments:
    ----------
        model (object) : statsmodels ARMA object
        train (array_like) : model training data
        test (array_like) : model test data
    
    Returns:
        pseudo_fcast (dictionary) : pseudo out-of-sample forecast
                                    with observational index
                                    i.e {index : forecast}
    """
    # get lengths
    n_train = len(train)
    n_test = len(test)

    # get forecast
    forecasts = {}
    forecasts[n_train] = model.forecast()

    # Step through the rest of the sample
    for t in range(1, n_test):

        # Update the model with the new observation without changing param estimates
        model = model.append(test[t-1:t], refit=False)

        # Save the new set of forecasts
        forecasts[n_train+t] = model.forecast()
    
    return forecasts


def get_pseudo_out_of_sample_model_train_test_mse( model, train, test ):
    """
    Prints a models training and test MSE

    Arguments:
    ----------
        model (object) : given model to be evaluated
        train (array_like) : model training set
        test (array_like) : model test set

    Returns:
    ----------
        train_mse (float) : training MSE
        test_mse (float) : test MSE
    """
    # get lengths
    n_train = len(train)
    n_test = len(test)

    # make predictions
    y_train_pred = model.predict(start=1, end = n_train)

    # get forecast
    fcast =  get_pseudo_out_of_sample_forecast( model, train, test )
    y_test_pred = np.array([x[0] for x in fcast.values()])

    # get mse
    train_mse = mean_squared_error(train, y_train_pred)
    test_mse = mean_squared_error(test, y_test_pred)

    return train_mse, test_mse

In [27]:
# print coeffients
print_model_summary(arma11)

                               SARIMAX Results                                
Dep. Variable:                      y   No. Observations:                  240
Model:                 ARIMA(1, 0, 1)   Log Likelihood                -159.845
Date:                Sun, 19 Feb 2023   AIC                            327.691
Time:                        16:38:59   BIC                            341.613
Sample:                             0   HQIC                           333.301
                                - 240                                         
Covariance Type:                  opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const         14.8411      1.337     11.098      0.000      12.220      17.462
ar.L1          0.9765      0.013     76.633      0.000       0.952       1.001
ma.L1          0.0818      0.059      1.375      0.1

In [28]:
# get MSEs
arma11_train_mse, arma11_test_mse = get_out_of_sample_model_train_test_mse( arma11, train, test )

# print MSEs for out of sample
print('Out of Sample:')
print(f'- Training MSE:  |  {arma11_train_mse:.3f}')
print(f'- Test MSE:      |  {arma11_test_mse:.3f}\n')

# print MSEs for pseudo out of sample
arma11_train_mse_pseudo, arma11_test_mse_pseudo = get_pseudo_out_of_sample_model_train_test_mse( arma11, train, test )
print('Pseudo Out of Sample:')
print(f'- Training MSE:  |  {arma11_train_mse_pseudo:.3f}')
print(f'- Test MSE:      |  {arma11_test_mse_pseudo:.3f}')

Out of Sample:
- Training MSE:  |  0.004
- Test MSE:      |  5.999

Pseudo Out of Sample:
- Training MSE:  |  0.004
- Test MSE:      |  0.123


## 20. Plot the price time series vs the ARMA forecast in the test set

In [29]:
def plot_out_of_sample_forecast( model, train, test, title, xlabel, ylabel  ):
    """
    Plots the out-of-sample forecast for a given model

    Arguments:
    ----------
        model (object) : statsmodels ARMA object
        train (array_like) : model training data
        test (array_like) : model test data
        title (str) : plot title
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label
    
    Returns:
        figure (object) : plotly figure object
    """
    # get lengths
    n_train = len(train)
    n_test = len(test)

    # make forecast
    fcast = model.get_forecast(steps = n_test).summary_frame()
    y_pred = fcast['mean']
    y_pred_mean_ci_lower = fcast['mean_ci_lower'].values
    y_pred_mean_ci_upper = fcast['mean_ci_upper'].values

    # plot
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=dates[n_train:], y=y_pred_mean_ci_lower, mode='lines', name='Lower 95% CI', line=dict(color='grey'))) # Lower CI
    fig.add_trace(go.Scatter(x=dates[n_train:], y=y_pred_mean_ci_upper, mode='lines', name='Upper 95% CI', line=dict(color='grey'), fill='tonexty')) # Upper CI
    fig.add_trace(go.Scatter(x=dates[n_train-n_test:n_train], y=train[-n_test:], mode='lines', name='train', line=dict(color='#000000'))) # Train Price series
    fig.add_trace(go.Scatter(x=dates[n_train:], y=test, mode='lines', name='test', line=dict(color='blue'))) # Test Price series
    fig.add_trace(go.Scatter(x=dates[n_train:], y=y_pred, mode='lines', name='forecast', line=dict(color='red'))) # Out of sample forecast


    fig.update_layout(
        title={'text': f"<b>{title}</b>", 'y':0.9, 'x':0.5, 'xanchor': 'center','yanchor': 'top'},
        xaxis_title=f"{xlabel}",
        yaxis_title=f"{ylabel}",
        showlegend=True,
        legend_title_text='Series:',
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),)
    
    return fig



def plot_pseudo_out_of_sample_forecast( model, train, test, title, xlabel, ylabel ):
    """
    Plots the pseudo out-of-sample forecast for a given model

    Arguments:
    ----------
        model (object) : statsmodels ARMA object
        train (array_like) : model training data
        test (array_like) : model test data
        title (str) : plot title
        xlabel (str) : x-axis label
        ylabel (str) : y-axis label
    
    Returns:
        figure (object) : plotly figure object
    """
    # get lengths
    n_train = len(train)
    n_test = len(test)

    # get forecast
    fcast = get_pseudo_out_of_sample_forecast( model, train, test )
    y_pred = np.array([x[0] for x in fcast.values()])

    # plot
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=dates[n_train-n_test:n_train], y=train[-n_test:], mode='lines', name='train', line=dict(color='#000000'))) # Price series
    fig.add_trace(go.Scatter(x=dates[n_train:], y=test, mode='lines', name='test', line=dict(color='blue'))) # Price series
    fig.add_trace(go.Scatter(x=dates[n_train:], y=y_pred, mode='lines', name='forecast', line=dict(color='red'))) # Price series

    fig.update_layout(
        title={'text': f"<b>{title}</b>", 'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'},
        xaxis_title=f"{xlabel}",
        yaxis_title=f"{ylabel}",
        showlegend=True,
        legend_title_text='Series:',
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),)

    return fig

In [30]:
# plot
open_price_out_of_sample_forecast_plot = plot_out_of_sample_forecast( arma11, train, test, 'Out of Sample Test Set Forecast for ARMA(1,1)', 'Date', 'Vale Price USD' )
open_price_out_of_sample_forecast_plot.update_layout(height = 400, width = 1300)
open_price_out_of_sample_forecast_plot.show()
open_price_out_of_sample_forecast_plot.write_image("images/fig_8_open_price_out_of_sample_forecast.png")

In [31]:
open_price_pseudo_out_of_sample_forecast_plot = plot_pseudo_out_of_sample_forecast( arma11, train, test, 'Pseudo Out-of-Sample Forecast for ARMA(1,1)', 'Date', 'Vale Price USD' )
open_price_pseudo_out_of_sample_forecast_plot.update_layout(height = 400, width = 1300)
open_price_pseudo_out_of_sample_forecast_plot.show()
open_price_pseudo_out_of_sample_forecast_plot.write_image("images/fig_9_open_price_pseudo_out_of_sample_forecast.png")

## 21. Fit an ARMA model to the return time series

In [32]:
# split data
train, test = train_test_split(log_r, 0.2)

In [33]:
# select model
criterion_dict = multi_arma_aic_bic_mse( train, 2, 1 )
print_model_selection_criteria(criterion_dict)

Model:    ARMA(1,1)
Results:  {'AIC': -988.515, 'BIC': -974.609, 'MSE': 0.00081} 

Model:    ARMA(2,1)
Results:  {'AIC': -986.749, 'BIC': -969.366, 'MSE': 0.00082} 



In [34]:
# train ARMA(2, 1) model
arma21 = ARIMA(train, order=(2, 0, 1)).fit()

## 22. Display the parameters of the model and its Mean Squared Error (MSE) in the training set and in the test set

In [35]:
# print coeffients
print_model_summary(arma21)

                               SARIMAX Results                                
Dep. Variable:                      y   No. Observations:                  239
Model:                 ARIMA(2, 0, 1)   Log Likelihood                 498.374
Date:                Sun, 19 Feb 2023   AIC                           -986.749
Time:                        16:39:00   BIC                           -969.366
Sample:                             0   HQIC                          -979.744
                                - 239                                         
Covariance Type:                  opg                                         
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
const          0.0005      0.002      0.258      0.796      -0.003       0.004
ar.L1          0.0257      1.625      0.016      0.987      -3.160       3.212
ar.L2         -0.0403      0.109     -0.371      0.7

In [36]:
# get MSEs
arma21_train_mse, arma21_test_mse = get_out_of_sample_model_train_test_mse( arma21, train, test )

# print MSEs for out of sample
print('Out of Sample:')
print(f'- Training MSE:  |  {arma21_train_mse:.3f}')
print(f'- Test MSE:      |  {arma21_test_mse:.3f}\n')

# print MSEs for pseudo out of sample
arma21_train_mse_pseudo, arma21_test_mse_pseudo = get_pseudo_out_of_sample_model_train_test_mse( arma21, train, test )
print('Pseudo Out of Sample:')
print(f'- Training MSE:  |  {arma21_train_mse_pseudo:.3f}')
print(f'- Test MSE:      |  {arma21_test_mse_pseudo:.3f}')

Out of Sample:
- Training MSE:  |  0.001
- Test MSE:      |  0.000

Pseudo Out of Sample:
- Training MSE:  |  0.001
- Test MSE:      |  0.000


## 23. Plot the return time series vs the ARMA forecast in the test set

In [37]:
# plot
log_return_out_of_sample_forecast_plot = plot_out_of_sample_forecast( arma21, train, test, 'Out of Sample Test Set Forecast for ARMA(2,1)', 'Date', 'Log Return' )
log_return_out_of_sample_forecast_plot.update_layout(height = 400, width = 1300)
log_return_out_of_sample_forecast_plot.show()
log_return_out_of_sample_forecast_plot.write_image("images/fig_10_log_return_out_of_sample_forecast.png")

In [38]:
log_return_pseudo_out_of_sample_forecast_plot = plot_pseudo_out_of_sample_forecast( arma21, train, test, 'Pseudo Out-of-Sample Forecast for ARMA(2,1)', 'Date', 'Log Return' )
log_return_pseudo_out_of_sample_forecast_plot.update_layout(height = 400, width = 1300)
log_return_pseudo_out_of_sample_forecast_plot.show()
log_return_pseudo_out_of_sample_forecast_plot.write_image("images/fig_11_log_return_pseudo_out_of_sample_forecast.png")

# **Gaussianity and Stationarity test**

## 24. Introduce mathematically a Gaussianity test

**Shaprio-Wilk Test**

The Shapiro-Wilk test is a statistical test used to test the null hypothesis that a sample of data comes from a normal distribution. In the context of ARMA models, the Shapiro-Wilk test can be used to test whether the residuals of the model follow a normal distribution specifically in our context whether they follow a white noise distribution i.e $w_t \sim N(0, \sigma_w^2)$.

The test statistic for the Shapiro-Wilk test is given by:

$$ W = \frac{\left( \sum_{i=1}^T a_i r_{(i)} \right)^2}{\sum_{i=1}^T (r_{(i)} - \mu)^2} $$

where $r_{(i)}$ are the ordered sample values of the time series, and $a_i$ are the coefficients that depend on the sample size given by,

$$ a_i = \frac{m^T V^{-1}}{N} $$

where N is a normalization factor., such that $\sum_{i=1}^T a_i^2 = 1$, and $m$ is the vector of expected values of all the order statistics in a Gaussian distribution, and $V$ is the expected covariance of pairs of order statistics. The test statistic $W$ measures the degree of departure of the sample from the normal distribution. Under the null hypothesis of normality, the Shapiro-Wilk test statistic follows a distribution that is close to a standard normal distribution.

If the p-value is less than the significance level (e.g., 0.05), then the null hypothesis of normality is rejected, and it is concluded that the residuals are not normally distributed, and therefore the residuals do not follow a normal distribution violating a modelling assumption.

## 25. Perform a Gaussianity test of the return time series

In [39]:
def shaprio_wilk_normality_test( sample ):
    """ 
    Computes the test statistic and p-value of 
    the Shapiro-Wilk normality test on a provided sample. \\
    Precisely, it tests the null hypothesis that the give sample was 
    sampled from a normal population. i.e \\
    H0: Sample is drawn from normal population \\
    H1: Sample is not drawn from normal population \\

    Arguments:
    ----------
    sample (array_like) : sample to be tested

    Returns:
    ---------
    W (float) : test stiatics
    p_value (float) : p_value for associated test statistic W
    rejected (bool) : truth value of test 
    """
    # get stats
    W, p_value = shapiro(sample)
    rejected = False

    # accept / reject
    if p_value >= 0.05:
        print('Insuficient evidence to reject null, i.e sample is normally distributed')
        print(f'Test Statistics:  |   {W:.3f}')
        print(f'p_value:          |   {p_value:.3f}')
        print(f'Null rejected:    |   {rejected}')
    else:
        rejected = True
        print('Evidence rejects null, i.e sample is not normally distributed')
        print(f'Test Statistics:  |   {W:.3f}')
        print(f'p_value:          |   {p_value:.3f}')
        print(f'Null rejected:    |   {rejected}')
        
    
    return W, p_value, rejected


In [40]:
# Normality test for Log Return Series
W, p_value, rejected = shaprio_wilk_normality_test( log_r )

Insuficient evidence to reject null, i.e sample is normally distributed
Test Statistics:  |   0.996
p_value:          |   0.737
Null rejected:    |   False


In [41]:
# Normality test for ARMA(2,1) residuals on Log Return Series
W, p_value, rejected = shaprio_wilk_normality_test( arma21.resid )

Insuficient evidence to reject null, i.e sample is normally distributed
Test Statistics:  |   0.997
p_value:          |   0.883
Null rejected:    |   False


## 26. Introduce mathematically a stationarity test

**Augmented Dickey-Fuller (ADF) Test**

_The Augmented Dickey-Fuller (ADF) test is a statistical test for stationarity._ Specifically, the ADF test tests the null hypothesis that a unit root is present in the sample of the time series (i.n $H_0$: Series is not stationary) against the alternative that no such root is present (i.e $H_1$: series is stationary).

The testing procedure for the ADF test uses the augmented Dickey-Fuller regression equation, which takes the form:

$$ \Delta y_t = \alpha + \beta t + \gamma t_{t-1} + \delta_1 \Delta y_{t-1} + ... + \delta_{p-1} \Delta y_{t-p+1} + \epsilon_t $$


where:

* $\Delta y_t$ is the first difference of the time series $y_t$
* $\gamma$ is the coefficient of the lagged value $y_{t-1}$ which tests for the presence of a unit root
* $p$ is the number of lags
* $\delta_i$ are the coefficients of the lagged differences 
* $\epsilon$ is the error term (assumed to be white noise)

Formally, the ADF test specified the null $H_0: \gamma = 0$ (that the series is stationary) against the alternative $H_1: \gamma > 0$ by computing the test statistics:

$$ DF_{p} = \frac{\hat{\gamma}}{SE(\hat{\gamma})} $$ 

The test statistic and its p-value is compared to a critical value from a table based on the sample size and significance level.

## 27. Perform a stationarity test of the return time series

In [42]:
def adfull_test( sample ):
    """ 
    Computes the test statistic and p-value of 
    the ADF stationarity test on a provided sample. \\
    Precisely, it tests the null hypothesis that the give sample has 
    a unit root i.e \\
    H0: Sample has unit root => non-stationary \\
    H1: Sample does not have unit root => stationary \\

    Arguments:
    ----------
    sample (array_like) : sample to be tested

    Returns:
    ---------
    DF (float) : test stiatics
    p_value (float) : p_value for associated test statistic W
    rejected (bool) : truth value of test
    """
    DF, p_value, _, _, _, _ = adfuller(sample)
    rejected = False

    # accept / reject
    if p_value >= 0.05:
        print('Insuficient evidence to reject null, i.e sample has unit root => series is non-stationary')
        print(f'Test Statistics:  |   {DF:.3f}')
        print(f'p_value:          |   {p_value:.3f}')
        print(f'Null rejected:    |   {rejected}')
    else:
        rejected = True
        print('Evidence rejects null, i.e sample has no unit root => series is stationary')
        print(f'Test Statistics:  |   {DF:.3f}')
        print(f'p_value:          |   {p_value:.3f}')
        print(f'Null rejected:    |   {rejected}')
    
    return DF, p_value, rejected
    

In [43]:
# Stationarity test on Log Return Series
DF, p_value, rejected = adfull_test(log_r)

Evidence rejects null, i.e sample has no unit root => series is stationary
Test Statistics:  |   -8.373
p_value:          |   0.000
Null rejected:    |   True


In [44]:
# Stationarity test on Original Opening Price Time Series
DF, p_value, rejected = adfull_test(open_price)

Insuficient evidence to reject null, i.e sample has unit root => series is non-stationary
Test Statistics:  |   -2.035
p_value:          |   0.272
Null rejected:    |   False
