*Imports*

In [None]:
""" 
pip install pandas
pip install numpy
pip install matplotlib
pip install plotly
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import skew, kurtosis
from datetime import date
import plotly.graph_objects as go
from collections import defaultdict

### *Data*

In [23]:
# Yields/Price/Duration
df = pd.read_csv('CRSP_UST.csv')

### *Pre-processing*

In [25]:
# Filter
df = df[df['TIDXFAM'] == 'FIXEDTERM'][['CALDT','TTERMLBL','TDNOMPRC','TDYTM','TDDURATN']]

# Multi Index
df = (df
 .sort_values(['CALDT','TTERMLBL'],ascending=True)
 .pivot(index = 'CALDT',columns='TTERMLBL',values = ['TDNOMPRC','TDYTM','TDDURATN'])
 )

new_column_names = {
    'CRSP Fixed Term Index - 1-Year (Nominal)': '1yr', 'CRSP Fixed Term Index - 2-Year (Nominal)': '2yr',
    'CRSP Fixed Term Index - 5-Year (Nominal)': '5yr', 'CRSP Fixed Term Index - 7-Year (Nominal)': '7yr',
    'CRSP Fixed Term Index - 10-Year (Nominal)': '10yr', 'CRSP Fixed Term Index - 20-Year (Nominal)': '20yr',
    'CRSP Fixed Term Index - 30-Year (Nominal)': '30yr'
}

# Rename columns
df.columns = pd.MultiIndex.from_tuples(
    [(level_0, new_column_names.get(level_1, level_1)) for level_0, level_1 in df.columns],
    names=df.columns.names
)

# Reorder columns
column_order = ['1yr', '2yr', '5yr', '7yr', '10yr', '20yr', '30yr']
ordered_columns = pd.MultiIndex.from_tuples(
    [(level_0, col) for level_0 in df.columns.get_level_values(0).unique() for col in column_order],
    names=df.columns.names
)

df = df[ordered_columns]

## Strategy Backtest

###  *Backtest code*

In [27]:
class Position:

    def __init__(self, entry_date, bond, entry_price, quantity,
                 direction, duration, trade_id, maturities,is_hedge = False):
        
        self.entry_date = entry_date
        self.bond = bond
        self.entry_price = entry_price  
        self.quantity = quantity 
        self.direction = direction 
        self.duration = duration  
        self.trade_id = trade_id  
        self.maturities = maturities  # List of maturities associated with this trade
        self.is_hedge = is_hedge
        self.closed = False
        self.exit_date = None
        self.exit_price = None

    def close(self,exit_date,exit_price):
        """ 
        Closes a position
        """
        
        self.exit_date = exit_date
        self.exit_price = exit_price
        self.closed = True
    
    def __str__(self):
        return ''

In [28]:
class YCA:
    def __init__(self, data, start_date, end_date, window_length, z_enter, z_exit, universe, txc=0.0005,hedge_threshold=0.5):
        """
        Initialize the strategy.

        Parameters:
        - data (pd.DataFrame): Market data with yields, prices, and durations.
        - start_date, end_date (datetime): Strategy start and end dates.
        - window_length (int): Look-back window for signal generation.
        - z_enter, z_exit (float): Z-score thresholds for entry and exit.
        - universe (list of tuples): List of traded tuples (low, mid, high) for maturities.
        - txc (float): Transaction cost percentage.
        - hedge_threshold (float): Maximum allowable portfolio duration 


        TODO:
            - Additional duration hedge trades not implemented
            - Currently uses fixed amount per trade, should be changed. 
            - Currently does not include transaction costs in trades
        """
        self.data = data
        self.start_date = start_date
        self.end_date = end_date
        self.window_length = window_length
        self.z_enter = z_enter
        self.z_exit = z_exit
        self.universe = universe
        self.txc = txc
        self.hedge_threshold = hedge_threshold

        # State tracking
        self.signals = defaultdict(list)
        self.positions = []
        self.daily_returns = pd.Series(dtype='float64')
        self.daily_duration = pd.Series(dtype='float64')
        self.daily_dollar_exposure = pd.Series(dtype='float64')

    def run(self):
        """Run the strategy over the specified date range."""

        for idx in range(self.window_length, len(self.data)):

            # 1. Generate Signals
            self._generate_signals(idx)

            # 2. Close Positions
            self._close_on_reversion(idx)

            # 3. Construct Portfolio
            self._construct_portfolio(idx)

            # 4. Hedge Decision (Not yet completed)
            # if self._needs_duration_hedge(idx):
            #    self._hedge_duration(idx)

            # 5. Compute + Store Statistics
            self._log_daily_returns(idx)
            self._log_daily_duration(idx)
            self._log_daily_dollar_exposure(idx)

    def _generate_signals(self, idx):
        """Generate entry signals based on z-scores."""

        dt = self.data.index[idx]
        for maturities in self.universe:
            low, mid, high = maturities
            
            # Observations for the current maturities
            x1 = self.data.loc[(dt, 'TDYTM')][low]
            x2 = self.data.loc[(dt, 'TDYTM')][mid]
            x3 = self.data.loc[(dt, 'TDYTM')][high]

            # Past window values
            y1 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][low].T.values
            y2 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][mid].T.values
            y3 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][high].T.values

            # Calculate observed
            observed = x1 - x2 + x3

            # Calculate z-score
            z = self._calculate_zscore(y1, y2, y3, observed)

            # Store the signal if it exceeds the threshold
            if abs(z) > self.z_enter:
                self.signals[dt].append([z, maturities])

    def _calculate_zscore(self,y1,y2,y3,obs):
        """
        Returns z-score for a given observation
        """

        spread = y1 - y2 + y3
        mu = spread.mean()
        sigma = spread.std()
        return (obs - mu) / sigma
    
    def _close_on_reversion(self, idx):
        """Close positions if z-score reverts below exit threshold."""
        dt = self.data.index[idx]
        
        # Track the trade_ids that are associated with mean reversion
        trades_to_close = set()
        
        for pos in self.positions:
            if not pos.closed and not pos.is_hedge:

                # Get associate maturities
                low, mid, high = pos.maturities
                
                # Current
                x1 = self.data.loc[(dt, 'TDYTM')][low]
                x2 = self.data.loc[(dt, 'TDYTM')][mid]
                x3 = self.data.loc[(dt, 'TDYTM')][high]
                
                # Window
                y1 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][low].T.values
                y2 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][mid].T.values
                y3 = self.data.iloc[idx - self.window_length: idx, :]['TDYTM'][high].T.values
                
                observed = x1 - x2 + x3
                zscore = self._calculate_zscore(y1, y2, y3, observed)

                if abs(zscore) < self.z_exit:  
                    trades_to_close.add(pos.trade_id)
    
        # Close required positions
        for pos in self.positions:
            if pos.trade_id in trades_to_close and not pos.closed:
                exit_price = self.data.loc[(dt, 'TDNOMPRC')][pos.bond]
                pos.close(exit_date = dt,exit_price = exit_price)

    def _construct_portfolio(self, idx):
        """Open new positions based on signals.
        
        TODO: 
            - Not sure if we should open fixed amount each time?
        """
        dt = self.data.index[idx] 

        # If no signal for the current date, skip
        if dt not in self.signals:
            return  

        for zscore, maturities in self.signals[dt]:
            low, mid, high = maturities 
            trade_id = f"trade_{dt}_{low}_{mid}_{high}"

            # Get prices for the bonds
            p_s = self.data.loc[(dt, 'TDNOMPRC')][low]
            p_m = self.data.loc[(dt, 'TDNOMPRC')][mid]
            p_l = self.data.loc[(dt, 'TDNOMPRC')][high]

            # Get durations for the bonds
            d_s = self.data.loc[(dt, 'TDDURATN')][low]
            d_m = self.data.loc[(dt, 'TDDURATN')][mid]
            d_l = self.data.loc[(dt, 'TDDURATN')][high]

            # Amount allocated to middle bond
            q_m = 100 

            # Solve for quantities based on dollar-duration neutrality
            q_s, q_l = self._solve_dollar_duration_neutral(p_s, p_m, p_l, d_s, d_m, d_l, q_m)

            # Place Orders
            if zscore < 0:  # Yield Curve Butterfly Trough [Short Body, Long Wings]
                self._place_order(dt, low, p_s, int(q_s), 'long', d_s, trade_id, maturities,is_hedge=False)
                self._place_order(dt, mid, p_m, int(q_m), 'short', d_m, trade_id, maturities, is_hedge=False)
                self._place_order(dt, high, p_l, int(q_l), 'long', d_l, trade_id, maturities, is_hedge=False)

            if zscore > 0:  # Yield Curve Butterfly Hump [Long Body, Short Wings]
                self._place_order(dt, low, p_s, int(q_s), 'short', d_s, trade_id, maturities, is_hedge=False)
                self._place_order(dt, mid, p_m, int(q_m), 'long', d_m, trade_id, maturities, is_hedge=False)
                self._place_order(dt, high, p_l, int(q_l), 'short', d_l, trade_id, maturities, is_hedge=False)
    
    def _solve_dollar_duration_neutral(self,P_s, P_m, P_l, D_s, D_m, D_l, q_m):

        """ 
        Solves the dollar-duration neutral required positions given inputs.

        A [q_s,q_l]       = b  
        p_s*q_s + p_l*q_l = q_m*p_m
        d_s*q_s + d_l*q_l = q_m*d_m
        """

        A = np.array([
            [P_s, P_l],
            [D_s, D_l]
        ])
        
        b = np.array([
            q_m * P_m,
            q_m * D_m
        ])
        
        # Solve
        q_s, q_l = np.linalg.solve(A, b)
        
        return q_s, q_l
    

    def _calculate_portfolio_duration(self, idx):
        """Calculate net portfolio duration in days."""
        total_market_value = 0
        weighted_duration = 0

        for position in self.positions:
            if not position.closed:  # Only open positions
        
                current_price = self.data.loc[self.data.index[idx], ('TDNOMPRC', position.bond)]
                market_value = current_price * position.quantity
                
                # Adjust for short positions
                effective_duration = position.duration * position.quantity
                if position.direction == 'short':
                    effective_duration *= -1
                
                weighted_duration += effective_duration * current_price
                total_market_value += market_value

        # No Positions
        if total_market_value == 0:
            return 0  
        else:
            daily_duration = weighted_duration / total_market_value
            return daily_duration

    def _needs_duration_hedge(self,idx):
        """Returns boolean value if duration hedge is required"""

        duration = self._calculate_portfolio_duration(idx)
        return abs(duration) > self.hedge_threshold

    def _hedge_duration(self, idx):
        """
        Hedge duration risk using the best available bond.
        
        TODO: 
            - Not implemented.
        """
        pass


    def _place_order(self, dt, bond, price, qty, direction, duration,trade_id, maturities, is_hedge=False):
        """Helper to create and store a new position."""
        pos = Position(
            entry_date=dt, bond=bond, entry_price=price, duration=duration,
            quantity=qty, direction=direction, trade_id=trade_id,
            maturities=maturities, is_hedge=is_hedge
        )
        self.positions.append(pos)

    def _log_daily_returns(self, idx):
        """Log the daily portfolio returns."""
        dt = self.data.index[idx]
        total_value = 0
        weighted_daily_return = 0

        for pos in self.positions:
            if not pos.closed:  # Returns only on open positions
                
                # Get last and current prices
                last_price = self.data.iloc[idx - 1]['TDNOMPRC'][pos.bond]
                curr_price = self.data.iloc[idx]['TDNOMPRC'][pos.bond]

                # Direction adjustment for short positions
                if pos.direction == 'short':
                    d = -1
                else:
                    d = 1

                # Calculate the return for this position
                position_value_last = last_price * pos.quantity
                position_value_current = curr_price * pos.quantity
                position_return = (position_value_current / position_value_last - 1) * d

                total_value += position_value_last
                weighted_daily_return += position_value_last * position_return
        
        # Final weighted daily return for the portfolio
        if total_value == 0:
            portfolio_return = 0
        else:
            portfolio_return = weighted_daily_return / total_value

        self.daily_returns[dt] = portfolio_return

    def _log_daily_duration(self, idx):
        """Log the daily portfolio duration."""

        dt = self.data.index[idx]
        duration = self._calculate_portfolio_duration(idx)
        self.daily_duration[dt] = duration

    def _log_daily_dollar_exposure(self,idx):
        """Log the daily portfolio dollar exposure"""
        total_market_value = 0
        dt = self.data.index[idx]
        for position in self.positions:
            if not position.closed:  # Only open positions
        
                current_price = self.data.loc[self.data.index[idx], ('TDNOMPRC', position.bond)]
                market_value = current_price * position.quantity
                
                if position.direction == 'short':
                    market_value *= -1
                
                total_market_value += market_value

        # No Positions
        self.daily_dollar_exposure[dt] = total_market_value

    def _get_clean_returns(self):

        """Cleans the daily returns """

        df = pd.DataFrame(self.daily_returns, columns=['daily_return'])
        df['cum_return'] = (1+df['daily_return']).cumprod()

        return df
    
    def _get_clean_duration(self):
        """Cleans the daily duration series"""
        return pd.DataFrame(self.daily_duration,columns=['duration'])
    
    def _get_clean_exposure(self):
        """Cleans the daily exposure series"""
        return pd.DataFrame(self.daily_dollar_exposure,columns=['Dollar Exposure'])

## *Results*

### 1. Inputs

In [35]:
# In-Sample Dates
start_date = date(1990,1,1)
end_date = date(2010,1,1)

# Esimtation Window Length for z-scores
window_length = 100

# z-score threshold
z_enter = 4
z_exit = 1

# Traded Maturities (Triplet of Low, Mid, High)
universe = [['5yr','7yr','10yr']]

# txc
txc = 0.0005

# Hedge Threshold
hedge_threshold = 0.5

# Filter for In-Sample Data
df.index = pd.to_datetime(df.index).date
df = df[(df.index <= end_date) & (df.index >= start_date)]

### 2. Run Strategy

In [36]:
strategy = YCA(data = df, 
               start_date = start_date, end_date = end_date,
               window_length= window_length, z_enter = z_enter,z_exit = z_exit,universe = universe,
               txc = txc, hedge_threshold = hedge_threshold)
strategy.run()

### 3. Plots

In [37]:
returns = strategy._get_clean_returns()
d = strategy._get_clean_duration()
exp = strategy._get_clean_exposure()

*Cumulative Return*

In [38]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x = returns.index,
        y = returns['cum_return'],
    )
)

fig.update_layout(title = 'Cumulative Returns (In Sample)')
fig.update_xaxes(title= 'Date')
fig.update_yaxes(title = 'Cumulative Return')
fig.show()

*Daily Returns*

In [10]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x = returns.index,
        y = returns['daily_return'],
    )
)

fig.update_layout(title = 'Daily Returns (In Sample)')
fig.update_xaxes(title= 'Date')
fig.update_yaxes(title = 'Return')
fig.show()

*Hedge Effectiveness (Duration)*

In [34]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x = d.index,
        y = d['duration'] / 365,
    )
)

fig.update_layout(title = 'Duration Exposure (In Sample)')
fig.update_xaxes(title= 'Date')
fig.update_yaxes(title = 'Duration (Years)')
fig.show()

*Hedge Effectiveness (Dollar Exposure)*

In [12]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x = exp.index,
        y = exp['Dollar Exposure'],
    )
)

fig.update_layout(title = 'Dollar Exposure (In Sample)')
fig.update_xaxes(title= 'Date')
fig.update_yaxes(title = 'Dollars')
fig.show()

### 4. Statistics

**Sharpe Ratio**

In [13]:
print(f'Annualize Sharpe Ratio {returns['daily_return'].mean()/returns['daily_return'].std() * np.sqrt(252)}')

Annualize Sharpe Ratio 0.33152970081696076


**Return Statistics**

In [14]:
print(f'Mean Returns: {returns['daily_return'].mean()*100*252}')
print(f'Standard Deviation of Returns: {returns['daily_return'].std()*100*np.sqrt(252)}')
print(f'Return Skewness {skew(returns['daily_return'])}') # low sample points
print(f'Return Kurtosis {kurtosis(returns['daily_return'])}') # low sample points

Mean Returns: 1.5204916270536013
Standard Deviation of Returns: 4.586290830977683
Return Skewness 53.158655370226576
Return Kurtosis 2985.4334744894654


**Drawdown**

In [15]:
running_max = returns['cum_return'].cummax()
drawdown = (returns['cum_return'] - running_max) / running_max  
max_drawdown = drawdown.min() 
print(f'Maximum Drawdown: {max_drawdown*100}')

Maximum Drawdown: -1.127235970036967


**Sortino Ratio**

In [16]:
mu = returns['daily_return'].mean()
dsd = returns[returns['daily_return'] <= 0]['daily_return'].std()
print(f'Annualized Sortino Ratio {mu/dsd * np.sqrt(252)}') # not many negative returns

Annualized Sortino Ratio 7.099562952216958


**Long vs Short Returns**

In [17]:
long = []
short = []
for p in strategy.positions:
    if not p.is_hedge:
        
        if p.direction == 'long':
            pos_ret = (p.exit_price / p.entry_price - 1)*100
            long.append(pos_ret)
        else:
            pos_ret = -(p.exit_price / p.entry_price - 1)*100
            short.append(pos_ret)
print(f'Average Long Return: {np.mean(long)}')
print(f'Average Short Return: {np.mean(short)}')

Average Long Return: 4.962769913999542
Average Short Return: 4.876228786472715


**Number of Trades Placed**


In [18]:
uid = set()
for p in strategy.positions:
    if not p.is_hedge:
        uid.add(p.trade_id)
print(f'Number of Positions Taken: {len(uid)}')

Number of Positions Taken: 3


**Average Time in Market**

In [19]:
market_days = set()
for p in strategy.positions:
    if not p.is_hedge:
        market_days.add((p.exit_date - p.entry_date).days)
print(f'Average Time in Market per Trade {round(np.mean(list(market_days)),2)}')

Average Time in Market per Trade 49.67


### **Extra: Strategy Z-scores**

In [39]:
spread = df['TDYTM']['10yr'] + df['TDYTM']['30yr'] - df['TDYTM']['20yr']

# Calculate rolling mean and rolling standard deviation for a 100-day window
rolling_mean = spread.rolling(window=100).mean()
rolling_std = spread.rolling(window=100).std()

# Compute the z-score for each point
z_scores = (spread - rolling_mean) / rolling_std
z_scores = z_scores.dropna()

z_scores = pd.DataFrame(z_scores,columns=['z'])

fig = go.Figure()

fig.add_trace(
    go.Scatter(
        x = z_scores.index,
        y = z_scores['z']
    )
)

fig.update_yaxes(title = 'z-score')
fig.update_xaxes(title = 'Date')
fig.show()