In [1]:
import os
from collections import defaultdict

from src.data_loading import load_equity_data, load_macro_data
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
UNK: float = -99.99

raw_data = load_equity_data(name="train")
R_t = raw_data[:, :, 0]
X = raw_data[:, :, 1:]

# Compute a mask to keep track of which values are available
mask = R_t != UNK

X_clean = X[mask]
y_clean = R_t[mask]

idxs = np.sum(mask, axis=1)
idxs = np.cumsum(idxs)

We start with a standard, cross-validated OLS regression to get some estimate of the baseline $R^2$ we can expect to achieve.

# Benchmark returns on features OLS

In [3]:
cross_cov = X_clean * y_clean[:, None]  # I_ti * R_t = F_tilde

# Now mean over assets in period
f_tilde = np.vsplit(cross_cov, idxs)
f_tilde = [
    np.mean(f, axis=0) for f in f_tilde if len(f) > 0
]  # mean over assets in period
f_tilde = np.row_stack(f_tilde)  # shape: (n_periods, n_factors)

cov = np.cov(f_tilde.T)
E_f_tilde = f_tilde.mean(axis=0)  # shape: (n_factors,), mean over periods

# yields: theta_LS = (Sigma)^-1 * E[F_tilde]
theta_LS = np.linalg.solve(cov, E_f_tilde)
theta_LS

In [4]:
# Now backtest it
w = X_clean @ theta_LS

monthly_portfolio_return = zip(
    np.split(y_clean, idxs, axis=0), np.split(w, idxs, axis=0)
)
monthly_portfolio_return = np.array(
    [
        np.sum(r * w) / np.sum(np.abs(w))
        for r, w in monthly_portfolio_return
        if len(r) > 0
    ]
)

equity = np.cumsum(monthly_portfolio_return)

plt.plot(equity)
plt.show()

In [5]:
from typing import Literal


def compute_portfolio_return_for_linear_portfolio_weights(
    theta: np.ndarray, raw_data: np.ndarray
) -> np.ndarray:
    R_t = raw_data[:, :, 0]
    X = raw_data[:, :, 1:]

    mask = R_t != UNK
    X_clean = X[mask]
    y_clean = R_t[mask]

    idxs = np.sum(R_t != UNK, axis=1)
    idxs = np.cumsum(idxs)

    w = X_clean @ theta

    monthly_portfolio_return = zip(
        np.split(y_clean, idxs, axis=0), np.split(w, idxs, axis=0)
    )
    monthly_portfolio_return = np.array(
        [
            np.sum(r * w) / np.sum(np.abs(w))
            for r, w in monthly_portfolio_return
            if len(r) > 0
        ]
    )

    return monthly_portfolio_return


dat = load_equity_data(name="valid")
equity_curve = compute_portfolio_return_for_linear_portfolio_weights(
    theta_LS, dat
)
equity_curve

In [6]:
eq_curves = []
stats = defaultdict(lambda: defaultdict(list))

for ds in ("train", "valid", "test"):
    dat = load_equity_data(name=ds)
    e = compute_portfolio_return_for_linear_portfolio_weights(theta_LS, dat)
    stats[ds]["mean"] = np.mean(e)
    stats[ds]["std"] = np.std(e)
    stats[ds]["sharpe"] = stats[ds]["mean"] / stats[ds]["std"]
    stats[ds]["VaR"] = np.percentile(e, 5)

    e = np.cumprod(1 + e)
    stats[ds]["total_return"] = e[-1]
    stats[ds]["max_drawdown"] = np.max(np.maximum.accumulate(e) - e)
    eq_curves.append(e)

eq_curves = np.concatenate(eq_curves)
# eq_curves = np.exp(eq_curves) * 100.0
# log scale on the y axis
fig, ax = plt.subplots()

ax.plot(eq_curves)
ax.set_yscale("log")

plt.show()

pd.DataFrame(stats)

# Betas and Fama Macbeth Regression for the one-factor model


In [7]:
from sklearn.linear_model import LinearRegression

F_t = compute_portfolio_return_for_linear_portfolio_weights(theta_LS, raw_data)

# Replace UNK with nan and take the nanmean then
R_t_nan = R_t.copy()
R_t_nan[R_t_nan == UNK] = np.nan


def compute_beta(
    r_t: np.ndarray, f_t: np.ndarray, estimate_alpha: bool = False
) -> float:
    """Returns a tuple, beta for a single asset."""
    reg = LinearRegression(fit_intercept=estimate_alpha)

    # Find the indexes that are not nan for the r_t
    clean_indexes = np.isfinite(r_t)
    reg.fit(f_t[clean_indexes, None], r_t[clean_indexes])

    beta: float = reg.coef_[0]

    return beta


beta = [compute_beta(r, F_t) for r in R_t_nan.T]
beta = np.array(beta)

In [8]:
# Have to be careful with nans in the asset returns
# Note: there is quite the difference whether we regress an intercept (alpha) or not to find the beta
# See above function to find the beta
# beta = np.nanmean(R_t_nan * F_t[:, None], axis=0)
stock_indexes = np.arange(len(beta))
sorted_indexes = np.argsort(beta)
sorted_R_t = R_t_nan[:, sorted_indexes]

decile_portfolios = [
    np.nanmean(s, axis=1)
    for s in np.array_split(sorted_R_t, 10, axis=1)
    if len(s) > 0
]
port_betas = [
    np.mean(b) for b in np.array_split(beta[sorted_indexes], 10) if len(b) > 0
]
port_betas = np.array(port_betas)

# And plot the cumulative returns of the decile portfolios
fig, axs = plt.subplots(1, 2, figsize=(20, 10))

# Put them all in one plot
for i, r in enumerate(decile_portfolios):
    # Replace nan return with 0.0
    cleaned_r = np.nan_to_num(r)
    cleaned_r = cleaned_r / np.std(cleaned_r)
    equity = np.cumsum(cleaned_r)

    axs[0].scatter(np.arange(len(equity)), equity, label=f"Decile {i + 1}")

    # label the decile
    axs[0].text(0, equity[-1], f"Decile {i + 1}")


# And make a scatter plot with expected portfolio return and portfolio beta for the decile portfolios
exp_portf_return = [np.nanmean(r) for r in decile_portfolios if len(r) > 0]
axs[1].scatter(port_betas, exp_portf_return)

# And of course add a regression line that shows the slope (market price of risk)
reg = LinearRegression(fit_intercept=True).fit(
    X=port_betas[:, None], y=exp_portf_return
)

# And draw that slope as a line in the right chart
p: float = 0.1
x = np.linspace(np.min(port_betas) * (1 + p), np.max(port_betas) * (1 + p), 100)
y = reg.predict(x[:, None])

axs[1].plot(
    x, y, label=f"Market price of risk: {reg.coef_[0]:.2f}", color="red"
)

# Set y axis scale to [-0.1, 0.1]
axs[1].set_ylim([-0.1, 0.1])

# Add legend
plt.show()

In [9]:
import statsmodels.api as sm

expected_returns = np.nanmean(R_t_nan, axis=0)
sm.OLS(endog=expected_returns, exog=sm.add_constant(beta)).fit().summary()

In [10]:
# And make a regression plot with seaborn
reg_df = pd.DataFrame(dict(portfolio_return=expected_returns, beta=beta))

# Make it a bit more transparent
sns.regplot(
    data=reg_df, x="beta", y="portfolio_return", scatter_kws={"alpha": 0.2}
)
plt.show()

# Time-Varying betas with another linear regression

$$\beta_{t, i} = \gamma ^T I_{t, i}$$

$\gamma$ is constant OLS regression coefficient obtained from *regression of $R_{t,i} F_t$ on $I_{t,i}$*.

In [11]:
F_t = []

w = X_clean @ theta_LS
w_splitted = np.split(w, idxs, axis=0)
returns_splitted = np.split(y_clean, idxs, axis=0)

for w, r in zip(w_splitted, returns_splitted):
    if len(w) == 0:
        continue

    # Compute the factor return
    f = np.sum(r * w) / np.sum(np.abs(w))
    # f = np.sum(r * w) / np.sqrt(w.dot(w))
    F_t.append(f)

F_t = np.array(F_t)
F_t = F_t / np.std(F_t)

plt.plot(np.cumsum(F_t))

In [12]:
# Regression to yield an estimate for the time-varying beta
regression_target = (R_t_nan * (F_t[:, None]))[mask]
reg = LinearRegression(fit_intercept=False).fit(X=X_clean, y=regression_target)

beta_ls = reg.predict(X_clean)
beta_splitted = np.split(beta_ls, idxs, axis=0)

n_quantiles: int = 10
decile_portfolios = [list() for _ in range(n_quantiles)]


for i, (b, r) in enumerate(zip(beta_splitted, returns_splitted)):
    if len(b) == 0:
        continue

    # Sort the betas
    sorted_indexes = np.argsort(b)
    sorted_R_t = r[sorted_indexes]
    sorted_R_t = sorted_R_t[~np.isnan(sorted_R_t)]

    # Drop the nans and split them into decile portfolios
    portf_returns = np.array_split(sorted_R_t, n_quantiles)
    portf_returns = [np.nanmean(r) for r in portf_returns if len(r) > 0]

    for j, r in enumerate(portf_returns):
        decile_portfolios[j].append(float(r))


decile_portfolios = np.row_stack(decile_portfolios)

In [13]:
# And plot the equity curves like before

fig, ax = plt.subplots(figsize=(20, 10))

for i, r in enumerate(decile_portfolios):
    equity = np.cumsum(r) / np.std(r)
    ax.scatter(np.arange(len(equity)), equity, label=f"Decile {i + 1}")

plt.show()

In [14]:
np.mean(F_t) / np.std(F_t)

In [15]:
# And now calculate R_hat from that
# beta * F_t / (b^T b)

eps = []

for i, (b, r, f) in enumerate(zip(beta_splitted, returns_splitted, F_t)):
    if len(b) == 0:
        continue

    # Compute the residuals
    # r_hat = b * f / np.sum(b**2)
    r_hat = b.dot(r) * b / (np.sum(np.square(b)))
    # r_hat = b.dot(r) * b / np.sqrt(np.sum(b**2))
    # r_hat = b.dot(r) * b / np.sum(np.absolute(b))
    e = r - r_hat
    eps.extend(e)


eps = np.array(eps)

In [16]:
# r2 = 1 - (np.sum(eps**2) / np.sum(y_clean**2))
r2 = 1 - (
    np.sum([np.mean(e**2) for e in np.split(eps, idxs, axis=0) if len(e) > 0])
    / np.sum(
        [np.mean(r**2) for r in np.split(y_clean, idxs, axis=0) if len(r) > 0]
    )
)

r2