In [1]:
import pandas as pd
import numpy as np

from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tools.eval_measures import rmse, aic

import matplotlib.pyplot as plt
%matplotlib inline

import datetime

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', -1)
pd.plotting.register_matplotlib_converters()
np.set_printoptions(suppress=True)

In [2]:
ita_regional=pd.read_csv("../input/covid19-in-italy/covid19_italy_region.csv")
ita_regional.head(3)

In [3]:
print(ita_regional.info())
# Checking the percentage of missing data in each column
per_missing = ita_regional.isna().sum()*100/len(ita_regional)
per_missing.sort_values(ascending=False)

In [4]:
ita_regional[ita_regional.TestsPerformed.isna()]

In [5]:
# Check for the period covered by the data (total # of days)
ita_regional['Date'] = pd.to_datetime(ita_regional['Date']).dt.normalize()
(ita_regional.Date.max()-ita_regional.Date.min()) + datetime.timedelta(days=1)

In [6]:
var_df = ita_regional.groupby('Date')[['HospitalizedPatients', 'IntensiveCarePatients', 'TotalHospitalizedPatients',
                                      'HomeConfinement', 'CurrentPositiveCases', 'NewPositiveCases',
                                      'Recovered', 'Deaths', 'TotalPositiveCases', 'TestsPerformed']].sum().reset_index()
print("df shape: ", var_df.shape)

In [7]:
var_df.drop(['HospitalizedPatients', 'IntensiveCarePatients', 'NewPositiveCases', 'TotalPositiveCases', 'CurrentPositiveCases'],
            axis=1, inplace=True)

var_df.head(3)

In [8]:
fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(22,5))

for ycol, ax in zip(['TotalHospitalizedPatients', 'HomeConfinement',
                                                  'Recovered', 'Deaths', 'TestsPerformed'], axes):

    var_df.plot(kind='line', x='Date', y=ycol, ax=ax, alpha=0.5, color='r')

In [9]:
def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False, maxlag=5):    
    
    """Check Granger Causality of all possible combinations of the Time series.
    The rows are the response variable, columns are predictors. 

    data      : pandas dataframe containing the time series variables
    variables : list containing names of the time series variables.
    """
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag, verbose=False)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

In [10]:
grangers_causation_matrix(var_df, variables = ['TotalHospitalizedPatients', 'HomeConfinement',
                                                  'Recovered', 'Deaths', 'TestsPerformed']) 

The test Null Hypothesis is that the coefficients of the corresponding past values are zero; That is the X does not cause Y. The P-values in the table are lesser than our significance level (0.05), which implies that the Null Hypothesis can be rejected.

In [45]:
test_frec = 0.25
n_test = round((len(var_df)) * test_frec)
df_train, df_test = var_df[0:-n_test], var_df[-n_test:]
# df_train_copy = df_train.copy()
df_train.drop('Date',1, inplace=True)

In [21]:
## Test to check stationary
def adfuller_test(series, signif=0.05, name='', verbose=False):
    """Perform ADFuller to test for Stationarity of given series and print report"""
    r = adfuller(series, autolag='AIC')
    output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
    p_value = output['pvalue'] 
    def adjust(val, length= 6): return str(val).ljust(length)

    # Print Summary
    print(f'    Augmented Dickey-Fuller Test on "{name}"', "\n   ", '-'*47)
    print(f' Null Hypothesis: Data has unit root. Non-Stationary.')
    print(f' Significance Level    = {signif}')
    print(f' Test Statistic        = {output["test_statistic"]}')
    print(f' No. Lags Chosen       = {output["n_lags"]}')

    for key,val in r[4].items():
        print(f' Critical value {adjust(key)} = {round(val, 3)}')

    if p_value <= signif:
        print(f" => P-Value = {p_value}. Rejecting Null Hypothesis.")
        print(f" => Series is Stationary.")
    else:
        print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.")
        print(f" => Series is Non-Stationary.")

In [22]:
import statsmodels as sm
for name, column in df_train.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')
    
for i in var_df:
    sm.graphics.tsaplots.plot_acf(var_df[i], lags = 50)
    plt.title('ACF for %s' % i)
    plt.show()

In [24]:
## Take 2 step difference
df_differenced = df_train.diff().dropna().diff().dropna()
for name, column in df_differenced.iteritems():
    adfuller_test(column, name=column.name)
    print('\n')
    
for i in df_differenced:
    sm.graphics.tsaplots.plot_acf(df_differenced[i], lags = 50)
    plt.title('ACF for %s' % i)
    plt.show()

In [26]:
model = VAR(df_differenced[['TotalHospitalizedPatients', 'HomeConfinement',
                            'Recovered', 'Deaths', 'TestsPerformed']])

# fitted = model.fit(6)
fitted = model.fit(maxlags=6, ic='aic')
fitted.summary()

In [31]:
fitted.plot()

In [32]:
fitted.plot_acorr()

In [33]:
fitted.plot_forecast(10)

We'll use Durbin-Watson test for this (denoted as d):

* The value of d always lies between 0 and 4.

* d = 2 indicates no autocorrelation.

* If d < 2, there is evidence of positive serial correlation. As a rough rule of thumb, if d < 1.0, there may be cause > for alarm. Small values of d indicate successive error terms are positively correlated.

* If d > 2, successive error terms are negatively correlated. In regressions, this can imply an underestimation of the > level of statistical significance.

In [34]:
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(fitted.resid)

for col, val in zip(var_df[['TotalHospitalizedPatients', 'HomeConfinement',
                                                  'Recovered', 'Deaths', 'TestsPerformed']], out):
    print(col, ':', round(val, 2))


## Forcasting

In [35]:
lag_order = fitted.k_ar

# Input data for forecasting
forecast_input = df_differenced.values[-lag_order:]
forecast_input

In [36]:
var_df_forecast = var_df[['TotalHospitalizedPatients', 'HomeConfinement',
                                                  'Recovered', 'Deaths', 'TestsPerformed']]

fc = fitted.forecast(y=forecast_input, steps=n_test)
df_forecast = pd.DataFrame(fc, index=var_df_forecast.index[-n_test:], columns=var_df_forecast.columns + '_2d')
df_forecast

In [37]:
def invert_transformation(df_train, df_forecast, second_diff=False, third_diff=False):
    """Revert back the differencing to get the forecast to original scale."""
    df_fc = df_forecast.copy()
    columns = df_train.columns
    for col in columns:        
        # Roll back 3rd Diff
        if third_diff:
            df_fc[str(col)+'_2d'] = (df_train[col].iloc[-2]-df_train[col].iloc[-3]) + df_fc[str(col)+'_3d'].cumsum()
        # Roll back 2nd Diff
        if second_diff:
            df_fc[str(col)+'_1d'] = (df_train[col].iloc[-1]-df_train[col].iloc[-2]) + df_fc[str(col)+'_2d'].cumsum()
        # Roll back 1st Diff
        df_fc[str(col)+'_forecast'] = df_train[col].iloc[-1] + df_fc[str(col)+'_1d'].cumsum()
    return df_fc

In [42]:
df_results = invert_transformation(df_train, df_forecast, second_diff=True, third_diff=False)        
df_results.loc[:, ['TotalHospitalizedPatients_forecast', 'HomeConfinement_forecast',
                                                  'Recovered_forecast', 'Deaths_forecast', 'TestsPerformed_forecast']]

In [43]:
df_results

In [46]:
df_results['Date'] = var_df['Date'][215:287]
df_test.set_index('Date',inplace=True)

fig, axes = plt.subplots(nrows=1, ncols=5, figsize=(22,6))

for col, ax in zip(['TotalHospitalizedPatients', 'HomeConfinement',
                                                  'Recovered', 'Deaths', 'TestsPerformed'], axes):

    df_results.plot(kind='line', y=[col+'_forecast'], x='Date', ax=ax, alpha=0.5, color='r', legend=True).autoscale(axis='x',tight=True)
    df_test[col][-n_test:].plot(legend=True, ax=ax)
    ax.set_title(col + ": Forecast vs Actuals")
plt.tight_layout()

In [47]:
from statsmodels.tsa.stattools import acf
def forecast_accuracy(forecast, actual):
    mape = np.mean(np.abs(forecast - actual)/np.abs(actual))  # MAPE
    me = np.mean(forecast - actual)             # ME
    mae = np.mean(np.abs(forecast - actual))    # MAE
    mpe = np.mean((forecast - actual)/actual)   # MPE
    rmse = np.mean((forecast - actual)**2)**.5  # RMSE
    corr = np.corrcoef(forecast, actual)[0,1]   # corr
    mins = np.amin(np.hstack([forecast[:,None], 
                              actual[:,None]]), axis=1)
    maxs = np.amax(np.hstack([forecast[:,None], 
                              actual[:,None]]), axis=1)
    minmax = 1 - np.mean(mins/maxs)             # minmax
    return({'mape':mape, 'me':me, 'mae': mae, 
            'mpe': mpe, 'rmse':rmse, 'corr':corr, 'minmax':minmax})

In [48]:
print('Forecast Accuracy of: TotalHospitalizedPatients')
accuracy_prod = forecast_accuracy(df_results['TotalHospitalizedPatients_forecast'].values, df_test['TotalHospitalizedPatients'])
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: HomeConfinement')
accuracy_prod = forecast_accuracy(df_results['HomeConfinement_forecast'].values, df_test['HomeConfinement'])
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: Recovered')
accuracy_prod = forecast_accuracy(df_results['Recovered_forecast'].values, df_test['Recovered'])
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: Deaths')
accuracy_prod = forecast_accuracy(df_results['Deaths_forecast'].values, df_test['Deaths'])
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: TestsPerformed')
accuracy_prod = forecast_accuracy(df_results['TestsPerformed_forecast'].values, df_test['TestsPerformed'])
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))