In [1]:
import numpy as np
import pandas as pd
from scipy import signal
from scipy import stats
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller
from statsmodels.nonparametric.smoothers_lowess import lowess
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import PolynomialFeatures
from sklearn.model_selection import TimeSeriesSplit
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
import itertools
import warnings
warnings.filterwarnings('ignore')

In [2]:
# class TimeSeriesRegression:
#     def clean_series(self, X, Y):
#         """
#         Aligns two time series based on the first valid index.
#         Inputs:
#         X           : yearly time series data
#         Y           : yearly time series data
#         Outputs:
#         tuple       : (X_aligned, Y_aligned)
#         X_aligned   : time series data aligned based on first valid index
#         Y_aligned   : time series data aligned based on first valid index
#         """
#         first_index = max(X.first_valid_index(), Y.first_valid_index()) 
#         last_index = min(X.last_valid_index(), Y.last_valid_index())
#         return X[first_index:last_index], Y[first_index:last_index]

#     def max_lag(self, X, Y, max_lag_years=10):
#         """
#         Finds the lag that maximizes correlation between two time series.
#         Inputs:
#         X             : yearly time series data
#         Y             : yearly time series data
#         max_lag_years : optional integer that specifies 
#                         maximum number of years to check for lag
#         Outputs:
#         tuple           : (optimal_lag, max_correlation)
#         optimal_lag     : maximizes the correlation of the 2 series
#         max_correlation : the value of the correlation
#         """
#         X = np.array(X)
#         Y = np.array(Y)
        
#         # Convert to standard normal
#         X_norm = (X - np.mean(X)) / np.std(X)
#         Y_norm = (Y - np.mean(Y)) / np.std(Y)
        
#         # Calculate cross-correlation
#         correlations = signal.correlate(Y_norm, X_norm, mode='full')
#         lags = signal.correlation_lags(len(X_norm), len(Y_norm))
        
#         # Only consider positive lags up to max_lag_years
#         valid_indices = (lags >= 0) & (lags <= max_lag_years)
#         valid_correlations = correlations[valid_indices]
#         valid_lags = lags[valid_indices]
        
#         # Find the lag with maximum correlation
#         max_corr_index = np.argmax(valid_correlations)
#         optimal_lag = valid_lags[max_corr_index]
#         max_correlation = valid_correlations[max_corr_index] / len(X)  # Normalize by series length
        
#         return optimal_lag, max_correlation
    
    
#     def linear_regression(self, X, Y):
#         """
#         Finds the lag that maximizes correlation between two time series.
#         Inputs:
#         X             : yearly time series data, regression parameter
#         Y             : yearly time series data, target
#         max_lag_years : optional integer that specifies 
#                         maximum number of years to check for lag
#         Outputs:
#         dict           : {
#             'best lag': optimal lag value,
#             'prediction for next year': predicted value for Y in the next period,    
#             'dataset correlation': correlation between lagged series,
#             'R value': R-squared value of the regression
#         }
#         """
#         # Clean series
#         X_aligned, Y_aligned = self.clean_series(X, Y)

#         # Find optimal lag
#         best_lag, correlation = self.max_lag(X, Y)
        
#         # Align series based on optimal lag
#         if best_lag > 0:
#             X_aligned = X[:-best_lag]
#             Y_aligned = Y[best_lag:]
#         else:
#             X_aligned = X
#             Y_aligned = Y
        
#         # Ensure equal lengths
#         min_length = min(len(X_aligned), len(Y_aligned))
#         X_aligned = X_aligned[:min_length]
#         Y_aligned = Y_aligned[:min_length]
        
#         # Perform linear regression
#         X_with_const = add_constant(X_aligned)

#         model = OLS(Y_aligned, X_with_const).fit()
        
#         # Calculate prediction for next year
#         last_X = X[-1]
#         next_year_pred = model.params[0] + model.params[1] * last_X
        
#         # Calculate dataset correlation
#         dataset_corr = stats.pearsonr(X_aligned, Y_aligned)[0]
        
#         return {
#             "lag": best_lag,
#             "prediction": next_year_pred,
#             "correlation": dataset_corr,
#             "R": model.rsquared
#         }

In [3]:
# Version 2

class TimeSeriesRegression:
    def cross_validate(self, X, Y, k_folds=5):
        """
        Performs k-fold cross validation on the time series regression,
        cross validation splits are done with TimeSeriesSplit to maintin time ordering
        Inputs:
        X               : yearly time series data, regression parameter
        Y               : yearly time series data, target
        k_folds         : number of folds for cross validation
        
        Outputs:
        dict            : {
            'lag'                   : optimal lag value,
            'prediction'            : predicted value for Y in the next period,
            'correlation'           : correlation between lagged series,
            'R'                     : R-squared value of the full model,
            'Cross Validation Score': mean R-squared across validation folds,
            'Cross Validation Error': standard deviation of R-squared across folds
        }
        """
        # Ensure we're working with pandas Series
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
        
        # Find optimal lag using full dataset
        best_lag, correlation = self.max_lag(X, Y)
        
        # Align series based on optimal lag
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        # Initialize TimeSeriesSplit
        tscv = TimeSeriesSplit(n_splits=k_folds)
        
        # Storage for validation scores
        val_scores = []
        
        # Perform cross validation
        for train_idx, val_idx in tscv.split(X_aligned):
            # Split the data
            X_train = X_aligned.iloc[train_idx]
            Y_train = Y_aligned.iloc[train_idx]
            X_val = X_aligned.iloc[val_idx]
            Y_val = Y_aligned.iloc[val_idx]
            
            # Add constant to training data
            X_train_const = add_constant(X_train)
            X_val_const = add_constant(X_val)
            
            # Fit model on training data
            model = OLS(Y_train, X_train_const).fit()
            
            # Predict on validation set
            Y_pred = model.predict(X_val_const)
            
            # Calculate RÂ² for this fold
            r2 = r2_score(Y_val, Y_pred)
            val_scores.append(r2)
        
        # Fit final model on full dataset
        X_full_const = add_constant(X_aligned)
        final_model = OLS(Y_aligned, X_full_const).fit()
        
        # Find the most recent X value (2024 or latest available)
        current_year = 2024
        if current_year not in X.index:
            current_year = X.index.max()
        
        # Use the most recent X value for prediction
        current_X = X[current_year]
        next_year_pred = final_model.params[0] + final_model.params[1] * current_X
        
        # Calculate dataset correlation
        dataset_corr = stats.pearsonr(X_aligned, Y_aligned)[0]
        
        return {
            "Lag": best_lag,
            "Prediction": next_year_pred,
            "Correlation": dataset_corr,
            "R": final_model.rsquared,
            "Cross Validation Score": np.mean(val_scores),
            "Cross Validation Error": np.std(val_scores)
        }
    
    def clean_series(self, X, Y):
        """
        Aligns two time series based on the first valid index.
        Inputs:
        X           : yearly time series data
        Y           : yearly time series data
        Outputs:
        tuple       : (X_aligned, Y_aligned)
        X_aligned   : time series data aligned based on first valid index
        Y_aligned   : time series data aligned based on first valid index
        """
        # Ensure we're working with pandas Series
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
            
        # Get the overlapping time period
        first_index = max(X.first_valid_index(), Y.first_valid_index())
        last_index = min(X.last_valid_index(), Y.last_valid_index())
        
        return X[first_index:last_index], Y[first_index:last_index]

    def align_with_lag(self, X, Y, lag):
        """
        Aligns two time series based on a given lag value.
        Inputs:
        X    : yearly time series data
        Y    : yearly time series data
        lag  : integer lag value to shift X backwards
        Outputs:
        tuple: (X_aligned, Y_aligned) where X is shifted back by lag periods
        """
        # Ensure we're working with pandas Series
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
        
        # Get Y's time range
        y_start = Y.first_valid_index()
        y_end = Y.last_valid_index()
        
        # Select X values from (y_start - lag) to (y_end - lag)
        X_lagged = X.loc[y_start - lag:y_end - lag]
        
        # Ensure the indices align properly
        X_aligned = X_lagged.reset_index(drop=True)
        Y_aligned = Y.loc[y_start:y_end].reset_index(drop=True)
        
        return X_aligned, Y_aligned

    def max_lag(self, X, Y, max_lag_years=10):
        """
        Finds the lag that maximizes correlation between two time series.
        Inputs:
        X             : yearly time series data
        Y             : yearly time series data
        max_lag_years : optional integer that specifies 
                        maximum number of years to check for lag
        Outputs:
        tuple           : (optimal_lag, max_correlation)
        optimal_lag     : maximizes the correlation of the 2 series
        max_correlation : the value of the correlation
        """
        # Get the clean, aligned series first
        X_clean, Y_clean = self.clean_series(X, Y)
        
        # Convert to numpy arrays for correlation calculation
        X_arr = np.array(X_clean)
        Y_arr = np.array(Y_clean)
        
        # Convert to standard normal
        X_norm = (X_arr - np.mean(X_arr)) / np.std(X_arr)
        Y_norm = (Y_arr - np.mean(Y_arr)) / np.std(Y_arr)
        
        # Calculate cross-correlation
        correlations = signal.correlate(Y_norm, X_norm, mode='full')
        lags = signal.correlation_lags(len(X_norm), len(Y_norm))
        
        # Only consider positive lags up to max_lag_years
        valid_indices = (lags >= 0) & (lags <= max_lag_years)
        valid_correlations = correlations[valid_indices]
        valid_lags = lags[valid_indices]
        
        # Find the lag with maximum correlation
        max_corr_index = np.argmax(valid_correlations)
        optimal_lag = valid_lags[max_corr_index]
        max_correlation = valid_correlations[max_corr_index] / len(X_arr)  # Normalize by series length
        
        return optimal_lag, max_correlation
    
    def linear_regression(self, X, Y):
        """
        Finds the lag that maximizes correlation between two time series.
        Inputs:
        X             : yearly time series data, regression parameter
        Y             : yearly time series data, target
        max_lag_years : optional integer that specifies 
                        maximum number of years to check for lag
        Outputs:
        dict           : {
            'best lag': optimal lag value,
            'prediction for next year': predicted value for Y in the next period,    
            'dataset correlation': correlation between lagged series,
            'R value': R-squared value of the regression
        }
        """
        # Find optimal lag
        best_lag, correlation = self.max_lag(X, Y)
        
        # Align series based on optimal lag
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        # Perform linear regression
        X_with_const = add_constant(X_aligned)
        model = OLS(Y_aligned, X_with_const).fit()
        
        # Calculate prediction for next year
        last_X = X.iloc[-1]  # Use last available X value
        next_year_pred = model.params[0] + model.params[1] * last_X
        
        # Calculate dataset correlation
        dataset_corr = stats.pearsonr(X_aligned, Y_aligned)[0]
        
        return {
            "lag": best_lag,
            "prediction": next_year_pred,
            "correlation": dataset_corr,
            "R": model.rsquared
        }

In [5]:
class TimeSeriesRegression:
    def clean_series(self, X, Y):
        """
        Aligns two time series based on the first valid index.
        Inputs:
        X           : yearly time series data
        Y           : yearly time series data
        Outputs:
        tuple       : (X_aligned, Y_aligned)
        X_aligned   : time series data aligned based on first valid index
        Y_aligned   : time series data aligned based on first valid index
        """
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
            
        first_index = max(X.first_valid_index(), Y.first_valid_index())
        last_index = min(X.last_valid_index(), Y.last_valid_index())
        
        return X[first_index:last_index], Y[first_index:last_index]

    def align_with_lag(self, X, Y, lag):
        """
        Aligns two time series based on a given lag value.
        Inputs:
        X    : yearly time series data
        Y    : yearly time series data
        lag  : integer lag value to shift X backwards
        Outputs:
        tuple: (X_aligned, Y_aligned) where X is shifted back by lag periods
        """
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
        
        y_start = Y.first_valid_index()
        y_end = Y.last_valid_index()
        
        X_lagged = X.loc[y_start - lag:y_end - lag]
        
        X_aligned = X_lagged.reset_index(drop=True)
        Y_aligned = Y.loc[y_start:y_end].reset_index(drop=True)
        
        return X_aligned, Y_aligned

    def max_lag(self, X, Y, max_lag_years=10):
        """
        Finds the lag that maximizes correlation between two time series.
        Inputs:
        X             : yearly time series data
        Y             : yearly time series data
        max_lag_years : optional integer that specifies 
                        maximum number of years to check for lag
        Outputs:
        tuple           : (optimal_lag, max_correlation)
        optimal_lag     : maximizes the correlation of the 2 series
        max_correlation : the value of the correlation
        """
        X_clean, Y_clean = self.clean_series(X, Y)
        
        X_arr = np.array(X_clean)
        Y_arr = np.array(Y_clean)
        
        X_norm = (X_arr - np.mean(X_arr)) / np.std(X_arr)
        Y_norm = (Y_arr - np.mean(Y_arr)) / np.std(Y_arr)
        
        correlations = signal.correlate(Y_norm, X_norm, mode='full')
        lags = signal.correlation_lags(len(X_norm), len(Y_norm))
        
        valid_indices = (lags >= 0) & (lags <= max_lag_years)
        valid_correlations = correlations[valid_indices]
        valid_lags = lags[valid_indices]
        
        max_corr_idx = np.argmax(valid_correlations)
        optimal_lag = valid_lags[max_corr_idx]
        max_correlation = valid_correlations[max_corr_idx] / len(X_arr)
        
        return optimal_lag, max_correlation

    def _do_cross_validation(self, X, Y, model_func, params, k_folds=5):
        """
        Helper function to perform cross validation for any model
        Inputs:
        X           : aligned X data for validation
        Y           : aligned Y data for validation
        model_func  : model fitting function
        params      : dictionary of model parameters
        k_folds     : number of folds for CV
        
        Outputs:
        tuple       : (cv_score, cv_error)
        """
        tscv = TimeSeriesSplit(n_splits=k_folds)
        val_scores = []
        
        for train_idx, val_idx in tscv.split(X):
            X_train = X.iloc[train_idx]
            Y_train = Y.iloc[train_idx]
            X_val = X.iloc[val_idx]
            Y_val = Y.iloc[val_idx]
            
            model = model_func(X_train, Y_train, **params)
            Y_pred = model.predict(X_val) if hasattr(model, 'predict') else model.forecast(steps=len(val_idx), exog=X_val)
            val_scores.append(r2_score(Y_val, Y_pred))
        
        return np.mean(val_scores), np.std(val_scores)

    def linear_regression(self, X, Y, do_cv=True, k_folds=5):
        """
        Performs linear regression with optional cross validation.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        do_cv   : whether to perform cross validation
        k_folds : number of folds if doing cross validation
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        X_const = add_constant(X_aligned)
        model = OLS(Y_aligned, X_const).fit()
        
        current_X = X[2024 if 2024 in X.index else X.index.max()]
        next_year_pred = model.params[0] + model.params[1] * current_X
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': model.fittedvalues
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "r2": model.rsquared,
            "plot_data": plot_data
        }
        
        if do_cv:
            cv_score, cv_error = self._do_cross_validation(
                X_aligned, Y_aligned,
                OLS, {'endog': Y_aligned},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results
    
    def arima_regression(self, X, Y, max_p=3, max_d=2, max_q=3, do_cv=True, k_folds=5):
        """
        Performs ARIMA regression with parameters optimized by AIC.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        max_p   : maximum AR order to test
        max_d   : maximum difference order to test
        max_q   : maximum MA order to test
        do_cv   : whether to perform cross validation
        k_folds : number of folds if doing cross validation
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'aic'       : model AIC score,
            'rmse'      : root mean square error,
            'params'    : optimal (p,d,q) parameters,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        best_aic = float('inf')
        best_model = None
        best_params = None
        d_min = 0 if adfuller(Y_aligned)[1] < 0.05 else 1
        
        for p, d, q in itertools.product(
            range(max_p + 1),
            range(d_min, max_d + 1),
            range(max_q + 1)
        ):
            try:
                model = ARIMA(Y_aligned, exog=X_aligned, order=(p,d,q)).fit()
                if model.aic < best_aic:
                    best_aic = model.aic
                    best_model = model
                    best_params = (p,d,q)
            except:
                continue
        
        current_X = X[2024 if 2024 in X.index else X.index.max()]
        next_year_pred = best_model.forecast(steps=1, exog=[current_X])[0]
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': best_model.fittedvalues
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "aic": best_aic,
            "rmse": np.sqrt(mean_squared_error(Y_aligned, best_model.fittedvalues)),
            "params": best_params,
            "plot_data": plot_data
        }
        
        if do_cv:
            cv_score, cv_error = self._do_cross_validation(
                X_aligned, Y_aligned,
                lambda x, y: ARIMA(y, exog=x, order=best_params).fit(),
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results

    def polynomial_regression(self, X, Y, degree=2, do_cv=True, k_folds=5):
        """
        Performs polynomial regression.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        degree  : polynomial degree
        do_cv   : whether to perform cross validation
        k_folds : number of folds for CV
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'rmse'      : root mean square error,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        poly = PolynomialFeatures(degree)
        X_poly = poly.fit_transform(X_aligned.values.reshape(-1, 1))
        model = OLS(Y_aligned, X_poly).fit()
        
        current_X = X[2024 if 2024 in X.index else X.index.max()]
        next_year_pred = model.predict(poly.transform([[current_X]]))[0]
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': model.fittedvalues
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "r2": model.rsquared,
            "rmse": np.sqrt(mean_squared_error(Y_aligned, model.fittedvalues)),
            "plot_data": plot_data
        }
        
        if do_cv:
            cv_score, cv_error = self._do_cross_validation(
                X_aligned, Y_aligned,
                lambda x, y: OLS(y, poly.fit_transform(x.values.reshape(-1, 1))).fit(),
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results
    
    def lowess_regression(self, X, Y, frac=0.3, do_cv=True, k_folds=5):
        """
        Performs LOWESS regression.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        frac    : smoothing fraction
        do_cv   : whether to perform cross validation
        k_folds : number of folds for CV
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'rmse'      : root mean square error,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        # Find optimal lag and align series
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        # Fit LOWESS model
        smoothed = lowess(
            Y_aligned,
            X_aligned,
            frac=frac,
            return_sorted=True
        )
        
        # Get prediction
        current_year = 2024 if 2024 in X.index else X.index.max()
        current_X = X[current_year]
        next_year_pred = np.interp(current_X, smoothed[:, 0], smoothed[:, 1])
        
        # Create plot data
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': smoothed[:, 1]
        })
        
        # Calculate metrics
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "r2": r2_score(Y_aligned, smoothed[:, 1]),
            "rmse": np.sqrt(mean_squared_error(Y_aligned, smoothed[:, 1])),
            "plot_data": plot_data
        }
        
        # Add cross validation if requested
        if do_cv:
            def lowess_model(x, y):
                return lambda z: np.interp(
                    z, 
                    *lowess(y, x, frac=frac, return_sorted=True).T
                )
            
            cv_score, cv_error = self._do_cross_validation(
                X_aligned,
                Y_aligned,
                lowess_model,
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results
    
    def gaussian_process_regression(self, X, Y, length_scale=1.0, do_cv=True, k_folds=5):
        """
        Performs Gaussian Process Regression with RBF kernel.
        Inputs:
        X             : yearly time series data
        Y             : yearly time series data
        length_scale  : RBF kernel length scale parameter
        do_cv         : whether to perform cross validation
        k_folds       : number of folds for CV
        
        Outputs:
        dict          : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'rmse'      : root mean square error,
            'std'       : prediction standard deviation,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        # Find optimal lag and align series
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        # Define kernel and initialize model
        kernel = RBF(length_scale=length_scale) + WhiteKernel(noise_level=0.1)
        gpr = GaussianProcessRegressor(
            kernel=kernel,
            random_state=0,
            n_restarts_optimizer=5
        )
        
        # Fit model
        X_fit = X_aligned.values.reshape(-1, 1)
        gpr.fit(X_fit, Y_aligned)
        
        # Get prediction with uncertainty
        current_year = 2024 if 2024 in X.index else X.index.max()
        current_X = np.array([[X[current_year]]])
        next_year_pred, next_year_std = gpr.predict(current_X, return_std=True)
        
        # Get predictions for plot data
        Y_pred, Y_std = gpr.predict(X_fit, return_std=True)
        
        # Create plot data
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': Y_pred,
            'Y_std': Y_std
        })
        
        # Calculate metrics
        results = {
            "lag": best_lag,
            "prediction": next_year_pred[0],
            "r2": r2_score(Y_aligned, Y_pred),
            "rmse": np.sqrt(mean_squared_error(Y_aligned, Y_pred)),
            "std": next_year_std[0],
            "plot_data": plot_data
        }
        
        # Add cross validation if requested
        if do_cv:
            def gpr_model(x, y):
                x_reshape = x.values.reshape(-1, 1)
                model = GaussianProcessRegressor(
                    kernel=kernel,
                    random_state=0,
                    n_restarts_optimizer=5
                )
                model.fit(x_reshape, y)
                return model
            
            cv_score, cv_error = self._do_cross_validation(
                X_aligned,
                Y_aligned,
                gpr_model,
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results

In [None]:
class TimeSeriesRegression:
    def clean_series(self, X, Y):
        """
        Aligns two time series based on the first valid index.
        Inputs:
        X           : yearly time series data
        Y           : yearly time series data
        Outputs:
        tuple       : (X_aligned, Y_aligned)
        X_aligned   : time series data aligned based on first valid index
        Y_aligned   : time series data aligned based on first valid index
        """
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
            
        first_index = max(X.first_valid_index(), Y.first_valid_index())
        last_index = min(X.last_valid_index(), Y.last_valid_index())
        
        return X[first_index:last_index], Y[first_index:last_index]

    def align_with_lag(self, X, Y, lag):
        """
        Aligns two time series based on a given lag value.
        Inputs:
        X    : yearly time series data
        Y    : yearly time series data
        lag  : integer lag value to shift X backwards
        Outputs:
        tuple: (X_aligned, Y_aligned) where X is shifted back by lag periods
        """
        if not isinstance(X, pd.Series):
            X = pd.Series(X)
        if not isinstance(Y, pd.Series):
            Y = pd.Series(Y)
        
        y_start = Y.first_valid_index()
        y_end = Y.last_valid_index()
        
        X_lagged = X.loc[y_start - lag:y_end - lag]
        
        X_aligned = X_lagged.reset_index(drop=True)
        Y_aligned = Y.loc[y_start:y_end].reset_index(drop=True)
        
        return X_aligned, Y_aligned

    def max_lag(self, X, Y, max_lag_years=10):
        """
        Finds the lag that maximizes correlation between two time series.
        Inputs:
        X             : yearly time series data
        Y             : yearly time series data
        max_lag_years : optional integer that specifies 
                        maximum number of years to check for lag
        Outputs:
        tuple           : (optimal_lag, max_correlation)
        optimal_lag     : maximizes the correlation of the 2 series
        max_correlation : the value of the correlation
        """
        X_clean, Y_clean = self.clean_series(X, Y)
        
        X_arr = np.array(X_clean)
        Y_arr = np.array(Y_clean)
        
        X_norm = (X_arr - np.mean(X_arr)) / np.std(X_arr)
        Y_norm = (Y_arr - np.mean(Y_arr)) / np.std(Y_arr)
        
        correlations = signal.correlate(Y_norm, X_norm, mode='full')
        lags = signal.correlation_lags(len(X_norm), len(Y_norm))
        
        valid_indices = (lags >= 0) & (lags <= max_lag_years)
        valid_correlations = correlations[valid_indices]
        valid_lags = lags[valid_indices]
        
        max_corr_idx = np.argmax(valid_correlations)
        optimal_lag = valid_lags[max_corr_idx]
        max_correlation = valid_correlations[max_corr_idx] / len(X_arr)
        
        return optimal_lag, max_correlation

    def _do_cross_validation(self, X, Y, model_func, params, k_folds=5):
        """
        Helper function to perform cross validation for any model
        Inputs:
        X           : aligned X data for validation
        Y           : aligned Y data for validation
        model_func  : model fitting function
        params      : dictionary of model parameters
        k_folds     : number of folds for CV
        
        Outputs:
        tuple       : (cv_score, cv_error)
        """
        tscv = TimeSeriesSplit(n_splits=k_folds)
        val_scores = []
        
        for train_idx, val_idx in tscv.split(X):
            X_train = X.iloc[train_idx]
            Y_train = Y.iloc[train_idx]
            X_val = X.iloc[val_idx]
            Y_val = Y.iloc[val_idx]
            
            model = model_func(**params, X=X_train, Y=Y_train)
            Y_pred = model.predict(X_val) if hasattr(model, 'predict') else model.forecast(steps=len(val_idx), exog=X_val)
            val_scores.append(r2_score(Y_val, Y_pred))
        
        return np.mean(val_scores), np.std(val_scores)

    def linear_regression(self, X, Y, do_cv=True, k_folds=5):
        """
        Performs linear regression with optional cross validation.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        do_cv   : whether to perform cross validation
        k_folds : number of folds if doing cross validation
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        X_const = add_constant(X_aligned)
        model = OLS(Y_aligned, X_const).fit()
        
        current_X = X[2024 if 2024 in X.index else X.index.max()]
        next_year_pred = model.params[0] + model.params[1] * current_X
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': model.fittedvalues
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "r2": model.rsquared,
            "plot_data": plot_data
        }
        
        if do_cv:
            def ols_model(X, Y):
                X_const = add_constant(X)
                return OLS(Y, X_const).fit()
            
            cv_score, cv_error = self._do_cross_validation(
                X_aligned, Y_aligned,
                ols_model,
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results

    def polynomial_regression(self, X, Y, degree=2, do_cv=True, k_folds=5):
        """
        Performs polynomial regression.
        Inputs:
        X       : yearly time series data
        Y       : yearly time series data
        degree  : polynomial degree
        do_cv   : whether to perform cross validation
        k_folds : number of folds for CV
        
        Outputs:
        dict    : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'rmse'      : root mean square error,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        X_array = X_aligned.values.reshape(-1, 1)
        poly = PolynomialFeatures(degree)
        X_poly = poly.fit_transform(X_array)
        model = OLS(Y_aligned, X_poly).fit()
        
        current_X = X[2024 if 2024 in X.index else X.index.max()]
        next_year_pred = model.predict(poly.transform([[current_X]]))[0]
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': model.fittedvalues
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred,
            "r2": model.rsquared,
            "rmse": np.sqrt(mean_squared_error(Y_aligned, model.fittedvalues)),
            "plot_data": plot_data
        }
        
        if do_cv:
            def poly_model(X, Y):
                X_array = X.values.reshape(-1, 1)
                X_poly = poly.fit_transform(X_array)
                return OLS(Y, X_poly).fit()
            
            cv_score, cv_error = self._do_cross_validation(
                X_aligned, Y_aligned,
                poly_model,
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results
    
    def gaussian_process_regression(self, X, Y, length_scale=1.0, do_cv=True, k_folds=5):
        """
        Performs Gaussian Process Regression with RBF kernel.
        Inputs:
        X             : yearly time series data
        Y             : yearly time series data
        length_scale  : RBF kernel length scale parameter
        do_cv         : whether to perform cross validation
        k_folds       : number of folds for CV
        
        Outputs:
        dict          : {
            'lag'        : optimal lag value,
            'prediction' : predicted value for next period,
            'r2'        : R-squared value,
            'rmse'      : root mean square error,
            'std'       : prediction standard deviation,
            'plot_data' : pandas DataFrame with X, Y_pred, Y_data columns,
            'cv_score'  : mean validation score (if do_cv=True),
            'cv_error'  : std of validation scores (if do_cv=True)
        }
        """
        best_lag, _ = self.max_lag(X, Y)
        X_aligned, Y_aligned = self.align_with_lag(X, Y, best_lag)
        
        # Reshape X data properly for GPR
        X_fit = X_aligned.values.reshape(-1, 1)
        
        kernel = RBF(length_scale=length_scale) + WhiteKernel(noise_level=0.1)
        gpr = GaussianProcessRegressor(
            kernel=kernel,
            random_state=0,
            n_restarts_optimizer=5
        )
        
        gpr.fit(X_fit, Y_aligned)
        
        current_year = 2024 if 2024 in X.index else X.index.max()
        current_X = np.array([[X[current_year]]])
        next_year_pred, next_year_std = gpr.predict(current_X, return_std=True)
        
        Y_pred, Y_std = gpr.predict(X_fit, return_std=True)
        
        plot_data = pd.DataFrame({
            'X': X_aligned,
            'Y_data': Y_aligned,
            'Y_pred': Y_pred,
            'Y_std': Y_std
        })
        
        results = {
            "lag": best_lag,
            "prediction": next_year_pred[0],
            "r2": r2_score(Y_aligned, Y_pred),
            "rmse": np.sqrt(mean_squared_error(Y_aligned, Y_pred)),
            "std": next_year_std[0],
            "plot_data": plot_data
        }
        
        if do_cv:
            def gpr_model(X, Y):
                x_reshape = X.values.reshape(-1, 1)
                model = GaussianProcessRegressor(
                    kernel=kernel,
                    random_state=0,
                    n_restarts_optimizer=5
                )
                model.fit(x_reshape, Y)
                return model
            
            cv_score, cv_error = self._do_cross_validation(
                X_aligned,
                Y_aligned,
                gpr_model,
                {},
                k_folds
            )
            results.update({
                'cv_score': cv_score,
                'cv_error': cv_error
            })
        
        return results
    

    #Add in gradient boosted method lightGBM for time series prediction

In [13]:
tsr = TimeSeriesRegression()
X = pd.read_csv('../../data-collection/data_interpolated.csv')['GDP']
Y = pd.read_csv('../../data-collection/data_interpolated.csv')['faculty']

len(X), len(Y)

(112, 112)

In [15]:
tsr.polynomial_regression(X, Y)

ValueError: shapes (1,8) and (3,) not aligned: 8 (dim 1) != 3 (dim 0)