In [1]:
import pandas as pd
import numpy as np
import datetime
import holidays
import seaborn as sns

import matplotlib.pyplot as plt
import matplotlib.dates as mdates

from sklearn.linear_model import LinearRegression

from scipy.optimize import minimize
from scipy import interpolate
from scipy.optimize import fsolve

from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.tseries.offsets import CustomBusinessDay
# The code is written with the help of OpenAI's ChatGPT.com

In [None]:
# Define a function to calculate interest rates from discount factors
def discount_to_intrate(discount, maturity, n_compound=None):

    # If the compounding frequency is not provided, assume continuous compounding
    if n_compound is None:
        # Calculate the continuously compounded interest rate
        intrate = - np.log(discount) / maturity
    else:
        # Calculate the interest rate with discrete compounding
        intrate = n_compound * (1 / discount**(1 / (n_compound * maturity)) - 1)
    
    # Return the computed interest rate
    return intrate

# Convert the Interest/Discount Rate to Discount Factor
def intrate_to_discount(intrate, maturity, n_compound=None):
    # Converts interest rates into discount factors for time value of money calculations.
    # If compounding frequency is not specified, continuous compounding is assumed.
    
    if n_compound is None:
        # Compute the discount factor using the continuous compounding formula:
        # discount = e^(-rate * maturity)
        discount = np.exp(-intrate * maturity)
    else:
        # Compute the discount factor using periodic compounding:
        # discount = 1 / (1 + rate/n_compound)^(n_compound * maturity)
        discount = 1 / (1 + (intrate / n_compound))**(n_compound * maturity)
    
    return discount  # Return the computed discount factor.

def get_coupon_dates(quote_date, maturity_date):
    # Check if quote_date is a string; if yes, convert it to a datetime object
    if isinstance(quote_date, str):
        quote_date = datetime.datetime.strptime(quote_date, '%Y-%m-%d')
        
    # Check if maturity_date is a string; if yes, convert it to a datetime object
    if isinstance(maturity_date, str):
        maturity_date = datetime.datetime.strptime(maturity_date, '%Y-%m-%d')
    
    # Calculate the number of semi-annual periods between quote_date and maturity_date
    # Use ceil to ensure we cover the full period (even if partial)
    num_periods = int(np.ceil((maturity_date - quote_date).days / 180))  # Convert to int

    # Generate semi-annual dates going backward from the maturity date
    temp = pd.date_range(
        end=maturity_date, 
        periods=num_periods, 
        freq=pd.DateOffset(months=6)
    )
    
    # Keep only the dates that are after the quote date
    temp = pd.DataFrame(data=temp[temp > quote_date])
    
    # Extract the first column of the DataFrame and return it as the result
    out = temp[0]
    return out


def filter_treasury_cashflows(CF, filter_maturity_dates=False, filter_benchmark_dates=False, filter_CF_strict=True):
    # Initialize an empty list to store the benchmark dates
    mask_benchmark_dts = []
    
    # Loop through each column (date) in the cash flow DataFrame
    for col in CF.columns:
        # If filtering by benchmark dates is enabled
        if filter_benchmark_dates:
            # Check if the date is one of the benchmark dates (Feb 15, May 15, Aug 15, Nov 15)
            if col.month in [2, 5, 8, 11] and col.day == 15:
                mask_benchmark_dts.append(col)  # Add it to the list of benchmark dates
        else:
            # If not filtering by benchmark dates, include all dates
            mask_benchmark_dts.append(col)

    # If filtering by maturity dates is enabled
    if filter_maturity_dates:
        # Select columns where at least one cash flow >= 100 (indicating maturity)
        mask_maturity_dts = CF.columns[(CF >= 100).any()]
    else:
        # If not filtering by maturity dates, include all columns
        mask_maturity_dts = CF.columns

    # Combine the two filters: Keep only dates present in both lists
    mask = [i for i in mask_benchmark_dts if i in mask_maturity_dts]

    # Filter the cash flow DataFrame to include only the selected dates
    CF_filtered = CF[mask]
          
    if filter_CF_strict:
        # Strict filtering: Keep only rows where cash flows on included dates match the original cash flows
        mask_bnds = CF_filtered.sum(axis=1) == CF.sum(axis=1)
        CF_filtered = CF_filtered[mask_bnds]
    else:
        # Lenient filtering: Keep rows with at least one non-zero cash flow on included dates
        mask_bnds = CF_filtered.sum(axis=1) > 0
        CF_filtered = CF_filtered[mask_bnds]
        
    # Remove columns (dates) where all cash flows are zero
    CF_filtered = CF_filtered.loc[:, (CF_filtered > 0).any()]
    
    # Return the filtered cash flow DataFrame
    return CF_filtered


def filter_treasuries(data, t_date=None, filter_maturity = None, filter_maturity_min=None, drop_duplicate_maturities = False, filter_tips=True, filter_yld=True):
    outdata = data.copy()
    
    if t_date is None:
        t_date = outdata['quote date'].values[-1]
    
    outdata = outdata[outdata['quote date']==t_date]
    
    # Filter out redundant maturity
    if drop_duplicate_maturities:
        outdata = outdata.drop_duplicates(subset=['maturity date'])
    
    # Filter by max maturity
    if filter_maturity is not None:
        mask_truncate = outdata['maturity date'] < (t_date + np.timedelta64(365*filter_maturity+1,'D'))
        outdata = outdata[mask_truncate]

    # Filter by min maturity
    if filter_maturity_min is not None:
        mask_truncate = outdata['maturity date'] > (t_date + np.timedelta64(365*filter_maturity_min-1,'D'))
        outdata = outdata[mask_truncate]

    outdata = outdata[outdata['type'].isin([11,12]) == (not filter_tips)]
        
    if filter_yld:
        outdata = outdata[outdata['ytm']>0]
        
    return outdata


def calc_cashflows(quote_data, filter_maturity_dates=False):
    # Create an empty DataFrame to store cash flows
    # Rows represent individual securities (indexed by quote_data.index)
    # Columns represent unique maturity dates from the data
    CF = pd.DataFrame(data=0.0, index=quote_data.index, columns=quote_data['maturity date'].unique())

    # Iterate over each security in the dataset
    for i in quote_data.index:
        # Get the semi-annual coupon payment dates for the security
        coupon_dates = get_coupon_dates(quote_data.loc[i, 'quote date'], quote_data.loc[i, 'maturity date'])

        # If coupon dates exist (not None), set coupon payment amounts
        if coupon_dates is not None:
            # Add semi-annual coupon payments (coupon rate divided by 2)
            CF.loc[i, coupon_dates] = quote_data.loc[i, 'cpn rate'] / 2

        # Add the face value (100) at maturity
        CF.loc[i, quote_data.loc[i, 'maturity date']] += 100

    # Replace any NaN values with 0 and sort columns by date
    CF = CF.fillna(0).sort_index(axis=1)
    
    # Remove columns (dates) where all cash flows are zero
    CF.drop(columns=CF.columns[(CF == 0).all()], inplace=True)

    # If filtering by maturity dates is requested, apply the filter function
    if filter_maturity_dates:
        CF = filter_treasury_cashflows(CF, filter_maturity_dates=True)
        
    # Return the final cash flow DataFrame
    return CF


def get_maturity_delta(t_maturity, t_current):
    # Calculates the time-to-maturity in years based on the difference between maturity date and current date.
    # Time is expressed as a fraction of a year (365.25 days to account for leap years).
    
    maturity_delta = (t_maturity - t_current) / pd.Timedelta('365.25 days')  # Convert timedelta to years.
    
    return maturity_delta  # Return the time-to-maturity in years.

def bootstrap(params, maturity):
    # Extract estimated maturities from the first element of params
    estimated_maturities = params[0]
    # Extract corresponding discount factors (betas) from the second element of params
    betas = params[1]
    # Convert discount factors to interest rates using the discount_to_intrate function
    estimated_rates = discount_to_intrate(betas, estimated_maturities)
    
    # Create an interpolation function for the rates based on maturities
    f = interpolate.interp1d(estimated_maturities, estimated_rates, bounds_error=False, fill_value='extrapolate')
    
    # Use the interpolation function to estimate the rate at the given maturity
    rate = f(maturity)
    
    # Return the interpolated rate
    return rate

def bootstrap_semiannual(params, maturity):
    # Extract estimated maturities from the first element of params
    estimated_maturities = params[0]
    # Extract corresponding discount factors (betas) from the second element of params
    betas = params[1]
    # Convert discount factors to interest rates using the discount_to_intrate function
    estimated_rates = discount_to_intrate(betas, estimated_maturities, 2)
    
    # Create an interpolation function for the rates based on maturities
    f = interpolate.interp1d(estimated_maturities, estimated_rates, bounds_error=False, fill_value='extrapolate')
    
    # Use the interpolation function to estimate the rate at the given maturity
    rate = f(maturity)
    
    # Return the interpolated rate
    return rate

def price_with_rate_model(params, CF, t_current, fun_model, convert_to_discount=True, price_coupons=False):
    # Prices future cash flows using an interest rate model and optionally converts rates to discount factors.
    
    # Compute the time-to-maturity for all cash flow dates.
    maturity = get_maturity_delta(CF.columns, t_current)
    
    if convert_to_discount:
        # Initialize an array to store discount factors.
        disc = np.zeros(maturity.shape)
        for i, mat in enumerate(maturity):
            # Convert rates (from model) to discount factors using the specified model.
            disc[i] = intrate_to_discount(fun_model(params, mat), mat)
    else:
        # If no discounting is required, use rates directly from the model.
        disc = fun_model(params, maturity)
        
    if price_coupons:
        price = CF * disc
    else:
        # price = cash flow matrix multiply discount factors
        price = CF @ disc
    
    return price

def pricing_errors(params, CF, t_current, fun_model, observed_prices):
    # Computes the difference (error) between observed market prices and model-derived prices.
    # The objective is to minimize this error during model calibration.

    price_modeled = price_with_rate_model(params, CF, t_current, fun_model)  # Get model-derived prices.
    
    if isinstance(observed_prices, (pd.DataFrame, pd.Series)):
        # Convert observed prices to NumPy array if they are in Pandas format.
        observed_prices = observed_prices.values
    
    error = sum((observed_prices - price_modeled)**2)  # Compute the sum of squared differences.
    
    return error  # Return the sum of squared errors.

def nelson_siegel(params, maturity):
    # Nelson-Siegel model is used to fit yield curves with a combination of level, slope, and curvature components.
    # params: [beta_0, beta_1, beta_2, tau], where beta's control curve shape and tau is the decay factor.
    
    rate = (
        params[0] +  # Level component (long-term interest rate).
        (params[1] + params[2]) *  # Combined slope and curvature adjustment.
        (1 - np.exp(-maturity / params[3])) / (maturity / params[3]) -  # Weighted adjustment factor.
        params[2] * np.exp(-maturity / params[3])  # Curvature component decay.
    )
    return rate  # Return the modeled interest rate for the given maturity.

def nelson_siegel_extended(params, maturity):
    # Implements the extended Nelson-Siegel model for yield curve fitting.
    # params: A list of parameters [beta_0, beta_1, beta_2, tau_1, beta_3, tau_2]
    # - beta_0: Level parameter (long-term rate).
    # - beta_1: Slope parameter (short-term rate adjustment).
    # - beta_2: Curvature parameter (medium-term adjustment).
    # - tau_1: Decay factor for the slope and curvature components.
    # - beta_3: Second curvature parameter for additional flexibility.
    # - tau_2: Decay factor for the second curvature component.
    # maturity: Array or scalar of times to maturity (in years).

    rate = (
        params[0] +  # Long-term level component (constant for all maturities).
        (params[1] + params[2]) *  # Combined slope and curvature adjustment.
        (1 - np.exp(-maturity / params[3])) / (maturity / params[3]) -  # Slope/curvature weighted adjustment.
        params[2] * np.exp(-maturity / params[3]) +  # Exponential decay of the first curvature.
        params[4] * (  # Second curvature adjustment with separate parameters.
            (1 - np.exp(-maturity / params[5])) / (maturity / params[5]) -  # Weighted adjustment for second curvature.
            np.exp(-maturity / params[5])  # Exponential decay of the second curvature.
        )
    )
    
    return rate  # Returns the modeled interest rate for the given maturity.

def estimate_curve_ols(CF, prices, interpolate=False):
    # Estimates discount factors using linear regression (ordinary least squares).
    
    if isinstance(prices, (pd.DataFrame, pd.Series)):
        # Ensure the alignment between cash flows and observed prices.
        prices = prices[CF.index].values
    
    # Fit a regression model where cash flows explain observed prices (no intercept).
    mod = LinearRegression(fit_intercept=False).fit(CF.values, prices)
    
    if interpolate:
        # Interpolate discount factors to obtain a smooth curve.
        matgrid = get_maturity_delta(CF.columns, CF.columns.min())  # Maturity grid for interpolation.
        
        # Identify valid discount factors within a reasonable range.
        dts_valid = np.logical_and(mod.coef_ < 1.25, mod.coef_ > 0)
        
        xold = matgrid[dts_valid]  # Valid maturities.
        yold = mod.coef_[dts_valid]  # Valid discount factors.
        xnew = matgrid  # Full maturity grid.
        
        # Create an interpolation function to estimate discount factors for all maturities.
        f = interpolate.interp1d(xold, yold, bounds_error=False, fill_value='extrapolate')
        discounts = f(xnew)  # Compute interpolated discount factors.
    else:
        discounts = mod.coef_  # Use OLS coefficients directly as discount factors.
        
    return discounts  # Return discount factors (direct or interpolated).

def estimate_rate_curve(model, CF, t_current, prices, x0=None):
    # Estimates parameters for an interest rate curve model by minimizing pricing errors.
    
    if model is bootstrap:
        # Use bootstrap method to estimate discount factors first.
        params = estimate_curve_ols(CF, prices, interpolate=False)
        
        # Calculate maturities for each cash flow.
        CF_intervals = get_maturity_delta(CF.columns.to_series(), t_current=t_current).values
        
        # Combine maturities and corresponding discount factors as model parameters.
        params_optimized = [CF_intervals, params]
    else:
        # Provide default initial guess if none is specified.
        if x0 is None:
            if model is nelson_siegel:
                x0 = np.ones(4) / 10  # Default guess for Nelson-Siegel model parameters.
            elif model is nelson_siegel_extended:
                x0 = np.ones(6)  # Default guess for extended model.
            else:
                x0 = 1  # Generic default initial guess.
        
        # Optimize the model parameters to minimize pricing errors.
        mod = minimize(pricing_errors, x0, args=(CF, t_current, model, prices))
        params_optimized = mod.x  # Extract optimized parameters.
    
    return params_optimized  # Return the estimated model parameters.


def price_treasury_ytm(yield_to_maturity, coupon_rate, face_value, time_to_maturity, payments_per_year=2):
    """
    Prices a Treasury bond based on its yield to maturity, coupon rate, face value, and time to maturity.

    Parameters:
        yield_to_maturity (float): The annual yield to maturity (as a decimal, e.g., 0.03 for 3%).
        coupon_rate (float): The annual coupon rate (as a decimal, e.g., 0.02 for 2%).
        face_value (float): The face value (par value) of the bond.
        time_to_maturity (float): The time to maturity of the bond in years.
        payments_per_year (int): The number of coupon payments per year (default is 2 for semi-annual payments).

    Returns:
        float: The price of the Treasury bond.
    """
    # Calculate the coupon payment per period
    coupon_payment = coupon_rate * face_value / payments_per_year

    # Total number of periods
    total_periods = int(time_to_maturity * payments_per_year)

    # Periodic yield to maturity
    periodic_yield = yield_to_maturity / payments_per_year

    # Price calculation: Sum of discounted coupon payments + discounted face value
    price = 0

    for t in range(1, total_periods + 1):
        price += coupon_payment / (1 + periodic_yield) ** t

    # Add the present value of the face value (paid at maturity)
    price += face_value / (1 + periodic_yield) ** total_periods

    return price

# Duration for a fixed rate bond
# freq = frequency of compounding in a year
# tau = years-to-maturity

def duration_closed_formula(tau, ytm, coupon_rate=None, freq=2):

    if coupon_rate is None:
        coupon_rate = ytm
        
    # y_tilde = ytm/frequency
    y = ytm/freq
    # c_tilde = coupon rate/ frequency
    c = coupon_rate/freq
    # tau_tilde= tau * frequency
    T = tau * freq
        
    if coupon_rate==ytm:
        duration = (1+y)/y  * (1 - 1/(1+y)**T)
        
    else:
        duration = (1+y)/y - (1+y+T*(c-y)) / (c*((1+y)**T-1)+y)

    duration /= freq
    
    return duration

def bootstrap_spot_rates(df):
    """
    Bootstraps spot rates from a dataframe of bond information.
    
    :param df: Pandas DataFrame with columns 'dirty price', 'cpn rate', and 'ttm'
    :return: Pandas Series of spot rates indexed by TTM
    """
    # Ensure the DataFrame is sorted by TTM
    df = df.sort_values(by='ttm')
    
    # Initialize a dictionary to store spot rates
    spot_rates = {}

    # Iterate over each bond
    for index, row in df.iterrows():
        ttm, coupon_rate, price = row['ttm'], row['cpn rate'], row['dirty price']
        cash_flows = [coupon_rate / 2] * round(ttm * 2)  # Semi-annual coupons
        cash_flows[-1] += 100  # Add the face value to the last cash flow

        # Function to calculate the present value of cash flows
        def pv_of_cash_flows(spot_rate):
            pv = 0
            for t in range(1, len(cash_flows) + 1):
                if t/2 in spot_rates:
                    rate = spot_rates[t/2]
                else:
                    rate = spot_rate
                pv += cash_flows[t - 1] / ((1 + rate / 2) ** t)
            return pv

        # Solve for the spot rate that sets the present value of cash flows equal to the bond price
        spot_rate_guess = (cash_flows[-1] / price) ** (1/(ttm*2)) - 1
        spot_rate = fsolve(lambda r: pv_of_cash_flows(r) - price, x0=spot_rate_guess)[0]

        # Store the calculated spot rate
        spot_rates[ttm] = spot_rate

    return pd.Series(spot_rates)

def summary_statistics_annualized(returns, annual_factor = 12):
    summary_statistics = pd.DataFrame(index=returns.columns)
    summary_statistics['Mean'] = returns.mean() * annual_factor
    summary_statistics['Vol'] = returns.std() * np.sqrt(annual_factor)
    return summary_statistics

