In [1]:
from IPython.display import HTML

HTML('''<script>
code_show=true; 
function code_toggle() {
 if (code_show){
 $('div.input').hide();
 } else {
 $('div.input').show();
 }
 code_show = !code_show
} 
$( document ).ready(code_toggle);
</script>
<form action="javascript:code_toggle()"><input type="submit" value="Click here to toggle on/off the raw code."></form>''')


In [2]:
import math
from math import ceil
import pandas as pd
from xbbg import blp
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from dateutil.relativedelta import relativedelta as rd
import statsmodels.api as sm
import re
import matplotlib.image as image
from ipywidgets import interact, IntSlider, Checkbox, Dropdown, Output, HBox, VBox, interactive, interactive_output, ToggleButton,Text, Button, DatePicker, FloatText, IntText, ToggleButtons, RadioButtons,SelectMultiple
from IPython.display import display, clear_output
import itertools
from scipy import stats
from scipy.optimize import minimize, Bounds
from scipy.interpolate import BSpline
from datetime import datetime, timedelta
import math
from math import ceil

y, m, d  = datetime.today().year, datetime.today().month, datetime.today().day
DATES = pd.date_range(datetime(y,m,d), datetime(y+1,m+1,d))

MEETING_DATES = {'czk':[datetime(2023,11,2),datetime(2023,12,21),datetime(2024,2,1), datetime(2024,3,20), datetime(2024,6,19),datetime(2024,8,1), datetime(2024,9,25), datetime(2024,11,7),datetime(2024,12,19)]}
# need pln and huf meeting date estimates
ON_RATES = {'czk': 'PRIBOVN Index', 'pln': 'WIBRON Index', 'huf': 'BUBORON Index'}

In [3]:
def exp(x):
    if isinstance(x,Dual):
        return x.__exp__()
    return math.exp(x)

def log(x):
    if isinstance(x,Dual):
        return x.__log__()
    return math.log(x)

def interpolate(x, x_1,y_1,x_2,y_2,method):
    if method == 'linear':
        op = lambda x: x
        
    elif method == 'log_linear':
        op, y_1, y_2 = exp, log(y_1), log(y_2)
        
    return op(y_1 + (y_2 - y_1) * (x - x_1) / (x_2 - x_1) )

def imm_date(month, year):
    alp = {'H':3,'M':6,'U':9,'Z':12}
    if isinstance(month,str):
        month = alp[month.upper()]
    first_day = datetime(year,month,1)
    offset = 2- first_day.weekday()
    if offset<0:
        offset += 7
    
    return first_day + timedelta(14+offset)

def add_months(start: datetime, months: int, imm = False):
    
    year_roll = int((start.month + months -1) / 12)
    month = (start.month + months) % 12
    month = 12 if month == 0 else month
    
    try:
        end = datetime(start.year + year_roll, month, start.day)
    
    except ValueError:
        return add_months(datetime(start.year, start.month, start.day -1), months,imm)
    if imm:
        return imm_date(end.month, end.year)
    else:
        return end

    

In [4]:
class Dual:
    
    def __init__(self, real, dual= None):
        """
        A dual number is denoted in the form:
        z = x + be, such that e**2 = 0 and e != 0
        
        real: real number
        dual: dict (key=name_index and value=value)
        """
        self.real = real
        self.dual = {} if dual is None else dual.copy()
        
    def __str__(self):
        output = f" f= {self.real:.8f}\n"
        for k, v in self.dual.items():
            output +=f"df/d{k} = {v:.6f}\n"
        return output
        
    def __repr__(self):
        output = f"{self.real}"
        for key in self.dual.keys ():
            output += f"{self.dual[key]:+.3f}e_{key}"
        return output
        
    def __neg__(self):
        """-z = -x - b_0e_0 - b_1e_1"""
        dual = {}
        for key in self.dual:
            dual[key] = -self.dual[key]
        return Dual(-self.real, dual)
        
    def __eq__(self, argument):
        """"equality iff real and dual components the same"""
        if not isinstance(argument, Dual):
            return False
        return False if Self.real != argument.real else self.dual == argument.dual
        
    def conjugate(self):
        dual = {}
        for key in self.dual:
            dual[key] = -self.dual[key]
        return Dual(self.real, dual)
            
    def __ne__(self, argument):
        """"inequalty iff not equal"""
        return not (self.__eq__(argument))
            
    def __add__(self, argument):
        """"z_1 + z_2 = (x_1 + x_2) + (b_1_0 + b_2_0)e_0 + (b_1_1 + b_2_1)e_1"""
        if isinstance(argument, Dual):
            real = self.real + argument.real
            dual = self.dual.copy()
            for key in argument.dual:
                val = 0 if key not in dual else dual[key]
                dual[key] = val +argument .dual[key]
            return Dual(real, dual)
        else:
            return Dual (self.real +argument, self.dual)
                
    __radd__ = __add__
                
    def __sub__(self, argument):
        """"z_1 - z_2 = (x_1 - x_2) + (b_1_0 - b_2_0)e_0 + (b_1_1 - b_2_1)e_1"""
        if isinstance (argument, Dual):
            real = self.real - argument.real
            dual = self.dual.copy()
            for key in argument.dual:
                val = 0 if key not in dual else dual[key]
                dual[key] = val - argument.dual[key]
            return Dual (real, dual)
        else:
            return  Dual(self.real - argument, self.dual)
        
    def __rsub__(self, argument):
        """z_2 - z_1 = -(z_1 - z_2)"""
        return  -(self - argument)
        
    def __mul__(self, argument):
        """"z_1 * z_2 = x_1 * x_2 + (x_1 * b_2_0 +x_2 * b_1_0)e_0"""
        
        if isinstance (argument, Dual):
            real = self.real * argument.real
            dual = {}
            for key in self.dual:
                dual [key] = self.dual[key] *  argument.real
            for key in argument.dual:
                val = 0 if key not in dual else dual[key]
                dual[key] =  val + argument.dual[key] * self.real
            return Dual(real, dual)
        else:
            dual = {}
            for key in self.dual:
                dual[key] = self.dual[key] * argument
            return Dual(self.real * argument, dual)
        
    __rmul__ = __mul__
    
    def __truediv__(self, argument):
        """z_1 / z_2 = (z_1 * z^2) / (z_2 * ^z_2)"""
        if isinstance (argument, Dual):
            numerator = self * argument.conjugate()
            return numerator / argument.real**2
        else:
            dual = {}
            for key in self.dual:
                dual[key] = self.dual[key] / argument
            return Dual(self.real / argument, dual)
        
    
    def __rtruediv__(self,argument):
        numerator = Dual(argument, {})
        return numerator / self
    
    def __pow__(self, power):
        dual = {}
        for key in self.dual:
            dual[key] = power * self.dual[key] * (self.real ** (power-1))
        
        return Dual(self.real ** power, dual)
    
    def __exp__(self):
        real = exp(self.real)
        dual = {}
        for key in self.dual:
            dual[key] = real * self.dual[key]
        return Dual(real,dual)
    
    def __log__(self):
        real = log(self.real)
        dual = {}
        for key in self.dual:
            dual[key] = self.dual[key] / self.real
        return Dual(real,dual)
        
        
                
                
                
class Curve():
    
    def __init__(self, nodes:dict,interpolation: str):
        self.nodes = nodes.copy()
        self.node_dates = list(self.nodes.keys())
        self.interpolation = interpolation
        
    def __repr__(self):
        output = ""
        for k,v in self.nodes.items():
            output +=   f"{k.strftime('%Y-%b-%d')}: {v:.6f}\n"
        return output
    
    def __getitem__(self, date: datetime):
        for i , node_date_1 in enumerate(self.node_dates[1:]):
            if date < node_date_1 or i == len(self.node_dates) - 2:
                node_date_0 = self.node_dates[i]
                return interpolate(date,
                                   node_date_0, self.nodes[node_date_0],
                                   node_date_1, self.nodes[node_date_1],
                                   self.interpolation)
            

            
class Schedule:
    
    def __init__(self, start: datetime, tenor: int, period: int, d_count: int = 360, imm = False):
        self.imm = imm
        self.start = start
        self.end = add_months(start, tenor,imm)
        self.tenor = tenor
        self.period = period
        self.dcf_conv = timedelta(days = d_count)
        self.n_periods = ceil(tenor/period)
        
    def __repr__(self):
        output = "period start | period end  | period DCF\n"
        for period in self.data:
            output += f"{period[0].strftime('%Y-%b-%d')}  | {period[1].strftime('%Y-%b-%d')} | {period[2]:.6f}\n"
        return output
    
    @property
    def data(self):
        schedule = []
        period_start = self.start
        for i in range(self.n_periods - 1):
            period_end = add_months(period_start, self.period,self.imm)
            schedule.append([period_start, period_end, (period_end - period_start) / self.dcf_conv])
            period_start = period_end
            
        schedule.append([period_start, self.end, (self.end - period_start) / self.dcf_conv])
        return schedule    
    
    

class SolvedCurve(Curve):
    
    def __init__(self, nodes: dict, interpolation: str, swaps: list, obj_rates: list, algorithm: str = 'levenberg_marquardt', d_count: int = 360, imm = False):
        super().__init__(nodes=nodes, interpolation = interpolation)
        self.swaps, self.obj_rates, self.algo, self.d_count, self.imm = swaps, obj_rates, algorithm, d_count, imm 
        self.n, self.m = len(self.nodes.keys())-1, len(swaps)
        self.s = np.array([self.obj_rates]).transpose()
        self.lam = 1000
        
    def calculate_metrics(self):
        self.r = np.array([[swap.rate(self) for swap in self.swaps]]).transpose()
        self.v = np.array([[v for v in list(self.nodes.values())[1:]]]).transpose()
        x = self.r - self.s
        self.f = np.matmul(x.transpose(),x)[0][0]
        self.grad_v_f = np.array(
            [[self.f.dual.get(f"v{i+1}") for i in range(self.n)]]
        ).transpose()
        self.J = np.array([
            [rate.dual.get(f"v{j+1}",0) for rate in self.r[:,0]]
            for j in range(self.n)
        ])
    
    def update_step_gauss_newton(self):
        A = np.matmul(self.J, self.J.transpose()) + self.lam * np.eye(self.shape[0])
        b = -0.5 * self.grad_v_f
        delta = np.linalg.solve(A,b)
        v_1 = self.v +delta
        return v_1
        
    def update_step_levenberg_marquardt(self):
        self.lam *= 2 if self.f_prev < self.f.real else 0.5
        A = np.matmul(self.J, self.J.transpose()) + self.lam * np.eye(self.J.shape[0])
        b = -0.5 * self.grad_v_f
        delta = np.linalg.solve(A,b)
        v_1 = self.v +delta
        return v_1
    
    def iterate(self, max_i = 2000, tol = 1e-10):
        ret, self.f_prev, self.f_list = None, 1e10, []
        for i in range(max_i):
            self.calculate_metrics()
            self.f_list.append(self.f.real)
            if self.f.real < self.f_prev and (self.f_prev - self.f.real) < tol:
                ret = f"tolerance reached ({self.algo}) after {i} iterations"
                break
            v_1 = getattr(self,f"update_step_{self.algo}")()
            for i, (k,v)  in enumerate(self.nodes.items()):
                if i == 0:
                    continue
                self.nodes[k] = v_1[i-1,0]
            self.f_prev = self.f.real
        self.lam  = 1000
        return f"max iters ({self.algo}), f:{self.f.real}" if ret is None else ret
    
    @property
    def grad_s_v(self):
        if getattr(sefl,"grad_s_v_", None) is None:
            self.grad_s_v_numeric()
        return self.grad_s_v_
    
    def grad_s_v_numeric(self):
        grad_s_v = np.zeros(shape=(self.m,self.n))
        ds = 1e-3
        s_cv_fwd = SolvedCurve(nodes = self.nodes, interpolation = self.interpolation,swaps = self.swaps,obj_rates = self.obj_rates, algorithm = 'gauss_newton')
        for s in range(self.m):
            s_cv_fwd.nodes, s_cv_fwd.s = self.nodes, self.s.copy()
            s_cv_fwd.s[s,0] += ds
            s_cv_fwd.terate()
            dvds_fwd = np.array([v.real for v in (s_cv_fwd.v[:,0]-self.v[:,0]/ds)])
            grad_s_v[s,:] = dvds_fwd
        self.grad_s_v_ = grad_s_v
            
        
class Swap:
    
    def __init__(self, start:datetime, tenor: int, period_fix: int, period_float: int,d_count = 360 ,imm_roll = False):
        self.start = start
        self.end  = add_months(start, tenor,imm_roll)
        self.schedule_fix = Schedule(start,tenor,period_fix,d_count, imm_roll)
        self.schedule_float = Schedule(start,tenor,period_float,d_count, imm_roll)
        
    def __repr__(self):
        return f"Swap: {self.start.strftime('%Y-%m-%d')} -> " \
               f"{self.end.strftime('%Y-%m-%d')}"
    
    def analytic_delta(self, curve: Curve,leg:str = "fix",notional: float = 1e4 ):
        delta = 0
        for period in getattr(self, f"schedule_{leg}").data:
            delta += curve[period[1]] * period[2]
        return delta * notional / 10000
    
    def rate(self, curve: Curve):
        rate = (curve[self.start] - curve[self.end]) / self.analytic_delta(curve) 
        return rate * 100 
    
    def npv(self, curve: Curve, fixed_rate: float, notional: float = 1e6):
        npv = (self.rate(curve) - fixed_rate) * self.analytic_delta(curve)
        return npv * notional / 100
    
    def risk(self, curve: SolvedCurve, fixed_rate: float, notional: float = 1e6):
        grad_v_P = np.array([
            [self.npv(curve,fixed_rate,notional).dual.get(f"v{i+1}",0)]
            for i in range(curve.n)
        ])
        grad_s_P = np.matmul(curve.grad_s_v, grad_v_P)
        return grad_s_P / 100
    
        


In [5]:
def ce3_fra_ticker(ecy,axb):
    ecy = ecy.lower()
    axb = axb.lower().split('x')
    
    num_to_aph = 'ABCDEFGHIJK1'
    ce3 = {'czk': ['PRIB03M Index', 'CK'],
           'pln': ['WIBR3M Index', 'PZ'],
           'huf': ['BUBOR03M Index', 'HF']}
    
    if axb[0] == '0':
        return ce3[ecy][0]
    
    else:
        _st, _en = num_to_aph[int(axb[0])-1], num_to_aph[int(axb[1])-1]
        return f"{ce3[ecy][1]}FR0{_st}{_en} Curncy"
def t_plus_2(date):
    i = 2
    done = i == 0
    while not done:
        date += timedelta(1)
        if date.weekday() < 5:
            i -= 1
            done = i == 0
    
    return date

def t_minus_2(date):
    i = 2
    done = i == 0
    while not done:
        date -= timedelta(1)
        if date.weekday() < 5:
            i -= 1
            done = i == 0
    
    return date


def modf(date, same_month = False):
    new = date
    current_month = date.month
    
    while new.weekday() >= 5:
        new += timedelta(1)
        
    if same_month:
        while new.weekday() >= 5:  
            new -= timedelta(1)
        
    return new

def gen_fra_table(ecy, main_fras = False):
    _0x3_fix = t_plus_2(datetime(y,m,d))
    use = [f"{i}x{i+3}" for i in range(0,10,3)] if main_fras else [f"{i}x{i+3}" for i in range(10)]
    df = pd.DataFrame({}, index =use)
    df['rate'] =[blp.bdp(j, 'px_last').values[0][0] for j in [ce3_fra_ticker(ecy,i) for i in df.index]]
    df['start'] = [_0x3_fix] + [modf(_0x3_fix + rd(months = int(i.split('x')[0]))) for i in df.index[1:]]
    
    return df


def first_n_imm(current_year,current_month,current_day,n):
    
    first_imm = imm_date(3*ceil(current_month/3),current_year)
    if first_imm.month == current_month and current_day >= first_imm.day:
        first_imm = add_months(first_imm,3, imm = True)
                
    ans = [first_imm]
    for i in range(1,n):
        ans.append(add_months(first_imm,3*i, imm = True))
    return ans


In [6]:
s_cv = None
fra = None
meetings = None
nodes = None    
par_swaps = None
curve_done = False
dcf = None
latest_cal = None
ecy = None

In [7]:

ECY_W = Dropdown(options = ['-','CZK', 'PLN'], description = 'CE3')
output1 = Output()
CALIB_B = Button(description = 'Calibrate Curve')
IMM_B = Button(description = 'IMM FRA Mids')
MD_RUN_B = Button(description = 'Meeting Date Run')

display(ECY_W)

CALIB_B = Button(description = 'Calibrate Curve')
IMM_B = Button(description = 'IMM FRA Mids')
#MD_RUN_B = Button(description = 'Meeting Date Run')

display(HBox([CALIB_B,IMM_B]), output1)

def calib_click(b):
    global s_cv,fra,meetings,nodes, par_swaps,curve_done,dcf,latest_cal, ecy
    with output1:
        if ECY_W.value != '-':
            clear_output()
            ecy = ECY_W.value.lower()
            fra = gen_fra_table(ecy,False)
            dcf = 365 if ecy == 'pln' else 360
            try:
                meetings = [i for i in MEETING_DATES[ecy] if i < datetime(y+1,m,d)]
            except:
                meetings = None
            nodes = {datetime(y,m,d) :Dual(1,{"v0":1})}

            for i,dt in enumerate(fra['start']):
                nodes[datetime(dt.year,dt.month,dt.day)] = Dual(1,{f"v{i+1}":1})
    
            par_swaps = {Swap(fra.at[i,'start'],3,3,3,dcf): fra.at[i,'rate'] for i in fra.index}

            s_cv = SolvedCurve(nodes = nodes, swaps = list(par_swaps.keys()), obj_rates = list(par_swaps.values()),
                               interpolation = 'log_linear', algorithm = 'levenberg_marquardt')

            
            s_cv.iterate();
            curve_done = True
            latest_cal = f"{ECY_W.value} curve calibrated at {datetime.today().strftime('%H:%M:%S')}"
            print(latest_cal)
        
        
def imm_mid_click(b):
    with output1:
        clear_output()
        if curve_done:
            _3_dates = first_n_imm(y,m,d,3)
            _imm_fras = [Swap(i, 3,3,3,dcf,True) for i in _3_dates]
            to_show = pd.DataFrame({'IMM Indic Mids':[i.rate(s_cv).real for i in _imm_fras]}, index = [i.strftime('%b %y') for i in _3_dates]) 
            display(to_show.style.format('{:.2f}%'))
            print(latest_cal)
            
            
            
def cb_click(b):
    with output1:
        clear_output()
        if curve_done:
            try:
                cba = pd.DataFrame({}, columns = ['Implied', 'CB Action'], index = ['Current'] + [i.strftime('%b %y') for i in meetings][:-1])
                cba.loc['Current'] = [fra.at['0x3', 'rate'],0]
                for i in range(1, len(meetings)):
                    t_0, t_1 = meetings[i-1],meetings[i] 
                    implied_rate = dcf * (s_cv[t_0].real/s_cv[t_1].real -1) / (t_1-t_0).days 
                    cba.at[t_0.strftime('%b %y'), 'Implied'] = implied_rate * 100

                cba['CB Action'] = cba['Implied'].diff() * 100
                cba.fillna(0, inplace = True)

                display(cba.style.format({'Implied': '{:.4f}%', 'CB Action' : '{:.2f} bps' }))
            
            except:
                print(f"{ecy.upper()} meeting dates not available")
            print(latest_cal)



            
CALIB_B.on_click(calib_click)
IMM_B.on_click(imm_mid_click)
#MD_RUN_B.on_click(cb_click)


Dropdown(description='CE3', options=('-', 'CZK', 'PLN'), value='-')

HBox(children=(Button(description='Calibrate Curve', style=ButtonStyle()), Button(description='IMM FRA Mids', …

Output()

In [15]:
fra


Unnamed: 0,rate,start
0x3,5.15,2024-05-21
1x4,5.095,2024-06-21
2x5,4.85,2024-07-22
3x6,4.678,2024-08-21
4x7,4.63,2024-09-23
5x8,4.48,2024-10-21
6x9,4.33,2024-11-21
7x10,4.313,2024-12-23
8x11,4.22,2025-01-21
9x12,4.123,2025-02-21


In [9]:
#curve calibration
#fra = gen_fra_table(ECY,True)
#dcf = 365 if ECY == 'pln' else 360
#meetings = [i for i in MEETING_DATES[ECY] if i < datetime(y+1,m,d)]
#nodes = {datetime(y,m,d) :Dual(1,{"v0":1})}

#for i,dt in enumerate(fra['start']):
#    nodes[datetime(dt.year,dt.month,dt.day)] = Dual(1,{f"v{i+1}":1})
    
#par_swaps = {Swap(fra.at[i,'start'],3,3,3,dcf): fra.at[i,'rate'] for i in  fra.index}

#s_cv = SolvedCurve(nodes = nodes, swaps = list(par_swaps.keys()), obj_rates = list(par_swaps.values()),
#                   interpolation = 'log_linear', algorithm = 'levenberg_marquardt')

#s_cv.iterate();

In [10]:
# imm fra
#_3_dates = first_n_imm(y,m,d,3)
#_imm_fras = [Swap(i, 3,3,3,dcf,True) for i in _3_dates]
#to_show = pd.DataFrame({'IMM Indic Mids':[i.rate(s_cv).real for i in _imm_fras]}, index = [i.strftime('%b %y') for i in _3_dates]) 
#to_show.style.format('{:.4f}%')


In [11]:
#md_run

#cba = pd.DataFrame({}, columns = ['Implied', 'CB Action'], index = ['Current'] + [i.strftime('%b %y') for i in meetings][:-1])
#cba.loc['Current'] = [fra.at['0x3', 'rate'],0]
#for i in range(1, len(meetings)):
 #   t_0, t_1 = meetings[i-1],meetings[i] 
 #   implied_rate = dcf * (s_cv[t_0].real/s_cv[t_1].real -1) / (t_1-t_0).days 
 #   cba.at[t_0.strftime('%b %y'), 'Implied'] = implied_rate * 100

#cba['CB Action'] = cba['Implied'].diff() * 100
#cba.fillna(0, inplace = True)

#cba.style.format({'Implied': '{:.4f}%', 'CB Action' : '{:.2f} bps' })

In [12]:
# fixing trade
#on_meeting =  [Swap(i,3,3,3,dcf) for i in meetings ]
#next_day = [Swap(i + timedelta(1),3,3,3,dcf) for i in meetings]

#fixing = pd.DataFrame({}, index = [i.strftime('%b %y') for i in meetings])
#fixing['t'] = [i.rate(s_cv).real for i in on_meeting]
#fixing['t+1'] = [i.rate(s_cv).real for i in next_day]

In [13]:
#fixing['t'] - fixing['t+1'] 

In [14]:
#class AdvancedCurve(SolvedCurve):
    
#    def __init__(self, nodes: dict, interpolation: str, swaps: list, obj_rates: list,
#                 t:list, algorithm: str = 'gauss_newton', d_count: int = 360, imm = False):
    
#        super().__init__(nodes, interpolation, swaps, obj_rates, algorithm, d_count, imm)
#        self.t = t
#        self.bs = BSpline(4,t)