# PCA Evaluation Lab

In [4]:
import numpy as np
import pandas as pd
from factor_analyzer.rotator import Rotator
import matplotlib.pyplot as plt

In [1]:
stock_sample = 50
n_factors = [5, 4, 4, 5]

## Walk-forward Import

Import done, now continue with estimation and then finish with eval. Then copy what you've done here over to the other notebooks. 

In [2]:
import os
import re
import pandas as pd

directory = f'Final_Data/Walkforward_Sets/{stock_sample}_stocks_seed42'
files = os.listdir(directory)

pattern_corr = re.compile(r'^(df\d{2}[a-z])_varresid_(is|os)_corr_(is|os)\.csv$')
pattern_resid = re.compile(r'^(df\d{2}[a-z])_varresid_(is|os)\.csv$')

for fname in files:
    # Skip covariance matrices (those with 'cov_' in the filename)
    if 'cov_' in fname:
        continue
    full_path = os.path.join(directory, fname)
    
    m = pattern_corr.match(fname)
    if m:
        # File: df00a_varresid_is_corr_is.csv -> Key: corr00a_is 
        base = m.group(1)    # e.g. "df00a"
        inout = m.group(2)   # e.g. "is" or "os"
        new_key = f"corr{base[2:]}_{inout}"
        df = pd.read_csv(full_path, index_col=0)
        globals()[new_key] = df
        continue

    m = pattern_resid.match(fname)
    if m:
        # File: df00a_varresid_is.csv -> Key: df00a_is
        base = m.group(1)
        inout = m.group(2)
        new_key = f"{base}_{inout}"
        df = pd.read_csv(full_path, index_col=0)
        globals()[new_key] = df
        continue

## Evaluation

No need for rotation here yet because it doesn't change the explained variance

In [3]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from datetime import datetime
import os, re
import matplotlib.pyplot as plt
import scipy.stats as stats

# --- Utility Functions ---

def congruence_coefficient(v1, v2):
    """
    Computes Tucker's coefficient of congruence between two vectors.
    """
    return np.sum(v1 * v2) / (np.sqrt(np.sum(v1**2)) * np.sqrt(np.sum(v2**2)))

def compute_sparsity_index(loadings):
    """
    Computes the normalized sparsity index for a given loadings DataFrame.
    For each factor vector v, we compute the ratio:
      ratio = ||v||_1 / ||v||_2,
    then normalize over n (number of assets):
      normalized = (sqrt(n) - ratio) / (sqrt(n)-1),
    so that 1 means only one asset carries weight and 0 means all assets share equally.
    Finally, return the average over factors.
    """
    sparsity_scores = []
    n_assets = loadings.shape[0]
    for col in loadings.columns:
        v = loadings[col].values
        l1_norm = np.linalg.norm(v, 1)
        l2_norm = np.linalg.norm(v, 2)
        ratio = l1_norm / l2_norm if l2_norm != 0 else np.nan
        normalized = (np.sqrt(n_assets) - ratio) / (np.sqrt(n_assets) - 1)
        sparsity_scores.append(normalized)
    return np.mean(sparsity_scores)

def compute_turnover(loadings_dict):
    """
    Given a dictionary mapping time labels to factor loading DataFrames, compute turnover 
    between adjacent periods.
    Turnover = ||loadings_current - loadings_prev||_F / ||loadings_prev||_F.
    Returns a list of dictionaries with keys 'transition' and 'turnover'.
    """
    keys = list(loadings_dict.keys())
    keys.sort(key=lambda x: (len(x), x))
    turnover_results = []
    for idx in range(1, len(keys)):
        prev = loadings_dict[keys[idx-1]]
        curr = loadings_dict[keys[idx]]
        common = prev.index.intersection(curr.index)
        if len(common) == 0:
            continue
        prev_aligned = prev.loc[common]
        curr_aligned = curr.loc[common]
        frob_diff = np.linalg.norm(curr_aligned.values - prev_aligned.values, 'fro')
        frob_prev = np.linalg.norm(prev_aligned.values, 'fro')
        turnover = frob_diff / frob_prev if frob_prev != 0 else np.nan
        turnover_results.append({'transition': f"{keys[idx-1]} -> {keys[idx]}", 'turnover': turnover})
    return turnover_results

def oblimin_rotation(loadings_df, gamma=0.0, tol=1e-6, max_iter=1000):
    """
    Applies an oblimin (quartimin when gamma=0) rotation to the factor loadings.
    Returns the rotated loadings DataFrame and the rotation matrix.
    """
    loadings = loadings_df.to_numpy()    # (n_assets, n_factors)
    p, m = loadings.shape
    T = np.eye(m)
    step = 1e-4

    for iteration in range(max_iter):
        L_rot = loadings.dot(T)
        mean_sq = np.mean(L_rot**2, axis=0, keepdims=True)
        gradient = 4 * loadings.T.dot(L_rot * (L_rot**2 - mean_sq))
        T_new = T - step * gradient
        if np.linalg.norm(T_new - T) < tol:
            T = T_new
            break
        T = T_new

    rotated = loadings.dot(T)
    rotated_df = pd.DataFrame(rotated, index=loadings_df.index, columns=loadings_df.columns)
    return rotated_df, T

def calc_proj_ev(df_std, loadings):
    """
    Given standardized returns (df_std) and factor loadings,
    compute predicted returns using regression-based factor score estimation,
    and return the explained variance defined as 1 - (residual variance / total variance).
    """
    inv_LL = np.linalg.inv(loadings.T.dot(loadings))
    factor_scores = (inv_LL.dot(loadings.T.dot(df_std.T))).T
    projected = factor_scores.dot(loadings.T)
    residuals = df_std - projected
    total_var = df_std.var(axis=0).mean()
    ev = 1 - residuals.var(axis=0).mean() / total_var
    return ev

def joreskog(cov, n_factors=None, max_iter=100000, tol=1e-6, min_communal=1e-6):
    """
    Robust Jöreskog's factor analysis with proper handling of communalities
    and convergence checks.
    """
    original_index = cov.index.tolist() if isinstance(cov, pd.DataFrame) else None
    cov = cov.to_numpy() if isinstance(cov, pd.DataFrame) else cov

    if not isinstance(cov, np.ndarray):
        raise ValueError("Covariance matrix must be numpy array or DataFrame")
    if cov.shape[0] != cov.shape[1]:
        raise ValueError("Covariance matrix must be square")
    if not np.allclose(cov, cov.T, atol=1e-9):
        raise ValueError("Covariance matrix must be symmetric")
    
    n_vars = cov.shape[0]
    is_correlation = np.allclose(np.diag(cov), 1.0, atol=1e-5)
    
    eigvals_full, _ = np.linalg.eigh(cov)
    tol_eigen = 1e-9 * np.max(eigvals_full)
    if n_factors is None:
        n_factors = int(np.sum(eigvals_full > tol_eigen))
    else:
        if not isinstance(n_factors, int) or n_factors <= 0:
            raise ValueError("n_factors must be a positive integer")
        n_factors = min(n_factors, n_vars)
    
    eigenvals, eigenvecs = np.linalg.eigh(cov)
    idx = np.argsort(eigenvals)[::-1]
    eigenvals = eigenvals[idx][:n_factors]
    eigenvecs = eigenvecs[:, idx][:, :n_factors]
    
    beta = eigenvecs @ np.diag(np.sqrt(np.maximum(eigenvals, 0)))
    communalities = np.sum(beta**2, axis=1)
    if is_correlation:
        communalities = np.clip(communalities, 0, 0.999)
    psi = np.diag(np.maximum(np.diag(cov) - communalities, min_communal))
    
    iter_num = 0
    beta_change = np.inf
    psi_change = np.inf
    
    while iter_num < max_iter and (beta_change > tol or psi_change > tol):
        sigma = beta @ beta.T + psi
        try:
            sigma_inv = np.linalg.inv(sigma)
        except np.linalg.LinAlgError:
            raise ValueError("Singular sigma matrix - reduce n_factors or increase min_communal")
        
        middle = np.linalg.inv(np.eye(n_factors) + beta.T @ sigma_inv @ beta)
        beta_new = cov @ sigma_inv @ beta @ middle
        
        communalities_new = np.sum(beta_new**2, axis=1)
        if is_correlation:
            communalities_new = np.clip(communalities_new, 0, 0.999)
        psi_new_diag = np.maximum(np.diag(cov) - communalities_new, min_communal)
        psi_new = np.diag(psi_new_diag)
        
        if np.any(psi_new_diag <= min_communal + 1e-6):
            print("Warning: Some uniquenesses at lower bound")
        
        beta_change = np.linalg.norm(beta_new - beta) / (np.linalg.norm(beta) + np.finfo(float).eps)
        psi_change = np.linalg.norm(psi_new_diag - np.diag(psi)) / (np.linalg.norm(np.diag(psi)) + np.finfo(float).eps)
        
        beta = beta_new
        psi = psi_new
        iter_num += 1
    
    if iter_num == max_iter:
        print("Warning: Maximum iterations reached without convergence")
    
    factor_variances = np.sum(beta**2, axis=0)
    total_variance = np.sum(np.diag(cov))
    explained_var = factor_variances / total_variance
    explained_cumulative = np.cumsum(explained_var)
    
    factor_columns = [f'PC{i+1}' for i in range(n_factors)]
    betas_df = pd.DataFrame(beta, 
                            index=original_index if original_index is not None 
                                  else [f'var_{i}' for i in range(n_vars)],
                            columns=factor_columns)
    
    return betas_df, factor_variances, explained_var, explained_cumulative

# --- Revised Evaluation Function for PCA-based Model ---

def evaluate_pca_os(base_corr, df_os, n_factors):
    """
    Computes PCA on the in-sample correlation matrix (base_corr) and on the 
    out-of-sample returns (df_os). Also projects the OOS standardized returns
    onto the in-sample loadings to compute residuals.
    Returns:
        metrics: a dictionary of evaluation metrics (explained variances, residual first PC EV,
                 average and individual factor loading congruence, sparsity index).
        loadings_is: the in-sample PCA loadings (used as factor weights).
        residuals: a DataFrame of residuals computed as (standardized OOS returns - projected returns),
                   with asset names as columns.
    """
    common_assets = df_os.columns.intersection(base_corr.index)
    if len(common_assets) == 0:
        raise ValueError("No common assets between in-sample and out-of-sample data")
    
    # Align data
    base_corr_aligned = base_corr.loc[common_assets, common_assets]
    df_os_aligned = df_os[common_assets]
    
    # In-sample PCA on correlation matrix
    pca_is = PCA(n_components=n_factors)
    pca_is.fit(base_corr_aligned)
    loadings_is = pd.DataFrame(
        pca_is.components_.T,
        index=common_assets,
        columns=[f'PC{i+1}' for i in range(n_factors)]
    )
    explained_variance_is = pca_is.explained_variance_ratio_.sum()
    
    sparsity_index = compute_sparsity_index(loadings_is)
    
    # Out-of-sample: standardize returns and compute correlation
    df_os_std = df_os_aligned.apply(lambda x: (x - x.mean())/x.std())
    corr_os = df_os_std.corr()
    pca_os = PCA(n_components=n_factors)
    pca_os.fit(corr_os)
    loadings_os = pd.DataFrame(
        pca_os.components_.T,
        index=common_assets,
        columns=[f'PC{i+1}' for i in range(n_factors)]
    )
    
    # Compute loading correlations and congruence per factor
    correlations = []
    congruences = []
    for i in range(n_factors):
        loading_is = loadings_is.iloc[:, i]
        loading_os = loadings_os.iloc[:, i]
        corr_val = abs(np.corrcoef(loading_is, loading_os)[0, 1])
        cong_val = abs(congruence_coefficient(loading_is, loading_os))
        correlations.append(corr_val)
        congruences.append(cong_val)
    avg_loading_correlation = np.mean(correlations)
    avg_congruence = np.mean(congruences)
    
    # Projection: compute factor returns and project them back to asset space
    factor_returns_os = df_os_std @ loadings_is
    projected_returns_os = factor_returns_os @ loadings_is.T
    residuals = df_os_std - projected_returns_os
    explained_variance_os = 1 - residuals.var(axis=0).mean() / df_os_std.var(axis=0).mean()
    
    pca_residuals = PCA(n_components=1)
    pca_residuals.fit(residuals)
    residual_first_pc_ev = pca_residuals.explained_variance_ratio_[0]
    
    metrics = {
        'n_factors': n_factors,
        'explained_variance_is': explained_variance_is,
        'explained_variance_os': explained_variance_os,
        'residual_first_pc_ev': residual_first_pc_ev,
        'avg_loading_correlation': avg_loading_correlation,
        'avg_congruence': avg_congruence,
        'sparsity_index': sparsity_index
    }
    for i, corr_val in enumerate(correlations):
        metrics[f'factor_{i+1}_correlation'] = corr_val
    for i, cong_val in enumerate(congruences):
        metrics[f'factor_{i+1}_congruence'] = cong_val
    
    return metrics, loadings_is, residuals

# --- Data Import Section ---

directory = f'Final_Data/Walkforward_Sets/{stock_sample}_stocks_seed42'
files = os.listdir(directory)
pattern_corr = re.compile(r'^(df\d{2}[a-z])_varresid_(is|os)_corr_(is|os)\.csv$')
pattern_resid = re.compile(r'^(df\d{2}[a-z])_varresid_(is|os)\.csv$')

for fname in files:
    if 'cov_' in fname:
        continue
    full_path = os.path.join(directory, fname)
    
    m = pattern_corr.match(fname)
    if m:
        base = m.group(1)    # e.g. "df00a"
        inout = m.group(2)   # "is" or "os"
        new_key = f"corr{base[2:]}_{inout}"
        df = pd.read_csv(full_path, index_col=0)
        globals()[new_key] = df
        continue

    m = pattern_resid.match(fname)
    if m:
        base = m.group(1)
        inout = m.group(2)
        new_key = f"{base}_{inout}"
        df = pd.read_csv(full_path, index_col=0)
        globals()[new_key] = df
        continue

# --- Main Loop for Walk-Forward Evaluation --- 

periods = ["00", "05", "10", "15"]
n_factors_dict = dict(zip(periods, n_factors))  # n_factors should be defined externally
results_list = []
factor_weights_list = []  # storing in-sample loadings (weights) per pairing
residuals_dict = {}       # storing residuals per pairing

# Define pairings for in-sample/out-of-sample: a-b, b-c, c-d, d-e
pairings = [("a", "b"), ("b", "c"), ("c", "d"), ("d", "e")]

print("Using the following number of factors per period:")
for period, n in n_factors_dict.items():
    print(f"Period {period}: {n} factors")
print("\n")

for period in periods:
    for in_sample, oos_sample in pairings:
        base_corr_key = f"corr{period}{in_sample}_is"   # in-sample correlation matrix
        df_os_key = f"df{period}{oos_sample}_os"         # out-of-sample returns
        if base_corr_key in globals() and df_os_key in globals():
            try:
                metrics, loadings_is, residuals = evaluate_pca_os(
                    globals()[base_corr_key],
                    globals()[df_os_key],
                    n_factors=n_factors_dict.get(period, list(n_factors_dict.values())[0])
                )
                metrics['period'] = period
                metrics['in_sample'] = in_sample
                metrics['oos_sample'] = oos_sample
                results_list.append(metrics)
                # Save loadings with asset identifiers
                loadings_is_reset = loadings_is.reset_index().rename(columns={'index': 'Asset'})
                loadings_is_reset['period'] = period
                loadings_is_reset['in_sample'] = in_sample
                factor_weights_list.append(loadings_is_reset)
                # Store residuals with a label for later aggregation
                label = f"{period}_{in_sample}_vs_{oos_sample}"
                residuals_dict[label] = residuals
                print(f"Completed {period}: in-sample '{in_sample}' vs out-of-sample '{oos_sample}'")
            except Exception as e:
                print(f"Error in {period} for pairing {in_sample}-{oos_sample}: {str(e)}")
        else:
            missing = [key for key in [base_corr_key, df_os_key] if key not in globals()]
            print(f"Skipping {period} pairing {in_sample}-{oos_sample} – missing data: {missing}")

if results_list:
    results_df = pd.DataFrame(results_list)
    column_order = ['period', 'in_sample', 'oos_sample', 'n_factors', 
                    'explained_variance_is', 'explained_variance_os', 'residual_first_pc_ev', 
                    'avg_loading_correlation', 'avg_congruence', 'sparsity_index']
    factor_corr_cols = sorted([col for col in results_df.columns if 'factor_' in col])
    column_order.extend(factor_corr_cols)
    results_df = results_df[column_order]
    
    numeric_columns = results_df.select_dtypes(include=[np.number]).columns
    results_df[numeric_columns] = results_df[numeric_columns].round(3)
    results_df = results_df.sort_values(['period', 'in_sample', 'oos_sample'])
    
    if factor_weights_list:
        factor_weights_df = pd.concat(factor_weights_list, ignore_index=True)
        factor_weight_cols = [col for col in factor_weights_df.columns if col.startswith('PC')]
        factor_weights_df = factor_weights_df[['period', 'in_sample', 'Asset'] + factor_weight_cols]
    
    # Optionally, compute factor turnover here (omitted for brevity)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    excel_filename = f"Final_Data/Final_Results/pca_{stock_sample}.xlsx"
    
    with pd.ExcelWriter(excel_filename) as writer:
        results_df.to_excel(writer, sheet_name='Results', index=False)
        summary_stats = results_df.groupby('period').agg({
            'explained_variance_is': ['mean', 'std'],
            'explained_variance_os': ['mean', 'std'],
            'residual_first_pc_ev': ['mean', 'std'],
            'avg_loading_correlation': ['mean', 'std'],
            'avg_congruence': ['mean', 'std'],
            'sparsity_index': ['mean', 'std']
        }).round(3)
        summary_stats.columns = ['IS_EV_mean', 'IS_EV_std',
                                 'OS_EV_mean', 'OS_EV_std',
                                 'Residual_PC1_mean', 'Residual_PC1_std',
                                 'Corr_mean', 'Corr_std',
                                 'Congruence_mean', 'Congruence_std',
                                 'Sparsity_Index_mean', 'Sparsity_Index_std']
        summary_stats.to_excel(writer, sheet_name='Summary_Stats')
        factor_summary = results_df[['period'] + factor_corr_cols].groupby('period').agg(['mean', 'std']).round(3)
        factor_summary.to_excel(writer, sheet_name='Factor_Correlations')
        if factor_weights_list:
            factor_weights_df.to_excel(writer, sheet_name='Factor_Weights', index=False)
        
        # --- Write Residuals Sheet ---
        if residuals_dict:
            all_residuals = None
            for label, resid in residuals_dict.items():
                resid_copy = resid.copy()
                # Rename columns to include the pairing label
                resid_copy.columns = [f"{label}_{col}" for col in resid_copy.columns]
                if all_residuals is None:
                    all_residuals = resid_copy
                else:
                    all_residuals = all_residuals.join(resid_copy, how='outer')
            all_residuals.to_excel(writer, sheet_name='Residuals', index=True)
        
        # --- New Aggregated Results Sheet ---
        agg_metrics = {}
        metric_columns = [col for col in results_df.columns if col not in ['period', 'in_sample', 'oos_sample'] and np.issubdtype(results_df[col].dtype, np.number)]
        for col in metric_columns:
            data = results_df[col].dropna()
            n = len(data)
            if n > 1:
                mean_val = data.mean()
                std_val = data.std()
                sem = std_val / np.sqrt(n)
                t_stat = stats.t.ppf(1-0.025, df=n-1)
                ci_lower = mean_val - t_stat * sem
                ci_upper = mean_val + t_stat * sem
            else:
                mean_val = data.mean()
                std_val = np.nan
                ci_lower = np.nan
                ci_upper = np.nan
            agg_metrics[col] = {
                "mean": round(mean_val, 3),
                "std": round(std_val, 3) if pd.notnull(std_val) else std_val,
                "count": n,
                "CI_lower": round(ci_lower, 3) if pd.notnull(ci_lower) else ci_lower,
                "CI_upper": round(ci_upper, 3) if pd.notnull(ci_upper) else ci_upper
            }
        agg_df = pd.DataFrame(agg_metrics).T.reset_index().rename(columns={'index':'Metric'})
        agg_df = agg_df[['Metric', 'mean', 'std', 'count', 'CI_lower', 'CI_upper']]
        agg_df.to_excel(writer, sheet_name='Results_Aggregated', index=False)
    
    print(f"\nResults saved to: {excel_filename}")
    print("\nResults DataFrame:")
    print(results_df)
    print("\nSummary Statistics by Period:")
    print(summary_stats)
    if residuals_dict:
        print("\nResiduals sheet saved with columns:")
        print(list(all_residuals.columns))
else:
    print("No results were generated. Check your input data.")


Using the following number of factors per period:
Period 00: 5 factors
Period 05: 4 factors
Period 10: 4 factors
Period 15: 5 factors


Completed 00: in-sample 'a' vs out-of-sample 'b'
Completed 00: in-sample 'b' vs out-of-sample 'c'
Completed 00: in-sample 'c' vs out-of-sample 'd'
Completed 00: in-sample 'd' vs out-of-sample 'e'
Completed 05: in-sample 'a' vs out-of-sample 'b'
Completed 05: in-sample 'b' vs out-of-sample 'c'
Completed 05: in-sample 'c' vs out-of-sample 'd'
Completed 05: in-sample 'd' vs out-of-sample 'e'
Completed 10: in-sample 'a' vs out-of-sample 'b'
Completed 10: in-sample 'b' vs out-of-sample 'c'
Completed 10: in-sample 'c' vs out-of-sample 'd'
Completed 10: in-sample 'd' vs out-of-sample 'e'
Completed 15: in-sample 'a' vs out-of-sample 'b'
Completed 15: in-sample 'b' vs out-of-sample 'c'
Completed 15: in-sample 'c' vs out-of-sample 'd'
Completed 15: in-sample 'd' vs out-of-sample 'e'

Results saved to: Final_Data/Final_Results/pca_50.xlsx

Results DataFrame:
   p