In [1]:
import re
import datetime

from qfin_calendar import *

## Libor Calculation

### Libor Scheduler

 This class contains some utility methods for LIBOR rate time handling on a generic time interval between two dates (conventionally called start and end). 
 Constructor parameters:
 - param fixing_leg: days between fixing date and start accrual date
 - type fixing_leg:  datetime.date
 - param day_count:  convention for year fraction calculation
 - type day_count:   day_count_basis
 - param start_adj:  day adjustment convention for start date
 - type start_adj:   AdjustmentType  
 - param end_adj:    day adjustment convention for end date
 - type end_adj:     AdjustmentType 
 - param end_to_end: day adjustment convention for end month dates
 - type end_to_end:  boolean 


In [2]:
class LiborScheduler():
   
    def __init__(self, fixing_lag, day_count, start_adj, end_adj, end_to_end = False):
        self.__fixing_lag        = fixing_lag
        self.__delta_fixing_lag  = datetime.timedelta(days=fixing_lag)
        self.__day_count         = day_count
        self.__start_adj         = start_adj
        self.__end_adj           = end_adj
        self.__end_to_end        = end_to_end
    
    def dayCount(self):
        return self.__day_count
    
    def fixingDate(self, startDate):
        fixing = startDate
        modifier = DayAdjustmentFactory.create_istance(self.__start_adj)
        for i in range(self.__fixing_lag):
            fixing = modifier(fixing - datetime.timedelta(days=1))
        return fixing
    
    def startDate(self, fixing):
        start    = fixing
        modifier = DayAdjustmentFactory.create_istance(self.__start_adj)
        for i in range(self.__fixing_lag):
            start = modifier(start + datetime.timedelta(days=1))
        return start
        
    def endDate(self, startDate, tenor):
        modifier    = DayAdjustmentFactory.create_istance(self.__end_adj)
        m           = re.match("([0-9]+)([a-zA-Z]+)", tenor)
        value, unit = int(m.group(1)), m.group(2) 
        end         = modifier(startDate + addTimeInterval(value, unit))
        
        if self.__end_to_end and startDate.day == giorni_mese[startDate.month - 1]:
            end = datetime.date(end.year, end.month, last_month_day(end))
        return end     


## Forward Rate Calculator

In [3]:
class LiborForwardCalculator():
    
    def __init__(self, discount, libor_schedule):
        self.__discount = discount
        self.__schedule = libor_schedule
        self.__scheduleDates = {}
        self.__year_fraction   = YearFractionFactory.create_istance(self.__schedule.dayCount())
        
    def discount(self):
        return self.__discount
    
    def daycount_convention(self):
        return self.__schedule.dayCount()
     
    def __call__(self, fixing, tenor):
        if fixing not in self.__scheduleDates:
            startDate       = self.__schedule.startDate(fixing)
            endDate         = self.__schedule.endDate(startDate, tenor)
            tau             = self.__year_fraction(startDate, endDate)
            self.__scheduleDates[fixing] = (startDate, endDate, tau)
        else:
            startDate, endDate, tau = self.__scheduleDates[fixing]
        
        dfStart         = self.__discount.df(startDate)
        dfEnd           = self.__discount.df(endDate  ) 
        forward         = (dfStart/dfEnd - 1)/tau 
               
        return forward

## Libor Rate

In [4]:
class Libor(object):
    def __init__(self, obsdate, spot, tenor, forward_calc = None, label = None):
        self.__obsdate        = obsdate
        self.__spot           = spot
        self.__tenor          = tenor
        self.__forward_calc   = forward_calc
        self.__label          = label
    
    def daycount_convention(self):
        if self.__forward_calc:
            return self.__forward_calc.daycount_convention()
    
    def label(self):
        return self.__label
        
    def spot(self):
        """
        Return the spot of the Libor
        """
        return self.__spot

    def forwardCalc(self):
        return self.__forward_calc

    def forward(self, date):
        if self.__forward_calc:
            return self.__forward_calc(date, self.__tenor)
        else:
            return None   


## Discount Curve Class

We want to create the DiscountCurve class with that will compute $df(t, T)$ where:
- t is the "today" (the so called observation date) and T a generic maturity
- obsdate: the date at which the curve refers to (i.e. today)
- pillars: a list of dates at which the discount factor is known
- dfs: the known discount factors

In [5]:
# numpy is a numerical package
# (package is the pythonic name for library, which actually is nothing else than a directory containing python files)
import numpy

# to represent dates we use the date class from the package datetime
from datetime import date

# math is mathematical package
import math

class DiscountCurve:
    def __init__(self, obsdate, pillars, dfs):
        # the following generates an error that will block the program
        if pillars[0] < obsdate:
            raise "today is greater than the first pillar date"
        
        # we want to make sure that the first pillar is the observation date and its discount factor is 1.0
        # therefore we add it if not present in the original lists
        if pillars[0] > obsdate:
            pillars.insert(0, obsdate)
            dfs.insert(0, 1.0)

        # store the input variables
        self.today = obsdate
        self.pillars = pillars
        self.dfs = dfs

        # dates must be converted to numbers, otherwise the interpolation function will not work
        self.pillars_number = [aDate.toordinal() for aDate in pillars]

        # we will linearly interpolate on the logarithm of the discount factors
        self.logdfs = [math.log(df) for df in dfs]

    def df(self, aDate):
        # we convert the date to a number
        date_number = aDate.toordinal()

        # we use the linear interpolator of the numpy library
        log_df = numpy.interp(date_number, self.pillars_number, self.logdfs)

        #we  will have to the take the exponential beacuse we interpolated the logarithms
        df = math.exp(log_df)

        # return the resulting discount factor
        return df
    
    def obsdate(self):
        return self.today

In [None]:
def extends(klass):
    def decorator(func):
        setattr(klass, func.__name__, func)
        return func
    return decorator

### IR Stream Class

In [None]:
from enum import Enum

class flow_type(Enum):
    fixed       = "fixed"
    floating    = "floating"
    notional    = "notional"

class flow(object):
    def __init__(self
             , this_type
             , index                    #None for fixed flows 
             , leverage
             , coupon                   #the margin in case of a floating flow
             , payment_date
             , fixing_date        
             , start_date
             , end_date
             , notional                 # the notional on which the flow amount is computed
             , daycount_convention
             ):
        self.__type                     = this_type
        self.__index                    = index
        self.__leverage                 = leverage
        self.__coupon                   = coupon
        self.__payment_date             = payment_date
        self.__fixing_date              = fixing_date
        self.__start_date               = start_date
        self.__end_date                 = end_date
        self.__notional                 = notional
        self.__daycount_convention      = daycount_convention
 
        if self.__type == flow_type.fixed:
            year_fraction = YearFractionFactory.create_istance(self.__daycount_convention)
        if self.__type == flow_type.floating:
            year_fraction = YearFractionFactory.create_istance(self.__index.daycount_convention())
        if self.__type == flow_type.notional:
            year_fraction = YearFractionFactory.create_istance(day_count_basis.basis_lin_30360)    
        self.__tau = year_fraction(self.__start_date, self.__end_date)
        self.__npv  = 0.0
    
    def index(self):                    return self.__index
    def fixing_date(self):              return self.__fixing_date
    def payment_date(self):             return self.__payment_date
    def this_type(self):                return self.__type
    def leverage(self):                 return self.__leverage
    def start_date(self):               return self.__start_date
    def end_date(self):                 return self.__end_date
    def tau(self):                      return self.__tau
    def notional(self):                 return self.__notional
    
    def value(self, obsdate, fixed):
        if self.__type == flow_type.notional:
            return self.__importo
        else:    
            return self.__notional * self.__tau * self.rate(obsdate, fixed)
    
    def rate(self, obsdate, fixed = None):
        try:
            table = fixed[self.__index.label()]
        except:                
            table = None
        
        floater, coupon  = 0, 0
        if self.__index:
            if self.__fixing_date > obsdate:
                floater = self.__index.forward(self.__fixing_date)
            else:
                if self.__fixing_date == obsdate and not fixed:
                    floater = self.__index.forward(self.__fixing_date)
                else:
                    if table:
                        floater = table(self.__fixing_date)
                    else:
                        floater = 0.0       
        else:
            coupon = self.__coupon   
        return self.__leverage * floater + coupon 


### Leg Class

In [None]:
class leg(object):
    def __init__(self
                , flows
                , pay_or_receive
                , currency
                , payment_frequency     = None
                , calculation_frequency = None
                , payoff                = None
               ):
        
        self.__flows                    = flows
        self.__pay_or_receive           = pay_or_receive
        self.__currency                 = currency
        self.__payment_frequency        = payment_frequency
        self.__calculation_frequency    = calculation_frequency
        self.__payoff                   = payoff
       
    def flow(self, index):              return self.__flows[index]
    def flows(self):                    return self.__flows
    def pay_receive(self):              return self.__pay_or_receive
    def has_payoff(self):               return self.__payoff <> None
    def currency(self):                 return self.__currency
    
    def flowsCount(self):
        return len(self.__flows)    

    def payoff(self):
        if self.__payoff == None:
            raise RuntimeError, "Null payoff"
        return self.__payoff


### Swap Class

In [None]:
class Swap():
    def __init__(self, 
                 today, 
                 underlying, 
                 maturity, 
                 coupon, 
                 notional = 1000000, 
                 pay_fix = +1, 
                 pay_float = -1, 
                 float_tenor = '6m', 
                 fixed_tenor='1y', 
                 floatscheduler = None, 
                 fixscheduler = None):
        
        if floatscheduler == None:
            self.__floatscheduler = SimpleScheduler(fixing_lag = 2, 
                                                    tenor = float_tenor, 
                                                    dayadjustment = AdjustmentType.MODFOLLOWING, 
                                                    daycount = day_count_basis.basis_lin_act_360)
        else:
            self.__floatscheduler = floatscheduler
        if fixscheduler == None:
            self.__fixscheduler = SimpleScheduler(fixing_lag =2, 
                                                  tenor = fixed_tenor, 
                                                  dayadjustment = AdjustmentType.MODFOLLOWING, 
                                                  daycount = day_count_basis.basis_lin_s30360)
        else:
            self.__fixscheduler = fixscheduler
        
        floatscheduler      = self.__floatscheduler
        fixscheduler        = self.__fixscheduler
        
        floatstartdate      = floatscheduler.scheduleDate(today)
        floatdates          = floatscheduler.scheduleDates(floatstartdate, maturity)
        floatstartdates     = floatdates[0:len(floatdates)-1]
        floatenddates       = floatdates[1:len(floatdates)]
        floatfixingdates    = []
        
        for startflowdate in floatstartdates:
            floatfixingdates.append(floatscheduler.fixingDate(startflowdate))
        floatpaymentdates = floatenddates

        fixstartdate    = fixscheduler.scheduleDate(today)
        fixdates        = fixscheduler.scheduleDates(fixstartdate, maturity)
        fixstartdates   = fixdates[0:len(fixdates)-1]
        fixenddates     = fixdates[1:len(fixdates)]
        fixpaymentdates = fixenddates
        
        floatflows = []
        for i in range(len(floatstartdates)):
            f = flow(this_type              = flow_type.floating
                    , index                 = underlying 
                    , leverage              = 1.
                    , coupon                = 0.
                    , payment_date          = floatpaymentdates[i]
                    , fixing_date           = floatfixingdates[i]        
                    , start_date            = floatstartdates[i]
                    , end_date              = floatenddates[i]
                    , notional              = notional
                    , daycount_convention   = day_count_basis.basis_lin_act_360
                    )
            floatflows.append(f)

        fixflows = []
        for i in range(len(fixstartdates)):
            f = flow(this_type              = flow_type.fixed
                    , index                 = None 
                    , leverage              = 1.
                    , coupon                = coupon
                    , payment_date          = fixpaymentdates[i]
                    , fixing_date           = None        
                    , start_date            = fixstartdates[i]
                    , end_date              = fixenddates[i]
                    , notional              = notional
                    , daycount_convention   = day_count_basis.basis_lin_30360
                    )
            fixflows.append(f)
        
        floatleg    = leg(floatflows, pay_float, None)
        fixleg      = leg(fixflows, pay_fix, None)
      
        self.__leg_1 = floatleg
        self.__leg_2 = fixleg
        return None
       
    def legs(self):
        return self.__leg_1, self.__leg_2   
    


In [None]:
@extends(Swap)

def price(self, fixed, discount_curve, evaluation_date = None):
    res             = {}
    flows           = []
    npv, annuity    = 0.0, 0.0
    legvalue        = [0, 0] 
    j, k            = 0, 0
    try:
        dc       = discount_curve
        df       = dc.df
        obsdate  = evaluation_date
        if obsdate == None: obsdate  = dc.obsdate()      
        k = 0
        for l in self.legs():
            leg_npv, f_value, j = 0.0, 0.0, 0
            for f in l.flows():
                flow  = {}
                payment_date = f.payment_date()
                #print '-----> leg #%s (%s)- flow #%s, date %s - obs date %s'%(k, str(f.this_type()), str(j), payment_date, obsdate)
                if payment_date > obsdate:
                    f_discf = df(payment_date)
                    f_flow  = f.value(obsdate, fixed)    
                    f_value = f_discf * f_flow * f.leverage()    
                    leg_npv += f_value
                    if f.index() == None: annuity += f_discf * f.tau() * f.notional()
                    j+=1
                    flow['leg']          = k    
                    flow['payment date'] = payment_date
                    flow['start date']   = f.start_date()
                    flow['end date']     = f.end_date()
                    flow['fixing date']  = f.fixing_date()
                    flow['flow']         = f_flow
                    flow['npv']          = f_value
                    flow['discount']     = f_discf
                    flows.append(flow)
                        
            legvalue[k] = l.pay_receive() * leg_npv
            npv += legvalue[k]
            k+=1

    except Exception, e:
        print '-----> ERRORE - Flusso %s - Leg %s - Error %s' % (str(j), str(k), e.__str__())
            
    finally:    
        res['npv']      = npv
        res['legs']     = legvalue
        res['flows']    = flows    
        res['annuity']  = annuity
            
    return res

In [8]:
print "module qfin_ir_classes imported"

module qfin_ir_classes imported
