In [15]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.api import VAR
from statsmodels.stats.sandwich_covariance import cov_hac
from joblib import Parallel, delayed
import scipy.stats as stats
from statsmodels.stats import power
from scipy.special import expit
from sklearn.model_selection import GridSearchCV, LeaveOneOut

from nonparametric import *
from basis import *

In [16]:
class scaling:
    """
    A class used to scale data to the range [-1, 1] and inverse scale it back to the original range.
    
    Methods
    -------
    scale_to_minus_one_to_one(column)
        Scales a single column to the range [-1, 1].

    scale_to_minus_one_to_one_df(df)
        Scales all columns in a DataFrame to the range [-1, 1].

    inverse_scale(column, min_val, max_val)
        Inverse scales a single column back to its original range.

    inverse_scale_df(scaled_df)
        Inverse scales all columns in a scaled DataFrame back to their original ranges.
    """
    def __init__(self):
        """
        Initialize the Scaling class with attributes to store scaled data and min-max values.
        """
        # No initialization of variables is strictly needed here, but we define the __init__ method for potential future extensions.
        return None
    
    def scale_to_minus_one_to_one(self, column):
        """
        Scales a single column to the range [-1, 1] using min-max normalization.
        
        Parameters
        ----------
        column : pandas Series
            The column to scale.
        
        Returns
        -------
        scaled_column : pandas Series
            The scaled column with values in the range [-1, 1].
        min_val : float
            The minimum value of the original column.
        max_val : float
            The maximum value of the original column.
        """
        # Find the minimum and maximum values in the column
        min_val = column.min()
        max_val = column.max()
        # Scale the column to the range [-1, 1]
        scaled_column = 2 * (column - min_val) / (max_val - min_val) - 1
        # Return the scaled column along with its original min and max values
        return scaled_column, min_val, max_val
    
    def scale_to_minus_one_to_one_df(self, df):
        """
        Scales all columns in a DataFrame to the range [-1, 1] using min-max normalization.
        
        Parameters
        ----------
        df : pandas DataFrame
            The DataFrame to scale.
        
        Returns
        -------
        self.scaled_df : pandas DataFrame
            A new DataFrame with all columns scaled to the range [-1, 1].
        """
        # Initialize an empty DataFrame to store the scaled data
        self.scaled_df = pd.DataFrame()
        # Initialize a dictionary to store the min and max values for each column
        self.min_max_values = {}
        
        # Iterate through each column in the DataFrame
        for column in df.columns:
            # Scale the column and retrieve its min and max values     
            scaled_column, min_val, max_val = self.scale_to_minus_one_to_one(df[column])
            # Add the scaled column to the new DataFrame
            self.scaled_df[column] = scaled_column
            # Store the min and max values for this column
            self.min_max_values[column] = (min_val, max_val)
            
        # Return the DataFrame with all columns scaled to [-1, 1]   
        return self.scaled_df
    
    def inverse_scale(self, column, min_val, max_val):
        """
        Inverse scales a single column from the range [-1, 1] back to its original range.
        
        Parameters
        ----------
        column : pandas Series
            The scaled column to inverse scale.
        min_val : float
            The minimum value of the original column before scaling.
        max_val : float
            The maximum value of the original column before scaling.
        
        Returns
        -------
        original_column : pandas Series
            The column rescaled back to its original range.
        """
        # Reverse the scaling process to return the column to its original range
        original_column = (column + 1) / 2 * (max_val - min_val) + min_val
        # Return the rescaled column
        return original_column
    
    def inverse_scale_df(self, scaled_df):
        """
        Inverse scales all columns in a DataFrame from the range [-1, 1] back to their original ranges.
        
        Parameters
        ----------
        scaled_df : pandas DataFrame
            The DataFrame with scaled columns to inverse scale.
        
        Returns
        -------
        self.original_df : pandas DataFrame
            A new DataFrame with all columns rescaled back to their original ranges.
        """
        # Initialize an empty DataFrame to store the original data
        self.original_df = pd.DataFrame()
        
        # Iterate through each column in the scaled DataFrame
        for column in scaled_df.columns:
            # Retrieve the original min and max values for this column
            min_val, max_val = self.min_max_values[column]
            # Inverse scale the column and add it to the new DataFrame
            self.original_df[column] = self.inverse_scale(scaled_df[column], min_val, max_val)
            
        # Return the DataFrame with all columns rescaled to their original ranges
        return self.original_df

In [17]:
class scalingDataFrame:
    """
    A class used to scale a DataFrame's entire range to [-1, 1] and inverse scale it back to the original range.
    
    Methods
    -------
    scale_to_minus_one_to_one_df(df)
        Scales all values in a DataFrame to the range [-1, 1].
    
    inverse_scale_df(scaled_df)
        Inverse scales a scaled DataFrame back to its original range.
    """
    
    def __init__(self):
        """
        Initialize the Scaling class with attributes to store the original min and max values for the entire DataFrame.
        """
        # These will store the overall min and max values for the entire DataFrame
        self.min_val = None
        self.max_val = None
    
    def scale_to_minus_one_to_one_df(self, df):
        """
        Scales the entire DataFrame to the range [-1, 1] based on the global min and max of all values in the DataFrame.
        
        Parameters
        ----------
        df : pandas DataFrame
            The DataFrame to scale.
        
        Returns
        -------
        scaled_df : pandas DataFrame
            A new DataFrame with all values scaled to the range [-1, 1].
        """
        # Find the overall min and max values for the entire DataFrame
        self.min_val = df.min().min()
        self.max_val = df.max().max()
        
        # Scale the entire DataFrame to the range [-1, 1]
        scaled_df = 2 * (df - self.min_val) / (self.max_val - self.min_val) - 1
        
        # Return the scaled DataFrame
        return scaled_df
    
    def inverse_scale_df(self, scaled_df):
        """
        Inverse scales a DataFrame from the range [-1, 1] back to its original range.
        
        Parameters
        ----------
        scaled_df : pandas DataFrame
            The DataFrame with scaled values to inverse scale.
        
        Returns
        -------
        original_df : pandas DataFrame
            A new DataFrame with all values rescaled back to their original range.
        """
        # Reverse the scaling process to return the DataFrame to its original range
        original_df = (scaled_df + 1) / 2 * (self.max_val - self.min_val) + self.min_val
        
        # Return the rescaled DataFrame
        return original_df


In [18]:
class CalculateTheta:
    """
    Class to calculate theta values using policy estimation, reward estimation, and basis function expansion.
    
    Parameters
    ----------
    state_df : DataFrame
        DataFrame containing the state values.
    action_df : DataFrame
        DataFrame containing the action values.
    reward_df : DataFrame
        DataFrame containing the reward values.
    next_x_df : DataFrame
        DataFrame containing the next state values.
    """

    def __init__(self, state_df, action_df, reward_df, next_x_df):
        # Initialize dataframes for state, action, reward, and next state
        self.state_df = state_df
        self.action_df = action_df
        self.reward_df = reward_df
        self.next_x_df = next_x_df
        
        # Scale the state and next state dataframes to the range [-1, 1]
        
        state_next_state_df = pd.concat([self.state_df, self.next_x_df], axis=1)
        self.sc = scalingDataFrame()#scaling()
        self.scaled_state_next_state_df = self.sc.scale_to_minus_one_to_one_df(state_next_state_df)
        
        self.scaled_state_df = self.scaled_state_next_state_df.iloc[:,:-1]
        self.scaled_next_x_df = pd.DataFrame(self.scaled_state_next_state_df.iloc[:,-1])
        
        #self.sc_state = scaling()
        #self.scaled_state_df = self.sc_state.scale_to_minus_one_to_one_df(self.state_df)
        
        #self.sc_next_x = scaling()
        #self.scaled_next_x_df = self.sc_next_x.scale_to_minus_one_to_one_df(self.next_x_df)
        
        self.sc_reward = scaling()
        self.scaled_reward_df = self.sc_reward.scale_to_minus_one_to_one_df(self.reward_df)
        
        
        
    def fit(self, order: int = 3, search_interval=np.linspace(0.05, 0.06, 100), cv_k: int = 20):
        """
        Fit the model by searching for optimal bandwidth and computing policy, reward, and basis.

        Parameters
        ----------
        search_interval : array-like, optional
            Interval to search for the optimal bandwidth for kernel density estimation.
        cv_k : int, optional
            Number of folds for cross-validation in the search for the optimal bandwidth.
        order : int, optional
            The order of the Chebyshev basis function expansion.
        
        Returns
        -------
        int : 0
            Always returns 0 after fitting.
        """
        
        # Search for the optimal bandwidth for kernel density estimation
        self.h_x = self.search_optimal_bandwidth(self.scaled_state_df, search_interval, cv_k)
        
        # Estimate policy and compute policy data
        self.est_pi_data_all, self.est_pi_actual = self.estimation_policy(self.h_x, self.scaled_state_df, self.action_df)
        
        # Estimate the reward
        self.r_pi_est = self.estimation_reward(self.h_x, self.scaled_state_df, self.scaled_reward_df)
        
        # Compute the basis functions
        self.basis_df, self.basis_dict, self.hat_psi_next = self.compute_basis(self.h_x, self.scaled_state_df, self.scaled_next_x_df, order)
        
        return 0
    
    def calculate_theta_w(self, w: int = 10, gamma: float = 0.99):
        """
        Calculate theta_w based on time lag `w` and discount factor `gamma`.
        
        Parameters
        ----------
        w : int, optional
            The time lag used in the reward estimation.
        gamma : float, optional
            The discount factor for future rewards.
        
        Returns
        -------
        DataFrame
            DataFrame of theta estimates.
        DataFrame
            DataFrame of basis functions (adjusted for lag).
        dict
            Dictionary of basis function definitions.
        DataFrame
            Estimated policy data for all actions (adjusted for lag).
        DataFrame
            Estimated actual policy data (adjusted for lag).
        """
        
        # Estimate reward with time lag `w`
        self.r_pi_w_est = self.estimation_reward_w(self.h_x, self.scaled_state_df, self.scaled_reward_df, w)
        
        # Compute discounted reward difference
        R_pi = - self.r_pi_est[:-w] + (gamma**w) * self.r_pi_w_est[:-w]
        
        # Compute zeta_w (difference between next basis and current basis, scaled by gamma)
        self.zeta_w = (gamma * self.hat_psi_next - self.basis_df.values)[:-w]
        
        # Cross-products of zeta_w vectors
        zeta_cross = np.array([z @ z.T for z in self.zeta_w])
        
        # Element-wise product of zeta_w and reward difference
        zeta_R_pi = self.zeta_w * R_pi[:, np.newaxis]
        
        # Compute theta_hat_w by dividing zeta_R_pi by the cross-products
        self.theta_hat_w = zeta_R_pi / zeta_cross.reshape(-1, 1)
        
        # Store the results in a DataFrame
        self.theta_hat_w_df = pd.DataFrame(self.theta_hat_w, columns=self.basis_df.columns)
        
        return self.theta_hat_w_df, self.basis_df[:-w], self.basis_dict, self.est_pi_data_all[:-w], self.est_pi_actual[:-w]
    
    def calculate_theta_inf(self, gamma: float = 0.99):
        """
        Calculate theta in the infinite-horizon case using the discount factor `gamma`.
        
        Parameters
        ----------
        gamma : float, optional
            The discount factor for future rewards.
        
        Returns
        -------
        DataFrame
            DataFrame of theta estimates for the infinite-horizon case.
        DataFrame
            DataFrame of basis functions.
        dict
            Dictionary of basis function definitions.
        DataFrame
            Estimated policy data for all actions.
        DataFrame
            Estimated actual policy data.
        """
        
        # Compute the reward estimate without time lag
        R_pi = - self.r_pi_est
        
        # Compute zeta_inf (difference between next basis and current basis, scaled by gamma)
        self.zeta_inf = (gamma * self.hat_psi_next - self.basis_df.values)
        
        # Cross-products of zeta_inf vectors
        zeta_cross = np.array([z @ z.T for z in self.zeta_inf])
        
        # Element-wise product of zeta_inf and reward estimate
        zeta_R_pi = self.zeta_inf * R_pi[:, np.newaxis]
        
        # Compute theta_hat_inf by dividing zeta_R_pi by the cross-products
        self.theta_hat_inf = zeta_R_pi / zeta_cross.reshape(-1, 1)
        
        # Store the results in a DataFrame
        self.theta_hat_inf_df = pd.DataFrame(self.theta_hat_inf, columns=self.basis_df.columns)
        
        return self.theta_hat_inf_df, self.basis_df, self.basis_dict, self.est_pi_data_all, self.est_pi_actual
    
    def search_optimal_bandwidth(self, scaled_state_df, search_interval=np.linspace(0.05, 0.06, 100), cv_k=20):
        """
        Search for the optimal bandwidth for kernel density estimation using cross-validation.
        
        Parameters
        ----------
        scaled_state_df : DataFrame
            Scaled state data.
        search_interval : array-like, optional
            Range of bandwidth values to search.
        cv_k : int, optional
            Number of folds for cross-validation.
        
        Returns
        -------
        float
            The optimal bandwidth value.
        """
        
        # Perform a grid search for optimal bandwidth using cross-validation
        grid_search_custom = GridSearchCV(estimator=KDE(),  
                                          param_grid={'bandwidth': search_interval},
                                          cv=cv_k)
        grid_search_custom.fit(scaled_state_df.values)
        h_x = grid_search_custom.best_params_["bandwidth"]
        
        print(grid_search_custom.best_params_)  # Output the optimal bandwidth
        
        return h_x
    
    def extract_est_pi_actual(self, est_pi_data_all, action_df, unique_actions):
        """
        Extract actual policy estimates corresponding to each action.
        
        Parameters
        ----------
        est_pi_data_all : array-like, shape (n_samples, n_actions)
            Estimated policy data for all actions.
        action_df : DataFrame
            The action data corresponding to each sample.
        unique_actions : array-like
            Unique action values.
        
        Returns
        -------
        array-like, shape (n_samples,)
            The actual policy estimates corresponding to the action taken.
        """
        
        est_pi_actual = np.zeros(est_pi_data_all.shape[0])
        
        # For each action, apply a mask to extract the actual policy estimates
        for index, action in enumerate(unique_actions):
            mask = (action_df.squeeze() == action)  # Mask for each action
            est_pi_actual[mask] = est_pi_data_all[mask, index]
            
        return est_pi_actual
    
    def estimation_policy(self, h_x, scaled_state_df, action_df):
        """
        Estimate the policy probabilities using kernel density estimation.
        
        Parameters
        ----------
        h_x : float
            The bandwidth for kernel density estimation.
        scaled_state_df : DataFrame
            Scaled state data.
        action_df : DataFrame
            Action data corresponding to each state.
        
        Returns
        -------
        est_pi_data_all : array-like, shape (n_samples, n_actions)
            Estimated policy data for all actions.
        est_pi_actual : array-like, shape (n_samples,)
            Actual policy estimates corresponding to the action taken.
        """
        
        # Initialize policy estimator
        est_pi = est_policy(h_x)
        est_pi.fit(scaled_state_df.values, action_df.values.ravel())
        
        # Get the unique actions
        unique_actions = np.unique(action_df.values)
         
        # Estimate policy for each action and combine into a single matrix
        est_pi_data = [est_pi(scaled_state_df.values, action) for action in unique_actions]
        est_pi_data_all = np.concatenate([data.reshape(-1, 1) for data in est_pi_data], axis=1)
       
        # Extract the actual policy estimates
        est_pi_actual = self.extract_est_pi_actual(est_pi_data_all, action_df, unique_actions)
        
        return est_pi_data_all, est_pi_actual

    def estimation_reward(self, h_x, scaled_state_df, reward_df):
        """
        Estimate the expected reward for each state.

        Parameters
        ----------
        h_x : float
            The bandwidth parameter for kernel density estimation.
        scaled_state_df : DataFrame
            Scaled state data.
        reward_df : DataFrame
            DataFrame containing the reward values.

        Returns
        -------
        ndarray
            Estimated reward values for each state.
        """
        # Initialize reward estimator
        r_pi_hat = est_r_pi(h_x)  # Instantiate the reward estimator
        r_pi_hat.fit(scaled_state_df.values, reward_df.values.ravel())

        r_pi_est = r_pi_hat(scaled_state_df.values)  # Estimate rewards

        return r_pi_est

    def estimation_reward_w(self, h_x, scaled_state_df, reward_df, w):
        """
        Estimate the reward over a time window `w`.

        Parameters
        ----------
        h_x : float
            The bandwidth parameter for kernel density estimation.
        scaled_state_df : DataFrame
            Scaled state data.
        reward_df : DataFrame
            DataFrame containing the reward values.
        w : int
            The time window over which to estimate the reward.

        Returns
        -------
        ndarray
            Estimated reward values over the time window.
        """
        
        # Initialize time-windowed reward estimator
        r_pi_w_hat = est_r_pi_w(h_x)  
        r_pi_w_hat.fit(scaled_state_df.values, reward_df.values.ravel(), w)

        r_pi_w_est = r_pi_w_hat(scaled_state_df.values)  # Estimate rewards over time window `w`

        return r_pi_w_est

    def compute_basis(self, h_x, scaled_state_df, scaled_next_x_df, order: int = 3):
        """
        Compute the Chebyshev basis functions and their expectations for the next state.

        Parameters
        ----------
        h_x : float
            The bandwidth parameter for kernel density estimation.
        scaled_state_df : DataFrame
            Scaled state data.
        scaled_next_x_df : DataFrame
            Scaled next state data.
        order : int, optional
            The order of the Chebyshev basis function expansion.

        Returns
        -------
        DataFrame
            DataFrame containing the basis functions.
        dict
            Dictionary of basis function definitions.
        ndarray
            Estimated expectations of the next state's basis functions.
        """

        c_basis = ChebyshevBasis(order)  # Instantiate the Chebyshev basis object
        basis_df, basis_dict = c_basis(scaled_state_df.values)  # Compute the basis functions

        next_psi = BasisNextExpect(h_x)  # Instantiate the next-state basis expectation estimator
        next_psi.fit(scaled_state_df.values, scaled_next_x_df.values)

        # Estimate expectations for the next state
        hat_psi_next = next_psi(scaled_state_df.values, c_basis)

        return basis_df, basis_dict, hat_psi_next

In [19]:
class HACTest:
    """
    A class to perform statistical testing with HAC (Heteroscedasticity and Autocorrelation Consistent) standard errors.
    
    Parameters
    ----------
    theta_df : pandas.DataFrame
        The input DataFrame containing the data for testing.
    """
    
    def __init__(self, theta_df):
        """
        Initializes the HACTest with the input data.
        
        Parameters
        ----------
        theta_df : pandas.DataFrame
            The data on which to perform the HAC test.
        """
        self.theta_df = theta_df
        
    def __call__(self):
        """
        Perform HAC testing on the data with an automatically computed maximum lag.
        
        Returns
        -------
        test_result : pandas.DataFrame
            DataFrame containing formatted test statistics and results.
        latex_result : str
            LaTeX formatted table of test results for presentation.
        original_result : tuple
            Tuple of raw result dictionaries with numerical statistics.
        """
        # Calculate the maximum lag using a rule of thumb based on the sample size
        max_lag = int(4*((self.theta_df.shape[0]/100)** (1/3)))
        
        # Perform the HAC test and return the results
        test_result, latex_result, original_result = self.test_hac(self.theta_df, max_lag)
        
        return test_result, latex_result, original_result
    
    def significance_stars(self, p_value):
        """
        Assign significance stars based on p-value.
        
        Parameters
        ----------
        p_value : float
            The p-value for statistical significance.
            
        Returns
        -------
        str
            Stars representing the level of significance ('***' for p < 0.01, '**' for p < 0.05, '*' for p < 0.1).
        """
        if p_value < 0.01:
            return "***"
        elif p_value < 0.05:
            return "**"
        elif p_value < 0.1:
            return "*"
        else:
            return ""
        
    def compute_statistics(self, column_data: np.ndarray, max_lag: int):
        """
        Fit a regression model and compute HAC-based statistics for a single column of data.
        
        Parameters
        ----------
        column_data : np.ndarray
            Array containing the data for a single variable/column.
        max_lag : int
            Maximum lag to use for HAC standard errors.
        
        Returns
        -------
        formatted_results : tuple
            A tuple containing formatted strings for coefficient estimate, standard error, t-value, p-value, and significance stars.
        original_results : dict
            A dictionary containing the original numeric statistics.
        """
        X = np.ones((len(column_data), 1))  # Constant term as the only explanatory variable
        y = column_data  # Response variable
        
        # Fit the OLS model with HAC standard errors
        model = sm.OLS(y, X).fit(cov_type='HAC', cov_kwds={'maxlags': max_lag})
        
        coef_estimate = model.params[0]  # Coefficient estimate
        hac_std_error = model.bse[0]  # HAC standard error
        
        # Calculate the t-value and p-value
        t_value = coef_estimate / hac_std_error
        p_value = 2 * (1 - stats.norm.cdf(np.abs(t_value)))  # Two-sided p-value
        
        # Assign significance stars based on the p-value
        stars = self.significance_stars(p_value)

        # Format the results into a tuple for display
        formatted_results = (
            f"{coef_estimate:.4g}",
            f"{hac_std_error:.4g}",
            f"{t_value:.4g}",
            f"{p_value:.4g}",
            stars
        )

        # Store original numeric results in a dictionary
        original_results = {
            "coef_estimate": coef_estimate,
            "hac_std_error": hac_std_error,
            "t_value": t_value,
            "p_value": p_value,
            "stars": stars
        }

        return formatted_results, original_results
    
    def scientific_to_latex(self, value):
        """
        Convert a numerical value to a LaTeX string in scientific notation.
        
        Parameters
        ----------
        value : float
            The value to convert.
            
        Returns
        -------
        str
            The LaTeX formatted string.
        """
        if value == 0:
            return "$0$"
        else:
            exp = int(np.floor(np.log10(np.abs(value))))
            base = value / 10**exp
            return f"${base:.4f} \\times 10^{{{exp}}}$"
            
    def test_hac(self, data, max_lag: int):
        """
        Compute HAC standard errors and statistics for each column of data.
        
        Parameters
        ----------
        data : pandas.DataFrame
            The input data with columns as variables.
        max_lag : int
            Maximum lag for HAC standard errors.
            
        Returns
        -------
        df : pandas.DataFrame
            DataFrame containing the formatted test results.
        latex_table : str
            LaTeX formatted table for easy insertion into documents.
        original_results : list
            List of dictionaries with original numeric results.
        """
        # Compute statistics for each column in parallel
        results = Parallel(n_jobs=-1)(delayed(self.compute_statistics)(data.values[:, i], max_lag) for i in range(data.shape[1]))

        # Unpack the results into formatted and original results
        formatted_results, original_results = zip(*results)

        # Convert results to a NumPy array
        results_array = np.array(formatted_results)

        # Create DataFrame of results
        column_names = ["Coefficient Estimate", "HAC Standard Error", "t-Value", "p-Value", "Significance"]
        df = pd.DataFrame(results_array, columns=column_names, index=data.columns)

        # Convert numerical columns to LaTeX scientific notation
        for col in ["Coefficient Estimate", "HAC Standard Error", "t-Value", "p-Value"]:
            df[col] = df[col].astype(float).apply(self.scientific_to_latex)

        # Create LaTeX table format
        latex_table = df.to_latex(index=True, escape=False).replace(r'\toprule', r'\hline').replace(r'\midrule', r'\hline').replace(r'\bottomrule', r'\hline')

        print("LaTeX formatted table:")
        print(latex_table)

        return df, latex_table, original_results
