In [161]:
import pandas as pd
import numpy as np
import statsmodels.api as sm
import seaborn as sns
import matplotlib.pyplot as plt
import sys
from functools import partial




def calc_univariate_regression(y, X, intercept=True, adj=12):
    """
    Calculate a univariate regression of y on X. Note that both X and y
    need to be one-dimensional.

    Args:
        y : target variable
        X : independent variable
        intercept (bool, optional): Fit the regression with an intercept or not. Defaults to True.
        adj (int, optional): What to adjust the returns by. Defaults to 12.

    Returns:
        DataFrame: Summary of regression results
    """
    X_down = X[X.iloc[:, 0] < 0].copy()
    y_down = y[X.iloc[:, 0] < 0].copy()
    if intercept:
        X = sm.add_constant(X)
        X_down = sm.add_constant(X_down)

    model = sm.OLS(y, X, missing="drop")
    results = model.fit()

    inter = results.params.iloc[0] if intercept else 0
    beta = results.params.iloc[1] if intercept else results.params.iloc[0]

    summary = dict()

    summary["Alpha"] = inter * adj
    summary["Beta"] = beta

    down_mod = sm.OLS(y_down, X_down, missing="drop").fit()
    summary["Downside Beta"] = down_mod.params.iloc[1] if intercept else down_mod.params.iloc[0]
    summary["R-Squared"] = results.rsquared
    summary["Treynor Ratio"] = (y.mean() / beta) * adj
    summary["Information Ratio"] = (inter / results.resid.std()) * np.sqrt(adj)
    summary["Tracking Error"] = (
        inter / summary["Information Ratio"]
        if intercept
        else results.resid.std() * np.sqrt(adj)
    )
    
    if isinstance(y, pd.Series):
        return pd.DataFrame(summary, index=[y.name])
    else:
        return pd.DataFrame(summary, index=y.columns)

def calc_multivariate_regression(y, X, intercept=True, adj=12):
    """
    Calculate a multivariate regression of y on X. Adds useful metrics such
    as the Information Ratio and Tracking Error. Note that we can't calculate
    Treynor Ratio or Downside Beta here.

    Args:
        y : target variable
        X : independent variables
        intercept (bool, optional): Defaults to True.
        adj (int, optional): Annualization factor. Defaults to 12.

    Returns:
        DataFrame: Summary of regression results
    """
    if intercept:
        X = sm.add_constant(X)

    model = sm.OLS(y, X, missing="drop")
    results = model.fit()
    summary = dict()

    inter = results.params.iloc[0] if intercept else 0
    betas = results.params.iloc[1:] if intercept else results.params

    summary["Alpha"] = inter * adj
    summary["R-Squared"] = results.rsquared

    X_cols = X.columns[1:] if intercept else X.columns

    for i, col in enumerate(X_cols):
        summary[f"{col} Beta"] = betas[i]

    summary["Information Ratio"] = (inter / results.resid.std()) * np.sqrt(adj)
    summary["Tracking Error"] = results.resid.std() * np.sqrt(adj)
    
    if isinstance(y, pd.Series):
        return pd.DataFrame(summary, index=[y.name])
    else:
        return pd.DataFrame(summary, index=y.columns)


def calc_iterative_regression(y, X, intercept=True, one_to_many=False, adj=12):
    """
    Iterative regression for checking one X column against many different y columns,
    or vice versa. "one_to_many=True" means that we are checking one X column against many
    y columns, and "one_to_many=False" means that we are checking many X columns against a
    single y column.

    To enforce dynamic behavior in terms of regressors and regressands, we
    check that BOTH X and y are DataFrames

    Args:
        y : Target variable(s)
        X : Independent variable(s)
        intercept (bool, optional): Defaults to True.
        one_to_many (bool, optional): Which way to run the regression. Defaults to False.
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        DataFrame : Summary of regression results.
    """

    if not isinstance(X, pd.DataFrame) or not isinstance(y, pd.DataFrame):
        raise TypeError("X and y must both be DataFrames.")

    if one_to_many:
        if X.shape[1] > 1:
            summary = pd.concat(
                [
                    calc_multivariate_regression(y[col], X, intercept, adj)
                    for col in y.columns
                ],
                axis=0,
            )
        else:
            summary = pd.concat(
                [
                    calc_univariate_regression(y[col], X, intercept, adj)
                    for col in y.columns
                ],
                axis=0,
            )
        summary.index = y.columns
        return summary
    else:
        summary = pd.concat(
            [
                calc_univariate_regression(y, X[col], intercept, adj)
                for col in X.columns
            ],
            axis=0,
        )
        summary.index = X.columns
        return summary


# -----------------------
# RISK & RETURN FUNCTIONS
# -----------------------


def calc_return_metrics(data, as_df=False, adj=12):
    """
    Calculate return metrics for a DataFrame of assets.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        as_df (bool, optional): Return a DF or a dict. Defaults to False (return a dict).
        adj (int, optional): Annualization. Defaults to 12.

    Returns:
        Union[dict, DataFrame]: Dict or DataFrame of return metrics.
    """
    summary = dict()
    summary["Annualized Return"] = data.mean() * adj
    summary["Annualized Volatility"] = data.std() * np.sqrt(adj)
    summary["Annualized Sharpe Ratio"] = (
        summary["Annualized Return"] / summary["Annualized Volatility"]
    )
    summary["Annualized Sortino Ratio"] = summary["Annualized Return"] / (
        data[data < 0].std() * np.sqrt(adj)
    )
    return pd.DataFrame(summary, index=data.columns) if as_df else summary


def calc_risk_metrics(data, as_df=False, var=0.05):
    """
    Calculate risk metrics for a DataFrame of assets.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        as_df (bool, optional): Return a DF or a dict. Defaults to False.
        adj (int, optional): Annualizatin. Defaults to 12.
        var (float, optional): VaR level. Defaults to 0.05.

    Returns:
        Union[dict, DataFrame]: Dict or DataFrame of risk metrics.
    """
    summary = dict()
    summary["Skewness"] = data.skew()
    summary["Excess Kurtosis"] = data.kurtosis()
    summary[f"VaR ({var})"] = data.quantile(var, axis=0)
    summary[f"CVaR ({var})"] = data[data <= data.quantile(var, axis=0)].mean()
    summary["Min"] = data.min()
    summary["Max"] = data.max()

    wealth_index = 1000 * (1 + data).cumprod()
    previous_peaks = wealth_index.cummax()
    drawdowns = (wealth_index - previous_peaks) / previous_peaks

    summary["Max Drawdown"] = drawdowns.min()

    summary["Bottom"] = drawdowns.idxmin()
    summary["Peak"] = previous_peaks.idxmax()

    recovery_date = []
    for col in wealth_index.columns:
        prev_max = previous_peaks[col][: drawdowns[col].idxmin()].max()
        recovery_wealth = pd.DataFrame([wealth_index[col][drawdowns[col].idxmin() :]]).T
        recovery_date.append(
            recovery_wealth[recovery_wealth[col] >= prev_max].index.min()
        )
    summary["Recovery"] = ["-" if pd.isnull(i) else i for i in recovery_date]

    summary["Duration (days)"] = [
        (i - j).days if i != "-" else "-"
        for i, j in zip(summary["Recovery"], summary["Bottom"])
    ]

    return pd.DataFrame(summary, index=data.columns) if as_df else summary


def calc_performance_metrics(data, adj=12, var=0.05):
    """
    Aggregating function for calculating performance metrics. Returns both
    risk and performance metrics.

    Args:
        data (pd.DataFrame): DataFrame of asset returns.
        adj (int, optional): Annualization. Defaults to 12.
        var (float, optional): VaR level. Defaults to 0.05.

    Returns:
        DataFrame: DataFrame of performance metrics.
    """
    summary = {
        **calc_return_metrics(data=data, adj=adj),
        **calc_risk_metrics(data=data, var=var),
    }
    summary["Calmar Ratio"] = summary["Annualized Return"] / abs(
        summary["Max Drawdown"]
    )
    return pd.DataFrame(summary, index=data.columns)


In [175]:
all_data = partial(
    pd.read_excel, "gmo_analysis_data.xlsx", index_col=0, parse_dates=[0]
)
signals = all_data(sheet_name="signals")
returns = all_data(sheet_name="total_returns")
risk_free = all_data(sheet_name="risk-free rate")

spy = returns['SPY']

In [176]:
signals_lag = signals.shift(1).dropna()
spy = spy.loc[signals_lag.index]
forecast_df = spy.expanding().mean().shift(1).dropna()
forecast_df.columns = ['Mean']

dp_regr = sm.OLS(spy, sm.add_constant(signals_lag['SPX D/P'])).fit()
forecast_df['SPX D/P'] = dp_regr.params[0] + dp_regr.params[1] * signals['SPX D/P']
ep_regr = sm.OLS(spy, sm.add_constant(signals_lag['SPX E/P'])).fit()
forecast_df['SPX E/P'] = ep_regr.params[0] + ep_regr.params[1] * signals['SPX E/P']

print(f'D/P Regression: Alpha: {dp_regr.params[0]:.4f}, Beta: {dp_regr.params[1]:.4f}, R^2: {dp_regr.rsquared:.4f}')
print(f'E/P Regression: Alpha: {ep_regr.params[0]:.4f}, Beta: {ep_regr.params[1]:.4f}, R^2: {ep_regr.rsquared:.4f}')

multi_regr = sm.OLS(spy, sm.add_constant(signals_lag)).fit()
forecast_df['Multi'] = multi_regr.params[0] + multi_regr.params[1] * signals['SPX D/P'] + multi_regr.params[2] * signals['SPX E/P'] + multi_regr.params[3] * signals['T-Note 10YR']

display(multi_regr.summary())
display(multi_regr.params)


rets_2011 = returns.loc[:"2011"]
rets_2012 = returns.loc["2012":]

forecast_df.dropna(inplace=True)
forecast_df['SPY'] = returns['SPY']
forecast_df['Risk-Free'] = risk_free['TBill 3M']

D/P Regression: Alpha: -0.0098, Beta: 1.0243, R^2: 0.0086
E/P Regression: Alpha: -0.0029, Beta: 0.2220, R^2: 0.0032


  forecast_df['SPX D/P'] = dp_regr.params[0] + dp_regr.params[1] * signals['SPX D/P']
  forecast_df['SPX E/P'] = ep_regr.params[0] + ep_regr.params[1] * signals['SPX E/P']
  print(f'D/P Regression: Alpha: {dp_regr.params[0]:.4f}, Beta: {dp_regr.params[1]:.4f}, R^2: {dp_regr.rsquared:.4f}')
  print(f'E/P Regression: Alpha: {ep_regr.params[0]:.4f}, Beta: {ep_regr.params[1]:.4f}, R^2: {ep_regr.rsquared:.4f}')
  forecast_df['Multi'] = multi_regr.params[0] + multi_regr.params[1] * signals['SPX D/P'] + multi_regr.params[2] * signals['SPX E/P'] + multi_regr.params[3] * signals['T-Note 10YR']


0,1,2,3
Dep. Variable:,SPY,R-squared:,0.01
Model:,OLS,Adj. R-squared:,0.001
Method:,Least Squares,F-statistic:,1.166
Date:,"Sat, 12 Jul 2025",Prob (F-statistic):,0.323
Time:,13:04:08,Log-Likelihood:,581.2
No. Observations:,342,AIC:,-1154.0
Df Residuals:,338,BIC:,-1139.0
Df Model:,3,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
const,-0.0036,0.016,-0.220,0.826,-0.036,0.029
SPX D/P,1.4845,1.147,1.294,0.196,-0.772,3.741
SPX E/P,-0.2392,0.383,-0.625,0.533,-0.993,0.514
T-Note 10YR,-0.0572,0.186,-0.307,0.759,-0.424,0.309

0,1,2,3
Omnibus:,25.643,Durbin-Watson:,1.968
Prob(Omnibus):,0.0,Jarque-Bera (JB):,31.838
Skew:,-0.6,Prob(JB):,1.22e-07
Kurtosis:,3.891,Cond. No.,496.0


const         -0.003614
SPX D/P        1.484509
SPX E/P       -0.239207
T-Note 10YR   -0.057215
dtype: float64

In [177]:
forecast_wt = forecast_df * 100

multi_regr = sm.OLS(spy, sm.add_constant(signals_lag)).fit()


display(multi_regr.summary())
display(multi_regr.params)

print('R Squared: ', multi_regr.rsquared)

0,1,2,3
Dep. Variable:,SPY,R-squared:,0.01
Model:,OLS,Adj. R-squared:,0.001
Method:,Least Squares,F-statistic:,1.166
Date:,"Sat, 12 Jul 2025",Prob (F-statistic):,0.323
Time:,13:04:08,Log-Likelihood:,581.2
No. Observations:,342,AIC:,-1154.0
Df Residuals:,338,BIC:,-1139.0
Df Model:,3,,
Covariance Type:,nonrobust,,

0,1,2,3,4,5,6
,coef,std err,t,P>|t|,[0.025,0.975]
const,-0.0036,0.016,-0.220,0.826,-0.036,0.029
SPX D/P,1.4845,1.147,1.294,0.196,-0.772,3.741
SPX E/P,-0.2392,0.383,-0.625,0.533,-0.993,0.514
T-Note 10YR,-0.0572,0.186,-0.307,0.759,-0.424,0.309

0,1,2,3
Omnibus:,25.643,Durbin-Watson:,1.968
Prob(Omnibus):,0.0,Jarque-Bera (JB):,31.838
Skew:,-0.6,Prob(JB):,1.22e-07
Kurtosis:,3.891,Cond. No.,496.0


const         -0.003614
SPX D/P        1.484509
SPX E/P       -0.239207
T-Note 10YR   -0.057215
dtype: float64

R Squared:  0.010242493642037331


In [178]:
summary1 = dict()
ret = (forecast_wt["Multi"].mean() * 12)
vol = (forecast_wt["Multi"].std() * np.sqrt(12))
sharpe = ret / vol
summary1["Annualized Return"] = ret
summary1["Annualized Volatility"] = vol
summary1["Annualized Sharpe Ratio"] = sharpe

summary2 = dict()
ret = (forecast_wt["SPX D/P"].mean() * 12)
vol = (forecast_wt["SPX D/P"].std() * np.sqrt(12))
sharpe = ret / vol
summary2["Annualized Return"] = ret
summary2["Annualized Volatility"] = vol
summary2["Annualized Sharpe Ratio"] = sharpe

summary3 = dict()
ret = (forecast_wt["SPX E/P"].mean() * 12)
vol = (forecast_wt["SPX E/P"].std() * np.sqrt(12))
sharpe = ret / vol
summary3["Annualized Return"] = ret
summary3["Annualized Volatility"] = vol
summary3["Annualized Sharpe Ratio"] = sharpe

In [179]:
display(summary1)
display(summary2)
display(summary3)

{'Annualized Return': 10.409688789889849,
 'Annualized Volatility': 1.5618520783534433,
 'Annualized Sharpe Ratio': 6.664964585419696}

{'Annualized Return': 10.408286972292466,
 'Annualized Volatility': 1.4352227367651202,
 'Annualized Sharpe Ratio': 7.252036012021333}

{'Annualized Return': 10.417411457724333,
 'Annualized Volatility': 0.8710322002832781,
 'Annualized Sharpe Ratio': 11.959846552557265}

## 3.3

The short term bonds are outperforming the 

In [181]:
forecast_wt = forecast_df * 100
forecast_model = forecast_wt[['SPX D/P', 'SPX E/P', 'Multi']]

display(calc_performance_metrics(forecast_model).T)

calc_iterative_regression(forecast_model, forecast_model[['Actuals']], one_to_many=True).iloc[:-1, :]

TypeError: cannot convert the series to <class 'float'>