# Pricing and Backtesting of Fixed Coupon Notes 

## What is a Fixed Coupon Note?
Fixed Coupon Note (FCN) is a popular structured product among high networth investors. A simple version of FCN is defined by reference basket of stocks, coupon rate, autocall barrier, strike, coupon frequency and tenor. 

In [1]:
from dataclasses import dataclass


@dataclass
class FCN:
    strike: float  # % of initial price
    ko_barrier: float  # % of initial price
    coupon: float  # % per period
    periodlen: float  # coupon period in years, so 0.25 for quarterly
    numperiods: (
        int  # tenor in number of coupon periods, numperiods is 4 for 1Y quarterly
    )

This note pays a constant coupon amount periodically subject to early termination. At maturity, the note holder receives either 100% of the invested amount or shares of the worst performing stock in the reference basket depending on its performance. More precisely,

In [2]:
import numpy as np


def fcn_payout(path, fcn):
    payments = np.zeros_like(path)
    isKO = False
    for t, p in enumerate(path):
        payments[t] = fcn.coupon
        if p > fcn.ko_barrier:  # check for early termination
            isKO = True
            payments[t] += 1
            break
    if not isKO:
        payments[-1] += min(
            1.0, path[-1] / fcn.strike
        )  # delivery of shares of worst performing stock at strike price
    return payments

Let's take a concrete example of a 1Y FCN on basket of Apple, Tesla and Microsoft that pays quarterly a coupon of 20% p.a.

In [3]:
fcn = FCN(strike=0.8, ko_barrier=1.0, coupon=0.05, periodlen=0.25, numperiods=4)

Below are some examples of FCN payout for different trajectories of prices of the worst performing share. Initial price of all stocks is taken as 1.0 without loss of generality.

In [4]:
path = [0.98, 0.97, 0.99, 1.0]
fcn_payout(path, fcn)

array([0.05, 0.05, 0.05, 1.05])

In [5]:
path = [0.98, 0.99, 1.01, 1.0]
fcn_payout(path, fcn)

array([0.05, 0.05, 1.05, 0.  ])

In [6]:
path = [0.98, 0.92, 0.89, 0.72]
fcn_payout(path, fcn)

array([0.05, 0.05, 0.05, 0.95])

## Pricing a FCN

We use monte-carlo method to price FCNs. Many paths of worst performing share of the baset are simulated using Black-Scholes model for prices. FCN payout is computed for each the paths. Average of the discounted payouts is an estimate of the price.

Under Black-Scholes model, price evolve according to the stochastic differential equation:

$$ \log{S_{t+\Delta}} = \log{S_t} + (r_f - d - \frac{1}{2} \text{diag}(\Sigma)) \Delta + \sigma \sqrt{\Delta} \epsilon$$

where, $S_t$ is the vector of stock prices at time t, $r_f$ is the risk free rate, $d$ is the vector of dividend yields of the stocks, $\Sigma$ is the covariance matrix of stock returns and $\sigma$ is the Cholesky factor of $\Sigma$ i.e. $\Sigma = \sigma \sigma^T$.

Following function generates a path of the worst performing share in a basket.

In [7]:
def bsm_paths(numpaths, pathlen, dt, divs, cov, rf, rng):
    drift = (rf - divs - 0.5 * np.diag(cov)) * dt
    z = rng.multivariate_normal(drift, cov * dt, size=(numpaths, pathlen))
    np.cumsum(z, axis=1, out=z)
    np.exp(z, out=z)
    w = z.min(axis=-1)
    return w

Some sample paths of worst performing share among Apple, Microsoft and Tesla based the available data. 

In [8]:
rng = np.random.default_rng(seed=42)
divs = np.array([0.0054, 0.0071, 0.0])
rf = 0.03
cov = np.array(
    [
        [0.03751506, 0.02176969, 0.03418482],
        [0.02176969, 0.04947003, 0.02940194],
        [0.03418482, 0.02940194, 0.23212459],
    ]
)
paths = bsm_paths(5, fcn.numperiods, fcn.periodlen, divs, cov, rf, rng)
print(paths)

[[0.88642084 0.65257628 0.61394903 0.75759478]
 [0.90485803 0.87779186 0.84394758 0.76748896]
 [1.01119888 0.9465981  0.56566273 0.68735511]
 [0.97920183 0.83926    0.9698816  0.90575901]
 [0.83451584 0.67335333 0.72365794 0.48529013]]


And the corresponding FCN payouts

In [9]:
payouts = np.vstack([fcn_payout(path, fcn) for path in paths])
print(payouts)

[[0.05       0.05       0.05       0.99699348]
 [0.05       0.05       0.05       1.0093612 ]
 [1.05       0.         0.         0.        ]
 [0.05       0.05       0.05       1.05      ]
 [0.05       0.05       0.05       0.65661266]]


Note that the FCN early terminates at the end of first quarter in the third path above. 
Each of these payouts need to be discounted appropriately to compute PV. Since we know the risk free rate and the coupon period, we can pre compute the discount factors.

In [10]:
settlement_times = np.arange(1, fcn.numperiods + 1) * fcn.periodlen
discount_factors = np.exp(-rf * settlement_times)
print(discount_factors)

[0.99252805 0.98511194 0.97775124 0.97044553]


Price estimate is just the average of the PVs.

In [11]:
price = np.mean(payouts @ discount_factors)
print(price)

1.0472930537593481


The function below wraps the above calculations.

In [12]:
def calc_fcn_price_from_paths(paths, fcn, rf):
    payouts = np.vstack([fcn_payout(path, fcn) for path in paths])
    settlement_times = np.arange(1, fcn.numperiods + 1) * fcn.periodlen
    discount_factors = np.exp(-rf * settlement_times)
    price = np.mean(payouts @ discount_factors)
    return price


price = calc_fcn_price_from_paths(paths, fcn, 0.03)
print(price)

1.0472930537593481


## Backtesting a FCN

Backtesting evaluates payout statistics for a price trajectory observed in the past. An investor might be interested to know what the FCN would have paid, how frequently it terminated early, average duration, etc.

The function below returns the statistics of interest in addition to the payout as implemented in `fcn_payout` function above.

In [13]:
def fcn_payout_with_stats(path, fcn: FCN):
    payments = np.zeros_like(path)
    isKO = False
    duration = 0
    for t, p in enumerate(path):
        duration += 1
        payments[t] = fcn.coupon
        if p > fcn.ko_barrier:
            isKO = True
            payments[t] += 1
            break
    if not isKO:
        payments[-1] += min(1.0, path[-1] / fcn.strike)
    return {"payments": payments, "isKO": isKO, "duration": duration * fcn.periodlen}

Let us import historical prices of the stocks of interest.

In [14]:
import pandas as pd

prices = pd.read_csv("data/prices.csv", parse_dates=True, index_col="date")
prices = prices.dropna()
prices.describe()

Unnamed: 0,AAPL,MSFT,TSLA
count,3456.0,3456.0,3456.0
mean,61.662364,117.62456,72.527078
std,57.609164,108.442047,101.953699
min,7.25,17.58,1.05
25%,18.12,30.69,11.0875
50%,33.78,62.965,17.13
75%,114.3825,207.075,137.0175
max,197.86,429.37,409.97


We pick a 1Y trajectory sampled quarterly. We assume 252 trading days per year.

In [15]:
coupon_period_days = int(fcn.periodlen * 252)
tenor_days = fcn.numperiods * coupon_period_days
t = 500  # pick a point in history
path_idx = t + np.arange(fcn.numperiods + 1) * coupon_period_days
print(path_idx)

[500 563 626 689 752]


In [16]:
path = prices.iloc[path_idx]
print(path)

             AAPL   MSFT  TSLA
date                          
2012-06-21  17.46  24.22  2.15
2012-09-20  21.21  25.45  2.06
2012-12-21  15.84  22.40  2.27
2013-03-26  14.15  23.16  2.52
2013-06-25  12.43  27.89  6.83


We scale the paths such that the initial prices are 1.0 and compute the trajectory of the worst performing stock.

In [17]:
path = np.array(path)
path = path[1:] / path[0]
wo_path = path.min(axis=1)
print(wo_path)

[0.95813953 0.90721649 0.81042383 0.71191294]


And backtest the FCN over this path.

In [18]:
stats = fcn_payout_with_stats(wo_path, fcn)
print(stats)

{'payments': array([0.05      , 0.05      , 0.05      , 0.93989118]), 'isKO': False, 'duration': 1.0}


Since payouts at different points in time can have different durations, we use internal rate of return as a metric for comparison. A robust implementation would check for the status of the optimizaiton and exceptions.

In [19]:
from scipy.optimize import minimize_scalar


def loss_irr(ytm, payments, times, price):
    pv = np.dot(payments, np.exp(-ytm * times))
    err = pv - price
    return 0.5 * err * err


def compute_irr(payments, times, price):
    res = minimize_scalar(fun=loss_irr, args=(payments, times, price))
    return res.x

To compute IRR, we need the price of the FCN associated with this path. This is the price estimated as of time t = 500 based on data available at that time. We assume rf and divs as above and compute covariance matrix based on 1Y history available at time t = 500.

In [20]:
px = prices.iloc[(t - 252) : t]
cov = np.log(px).diff().dropna().cov() * 252
cov = np.asarray(cov)
print(cov)

[[0.08759867 0.03576015 0.06417788]
 [0.03576015 0.06166824 0.05267657]
 [0.06417788 0.05267657 0.30022503]]


In [21]:
paths = bsm_paths(5000, fcn.numperiods, fcn.periodlen, divs, cov, rf, rng)
price = calc_fcn_price_from_paths(paths, fcn, rf)
print(price)

0.9998274884469566


In [22]:
irr = compute_irr(stats["payments"], settlement_times, price)
print(f"{round(irr*100,2)}%")

9.28%


## Generic Method for Pricing and Backtesting FCNs

While the above set-up was pretty straight forward, one had to write contract specific functions for calculating payouts for a given trajectory of prices. The price model was also 'hard coded' into the monte-carlo calculations.

It would be nicer to have a framework, where one can specify a payment schedule i.e. a set of dates and what the contract pays on those dates and choose a model for generating paths to price the given payment schedule. Such a framework would be a powerful tool to design and evaluate new contracts requiring minimal code from the user.

qablet is a powerful framework that lets us specify payment schedule with commonly used contract components e.g. fixed payments, exchanging one asset for cash or another, digitals, etc. The user need only to specify the schedule and choose a pricing model. Given the generality, the framework can also be used for backtesting. Let's see the above example implemented with qablet.

### Set up the FCN contract.
At the heart of setting up a contract are events that define the payments under the contract.

In [23]:
from qablet_contracts.timetable import EventsMixin
from typing import List
from datetime import datetime


@dataclass
class FCNote(EventsMixin):
    ccy: str
    assets: List[str]
    initial_spot: List[float]
    strike: float  # % of initial spot
    barrier: float  # % of initial spot
    cpn_rate: float
    cpn_dates: List[datetime]
    track: str = ""

    def events(self):
        events = []
        # coupon and call events
        for cpn_date in self.cpn_dates:
            # coupon payment
            events.append(
                {
                    "track": self.track,
                    "time": cpn_date,
                    "op": "+",
                    "quantity": self.cpn_rate,
                    "unit": self.ccy,
                }
            )
            # call event
            events.append(
                {
                    "track": self.track,
                    "time": cpn_date,
                    "op": "CALL",
                    "quantity": 1.0,
                    "unit": self.ccy,
                }
            )

        # payoff at maturity
        events.append(
            {
                "track": "",
                "time": self.cpn_dates[-1],
                "op": "+",
                "quantity": 1.0,
                "unit": "PAYOFF",
            }
        )
        return events

    def expressions(self):
        # Define the autocall condition
        barriers = [s * self.barrier for s in self.initial_spot]

        def barrier_fn(inputs):
            a1 = inputs[0] > barriers[0]
            b1 = inputs[1] > barriers[1]
            c1 = inputs[2] > barriers[2]

            return [a1 * b1 * c1]  # 1 if all are above the barrier, 0 otherwise

        strikes = [s * self.strike for s in self.initial_spot]

        def pay_fn(inputs):
            # Calculate worst return
            worst = 1.0
            for strike, asset in zip(strikes, inputs):
                worst = np.minimum(worst, asset / strike)
            return [worst]

        return {
            "PAYOFF": {
                "type": "phrase",
                "inp": self.assets,
                "fn": pay_fn,
            },
            "CALL": {
                "type": "phrase",
                "inp": self.assets,
                "fn": barrier_fn,
            },
        }

Payment events, barrier event and payoff function at maturity completely characterize the FCN.
By modifying the pay_fn, barrier_fn, etc, one can easily customize the FCN or in fact define a different contract altogether.
For this timetable we are going to pick the same coupon dates as the above example.

In [24]:
cpn_dates = [
    datetime(2012, 9, 20),
    datetime(2012, 12, 21),
    datetime(2013, 3, 26),
    datetime(2013, 6, 25),
]
timetable = FCNote(
    ccy="USD",
    assets=["AAPL", "MSFT", "TSLA"],
    initial_spot=[17.46, 24.22, 2.15],
    strike=0.8,
    barrier=1.0,
    cpn_rate=0.05,
    cpn_dates=cpn_dates,
).timetable()
print(timetable["events"].to_pandas())

                       time    op  quantity    unit track
0 2012-09-20 00:00:00+00:00     +      0.05     USD      
1 2012-09-20 00:00:00+00:00  CALL      1.00     USD      
2 2012-12-21 00:00:00+00:00     +      0.05     USD      
3 2012-12-21 00:00:00+00:00  CALL      1.00     USD      
4 2013-03-26 00:00:00+00:00     +      0.05     USD      
5 2013-03-26 00:00:00+00:00  CALL      1.00     USD      
6 2013-06-25 00:00:00+00:00     +      0.05     USD      
7 2013-06-25 00:00:00+00:00  CALL      1.00     USD      
8 2013-06-25 00:00:00+00:00     +      1.00  PAYOFF      


### Backtesting Model
We will create a backtesting model that can parse and produce cashflows for any qablet timetable. For that we need to inherit from `CFModelPyBase` and implement the method `get_value`.

In [25]:
import polars as pl
from qablet.base.cf import CFModelPyBase

MS_IN_DAY = 1000 * 3600 * 24
TS_TO_YEARS = 1 / (365 * 24 * 3600 * 1e9)


class CFModelPyCSV(CFModelPyBase):
    """CFModel that uses data from a CSV."""

    def __init__(self, filename, base):
        """Read the csv file on initialization. Assumes that there is a date column and the
        data is sorted by the date column. Assumes that the other columns are the units.
        infer_schema_length is set to None, which causes the csv reader to read the whole file
        before inferring the schema. This is needed if the first row may have blanks. A faster
        read can be accomplished by supplying the schema directly."""

        self.data = pl.read_csv(
            filename, try_parse_dates=True, infer_schema_length=None
        ).set_sorted("date")
        self.ts = self.data["date"].cast(int)

        super().__init__(base)

    def get_value(self, unit, ts):
        """Return value for given unit, on given timestamp (ms).
        search_sorted is faster than filter by date. It picks the date on or before the ts.
        it appears that polars stores dates as a day timestamp, so a conversion is
        needed from qablet's milliseconds timestamp."""
        row = self.ts.search_sorted(ts // MS_IN_DAY)
        val = self.data.item(row, unit)
        return val

Run the model on the timetable and get the cashflow.
cashflow_by_ts returns a polars dataframe where

- index correponds to the index in the timetable events
- value is cashflow amount in base currency
- ts is the timestamp (ms) of the event

In [26]:
bk_model = CFModelPyCSV("data/prices.csv", "USD")
cf = bk_model.cashflow_by_ts(timetable)
print(cf)

shape: (9, 3)
┌───────┬──────────┬───────────────┐
│ index ┆ value    ┆ ts            │
│ ---   ┆ ---      ┆ ---           │
│ u64   ┆ f64      ┆ i64           │
╞═══════╪══════════╪═══════════════╡
│ 0     ┆ 0.05     ┆ 1348099200000 │
│ 1     ┆ 0.0      ┆ 1348099200000 │
│ 2     ┆ 0.05     ┆ 1356048000000 │
│ 3     ┆ 0.0      ┆ 1356048000000 │
│ 4     ┆ 0.05     ┆ 1364256000000 │
│ 5     ┆ 0.0      ┆ 1364256000000 │
│ 6     ┆ 0.05     ┆ 1372118400000 │
│ 7     ┆ 0.0      ┆ 1372118400000 │
│ 8     ┆ 0.889891 ┆ 1372118400000 │
└───────┴──────────┴───────────────┘


In [27]:
# net cashflows by timestamp
cf_agg = cf.group_by("ts").agg(pl.col("value").sum())
cf_agg

ts,value
i64,f64
1372118400000,0.939891
1348099200000,0.05
1364256000000,0.05
1356048000000,0.05


### Pricing Model
To define a Mnte-Carlo model, we will inherit from `finmc.models.base.MCFixedStep`, and implement the methods `reset`, `step`, `get_value`, and `get_df`.

In [28]:
from finmc.models.base import MCFixedStep
from finmc.utils.assets import Discounter, Forwards
from numpy.random import SFC64, Generator


class BSMC(MCFixedStep):
    """Class for the multi asset Black Scholes model."""

    def reset(self):
        """Fetch any information from the dataset or timetable, that needs to be stored into self,
        to facilitate the 'advance' method to run as quickly as possible."""
        # common MC parameters
        self.n = self.dataset["MC"]["PATHS"]
        self.timestep = self.dataset["MC"]["TIMESTEP"]
        
        # fetch the model parameters from the dataset
        modeldata = self.dataset["BSM"]
        self.cov = modeldata["COV"]

        # Create a dictionary of assets, so that we can easily access the idx and spot from string name.
        # Also create ordered lists of forwards and variances for the assets.
        self.assets = {}
        self.fwds = []
        self.vars = []
        for idx, asset in enumerate(modeldata["ASSETS"]):
            fwd = Forwards(self.dataset["ASSETS"][asset])
            self.assets[asset] = {"idx": idx, "spot": fwd.forward(0)}
            self.fwds.append(fwd)
            self.vars.append(self.cov[idx][idx])

        self.discounter = Discounter(self.dataset["ASSETS"][self.dataset["BASE"]])

        # Initialize the RNG and the x_vec (log process for each asset)

        self.rng = Generator(SFC64(self.dataset["MC"]["SEED"]))
        self.x_vec = np.zeros((len(self.assets), self.n))

        self.cur_time = 0

    def step(self, new_time):
        """Update x_vec in place when we move simulation by time dt."""

        dt = new_time - self.cur_time

        drifts = np.array([fwd.rate(new_time, self.cur_time) for fwd in self.fwds])
        drifts -= np.array(self.vars) / 2.0

        # generate the random numbers and advance the log stock process
        dz = self.rng.multivariate_normal(
            drifts * dt, self.cov * dt, self.n
        ).transpose()

        self.x_vec += dz

        self.cur_time = new_time

    def get_value(self, unit):
        """If this asset is part of the model, return its value using the simulated array.
        For any other asset that may exist in the timetable, return the default implementation in qablet base model.
        """

        info = self.assets.get(unit)
        if info:
            return info["spot"] * np.exp(self.x_vec[info["idx"]])

    def get_df(self):
        return self.discounter.discount(self.cur_time)

Construct the assets market data.

In [29]:
# Construct the asset data (forwards)
assets_data = {}
# Rates data
assets_data["USD"] = ("ZERO_RATES", np.array([[5.0, rf]]))

# Equities data
assets = ["AAPL", "MSFT", "TSLA"]
times = np.array([0.0, 5.0])
# These are current spots for pricing. Can be different from contract strikes.
initial_spots=[17.46, 24.22, 2.15]
for asset, spot, div in zip(assets, initial_spots, divs):
    fwds = spot * np.exp(times * (rf - div))
    assets_data[asset] = ("FORWARDS", np.column_stack((times, fwds)))


In [30]:
from qablet_contracts.timetable import py_to_ts

dataset = {
    "MC": {
        "PATHS": 100_000,
        "TIMESTEP": 100,  # Black-Scholes doesn't need small timesteps
        "SEED": 1,
    },
    "PRICING_TS": py_to_ts(datetime(2012, 6, 21)).value,
    "BASE": "USD",
    "ASSETS": assets_data,
    "BSM": {
        "ASSETS": assets,
        "COV": cov,
    },
}

In [31]:
from qablet.base.mc import MCPricer
pricing_model = MCPricer(BSMC)
price, _ = pricing_model.price(timetable, dataset)
print(price)


1.0006363146014094
