In [0]:
# Project: IMPLEMENTING A LIBOR YIELD CURVE OBJECT

# By Snehal Vikas Kenjale (svk304)

In [0]:
from dateutil.relativedelta import relativedelta
import math
import logging
import datetime

In [0]:
# This class handles dates for Libor Yield Curve

class USDYieldCurveDate:
    def __init__(self, *args):
        if len(args) != 2:
            logging.error("Cannot build curve from given inputs. 2 parameters should be given.")
            raise Exception("Cannot build curve from given inputs.")
        self._trade_date = self.read_trade_date(args[0])
        self._holiday_list = self.read_holiday_calendar(args[1])  
        self._spot_date = self.calculate_spot_date(self.trade_date)

    # This function returns the trade date
    @property
    def trade_date(self):
        return self._trade_date

    # This function returns the spot date
    @property
    def spot_date(self):
        return self._spot_date

    # This function returns a list containing holidays
    @property
    def holiday_list(self):
        return self._holiday_list
    
    # This function returns date object from date as string
    def date_object(self, date_str):
        try:
            date_ymd = date_str.strip().split("-")
            return datetime.date(int(date_ymd[0]), int(date_ymd[1]), int(date_ymd[2]))
        except Exception as e:
            logging.error('Invalid date string. Should be in format: YYYY-MM-DD')
            raise Exception('Invalid date string. Should be in format: YYYY-MM-DD')

    # This function reads trade date from given file.
    def read_trade_date(self, trade_date_file):
        trade_date = None
        with open(trade_date_file, "r") as fp:
            rows = (line.rstrip() for line in fp) 
            rows = (line for line in rows if line)      
            for row in rows:
                trade_date = self.date_object(row.strip())
        logging.debug('Trade date : %s', trade_date)
        return trade_date

    # calculate the spot date which is two business days after trade date
    def calculate_spot_date(self, trade_date):
        spot_date = self.following_date(trade_date)
        spot_date = self.following_date(spot_date)
        logging.debug('Spot date : %s', spot_date)
        return spot_date
    
    # This function returns the next business day
    def following_date(self, date):
        date = date + relativedelta(days=+1)
        while self.is_holiday(date):
            date = date + relativedelta(days=+1)
        return date
    
    # This function returns previous business day
    def previous_date(self, date):
        date = date + relativedelta(days=-1)
        while self.is_holiday(date):
            date = date + relativedelta(days=-1)
        return date
    
    # This function determines if given date is a holiday.
    def is_holiday(self, date):
        return self.is_weekend(date) or date in self.holiday_list

    # This function determines if given date falls on weekend or not.
    @staticmethod
    def is_weekend(date):
        week_day = date.weekday()
        return week_day >= 5         # 0-4 weekday and 5-6 is weekend

    # This function reads holidays from given file
    def read_holiday_calendar(self, holiday_calendar):
        holiday_list = []
        with open(holiday_calendar, "r") as fp:
            rows = (line.rstrip() for line in fp) 
            rows = (line for line in rows if line)
            for row in rows:
                holiday_list.append(self.date_object(row.strip()))
        logging.debug('Holiday list : %s', holiday_list)
        return holiday_list

    # This function returns the payment date
    def modified_following(self, date):
        payment_date = date
        if self.is_holiday(payment_date):
            payment_date = self.following_date(payment_date)
        if payment_date.month > date.month:
            payment_date = date
            if self.is_holiday(payment_date):
                payment_date = self.previous_date(payment_date)
        logging.debug('Payment date : %s', payment_date)
        return payment_date

    @staticmethod
    def third_wednesday(year, month):
        third_day = datetime.date(year, month, 15)      # The 15th is the lowest third day in the month
        wd = third_day.weekday()
        if wd != 2:            # 2 represents Wednesday
            third_day = third_day.replace(day=(15 + (2 - wd) % 7))  
        logging.debug('Third wednesday : %s', third_day)
        return third_day


In [0]:
# This class handles computations for Libor Yield Curve

class USDYieldCurve(USDYieldCurveDate):
    def __init__(self, *args):
        if len(args) != 4:
            logging.error("Cannot build curve from given inputs. 4 parameters should be given.")
            raise Exception('Cannot build curve from given inputs.')
        super(USDYieldCurve, self).__init__(args[2], args[3]) 
        self._deposit_rates = self.read_deposit_rates(args[0])
        self._future_price_rates = self.read_futures_prices(args[1])
        
    # This function returns deposit rates
    @property
    def deposit_rates(self):
        return self._deposit_rates

    # This function returns future price rates
    @property
    def future_prices_rates(self):
        return self._future_price_rates

    # This function reads deposit rates from file
    def read_deposit_rates(self, deposit_rates_file):
        deposit_rates = []
        with open(deposit_rates_file, "r") as fp:
            rows = (line.rstrip() for line in fp)  
            rows = (line for line in rows if line)
            for row in rows:
                item = row.strip().split("\t")
                if item[0][-1] == "D":
                    date_to_mature = relativedelta(days=int(item[0][-2]))
                elif item[0][-1] == "W":
                    date_to_mature = relativedelta(weeks=int(item[0][-2]))
                else:
                    date_to_mature = relativedelta(months=int(item[0][-2]))
                mature_date = self.modified_following(self.spot_date + date_to_mature)
                deposit_rates.append((mature_date, float(item[1])))
        logging.debug('Deposit rates : %s', deposit_rates)
        return deposit_rates

    # This function reads future prices from file
    def read_futures_prices(self, futures_prices_file):
        future_prices = []
        with open(futures_prices_file, "r") as fp:
            rows = (line.rstrip() for line in fp)  
            rows = (line for line in rows if line)
            for row in rows:
                item = row.strip().split("\t")
                if item[0][-2] == "H":
                    month = 3
                elif item[0][-2] == "M":
                    month = 6
                elif item[0][-2] == "U":
                    month = 9
                else:
                    month = 12
                n = int(item[0][-1])
                tdf = int((str(self.trade_date.year))[3])
                tdl = int((str(self.trade_date.year))[:3])
                mn = self.trade_date.month
                if n >= tdf or (n == tdf and month >= mn):
                    year = tdl * 10 + n
                else:
                    year = tdl * 10 + n + 10
                date = self.third_wednesday(year, month)
                price = float(item[1])
                rate = (100 - price) / 100.0
                future_prices.append((date, price, rate))
        logging.debug('Future prices : %s', future_prices)    
        return future_prices

    # This function returns discount factors for deposit mature dates
    def df_mature_dates(self):
        df_mature_dates = [(self.spot_date, 1.0)]   
        for i in range(len(self.deposit_rates)):
            df = 1 / (1 + self.deposit_rates[i][1] * (self.deposit_rates[i][0] - self.spot_date).days / 36000.0)
            df_mature_dates.append((self.deposit_rates[i][0], df))
        logging.debug('Mature dates : %s', df_mature_dates)
        return df_mature_dates

    # This function returns discount factors for future expiry dates
    def df_future_expiry(self):
        df_future_expiry = []
        date_depo_max = self.deposit_rates[-1][0]
        date_depo_min = self.deposit_rates[0][0]
        date_first_future = self.future_prices_rates[0][0]
        if date_first_future <= date_depo_min or date_first_future >= date_depo_max:
            logging.error('Insufficient LIBOR cash rate data.')
            raise Exception('“Insufficient LIBOR cash rate data.')
        for i in range(len(self.future_prices_rates)):
            date = self.future_prices_rates[i][0]
            if not df_future_expiry:    
                for j in range(len(self.deposit_rates) - 1):
                    date1 = self.deposit_rates[j][0]
                    date2 = self.deposit_rates[j + 1][0]
                    deposit_rate_date1 = self.deposit_rates[j][1]
                    deposit_rate_date2 = self.deposit_rates[j + 1][1]
                    if date1 < date < date2:
                        df_date1 = 1 / (1 + deposit_rate_date1 * (date1 - self.spot_date).days / 36000.0)
                        df_date2 = 1 / (1 + deposit_rate_date2 * (date2 - self.spot_date).days / 36000.0)
                        df = math.exp(math.log(df_date1) + (date - date1) / (date2 - date1) * (math.log(df_date2) - math.log(df_date1)))
                        df_future_expiry.append((date, df))
                        break
            else:
                df = df_future_expiry[i - 1][1] / (1 + self.future_prices_rates[i - 1][2] * (date - self.future_prices_rates[i - 1][0]).days / 360.0)
                df_future_expiry.append((date, df))
        return df_future_expiry

    # This function returns df to date list
    def get_dfs_dates(self):
        df_mature_dates = self.df_mature_dates()
        df_future_expiry = self.df_future_expiry()
        if not (df_mature_dates or df_future_expiry):
            return None
        return sorted(df_mature_dates + df_future_expiry, key=lambda tup: tup[0])

    # This function calculates discount factor for the given date
    def get_df_date(self, date):
        dfs_dates = self.get_dfs_dates()
        if not dfs_dates:
            return None
        date_max = dfs_dates[-1][0] 
        date_min = dfs_dates[0][0] 
        if date < self.spot_date or date < date_min or date > date_max:
            logging.error('Input date should be between {first} and {last}.'.format(first=str(date_min), last=str(date_max)))
            raise Exception('Input date should be between {first} and {last}.'.format(first=str(date_min), last=str(date_max)))
        if self.is_holiday(date):
            logging.warning("{date} is a holiday".format(date=date))
        for i in range(len(dfs_dates) - 1):
            date1 = dfs_dates[i][0]
            date2 = dfs_dates[i + 1][0]
            df_date1 = dfs_dates[i][1]
            df_date2 = dfs_dates[i + 1][1]
            if date == date1:  
                return df_date1
            elif date == date2:
                return df_date2
            elif date1 < date < date2:
                return math.exp(math.log(df_date1) + (date - date1) / (date2 - date1) * (math.log(df_date2) - math.log(df_date1)))
                        
    # This function returns discount factor for given date as string
    def getDfToDate(self, date_str):
        dft = self.get_df_date(self.date_object(date_str.strip()))
        if dft:
            return self.round_up(dft)
        return None

    # This function returns the forward rate for two given dates as string
    def getFwdRate(self, date1_str, date2_str):
        date1 = self.date_object(date1_str.strip())
        date2 = self.date_object(date2_str.strip())
        if date1 >= date2:
            logging.error('First date parameter should be larger than the second one.')
            raise Exception('First date parameter should be larger than the second one.')
        if self.is_holiday(date1) or self.is_holiday(date2):
            logging.warning("Input date is a holiday.")
        df_date1 = self.get_df_date(date1)
        df_date2 = self.get_df_date(date2)
        if not (df_date1 or df_date2):
            return None
        fwd_rate = 360.0 / (date2 - date1).days * (df_date1 / df_date2 - 1.0)
        return self.round_up(fwd_rate)

    # This function rounds up the given value
    @staticmethod
    def round_up(value):
        return round(value * 1000000000) / 1000000000.0


In [0]:
def main():
    logging.basicConfig(filename='myProject.log', level=logging.DEBUG)
    
    print('\nTESTCASE - YCtestcase')
    usdCurve = USDYieldCurve('depoRates.txt', 'futurePrices.txt', 'tradeDate.txt', 'holidayCalendar.txt')
    print('\nDiscount Curve')
    print('getDfToDate : ', usdCurve.getDfToDate('2015-5-25'))
    print('getDfToDate : ', usdCurve.getDfToDate('2015-6-17'))
    print('getDfToDate : ', usdCurve.getDfToDate('2015-6-24'))
    print('getDfToDate : ', usdCurve.getDfToDate('2015-9-16'))
    print('getDfToDate : ', usdCurve.getDfToDate('2015-12-16'))
    print('getDfToDate : ', usdCurve.getDfToDate('2016-3-16'))
    print('getDfToDate : ', usdCurve.getDfToDate('2016-6-15'))
    print('getDfToDate : ', usdCurve.getDfToDate('2016-9-21'))
    print('getDfToDate : ', usdCurve.getDfToDate('2016-12-21'))
    print('getDfToDate : ', usdCurve.getDfToDate('2017-3-15'))
    print('getDfToDate : ', usdCurve.getDfToDate('2017-6-21'))
    print('getDfToDate : ', usdCurve.getDfToDate('2017-9-20'))
    print('getDfToDate : ', usdCurve.getDfToDate('2017-12-20'))
    
    print('\nOutputs')
    print('getDfToDate : ', usdCurve.getDfToDate('2015-12-1'))
    print('getDfToDate : ', usdCurve.getDfToDate('2016-2-1'))
    print('getFwdRate  : ', usdCurve.getFwdRate('2015-12-1', '2016-2-1'))
    
    print('\n\nTESTCASE - YCtestcaseWith2019Spot')
    usdCurve = USDYieldCurve("depoRates1.txt", "futurePrices1.txt", "tradeDate1.txt", "holidayCalendar.txt")
    print('\nDiscount Curve')
    print('getDfToDate : ', usdCurve.getDfToDate("2019-5-28"))         
    print('getDfToDate : ', usdCurve.getDfToDate("2019-6-19"))
    print('getDfToDate : ', usdCurve.getDfToDate('2019-6-25'))
    print('getDfToDate : ', usdCurve.getDfToDate('2019-9-18'))
    print('getDfToDate : ', usdCurve.getDfToDate('2019-12-18'))
    print('getDfToDate : ', usdCurve.getDfToDate('2020-3-18'))
    print('getDfToDate : ', usdCurve.getDfToDate('2020-6-17'))

    print('\nOutputs')
    print('getDfToDate : ', usdCurve.getDfToDate('2019-12-2'))
    print('getDfToDate : ', usdCurve.getDfToDate('2020-2-3'))
    print('getFwdRate  : ', usdCurve.getFwdRate("2019-12-2", "2020-2-3"))

In [6]:
if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        logging.error(e)
        print('Error encountered : ', e)


TESTCASE - YCtestcase

Discount Curve
getDfToDate :  0.999846746
getDfToDate :  0.999674522
getDfToDate :  0.999622112
getDfToDate :  0.998828709
getDfToDate :  0.997593079
getDfToDate :  0.995906401
getDfToDate :  0.993746107
getDfToDate :  0.990886848
getDfToDate :  0.987740893
getDfToDate :  0.983792605
getDfToDate :  0.978783409
getDfToDate :  0.973786532
getDfToDate :  0.968522879

Outputs
getDfToDate :  0.997796649
getDfToDate :  0.996721581
getFwdRate  :  0.006262862


TESTCASE - YCtestcaseWith2019Spot

Discount Curve
getDfToDate :  0.99983686
getDfToDate :  0.999668126
getDfToDate :  0.999622112
getDfToDate :  0.998570115
getDfToDate :  0.997083076
getDfToDate :  0.995146135
getDfToDate :  0.992737093

Outputs
getDfToDate :  0.997344373
getDfToDate :  0.996082208
getFwdRate  :  0.007240741
