
<div style="width: 100%; max-width: 100%; background-color: #fefefe; border: 1px solid #333; border-radius: 10px; padding: 20px; font-family: Arial, sans-serif; box-sizing: border-box;">
  <h3 style="color: #2c3e50; text-align: center;">VAR(p) Model (work in progress)</h2>
  
  <p style="color: #34495e; line-height: 1.6;">
    The VAR model is highly sensitive to data fluctuations, and even attempts at normalization were ineffective. I suspect this is due to overfitting. Given its poor performance, it will not be displayed in Streamlit.
  <p style="color: #34495e; line-height: 1.6;">
    The VAR model follows a structure similar to the AR model, but with a sliding Test Step that adjusts the training set size and retrains the model. This allows observation of how the training-to-test ratio affects model performance.
  
</div>

In [1]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from ipywidgets import interact, Dropdown, IntSlider, Layout
import warnings
from functools import reduce

warnings.filterwarnings('ignore')

# 1. Load data
data_full = pd.read_csv('data_imputation_full.csv', index_col=[0, 1])

selected_features_df = pd.read_csv('selected_features_per_country_elastic_net.csv')
countries = selected_features_df['Country'].unique()

# Transform selected_features_df to long format
selected_features_long = selected_features_df.melt(id_vars='Country', value_name='Feature').dropna()

# 2. Prepare data for VAR model training
def prepare_country_data(country):
    """
    Parameters:
    - country: country name

    Returns:
    - data: DataFrame containing the selected features for the given country
    """
    # Get the selected features for the given country
    features = selected_features_long[selected_features_long['Country'] == country]['Feature'].tolist()

    if 'Economics: GDP' not in features:
        features.append('Economics: GDP')

    data_frames = []
    for feature in features:
        try:
            feature_data = data_full.xs(feature, level=1)
            country_feature_data = feature_data.loc[feature_data.index == country]
            # Transpose the data to have years as index
            country_feature_data = country_feature_data.T
            # Rename the column to the feature name
            country_feature_data.columns = [feature]
            # Add the feature data to the list
            data_frames.append(country_feature_data)
        except Exception as e:
            print(f"{country}: Error preparing data for feature '{feature}': {e}")
            continue

    if not data_frames:
        print(f"{country}: No data available for the selected features.")
        return None

    # Merge the data frames
    data = reduce(lambda left, right: pd.merge(left, right, left_index=True, right_index=True, how='outer'), data_frames)
    # Convert the index to integer type
    data.index = data.index.astype(int)
    data = data.sort_index()

    # Convert index to DatetimeIndex
    data.index = pd.to_datetime(data.index.astype(str), format='%Y')

    return data

country_data_dict = {}

for country in countries:
    try:
        data = prepare_country_data(country)
        if data is not None:
            country_data_dict[country] = data
    except Exception as e:
        print(f"Error preparing data for {country}: {e}")

# Check if there is any country data available for further analysis
if not country_data_dict:
    print("No data available for any country.")
else:
    # Get the list of countries that have data available
    countries = list(country_data_dict.keys())

    # 3. Define a function to get the best lag order p for each country
    def get_best_p(country, max_p=10):
        try:
            data = country_data_dict[country]
            min_train_size = max_p + 1

            if len(data) < min_train_size:
                print(f"{country}: Not enough data to train VAR model (requires at least {min_train_size} data points).")
                return {'Country': country, 'Best_p': None, 'MSE': None}

            # Divide the data into training and testing sets (70% training, 30% testing)
            train_size = int(len(data) * 0.7)
            train_data = data.iloc[:train_size]
            test_data = data.iloc[train_size:]

            possible_p = range(1, min(max_p, train_size - 1) + 1)

            mse_dict = {}

            for p in possible_p:
                try:
                    model = VAR(train_data)
                    results = model.fit(p)

                    # Predict on the test set
                    lag_order = results.k_ar
                    forecast_input = train_data.values[-lag_order:]
                    forecast = results.forecast(y=forecast_input, steps=len(test_data))
                    forecast_df = pd.DataFrame(forecast, index=test_data.index, columns=test_data.columns)

                    # Determine the target variable column name
                    target_col = 'Economics: GDP' if 'Economics: GDP' in data.columns else 'GDP'

                    y_true = test_data[target_col]
                    y_pred = forecast_df[target_col]

                    mse = mean_squared_error(y_true, y_pred)
                    mse_dict[p] = mse
                except Exception as e:
                    # If an error occurs, skip this p value
                    continue

            if not mse_dict:
                print(f"{country}: No suitable p value found.")
                return {'Country': country, 'Best_p': None, 'MSE': None}

            # Get the best p value with the minimum MSE
            best_p = min(mse_dict, key=mse_dict.get)
            best_mse = mse_dict[best_p]

            return {'Country': country, 'Best_p': best_p, 'MSE': best_mse}
        except Exception as e:
            print(f"{country}: Error getting best p value: {e}")
            return {'Country': country, 'Best_p': None, 'MSE': None}

    best_p_results = []

    for country in countries:
        result = get_best_p(country, max_p=10)
        best_p_results.append(result)

    best_p_df = pd.DataFrame(best_p_results)

    # Filter out countries with no best p value
    best_p_df = best_p_df.dropna(subset=['Best_p'])
    best_p_df['Best_p'] = best_p_df['Best_p'].astype(int)
    countries = best_p_df['Country'].tolist()

    # 4. Define a function to process each country and get the future GDP forecast
    def process_country(country):
        try:
            data = country_data_dict[country]

            # Ensure the index is a DatetimeIndex
            if not isinstance(data.index, pd.DatetimeIndex):
                data.index = pd.to_datetime(data.index.astype(str), format='%Y')

            # Get the best lag order p
            best_p_row = best_p_df[best_p_df['Country'] == country]
            if best_p_row.empty or pd.isnull(best_p_row['Best_p'].values[0]):
                print(f"{country}: Best p value not found, skipping.")
                return None
            best_p = int(best_p_row['Best_p'].values[0])

            # Determine the target variable column name
            target_col = 'Economics: GDP' if 'Economics: GDP' in data.columns else 'GDP'

            # Train the VAR model (using all data)
            model = VAR(data)
            model_fitted = model.fit(best_p)

            # Forecast future GDP values (10 years)
            forecast_steps = 10
            lag_order = model_fitted.k_ar
            forecast_input = data.values[-lag_order:]
            future_forecast = model_fitted.forecast(y=forecast_input, steps=forecast_steps)

            # Create the index for future years
            last_year = data.index[-1].year
            future_years = pd.date_range(start=pd.Timestamp(last_year + 1, 1, 1), periods=forecast_steps, freq='YS')
            future_forecast_df = pd.DataFrame(future_forecast, index=future_years, columns=data.columns)

            # Extract the forecasted GDP values
            gdp_forecast = future_forecast_df[[target_col]].reset_index()
            gdp_forecast.columns = ['Year', 'GDP_Forecast']
            gdp_forecast['Country'] = country
            gdp_forecast['Year'] = gdp_forecast['Year'].dt.year
            gdp_forecast = gdp_forecast[['Country', 'Year', 'GDP_Forecast']]

            # Save results for plotting
            result = {
                'Country': country,
                'Best_p': best_p,
                'Model': model_fitted,
                'Data': data,
                'Target_Col': target_col,
                'GDP_Forecast': gdp_forecast
            }
            return result

        except Exception as e:
            print(f"{country}: Error processing: {e}")
            return None

    results = []

    for country in countries:
        res = process_country(country)
        if res is not None:
            results.append(res)

    # Combine future GDP forecasts for all countries
    future_forecasts_df = pd.concat([res['GDP_Forecast'] for res in results], ignore_index=True)

    # Save the future GDP forecasts to a CSV file
    future_forecasts_df.to_csv('future_GDP_forecasts.csv', index=False)
    print("Saved as 'future_GDP_forecasts.csv'")

    # 5. Define a function to plot the VAR forecast for each country
    def plot_var_forecast(country, test_steps=10, forecast_steps=5):
        try:
            # Check if the country is in the results
            country_result = next((res for res in results if res['Country'] == country), None)
            if country_result is None:
                print(f"{country}: Not found in the results.")
                return

            data = country_result['Data']
            target_col = country_result['Target_Col']
            best_p = country_result['Best_p']
            model_fitted = country_result['Model']

            # Ensure the index is a DatetimeIndex
            if not isinstance(data.index, pd.DatetimeIndex):
                data.index = pd.to_datetime(data.index.astype(str), format='%Y')

            if len(data) < (best_p + test_steps):
                print(f"{country}: Not enough data to plot (requires at least {best_p + test_steps} data points).")
                return

            # Split the data into training and testing sets
            train_size = len(data) - test_steps
            train_data = data.iloc[:train_size]
            test_data = data.iloc[train_size:]

            # Retrain the VAR model
            model = VAR(train_data)
            model_fitted = model.fit(best_p)

            # Predict on the test set
            lag_order = model_fitted.k_ar
            forecast_input = train_data.values[-lag_order:]
            nobs = len(test_data)
            forecast = model_fitted.forecast(y=forecast_input, steps=nobs)
            forecast_df = pd.DataFrame(forecast, index=test_data.index, columns=test_data.columns)

            # Forecast future values
            future_forecast = model_fitted.forecast(y=data.values[-lag_order:], steps=forecast_steps)
            last_year = data.index[-1].year
            future_years = pd.date_range(start=pd.Timestamp(last_year + 1, 1, 1), periods=forecast_steps, freq='YS')
            future_forecast_df = pd.DataFrame(future_forecast, index=future_years, columns=data.columns)

            # Prepare the plot data
            plt.figure(figsize=(12, 6))
            plt.plot(train_data.index.year, train_data[target_col], marker='o', label='Train Data')
            plt.plot(test_data.index.year, test_data[target_col], marker='o', label='Test Data')
            plt.plot(forecast_df.index.year, forecast_df[target_col], marker='o', linestyle='--', label='VAR Model Predictions')
            plt.plot(future_forecast_df.index.year, future_forecast_df[target_col], marker='o', linestyle='--', label='Future GDP Forecast')
            plt.title(f'{country} GDP with VAR Model Predictions and Forecast')
            plt.xlabel('Year')
            plt.ylabel('GDP')
            plt.legend()
            plt.grid(True)
            plt.show()

            # Calculate the forecast errors
            errors = test_data[target_col].values - forecast_df[target_col].values

            # Plot the errors
            plt.figure(figsize=(12, 6))
            plt.bar(test_data.index.year, errors)
            plt.title(f'{country} Prediction Errors (Test Data - Predictions)')
            plt.xlabel('Year')
            plt.ylabel('Error')
            plt.grid(True)
            plt.show()

        except Exception as e:
            print(f"{country}: Plot failed. Error: {e}\n")

    # Create interactive widgets
    default_country = 'United States'
    default_test_steps = 5
    default_forecast_steps = 5

    country_dropdown = Dropdown(
        options=sorted(countries),
        description='Country:',
        value=default_country,
        style={'description_width': '120px'},
        layout=Layout(width='400px', margin='0 0 0 100px'),
        disabled=False
    )

    test_slider = IntSlider(
        min=1,
        max=10,
        step=1,
        value=default_test_steps,
        description='Test Steps:',
        style={'description_width': '120px'},
        layout=Layout(width='400px', margin='0 0 0 100px')
    )

    forecast_slider = IntSlider(
        min=1,
        max=10,
        step=1,
        value=default_forecast_steps,
        description='Forecast Steps:',
        style={'description_width': '120px'},
        layout=Layout(width='400px', margin='0 0 0 100px')
    )

    interact(
        plot_var_forecast,
        country=country_dropdown,
        test_steps=test_slider,
        forecast_steps=forecast_slider
    )

Saved as 'future_GDP_forecasts.csv'


interactive(children=(Dropdown(description='Country:', index=160, layout=Layout(margin='0 0 0 100px', width='4…