---

## Mahalanobis Projection Decomposition of Active Exposures

We decompose an active exposure vector:

$$ \mathbf{a} $$

into two parts:
- a component aligned with a reference direction \( \mathbf{b}_{\text{raw}} \), and
- a residual component orthogonal to that direction under a covariance metric \( \Sigma \).

---

### Step 1: Normalize the Reference Vector

$$
\mathbf{b} = \frac{\mathbf{b}_{\text{raw}}}{\|\mathbf{b}_{\text{raw}}\|}
$$

---

### Step 2: Mahalanobis Projection

$$
\mathbf{a}_{\parallel} = \frac{\mathbf{b}^\top \Sigma \mathbf{a}}{\mathbf{b}^\top \Sigma \mathbf{b}} \cdot \mathbf{b}
$$

---

### Step 3: Orthogonal Residual

$$
\mathbf{a}_{\perp} = \mathbf{a} - \mathbf{a}_{\parallel}
$$

---

### Step 4: Variance Contributions

Projected variance:

$$
\sigma_{\parallel} = \sqrt{ \mathbf{a}_{\parallel}^\top \Sigma \mathbf{a}_{\parallel} }
$$

Residual variance:

$$
\sigma_{\perp} = \sqrt{ \mathbf{a}_{\perp}^\top \Sigma \mathbf{a}_{\perp} }
$$

Total variance:

$$
\sigma_{\text{total}} = \sqrt{ \mathbf{a}^\top \Sigma \mathbf{a} }
$$

---

### Step 5: Additivity from Orthogonality

$$
\mathbf{a}_{\parallel}^\top \Sigma \mathbf{a}_{\perp} = 0 \quad \Rightarrow \quad
\sigma_{\text{total}}^2 = \sigma_{\parallel}^2 + \sigma_{\perp}^2
$$


In [5]:
import numpy as np

def decompose_credit_te(Sigma, rel_exp, bench_exp, verbose=False, enforce_symmetry=True):
    """
    Decompose total tracking error into beta and residual components using
    Mahalanobis projection with respect to the factor return covariance matrix Σ.

    Parameters
    ----------
    Sigma : np.ndarray of shape (K, K)
        Covariance matrix of factor returns.
    rel_exp : np.ndarray of shape (K,) or (K, 1)
        Active factor exposure (portfolio - benchmark), e.g. in DTS.
    bench_exp : np.ndarray of shape (K,) or (K, 1)
        Benchmark factor exposure (DTS weights per sector).
    verbose : bool, default=False
        Print detailed logs if True.
    enforce_symmetry : bool, default=True
        If True, forcibly symmetrizes Σ; if False, warns if Σ is not symmetric.

    Returns
    -------
    dict
        {
            'total_te': float,
            'beta_te': float,
            'residual_te': float,
            'beta_fraction': float,
            'residual_fraction': float
        }
    """

    def log(msg, *args):
        if verbose:
            print("[TE Decomp] " + msg.format(*args))

    # Convert exposures to column vectors
    x = np.asarray(rel_exp).reshape(-1, 1)
    b_raw = np.asarray(bench_exp).reshape(-1, 1)
    K = x.shape[0]

    if Sigma.shape != (K, K):
        raise ValueError(f"Sigma shape {Sigma.shape} must match ({K}, {K})")

    # Handle symmetry
    if enforce_symmetry:
        Sigma = 0.5 * (Sigma + Sigma.T)
    else:
        if not np.allclose(Sigma, Sigma.T, rtol=1e-10):
            print("⚠️ WARNING: Covariance matrix Σ is not symmetric. Projections may be invalid.")

    log("Input rel_exp shape: {}, bench_exp shape: {}", x.shape, b_raw.shape)

    # Mahalanobis projection
    proj_num = float(b_raw.T @ Sigma @ x)
    proj_den = float(b_raw.T @ Sigma @ b_raw)
    if proj_den == 0:
        raise ValueError("Benchmark has zero Mahalanobis norm under Σ.")

    proj_scalar = proj_num / proj_den
    x_parallel = proj_scalar * b_raw
    x_residual = x - x_parallel

    log("Mahalanobis projection scalar: {:.6f}", proj_scalar)
    log("First 5 of x_parallel: {}", x_parallel[:5].flatten())
    log("First 5 of x_residual: {}", x_residual[:5].flatten())

    # Variances
    beta_var = float(x_parallel.T @ Sigma @ x_parallel)
    resid_var = float(x_residual.T @ Sigma @ x_residual)
    total_var = float(x.T @ Sigma @ x)

    beta_te = np.sqrt(beta_var)
    resid_te = np.sqrt(resid_var)
    total_te = np.sqrt(total_var)

    log("Variance(beta): {:.10f}", beta_var)
    log("Variance(resid): {:.10f}", resid_var)
    log("Variance(total): {:.10f}", total_var)
    log("Sum of components: {:.10f}", beta_var + resid_var)

    if not np.isclose(beta_var + resid_var, total_var, rtol=1e-6):
        log("⚠️ Variance components do not sum to total. Δ = {:.10f}",
            beta_var + resid_var - total_var)

    return {
        "total_te": total_te,
        "beta_te": beta_te,
        "residual_te": resid_te,
        "beta_fraction": beta_var / total_var if total_var > 0 else 0.0,
        "residual_fraction": resid_var / total_var if total_var > 0 else 0.0
    }

if __name__ == "__main__":
    import pandas as pd
    import numpy as np

    # Simulated data
    np.random.seed(42)
    K = 10
    factor_returns = pd.DataFrame(np.random.randn(1000, K), columns=[f"F{i}" for i in range(K)])
    Sigma = factor_returns.cov().to_numpy()
    rel_exp = pd.Series(np.random.randn(K))
    bench_exp = pd.Series(np.abs(np.random.randn(K)))

    # Run decomposition
    result = decompose_credit_te(Sigma, rel_exp, bench_exp, verbose=True)
    print(result)

ImportError: dlopen(/Users/peterballaro/opt/anaconda3/envs/ml4t/lib/python3.10/site-packages/pandas/_libs/pandas_parser.cpython-310-darwin.so, 0x0002): tried: '/Users/peterballaro/opt/anaconda3/envs/ml4t/lib/python3.10/site-packages/pandas/_libs/pandas_parser.cpython-310-darwin.so' (mach-o file, but is an incompatible architecture (have (x86_64), need (arm64e)))

In [3]:
import numpy as np

def pca_variance_explained(cov_matrix: np.ndarray, weights: np.ndarray = None):
    """
    Perform PCA on a covariance matrix and return the (optionally weighted) variance
    explained by the first principal component (PC1).

    Parameters
    ----------
    cov_matrix : np.ndarray of shape (K, K)
        Covariance matrix of K factors
    weights : np.ndarray of shape (K,) or (K, 1), optional
        Optional vector of benchmark factor weights (e.g., DTS).
        If provided, computes the percent of benchmark risk explained by PC1.

    Returns
    -------
    dict
        {
            'explained_variance_pc1': float,  # unweighted ratio
            'explained_weighted_risk_pc1': float or None,  # only if weights provided
            'eigenvalues': np.ndarray,
            'eigenvectors': np.ndarray,
        }
    """
    # Eigen decomposition
    eigenvals, eigenvecs = np.linalg.eigh(cov_matrix)
    idx = np.argsort(eigenvals)[::-1]
    eigenvals_sorted = eigenvals[idx]
    eigenvecs_sorted = eigenvecs[:, idx]

    # Unweighted total variance
    total_var = np.sum(eigenvals_sorted)
    explained_var_pc1 = eigenvals_sorted[0] / total_var

    result = {
        'explained_variance_pc1': explained_var_pc1,
        'explained_weighted_risk_pc1': None,
        'eigenvalues': eigenvals_sorted,
        'eigenvectors': eigenvecs_sorted
    }

    if weights is not None:
        # Ensure weights are 1D column vector
        weights = weights.reshape(-1, 1)
        total_risk = float(weights.T @ cov_matrix @ weights)

        # Risk from PC1 projection
        pc1 = eigenvecs_sorted[:, 0].reshape(-1, 1)
        pc1_proj = float(weights.T @ pc1 @ pc1.T @ cov_matrix @ pc1 @ pc1.T @ weights)

        # Alternatively, just:
        # pc1_proj = float((weights.T @ pc1)**2 * eigenvals_sorted[0])

        result['explained_weighted_risk_pc1'] = pc1_proj / total_risk

    return result
