In [1]:
import QuantLib as ql
import pandas as pd
import numpy as np
from scipy.stats import ncx2
from dataclasses import dataclass
from scipy.optimize import fsolve
from scipy.optimize import least_squares

# Implementation of CIR++ model
This is based on (Section 3.9 of Brigo and Mercurio book)

In [2]:
today = ql.Date(20, 10, 2022)

In [3]:
ql.Settings.instance().evaluationDate = today

In [4]:
curve = ql.YieldTermStructureHandle(ql.FlatForward(0, ql.NullCalendar(), 0.05, ql.SimpleDayCounter()))

In [5]:
@dataclass
class Quote:
    Talpha: float
    Tbeta: float
    volatility: float
    voltype: str
    shift: float = 0.0

quotes = [
    Quote(Talpha = 1, Tbeta = 6, volatility = 0.3, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 1, Tbeta = 7, volatility = 0.31, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 1, Tbeta = 8, volatility = 0.315, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 1, Tbeta = 9, volatility = 0.316, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 1, Tbeta = 9, volatility = 0.317, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 1, Tbeta = 10, volatility = 0.316, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 2, Tbeta = 6, volatility = 0.306, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 2, Tbeta = 7, volatility = 0.31, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 2, Tbeta = 8, volatility = 0.32, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 2, Tbeta = 9, volatility = 0.33, voltype = 'lognormal', shift = 0.00),
    Quote(Talpha = 2, Tbeta = 10, volatility = 0.335, voltype = 'lognormal', shift = 0.00)
]
swaps = [ql.MakeVanillaSwap(ql.Period(quote.Tbeta - quote.Talpha, ql.Years), ql.Euribor6M(curve), ql.nullDouble(), ql.Period(quote.Talpha)) for quote in quotes]

In [6]:
class CIRpp:
    
    h = lambda self, k, sigma: np.sqrt(k**2 + 2*sigma**2)
    
    d_to_t = lambda self, date: ql.SimpleDayCounter().yearFraction(today, date) # conversion of any date d into a time (yearfrac) t where t: yearfrac(date - today)
    to_time = lambda self, t: self.d_to_t(t) if type(t) is ql.QuantLib.Date else t # input date or time, return time
    
    def __init__(self, curve: ql.YieldTermStructureHandle, k = None, theta = None, sigma = None, x0 = None):
        self.k = k
        self.theta = theta
        self.sigma = sigma
        self.x0 = x0
        self.curve = curve
        
    
        
    def selfload(self, k, theta, sigma, x0):
        if (k is None) or (theta is None) or (sigma is None) or (x0 is None):
            return self.k, self.theta, self.sigma, self.x0
        else:
            return k, theta, sigma, x0
    
    def A(self, t, T, k = None, theta = None, sigma = None, x0 = None):
        '''
            A(t,T) term based on (3.25) equation in Brigo
        '''
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        h = self.h(k, sigma)
        
        term = 2*h*np.exp((k+h)*(T-t)/2)/(2*h + (k+h)*(np.exp((T-t)*h) - 1))
        return term ** (2*k*theta/sigma**2)

    
    def B(self, t, T, k = None, theta = None, sigma = None, x0 = None):
        '''
            B(t,T) term based on (3.25) equation in Brigo
        '''
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        h = self.h(k, sigma)
        
        term = 2*(np.exp((T-t)*h)-1)/(2*h + (k+h)*(np.exp((T-t)*h) - 1))
        return term
    
    def phi(self, t, k = None, theta = None, sigma = None, x0 = None):
        '''
            
            phi(t) = f(0,t)-f_CIR(0,t)
            which is needed for r(t)=x(t)+phi(t)
            Implemented based on (3.77)
        '''
        
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        return self.curve.forwardRate(t,t,ql.Continuous).rate() - self.fCIR(t, k, theta, sigma, x0)
    
    
    def PIx(self, t, T, k = None, theta = None, sigma = None, x0 = None):
        '''
            This is x-'bond' price. I.e. a fictive bond prices that would be driven by a process x instead of r.
            This is based on equation (3.24), where r is replaced by x
        '''
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        return self.A(t, T, k, theta, sigma, x0)*np.exp(-self.B(t, T, k, theta, sigma, x0)*x0)
        
    def Abar(self, t, T, k = None, theta = None, sigma = None, x0 = None):
        '''
            Modified A(t,T) term of a standard CIR; needed for bond price in CIR++
            Implemented based on p. 103
        '''
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        
        Pt = self.curve.discount(t)
        PT = self.curve.discount(T)
        numer = PT*self.A(0, t, k, theta, sigma, x0)*np.exp(-self.B(0, t, k, theta, sigma, x0)*x0)
        denom = Pt*self.A(0, T, k, theta, sigma, x0)*np.exp(-self.B(0, T, k, theta, sigma, x0)*x0)
        thirdTerm = self.A(t, T, k, theta, sigma, x0)*np.exp(self.B(t, T, k, theta, sigma, x0)*self.phi(t, k, theta, sigma, x0))
        return numer/denom*thirdTerm
    
    def bond_price(self, t, T, rt, k = None, theta = None, sigma = None, x0 = None):
        
        t = self.to_time(t)
        T = self.to_time(T)
        
        rt = rt if t>0 else self.curve.forwardRate(0,0,ql.Continuous).rate() # at time=0 r(0) can be obtained from the observed curve
        
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        
        return self.Abar(t, T, k, theta, sigma, x0)*np.exp(-self.B(t, T, k, theta, sigma, x0)*rt)
    
    def bond_option(self, t, T, s, rt, K, call_put, k, theta, sigma, x0):
        '''
            CIR++ Bond option formula
            We are at time t, pricing an T-expiry option with strike K on a zero bond that matures at s>T
            
            t = pricing time
            T = option expiry
            s > T = underlying bond maturity
            
            This page mainly follows p. 103 in Brigo, Mercurio; formula (3.78)
        '''
        
        t = self.to_time(t)
        T = self.to_time(T)
        s = self.to_time(s)
        
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        rt = rt if t>0 else self.curve.forwardRate(0,0,ql.Continuous).rate() # at time=0 r(0) can be obtained from the observed curve
        
        Ps = self.curve.discount(s) #P(0,s) - this is obtainable from the current t=0 curve
        PT = self.curve.discount(T) #P(0,T) - this is obtainable from the current t=0 curve
        
        Pts = self.bond_price(t, s, rt, k, theta, sigma, x0) #P(t,s) - for t>0 this is NOT obtainable from the current t=0 curve and depends on r(t)
        PtT = self.bond_price(t, T, rt, k, theta, sigma, x0) #P(t,T) - for t>0 this is NOT obtainable from the current t=0 curve and depends on r(t)
        
        AT = self.A(0, T, k, theta, sigma, x0)
        As = self.A(0, s, k, theta, sigma, x0)
        
        BT = self.B(0, T, k, theta, sigma, x0)
        Bs = self.B(0, s, k, theta, sigma, x0)
        
        phit = self.phi(t, k, theta, sigma, x0) # function phi(t)
        h = self.h(k, sigma)
        psi = (k+h)/sigma**2
        rho = 2*h/(sigma**2 * (np.exp(h*(T-t))-1) )
        
        
        
        rhat = 1/self.B(T, s, k, theta, sigma, x0) * (np.log(self.A(T, s, k, theta, sigma, x0)/K) - np.log(PT/Ps * As/AT * np.exp(-Bs*x0) / np.exp(-BT*x0)))
        
        v = 4*k*theta/sigma**2
        lambda1 = 2*rho**2 * (rt - phit) * np.exp(h*(T-t)) / (rho + psi + self.B(T, s, k, theta, sigma, x0))
        lambda2 = 2*rho**2 * (rt - phit) * np.exp(h*(T-t)) / (rho + psi)
        
        first_term = Pts*ncx2(v, lambda1).cdf( 2*rhat*(rho+psi+self.B(T, s, k, theta, sigma, x0)) )
        second_term = K*PtT*ncx2(v, lambda2).cdf( 2*rhat*(rho+psi) )
        
        call_price = first_term - second_term
        
        put_price = call_price - Pts + K*PtT
        
        if 'call' in call_put.lower():
            return call_price
        
        if 'put' in call_put.lower():
            return put_price
        
        
    def fCIR(self, t, k = None, theta = None, sigma = None, x0 = None):
        '''
            Instantaneous forward rate of a process x that is driven by a CIR model eq (3.77)
        '''
        k, theta, sigma, x0 = self.selfload(k, theta, sigma, x0)
        
        h = self.h(k, sigma)
        return 2*k*theta*(np.exp(t*h) - 1)/(2*h + (k+h)*(np.exp(t*h) - 1))+x0*(4*h**2 * np.exp(t*h))/(2*h + (k+h)*(np.exp(t*h) - 1))**2
    
    def get_jamshidian_coupons(self, swap):
        '''
            Function returns a list of dataclasses where each instance is a coupon based on Jamshidian method.
            In a swap underlying swaption, fixed coupons are paid at times {ti}, i = 1,..., n
            Coupon ci paid at ti is defined as 
                ci = K*taui,  i = 1, ..., n-1; and
                ci = 1+K*tau, i = n 
                
            This function is model-free, i.e. it does NOT depend on parameters (k, theta, sigma, x0)
        '''
        @dataclass
        class Jamshidian_coupon:
            ti: float
            ci: float

        jamshidian_coupons = []
        for i, date in enumerate(swap.fixedSchedule()):
            if date>swap.fixedSchedule()[0]:
                yearfrac = ql.SimpleDayCounter().yearFraction(swap.fixedSchedule()[i-1],date)
                ti = self.to_time(date)
                ci = swap.fixedRate()*yearfrac if date < max(swap.fixedSchedule()) else 1+swap.fixedRate()*yearfrac
                jamshidian_coupons.append(Jamshidian_coupon(ti = ti, ci = ci))
        
        return jamshidian_coupons
    
    def calibrate(self, quotes: list[Quote], swaps: list):
        
        def fzero_error_fun(r_critical, k, theta, sigma, x0, jamshidian_coupons, T):
            '''
                Given that jamshidian coupons ci were computed via get_jamshidian_coupons
                we want to compute the critical rate r* that determines the strikes {Xi} for each zero bond option
                
                This is based on the middle equality between (3.81 and 3.82) on p. 104
            ''' 
            
            T = self.to_time(T)
            
            lhs = []
            for coupon in jamshidian_coupons:
                lhs.append(coupon.ci*self.Abar(t = T, T = coupon.ti, k = k, theta = theta, sigma = sigma, x0 = x0)*np.exp(-self.B(t = T, T = coupon.ti, k = k, theta = theta, sigma = sigma, x0 = x0)*r_critical))

            return sum(lhs)-1.0
        
        def swaption_price(t, T, jamshidian_coupons, r_critical, k, theta, sigma, x0):
            
            '''
                Given the critical rate r* and jamshidian coupons are known, we compute swaption
                as sum of options on zero bonds that have strikes {Xi(r*)} and notionals {ci}
                
                The implementation below is based on (3.82) and (3.83)
            '''
            
            T = self.to_time(T)
            
            r0 = self.curve.forwardRate(t,t,ql.Continuous).rate()
            cumsum = 0
            for coupon in jamshidian_coupons:
                Xi = self.Abar(t = T, T = coupon.ti, k = k, theta = theta, sigma = sigma, x0 = x0)*np.exp(-self.B(t = T, T = coupon.ti, k = k, theta = theta, sigma = sigma, x0 = x0)*r_critical)
                cumsum += coupon.ci*self.bond_option(0, T, coupon.ti, r0, Xi, 'call', k, theta, sigma, x0)
            return cumsum
        
        def get_market_prices(swaps, quotes) -> list:
            
            '''
                Compute market prices of swaptions used as a calibration sample, via standard market models
            '''
            
            prices_market = []
            for swap, quote in zip(swaps, quotes):
                swaption = ql.Swaption(swap, ql.EuropeanExercise(swap.startDate()))
                if quote.voltype == 'lognormal':
                    swaption.setPricingEngine(ql.BlackSwaptionEngine(self.curve, ql.QuoteHandle(ql.SimpleQuote(quote.volatility)), ql.SimpleDayCounter(), quote.shift))
                elif quote.voltype == 'normal':
                    swaption.setPricingEngine(ql.BachelierSwaptionEngine(self.curve, ql.QuoteHandle(ql.SimpleQuote(quote.volatility))))
                prices_market.append(swaption.NPV())
            return prices_market
                
        def calib_error(params, prices_market, swaps, return_prices = False):
            
            '''
                Function that returns a list of errors {price_model(i) - price_market(i)}, i = 1, ....
            '''
            
            k, theta, sigma, x0 = params
            
            prices_model = []
            for market_price, swap in zip(market_prices, swaps):
                jamshidian_coupons = self.get_jamshidian_coupons(swap) # obtain 'bond notionals', this is model-free calculation
                expiry_date = swaps[0].startDate() # T date of the swaption
                r_critical = fsolve(lambda r_critical: fzero_error_fun(r_critical, k, theta, sigma, x0, jamshidian_coupons, expiry_date), 0.02)[0]
                prices_model.append(swaption_price(0, expiry_date, jamshidian_coupons, r_critical, k, theta, sigma, x0))
            
            if return_prices:
                return prices_model
            return np.array(prices_model) - np.array(prices_market)
        
        market_prices = get_market_prices(swaps, quotes)
        
        res = least_squares(lambda params: calib_error(params, market_prices, swaps), (0.02, 0.05, 0.04, 0.0))
        model_prices = calib_error(res.x, market_prices, swaps, True)
        return res, model_prices, market_prices

# Calibration

In [7]:
model = CIRpp(curve)
res, model_prices, market_prices = model.calibrate(quotes = quotes, swaps = swaps)

In [8]:
for val, name in zip(list(res.x)+[res.cost], ['k', 'theta', 'sigma', 'x0', 'ERROR']):
    print(f'{name} = {val:.4f}')

k = 0.0000
theta = 0.0507
sigma = 0.0494
x0 = 0.0606
ERROR = 0.0007
