<a href="https://colab.research.google.com/github/rpjena/random_matrix/blob/main/stock_factor_beta_signal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Factor Beta Signal: Long/Short Portfolio

Given:
- `returns`: a `(T x N)` DataFrame of stock returns
- `F`: a `(T,)` Series of factor returns

Strategy:
1. Estimate rolling beta of each stock to `F` at each time point
2. Cross-sectionally rank stocks by beta each period
3. Go long top decile(s), short bottom decile(s)
4. Compute strategy performance

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import warnings
warnings.filterwarnings('ignore')

## 1. Synthetic Data Generation

We simulate a factor model:
```
r_i(t) = beta_i(t) * F(t) + epsilon_i(t)
```
where betas are slowly time-varying (random walk) so the signal has realistic dynamics.

In [None]:
np.random.seed(42)

T = 300   # number of time periods (e.g. monthly: ~25 years)
N = 100   # number of stocks

dates = pd.date_range('2000-01-31', periods=T, freq='ME')

# --- Factor returns: mean-zero, std ~3% per month ---
F = pd.Series(np.random.normal(0.0, 0.03, T), index=dates, name='factor')

# --- True betas: slow random walk per stock, centred around 1 ---
true_beta = np.zeros((T, N))
true_beta[0] = np.random.uniform(0.2, 1.8, N)
for t in range(1, T):
    true_beta[t] = true_beta[t-1] + np.random.normal(0, 0.02, N)

# --- Stock returns = beta * F + idiosyncratic noise ---
idio_vol = np.random.uniform(0.02, 0.06, N)          # per-stock idio vol
epsilon = np.random.normal(0, 1, (T, N)) * idio_vol  # (T x N) noise
raw_returns = true_beta * F.values[:, None] + epsilon

returns = pd.DataFrame(raw_returns, index=dates,
                       columns=[f'Stock_{i:03d}' for i in range(N)])

print(f'returns shape : {returns.shape}   (T x N)')
print(f'F shape       : {F.shape}   (T,)')
print(returns.head())

## 2. Rolling Beta Estimation

At each time `t` with rolling window `W`:
```
beta_i(t) = Cov(r_i[t-W:t], F[t-W:t]) / Var(F[t-W:t])
```
Returns a `(T x N)` DataFrame; first `W-1` rows are `NaN`.

In [None]:
def compute_rolling_betas(returns, F, window=60):
    """
    Estimate rolling OLS beta of each stock to factor F.

    Parameters
    ----------
    returns : pd.DataFrame, shape (T, N)
        Stock return panel.
    F : pd.Series, shape (T,)
        Factor return series, aligned to returns.index.
    window : int
        Rolling window length.

    Returns
    -------
    beta_df : pd.DataFrame, shape (T, N)
        Rolling beta estimates; first (window-1) rows are NaN.
    """
    T, N = returns.shape
    beta_vals = np.full((T, N), np.nan)

    F_arr = F.values
    R_arr = returns.values

    for t in range(window - 1, T):
        f_win = F_arr[t - window + 1 : t + 1]          # (W,)
        r_win = R_arr[t - window + 1 : t + 1, :]       # (W, N)

        f_dm = f_win - f_win.mean()                     # demean factor
        r_dm = r_win - r_win.mean(axis=0)               # demean each stock

        var_f = np.dot(f_dm, f_dm)                      # scalar
        if var_f < 1e-12:
            continue
        cov_rf = f_dm @ r_dm                             # (N,): cov of each stock with F
        beta_vals[t, :] = cov_rf / var_f

    beta_df = pd.DataFrame(beta_vals, index=returns.index, columns=returns.columns)
    return beta_df


WINDOW = 60
beta_df = compute_rolling_betas(returns, F, window=WINDOW)

print(f'beta_df shape : {beta_df.shape}')
print(f'NaN rows (burn-in): {beta_df.iloc[:, 0].isna().sum()}')
print(beta_df.dropna().head())

### Beta diagnostics: cross-sectional spread over time

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 4))

# Cross-sectional quantiles of beta over time
q10 = beta_df.quantile(0.1, axis=1)
q50 = beta_df.quantile(0.5, axis=1)
q90 = beta_df.quantile(0.9, axis=1)

ax = axes[0]
ax.fill_between(beta_df.index, q10, q90, alpha=0.3, label='10-90th pct')
ax.plot(beta_df.index, q50, color='navy', lw=1.5, label='Median beta')
ax.set_title('Cross-sectional Beta Distribution Over Time')
ax.set_ylabel('Beta')
ax.legend()
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

# Histogram of betas in the last period
ax = axes[1]
last_betas = beta_df.iloc[-1].dropna()
ax.hist(last_betas, bins=20, color='steelblue', edgecolor='white')
ax.set_title(f'Beta Distribution at t={beta_df.index[-1].date()}')
ax.set_xlabel('Beta')
ax.set_ylabel('Count')

plt.tight_layout()
plt.show()

## 3. Cross-Sectional Signal Construction

At each time `t`, rank all stocks by estimated beta.
- **Long**: top `k` fraction (highest beta exposure to factor)
- **Short**: bottom `k` fraction (lowest beta exposure)
- Within each leg: equal-weight (weights sum to 1 / -1)

In [None]:
def compute_signal(beta_df, k=0.2):
    """
    Create equal-weighted long/short portfolio weights from cross-sectional beta rank.

    Parameters
    ----------
    beta_df : pd.DataFrame, shape (T, N)
        Rolling beta estimates.
    k : float
        Fraction of stocks in each leg (0.2 = top/bottom 2 deciles,
        0.5 = top/bottom 5 deciles).

    Returns
    -------
    weights : pd.DataFrame, shape (T, N)
        Portfolio weights: positive for long, negative for short,
        0 for neutral. Long sums to +1, short sums to -1 each row.
    long_mask : pd.DataFrame, shape (T, N), bool
    short_mask : pd.DataFrame, shape (T, N), bool
    """
    pct_rank = beta_df.rank(axis=1, pct=True, na_option='keep')  # (T, N) in (0, 1]

    long_mask  = pct_rank > (1 - k)   # top k fraction
    short_mask = pct_rank <= k         # bottom k fraction

    n_long  = long_mask.sum(axis=1)   # (T,) count of long stocks per period
    n_short = short_mask.sum(axis=1)  # (T,) count of short stocks per period

    weights = pd.DataFrame(0.0, index=beta_df.index, columns=beta_df.columns)

    # Equal-weight within leg; guard against zero-count periods
    for t in weights.index:
        nl = n_long[t]
        ns = n_short[t]
        if nl > 0:
            weights.loc[t, long_mask.loc[t]]  =  1.0 / nl
        if ns > 0:
            weights.loc[t, short_mask.loc[t]] = -1.0 / ns

    return weights, long_mask, short_mask


K = 0.2   # top/bottom 2 deciles; change to 0.5 for 5 deciles
weights, long_mask, short_mask = compute_signal(beta_df, k=K)

n_long_per_period  = long_mask.sum(axis=1)
n_short_per_period = short_mask.sum(axis=1)

print(f'Avg stocks in long leg : {n_long_per_period.mean():.1f}')
print(f'Avg stocks in short leg: {n_short_per_period.mean():.1f}')
print(f'Sample weights (last period):\n{weights.iloc[-1][weights.iloc[-1] != 0].head(6)}')

## 4. Portfolio Returns

**Lookahead-safe**: signal formed at end of period `t` (using returns through `t`) 
is applied to returns at `t+1`.

```
strat_return(t+1) = long_return(t+1) - short_return(t+1)
```

In [None]:
def compute_portfolio_returns(weights, returns):
    """
    Compute forward-looking L/S portfolio returns.

    Parameters
    ----------
    weights : pd.DataFrame, shape (T, N)
        Portfolio weights at each period (signal formed at t, applied to t+1).
    returns : pd.DataFrame, shape (T, N)
        Stock return panel.

    Returns
    -------
    strat_ret  : pd.Series  L/S strategy returns
    long_ret   : pd.Series  long-leg returns
    short_ret  : pd.Series  short-leg returns
    """
    # Shift weights by 1: signal at t -> return at t+1
    w_lagged = weights.shift(1)

    # Long leg: only positive weights
    w_long  = w_lagged.clip(lower=0)
    # Short leg: only negative weights (absolute values)
    w_short = (-w_lagged).clip(lower=0)

    long_ret  = (w_long  * returns).sum(axis=1)   # weighted avg return of longs
    short_ret = (w_short * returns).sum(axis=1)   # weighted avg return of shorts
    strat_ret = long_ret - short_ret

    # Drop burn-in periods (no signal available)
    valid = strat_ret != 0
    return strat_ret[valid], long_ret[valid], short_ret[valid]


strat_ret, long_ret, short_ret = compute_portfolio_returns(weights, returns)

print(f'Strategy return series: {len(strat_ret)} periods')
print(f'Date range: {strat_ret.index[0].date()} -> {strat_ret.index[-1].date()}')
print(strat_ret.describe())

## 5. Performance Metrics

In [None]:
def compute_performance(ret, freq=12, label='Strategy'):
    """
    Compute and print key performance statistics.

    Parameters
    ----------
    ret : pd.Series
        Return series.
    freq : int
        Periods per year (12 for monthly, 252 for daily).
    label : str
        Label for display.

    Returns
    -------
    stats : dict
        Dictionary of computed statistics.
    cum_ret : pd.Series
        Cumulative wealth index (starts at 1).
    """
    cum_ret = (1 + ret).cumprod()
    total_ret = cum_ret.iloc[-1] - 1
    n_years = len(ret) / freq
    ann_ret = (1 + total_ret) ** (1 / n_years) - 1
    ann_vol = ret.std() * np.sqrt(freq)
    sharpe  = ann_ret / ann_vol if ann_vol > 0 else np.nan

    # Maximum drawdown
    rolling_max = cum_ret.cummax()
    drawdown    = (cum_ret - rolling_max) / rolling_max
    max_dd      = drawdown.min()

    # Hit rate
    hit_rate = (ret > 0).mean()

    stats = dict(
        ann_return=ann_ret,
        ann_vol=ann_vol,
        sharpe=sharpe,
        max_drawdown=max_dd,
        hit_rate=hit_rate,
        n_periods=len(ret),
    )

    print(f'\n--- {label} ---')
    print(f'  Ann. Return  : {ann_ret*100:+.2f}%')
    print(f'  Ann. Vol     : {ann_vol*100:.2f}%')
    print(f'  Sharpe Ratio : {sharpe:.3f}')
    print(f'  Max Drawdown : {max_dd*100:.2f}%')
    print(f'  Hit Rate     : {hit_rate*100:.1f}%')
    print(f'  Periods      : {len(ret)}')

    return stats, cum_ret


FREQ = 12  # monthly data

stats_strat, cum_strat = compute_performance(strat_ret,  freq=FREQ, label=f'L/S Strategy (k={K})')
stats_long,  cum_long  = compute_performance(long_ret,   freq=FREQ, label='Long Leg')
stats_short, cum_short = compute_performance(-short_ret, freq=FREQ, label='Short Leg (inverted)')  # invert for display

## 6. Turnover Analysis

Turnover = fraction of the portfolio that changes each period.
High turnover erodes returns in practice.

In [None]:
def compute_turnover(weights):
    """
    Compute one-way turnover per period.

    Parameters
    ----------
    weights : pd.DataFrame, shape (T, N)
        Portfolio weights (+long, -short, 0 neutral).

    Returns
    -------
    turnover : pd.Series, shape (T,)
        One-way turnover as fraction of gross portfolio (0 to 1).
    """
    diff = weights.diff().abs().sum(axis=1) / 2   # /2 for one-way
    return diff


turnover = compute_turnover(weights)
print(f'Average monthly turnover : {turnover.mean()*100:.1f}%')
print(f'Median monthly turnover  : {turnover.median()*100:.1f}%')

## 7. Performance Plots

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
fig.suptitle(f'Factor Beta Signal: L/S Portfolio  (k={K}, window={WINDOW})', fontsize=14)

# --- 1. Cumulative returns ---
ax = axes[0, 0]
cum_strat.plot(ax=ax, color='navy',   lw=2,   label='L/S Strategy')
cum_long.plot( ax=ax, color='green',  lw=1.5, label='Long leg',  alpha=0.8)
cum_short.plot(ax=ax, color='red',    lw=1.5, label='Short leg (inv)', alpha=0.8)
ax.axhline(1, color='black', lw=0.8, ls='--')
ax.set_title('Cumulative Returns')
ax.set_ylabel('Wealth Index')
ax.legend()
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

# --- 2. Strategy drawdown ---
ax = axes[0, 1]
rolling_max = cum_strat.cummax()
drawdown = (cum_strat - rolling_max) / rolling_max
drawdown.plot(ax=ax, color='crimson', lw=1.5)
ax.fill_between(drawdown.index, drawdown, 0, alpha=0.3, color='crimson')
ax.set_title('Strategy Drawdown')
ax.set_ylabel('Drawdown')
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda y, _: f'{y*100:.0f}%'))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

# --- 3. Rolling Sharpe (36-month window) ---
ax = axes[1, 0]
roll_sharpe = (
    strat_ret.rolling(36).mean() /
    strat_ret.rolling(36).std()
) * np.sqrt(FREQ)
roll_sharpe.plot(ax=ax, color='darkgreen', lw=1.5)
ax.axhline(0, color='black', lw=0.8, ls='--')
ax.set_title('Rolling 36-period Sharpe Ratio')
ax.set_ylabel('Sharpe')
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

# --- 4. Turnover over time ---
ax = axes[1, 1]
turnover[turnover > 0].plot(ax=ax, color='steelblue', lw=1, alpha=0.7)
turnover[turnover > 0].rolling(12).mean().plot(ax=ax, color='navy', lw=2, label='12-period MA')
ax.set_title('One-Way Portfolio Turnover')
ax.set_ylabel('Fraction of portfolio')
ax.legend()
ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y'))

plt.tight_layout()
plt.show()

## 8. Sensitivity Analysis: Decile Threshold `k` and Rolling Window `W`

Sweep over `k` in {0.1, 0.2, 0.3, 0.5} and `window` in {36, 60, 120}.

In [None]:
k_vals      = [0.1, 0.2, 0.3, 0.5]
window_vals = [36, 60, 120]

results = []

for w in window_vals:
    b_df = compute_rolling_betas(returns, F, window=w)
    for k in k_vals:
        wts, _, _ = compute_signal(b_df, k=k)
        s_ret, l_ret, sh_ret = compute_portfolio_returns(wts, returns)
        if len(s_ret) < 12:
            continue
        n_yr  = len(s_ret) / FREQ
        cum   = (1 + s_ret).prod()
        ann_r = cum ** (1 / n_yr) - 1
        ann_v = s_ret.std() * np.sqrt(FREQ)
        sh    = ann_r / ann_v if ann_v > 0 else np.nan
        rolling_max = (1 + s_ret).cumprod().cummax()
        mdd   = ((( 1 + s_ret).cumprod() - rolling_max) / rolling_max).min()
        results.append(dict(window=w, k=k, ann_ret=ann_r, ann_vol=ann_v,
                            sharpe=sh, max_dd=mdd))

results_df = pd.DataFrame(results)
results_df['ann_ret'] = results_df['ann_ret'].map('{:.2%}'.format)
results_df['ann_vol'] = results_df['ann_vol'].map('{:.2%}'.format)
results_df['sharpe']  = results_df['sharpe'].map('{:.3f}'.format)
results_df['max_dd']  = results_df['max_dd'].map('{:.2%}'.format)
print(results_df.to_string(index=False))

### Sharpe heatmap: window Ã— k

In [None]:
import seaborn as sns

# Re-run without string formatting to get numeric Sharpe values
results_num = []
for w in window_vals:
    b_df = compute_rolling_betas(returns, F, window=w)
    for k in k_vals:
        wts, _, _ = compute_signal(b_df, k=k)
        s_ret, _, _ = compute_portfolio_returns(wts, returns)
        if len(s_ret) < 12:
            continue
        n_yr  = len(s_ret) / FREQ
        cum   = (1 + s_ret).prod()
        ann_r = cum ** (1 / n_yr) - 1
        ann_v = s_ret.std() * np.sqrt(FREQ)
        sh    = ann_r / ann_v if ann_v > 0 else np.nan
        results_num.append(dict(window=w, k=k, sharpe=sh))

pivot = pd.DataFrame(results_num).pivot(index='window', columns='k', values='sharpe')

fig, ax = plt.subplots(figsize=(7, 4))
sns.heatmap(pivot, annot=True, fmt='.3f', cmap='RdYlGn', center=0, ax=ax,
            linewidths=0.5, linecolor='grey')
ax.set_title('Sharpe Ratio: Rolling Window vs Decile Threshold k')
ax.set_xlabel('k (fraction in each leg)')
ax.set_ylabel('Rolling window')
plt.tight_layout()
plt.show()