#### SWAP UTILS

In [1]:
import os
import sys
os.chdir(os.path.dirname(sys.argv[0]))

In [2]:
import numpy as np
from scipy.optimize import minimize
from numbers import Real

#### TIME UTILS

In [3]:
# TODO: rewrite function to accomodate arrays
def get_npdatetime64(year: int, month: int, day: int) -> np.datetime64:
    """ Returns a np.datetime64 object associated with the input year, date and day."""
    return np.datetime64(f'{year}-{str(month).zfill(2)}-{str(day).zfill(2)}')

In [4]:
def get_weekday(date: np.datetime64) -> int:
    """ Returns the weekday of a np.datetime64 object."""
    return ((date.astype('datetime64[D]').view('int64') - 4) % 7).view('int64')

In [5]:
def get_day(date: np.datetime64) -> int:
    """ Returns the day of a np.datetime64 object."""
    return (date - date.astype('datetime64[M]') + 1).view('int64')

In [6]:
def get_month(date: np.datetime64) -> int:
    """ Returns the month of a np.datetime64 object."""
    return date.astype('datetime64[M]').astype(int) % 12 + 1

In [7]:
def get_year(date: np.datetime64) -> int:
    """ Returns the year of a np.datetime64 object."""
    return date.astype('datetime64[Y]').astype(int) + 1970

In [8]:
def is_eom(date: np.datetime64) -> bool:
    """ Asserts if a date is eom."""
    return get_month(date + np.timedelta64(1, '1D')) == get_month(date) + 1

In [9]:
def is_leap(year: int) -> bool:
    """Asserts if a year is a leap year."""
    return np.logical_and(year % 4 == 0, np.logical_or(year % 100 != 0, year % 400 == 0))

In [10]:
def build_schedule(effective: np.datetime64, maturity: np.datetime64, freq:int, roll:str='forward',holidays:list=[],
                   gen_backward:bool=False, eom:bool=True, first:np.datetime64=None, next_to_last:np.datetime64=None,
                   bump_maturity:bool=False) -> np.array:
    """Returns a payment schedule for fixed income instruments."""
    
    if gen_backward and next_to_last is None:
        next_to_last = maturity.astype('datetime64[M]') - np.timedelta64(freq, 'M')
        next_to_last = next_to_last.astype('datetime64[D]') + np.timedelta64(get_day(maturity) - 1, 'D')
        next_to_last = np.busday_offset(next_to_last, 0, roll=roll, holidays = holidays)
    
    elif gen_backward is False and first is None:
        first = effective.astype('datetime64[M]') + np.timedelta64(freq, 'M')
        first = first.astype('datetime64[D]') + np.timedelta64(get_day(effective) - 1, 'D')
        first = np.busday_offset(first, 0, roll, holidays = holidays)
    
    if gen_backward:
        start, end = next_to_last, effective
        flag = -1
    
    else:
        start, end = first, maturity
        flag = 1
    
    eom_adj = (is_eom(start)*eom)
    schedule = np.arange(start.astype('datetime64[M]'), end.astype('datetime64[M]'), np.timedelta64(flag*freq, 'M'))
    schedule = (schedule + np.timedelta64(1*eom_adj, 'M')).astype('datetime64[D]')
    schedule += np.timedelta64((1-eom_adj)*get_day(effective)-eom_adj-1, 'D')
    schedule = np.busday_offset(schedule, 0, roll=roll, holidays = holidays)
    if bump_maturity:
        schedule = np.append(schedule, [effective, np.busday_offset(maturity, 0, roll=roll, holidays = holidays)])
    else:
        schedule = np.append(schedule, [effective, maturity])
    return np.sort(schedule)

In [11]:
# this function is deprecated
def build_calendar(year: np.array):
    """Returns the beginning weekday of each month and the number of days per month. Each output
    row represents a year (ascending order)."""
    leap_array = is_leap(year)
    days_per_month = np.tile(np.array([31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 31, 31]), (len(year), 1))
    days_per_month += np.tile(np.array([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]), (len(year), 1))*leap_array[:, np.newaxis]
    leap_adj = np.tile(np.array([0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), (len(year), 1))*leap_array[:, np.newaxis]
    bom1900 = np.tile(np.array([0, 3, 3, 6, 1, 4, 6, 2, 5, 0, 3, 5]), (len(year), 1)) # Weekday of each month of the year 1900
    offset_array = (year - 1900) + np.sum(is_leap(np.arange(1900, year[0]-1))) + np.pad(leap_array[:-1], (1, 0), 'constant')
    weekdays_offset = np.ones((len(year), 12))*offset_array[:, np.newaxis]
    bom_year = (weekdays_offset+leap_adj+bom1900)%7
    return bom_year, days_per_month

In [12]:
def holiday_from_rule(year: np.array, holiday_rules:np.array) -> np.array:
    """Returns holidays based on rule of the form nth week day of month."""  
    nth = holiday_rules[:, 0]
    week_day = holiday_rules[:, 1]
    months = holiday_rules[:, 2]
    month_adj = np.where(nth>=0, months-1, months)
    dates_array = np.arange(get_npdatetime64(year[0], 1, 1), get_npdatetime64(year[-1]+1, 1, 1), np.timedelta64(1, 'M'), dtype='datetime64[M]')
    dates_array = dates_array.reshape(len(year), 12).astype('datetime64[D]')
    bom_year = get_weekday(dates_array)
    adj = (7 - bom_year[:, month_adj] + week_day)
    dates_array = dates_array[:, month_adj] + np.array(adj+ np.maximum(nth-1-1*(adj>6), 0)*7 +7*nth*np.where(nth>=0, False, True), dtype='timedelta64[D]')
    return np.asarray(dates_array).ravel()

In [13]:
def holiday_from_dates(year:np.array, holiday_dates:np.array, sat_roll:bool=True, sun_roll:bool=True) -> np.array:
    """Return holidays based on an fixed dates. The funbction allows to roll holidays that fall on the week end."""
    months = np.tile(holiday_dates[:, 0], (len(year), 1))
    days = np.tile(holiday_dates[:, 1], (len(year), 1))
    years = np.repeat(year[:,np.newaxis], len(holiday_dates), 1)
    to_return = years.astype(str).astype('datetime64[Y]').astype('datetime64[M]')
    to_return += (months-1).astype('timedelta64[M]')
    to_return = to_return.astype('datetime64[D]')
    to_return += (days-1).astype('timedelta64[D]')
    week_day = get_weekday(to_return)
    if sat_roll:
        adj_sat = np.where(week_day==5, -1, 0)
        to_return += adj_sat.astype('timedelta64[D]')
    if sun_roll:
        adj_sun = np.where(week_day==6, +1, 0)
        to_return += adj_sun.astype('timedelta64[D]')
    return np.asarray(to_return).ravel()

In [14]:
def holidays_from_foo(year:np.array, holiday_foo:np.array, sat_roll:bool=True, sun_roll:bool=True) -> np.array:
    """Returns holidays based on holiday functions of the type foo(year) -> month, day."""
    to_return = np.array([foo(year) for foo in holiday_foo])
    week_day = get_weekday(to_return)
    if sat_roll:
        adj_sat = np.where(week_day==5, -1, 0)
        to_return += adj_sat.astype('timedelta64[D]')
    if sun_roll:
        adj_sun = np.where(week_day==6, +1, 0)
        to_return += adj_sun.astype('timedelta64[D]')
    return np.asarray(to_return).ravel()

In [15]:
def get_holidays(year:np.array, holiday_rules:np.array, holiday_dates:np.array, holiday_foo:np.array, sat_roll:bool=True, sun_roll:bool=True) -> np.array:
    """Returns an array of holidays."""
    to_return = np.concatenate([holiday_from_rule(year, holiday_rules), holiday_from_dates(year, holiday_dates, sat_roll, sun_roll),
                                holidays_from_foo(year, holiday_foo,  sat_roll, sun_roll)], dtype='datetime64[D]')
    return to_return

In [16]:
def easter(year: int):
    """Returns the month and day of easter of the given year."""
    a = year % 19
    b = year // 100
    c = year % 100
    d = (19 * a + b - b // 4 - ((b - (b + 8) // 25 + 1) // 3) + 15) % 30
    e = (32 + 2 * (b % 4) + 2 * (c // 4) - d - (c % 4)) % 7
    f = d + e - 7 * ((a + 11 * d + 22 * e) // 451) + 114
    month = f // 31
    day = f % 31 + 1
    return np.array([month, day]).T

In [17]:
def good_friday(year: np.array) -> np.array:
    """Returns the month and day of good friday of the given year."""
    myeaster = easter(year)
    months = myeaster[:, 0]
    days = myeaster[:, 1]
    to_return = year.astype(str).astype('datetime64[Y]').astype('datetime64[M]')
    to_return += (months-1).astype('timedelta64[M]')
    to_return = to_return.astype('datetime64[D]')
    to_return += (days-3).astype('timedelta64[D]')
    return np.asarray(to_return).ravel()

#### DAY COUNT

In [18]:
def act_360(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ACT 360 day count standard."""
    actual_days = (end_date - start_date).view('int64')
    return actual_days / 360.0

In [19]:
def act_365_fixed(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ ACT 365 FIXED day count standard."""
    actual_days = (end_date - start_date).view('int64')
    return actual_days / 365.0

In [20]:
def act_365_fixed(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ ACT 365 FIXED day count standard."""
    actual_days = (end_date - start_date).view('int64')
    return actual_days / 365.0

In [21]:
def thirty_e_360(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ THIRTY E 360 day count standard."""
    start_year = get_year(start_date)
    end_year = get_year(end_date)
    start_month = get_month(start_date)
    end_month = get_month(end_date)
    start_day = np.minimum(get_day(start_date), 30)
    end_day = np.minimum(get_day(end_date), 30)
    return (360.0*(end_year - start_year) + 30.0*(end_month - start_month) + (end_day - start_day)) / 360.0

In [22]:
# FIXME: function is not behaving properly with arrays 
def act_365_act(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ ACT 365 ACT day count standard."""
    actual_days = (end_date - start_date).view('int64')
    start_year = get_year(start_date)
    start_feb_eom = get_feb_eom(start_year)
    end_year = get_year(end_date)
    end_feb_eom = get_feb_eom(end_year)
    leap_flag = is_leap(start_year)*(start_date <= start_feb_eom) + is_leap(end_year)*(end_date >= end_feb_eom)
    return actual_days / np.maximum(365.0, 366.0*leap_flag)

In [23]:
# FIXME: function is not behaving properly with arrays 
def act_act_isda(start_date: np.datetime64, end_date: np.datetime64) -> float:
    """ THIRTY 360 ISDA day count standard."""
    start_year = get_year(start_date)
    end_year = get_year(end_date)
    if start_year == end_year:
        actual_days = (end_date - start_date).astype(object).days
        return actual_days / np.maximum(365, 366 * is_leap(start_year))
    else:
        a = (start_date - get_npdatetime64(start_year, 1, 1)).astype(object).days
        a /= np.maximum(365, 366*is_leap(start_year))
        b = (get_npdatetime64(end_year, 1, 1)-end_date).astype(object).days
        b /= np.maximum(365, 366 * is_leap(end_year))
        return a + b + end_year - start_year - 1

#### CURVE INTERPOLATION

In [24]:
# Nelson Seigel Svensson
# TODO: this section needs some checking because from time to time the optimization doesn't converge 
# there are some relevan issues with the actual fit. possible issue might be due to its current inverted shape.

In [25]:
def nss_factors(t:np.array, tau:np.array):
    """Factor loadings for times t, excluding constants."""
    if isinstance(t, Real) and t <= 0:
        return 1, 0, 0
    elif isinstance(t, np.ndarray):
            zero_idx = t <= 0
            t[zero_idx] = np.finfo(float).eps
    exp_tt1 = np.exp(-t / tau[0])
    exp_tt2 = np.exp(-t / tau[1])
    factor1 = (1 - exp_tt1) / (t / tau[0])
    factor2 = factor1 - exp_tt1
    factor3 = (1 - exp_tt2) / (t / tau[1]) - exp_tt2
    if isinstance(t, np.ndarray):
        t[zero_idx] = 0
        factor1[zero_idx] = 1
        factor2[zero_idx] = 0
        factor3[zero_idx] = 0
    return factor1, factor2, factor3

In [26]:
def nss_factors_matrix(t:np.array, tau: np.array):
    """Factor loadings for time(s) T as matrix columns, including constant column (=1.0)."""
    factor1, factor2, factor3 = nss_factors(t, tau)
    constant =  (np.ones(t.size) if isinstance(t, np.ndarray) else 1.0)
    return np.stack([constant, factor1, factor2, factor3]).transpose()

In [27]:
def betas_nss_ols(tau:np.array, t:np.array, y:np.array):
    """Calculate the best-fitting beta-values given tau (= array of tau_1
    and tau_2) for time-value pairs t and y and return a corresponding
    Nelson Siegel Svensson curve parameters."""
    factors =  nss_factors_matrix(t, tau)
    res = np.linalg.lstsq(factors, y, rcond=None)
    return res[0]

In [28]:
def betas_nss_ols(tau:np.array, t:np.array, y:np.array):
    """Calculate the best-fitting beta-values given tau (= array of tau_1
    and tau_2) for time-value pairs t and y and return a corresponding
    Nelson Siegel Svensson curve parameters."""
    factors =  nss_factors_matrix(t, tau)
    res = np.linalg.lstsq(factors, y, rcond=None)
    return res[0]

In [29]:
def betas_nss_ols(tau:np.array, t:np.array, y:np.array):
    """Calculate the best-fitting beta-values given tau (= array of tau_1
    and tau_2) for time-value pairs t and y and return a corresponding
    Nelson Siegel Svensson curve parameters."""
    factors =  nss_factors_matrix(t, tau)
    res = np.linalg.lstsq(factors, y, rcond=None)
    return res[0]

In [30]:
def nss_interpolate(t:np.array, beta:np.array, tau:np.array) -> np.array:
    """Nelson Seigel Svensson interpolation method for supplied points."""
    factor1, factor2, factor3 = nss_factors(t, tau)
    return beta[0] + beta[1]*factor1 + beta[2]*factor2+beta[3]*factor3             

In [31]:
def nss_interpolate(t:np.array, beta:np.array, tau:np.array) -> np.array:
    """Nelson Seigel Svensson interpolation method for supplied points."""
    factor1, factor2, factor3 = nss_factors(t, tau)
    return beta[0] + beta[1]*factor1 + beta[2]*factor2+beta[3]*factor3             

In [32]:
def errorfn_nss_ols(tau: np.array, t: np.ndarray, y: np.ndarray) -> float:
    """Sum of squares error function for a Nelson-Siegel-Svensson
    model and time-value pairs t and y. All betas are obtained
    by ordinary least squares given tau (= array of tau_1 and tau_2)."""
    beta = betas_nss_ols(tau, t, y)
    error = np.sum((nss_interpolate(t, beta, tau)-y)**2)
    return error 

In [33]:
def calibrate_nss_ols(t: np.ndarray, y: np.ndarray, tau_0:np.array = np.array([2.0, 5.0])):
    """Calibrate a Nelson-Siegel-Svensson curve to time-value pairs t and y,
    by optimizing tau1 and tau2 and chosing all betas using ordinary 
    least squares. This method does not work well regarding the recovery
    of true parameters."""
    opt_res = minimize(errorfn_nss_ols, x0=tau_0, args=(t, y))
    assert opt_res.success
    betas = betas_nss_ols(opt_res.x, t, y)
    return betas, opt_res.x

In [34]:
# TODO: implement other interpolation meyhods. refer to bbg docs for ideas.

#### DISCOUNT CURVE UTILS

#### SWAP CURVE BOOTSTRAP

In [35]:
# reference: https://github.com/google/tf-quant-finance
# reference: https://github.com/domokane/FinancePy
# reference: https://github.com/goldmansachs/gs-quant

In [36]:
# TODO: define dataclass to store results
# TODO: try using tf or numba jit to speed the calcs

#### CASH FLOW UTILS

#### SWAP UTILS

In [37]:
# TODO: implement dataclass to store the resul (handle camel case wrapper and implement calc method)
# TODO: implement xnpv, pv01, dv01, gamma for the swap

### EXAMPLE

In [38]:
# holiday rules for NYSE (nth, week_day, month)
holiday_rules = np.array([(3, 0, 1), (3, 0, 2), (-1, 0, 5), (1, 0, 9), (4, 3, 11)])

# holiday dates for NYSE (month, day)
holiday_dates = np.array([(1, 1), (6, 19), (7, 4), (12, 25)])

# holiday functions for NYSE
holiday_foo = np.array([good_friday])

In [39]:
# params
effective = np.datetime64('2023-02-20')
maturity = np.datetime64('2024-02-20')

In [40]:
year = np.arange(get_year(effective), get_year(maturity)+1)

In [41]:
holiday = get_holidays(year, holiday_rules, holiday_dates, holiday_foo)

In [42]:
schedule = build_schedule(effective, maturity, freq=2, holidays = holiday)

In [43]:
schedule

array(['2023-02-20', '2023-04-20', '2023-06-20', '2023-08-21',
       '2023-10-20', '2023-12-20', '2024-02-20'], dtype='datetime64[D]')