In [1]:
import numpy as np
from numpy.polynomial import Polynomial, Laguerre
import scipy.stats as stats
from scipy.interpolate import CubicSpline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import Ridge
import QuantLib as ql
from multiprocess import Pool
from tqdm import tqdm
import matplotlib.pyplot as plt

In [2]:
# Flat Yield Curve
flat_yields = [
    (0.5, .03),
    (2.0, .03),
    (5.0, .03),
    (10.0, .03),
]

In [5]:
def create_yield_curve(yield_curve):
    # Extract durations and yields
    durations = [item[0] for item in yield_curve]
    yields = [item[1] for item in yield_curve]

    # Fit cubic spline
    yield_curve = CubicSpline(durations, yields)
    return yield_curve

In [6]:
yield_curve = create_yield_curve(flat_yields)

In [7]:
def get_short_rates(simulations, steps, dt, r0, a, sigma, theta, yield_curve):
    def simulate_path(params):

        _, steps, dt, r0, a, sigma, theta, yield_curve = params

        import numpy as np
        np.random.seed(_)       # So that values in each processor is unique

        theta = lambda t: yield_curve.derivative(nu=1)(t) + (a*yield_curve(t)) + ((sigma**2)/(2*a))*(1-np.exp(-2*a*t))
        Z = np.random.normal(0, 1, steps)
        dr = np.zeros(steps)
        r = np.zeros(steps)
        r[0] = r0  # Initialize with the initial rate from the market
        for t in range(1, steps):
            dr[t] = (theta(t * dt) - a * r[t-1]) * dt + sigma * np.sqrt(dt) * Z[t]
            r[t] = r[t-1] + dr[t]
        return r

    # Use multiprocess to run simulations in parallel
    simulation_params = [(np.random.randint((i+10)*100), steps, dt, r0, a, sigma, theta, yield_curve) for i in range(simulations)]     # PseudoRandom Seed

    # Run in Parallel
    with Pool() as pool:
        short_rate_paths = pool.map(simulate_path, simulation_params)

    return np.array(short_rate_paths)

In [8]:
def get_forward_rates(simulations, days_per_year, dt, T_m, tenor, short_rate_paths):
    # Finding forward rates based on the simulated short rates
    forward_rate_paths_regression = []
    for simulation in range(simulations):
        forward_rates_temp = []
        for t in np.arange(0.0, T_m, tenor):
            # To be clear what is counted where, made variables as explicit as possible
            start_id = int(days_per_year*t)
            finish_id = int(days_per_year*(t+tenor))
            forward_rate = np.sum(short_rate_paths[simulation, start_id:finish_id]*dt)  # Using Reimann sum to approximate the integral for rates for correct time period
            forward_rates_temp.append(forward_rate)
        forward_rate_paths_regression.append(forward_rates_temp)
    return (np.array(forward_rate_paths_regression))

In [84]:
# Parameters
T = 10                      # Years of Simulation
days_per_year = 365         # Days in the Year
steps = int(T*days_per_year)    # Number of steps
dt = 1.0 * T / steps            # Time step size
simulations = 10000        # Amount of Monte Carlo runs
r0 = 0.03

# Parameters for the Hull-White model
a = 0.01
sigma = 0.01
theta_normal = lambda t: yield_curve.derivative(nu=1)(t) + (a*yield_curve(t)) + ((sigma**2)/(2*a))*(1-np.exp(-2*a*t))

# Swaption parameters
T_m = 6.0              # Maximum duration of swaption (Maturity)
tenor = 1.0            # Semi-Annual settlements (First settlement is assumed to be possible after this period too)
N = 1                   # Notional Amount
fixed_rate = 0.03

In [85]:
normal_rates = get_short_rates(simulations, steps, dt, r0, a, sigma, theta_normal, yield_curve)

In [86]:
forward_rate_normal = get_forward_rates(simulations, days_per_year, dt, T_m, tenor, normal_rates)

In [72]:
def get_simulations_swap_values(simulations, days_per_year, T_m, tenor, N, fixed_rate, short_rate_paths, yield_curve, a, sigma):
    # Swap Values calculator
    def sim_swap_eval(params):
        _, days_per_year, T_m, tenor, N, fixed_rate, short_rate_path, yield_curve, a, sigma = params

        import numpy as np

        # Bond Evaluation
        def P(s, t, r_s, a, sigma, yield_curve):

            def A(s, t, a, sigma, yield_curve):
                P_0_t = np.exp(-yield_curve(t)*t)     # Value of Zero Coupon Bond (0,t)
                P_0_s = np.exp(-yield_curve(s)*s)     # Value of Zero Coupon Bond (0,s)
                term1 = P_0_t/P_0_s
                term2 = B(s, t, a) * yield_curve(s) # B * Instantenious Forward Rate
                term3 = (sigma**2/(4*a)) * B(s,t,a)**2 * (1-np.exp(-2*a*s))
                return term1*np.exp(term2-term3)

            def B(s, t, a):
                return 1/a * (1 - np.exp(a * (s - t)))

            return A(s, t, a, sigma, yield_curve) * np.exp(-B(s, t, a) * r_s)     # Short rate and initial rate are the same here
        # Swap Evaluation
        def V(days_per_year, t, T_0, T_m, N, fixed_rate, tenor, rates, a, sigma, yield_curve):
            term1 = P(t, T_0, rates[int(days_per_year*t)], a, sigma, yield_curve)
            term2 = P(t, T_m, rates[int(days_per_year*t)], a, sigma, yield_curve)
            term3 = 0
            for T_i in np.arange(T_0+tenor, T_m+tenor, tenor):       # For all intermediate and last one payout dates (np.arange does not include m)
                term3 += P(t, T_i, rates[int(days_per_year*t)],  a, sigma, yield_curve) * tenor
            return -N*(term1 - term2 - fixed_rate*term3)     # (we are buying swaption) Only put makes sense, otherwise pointless to exercise early

        # Finding Swap Value for each possible exercise point
        swap_values = []
        for t in np.arange(tenor, T_m, tenor):     # Because of delay on starting of payments, we have less entries (Last one must be a tenor before Maturity)
            swap_values.append(V(days_per_year, t, t, T_m, N, fixed_rate, tenor, short_rate_path, a, sigma, yield_curve))
        return (swap_values)

    # Use multiprocess to run simulations in parallel
    simulation_params = [(i, days_per_year, T_m, tenor, N, fixed_rate, short_rate_paths[i], yield_curve, a, sigma) for i in range(simulations)]
    # Run in Parallel
    with Pool() as pool:
        simulations_swap_values = pool.map(sim_swap_eval, simulation_params)
    return np.array(simulations_swap_values)

In [60]:
def LSM(simulations_swap_values, forward_rate_paths_regression, short_rate_paths, days_per_year, tenor, simulations, ridge_a=1.0, method="Laguerre", degree=3, visualize_regression=True):
    # Independent Variable for Regression (For first step it is just a discounted value of swap at the maturity (Discounted Realized Cashflow))
    expected_cashflow = simulations_swap_values[:, -1] * np.exp(-forward_rate_paths_regression[:, -1])
    expected_cashflow[np.where(simulations_swap_values[:, -1] < 0)] = 0.0      # If swap is not In Money, we just let it expire

    # Matrix of points where we exercise
    realization_matrix = np.zeros(simulations_swap_values.shape)
    realization_matrix[np.where(simulations_swap_values[:, -1] > 0), -1] = 1.0      # At last point we exercise if in money

    for i in range(simulations_swap_values.shape[1]-2, -1, -1):              # Starting from the back
        # For last 2 points the value is 0, no possible legs left
        # On last point we do not regress because the value is same as just swap
        # On 0th regression, the short rate at t=0 is the same for all paths
        ids_to_fit = np.where(simulations_swap_values[:, i] > 0)[0]         # Creating mask to regress only "in money" swaps
        if len(ids_to_fit) > 2:     # So that we can fit a line for prediction
            if method == "Ridge":
                Y = expected_cashflow[ids_to_fit].reshape(-1, 1)  # It is already discounted
                X_pred = short_rate_paths[ids_to_fit, int(days_per_year * (tenor * i + tenor))].reshape(-1, 1)  # Dependent variable is our short rate
                # Fit the Linear Regression model to the polynomial features
                poly = PolynomialFeatures(degree=degree)
                X_poly = poly.fit_transform(X_pred)
                model = Ridge(alpha=ridge_a).fit(X_poly, Y)
                # Predict using the polynomial features
                continuation_values = model.predict(poly.transform(short_rate_paths[ids_to_fit, int(days_per_year * (tenor * i + tenor))].reshape(-1, 1)))

            else:
                # Getting continuation Values
                Y = expected_cashflow[ids_to_fit].flatten()             # It is already discounted
                X_pred = short_rate_paths[ids_to_fit, int(days_per_year * (tenor * i + tenor))].flatten()      # Dependent variable is our short rate
                if method == "Polynomial":
                    model = Polynomial.fit(X_pred, Y, deg=degree)
                if method == "Laguerre":
                    model = Laguerre.fit(X_pred, Y, deg=degree)
                X_test = short_rate_paths[ids_to_fit, int(days_per_year * (tenor * i + tenor))].flatten()       # We need + tenor so that we offshift values to match T_0
                continuation_values = model(X_test)

        # Creating independent variable for the next iteration
        expected_cashflow_temp = np.zeros(simulations)
        for k in range(simulations):   # For each path we should find out if it is better to exercise or not
            if k in ids_to_fit:             # If there is any update to value during regression
                if simulations_swap_values[k, i] > continuation_values[np.where(ids_to_fit==k)] and simulations_swap_values[k, i] > 0:
                    expected_cashflow_temp[k] = simulations_swap_values[k, i] * np.exp(-forward_rate_paths_regression[k, i+1])        # We exercise and keep this value, i+1 because we have 1 more value in the beginnign for forward rates
                    realization_matrix[k,i] = 1.0           # For future reference
                    realization_matrix[k,i+1:] = 0.0        # Delete all older subsequent realizations, we keep only the earliest
                else:                       # keep the value same otherwise, discounted
                    expected_cashflow_temp[k] = expected_cashflow[k] * np.exp(-forward_rate_paths_regression[k, i+1])
            else:                           # keep the value same otherwise, discounted
                expected_cashflow_temp[k] = expected_cashflow[k] * np.exp(-forward_rate_paths_regression[k, i+1])
        expected_cashflow = expected_cashflow_temp

    if visualize_regression: print("Final Realization Matrix\n", realization_matrix)

    return realization_matrix

In [61]:
def get_fair_value(realization_matrix, simulations_swap_values, forward_rate_paths_regression, simulations):

    # Arrays to hold values of best exercise periods
    max_indexes = []
    max_values = []
    sim_id = []

    fair_values_t = []
    for sim in range(simulations):
        # Finding max value in array

        # If path was never exercized, i.e. its value is less than zero, we just let it expire worthless
        if realization_matrix[sim][0] == 1:

            # max_index = np.argmax(realization_matrix[sim])
            # max_value = simulations_swap_values[sim][max_index]

            # Used for later visualization
            # sim_id.append(sim)
            # max_indexes.append(max_index)
            max_values.append(realization_matrix[sim][0])

            # Discounting swaps values at optimal execution points
            #disc_rate = np.sum(forward_rate_paths_regression[sim, :(max_index+1)])
            disc_rate = np.sum(forward_rate_paths_regression[sim, :(0+2)])

            max_value_fair = realization_matrix[sim][0]*np.exp(-disc_rate)
            fair_values_t.append(max_value_fair)

    fair_value_t = np.sum(fair_values_t)/simulations

    #return fair_value_t*np.exp(-r0*tenor), max_indexes, max_values, sim_id
    return fair_value_t, max_indexes, max_values, sim_id

In [62]:
simulations_swap_values = get_simulations_swap_values(simulations, days_per_year, T_m, tenor, N, fixed_rate, normal_rates, yield_curve, a, sigma)
realization_matrix = LSM(simulations_swap_values, forward_rate_normal, normal_rates, days_per_year, tenor, simulations, method="Laguerre", degree=3, visualize_regression=False)                 # Wiggly, predicts well
european, _, _, _ = get_fair_value(realization_matrix, simulations_swap_values, forward_rate_normal, simulations)

In [63]:
realization_matrix

array([[0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       ...,
       [1., 0., 0., 0., 0.],
       [0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])

In [64]:
european

0.15292275009767842

In [96]:
0.03 * 1.2

0.036

In [109]:
swap_values = get_simulations_swap_values(simulations, days_per_year, T_m, tenor, N, 0.036, normal_rates, yield_curve, a, sigma)[:,0]

In [76]:
forward_rate_normal[0, :1]

array([0.03425175])

In [88]:
def price_european_MC(swap_values, T_n, forward_rates):
    positive_swaps = list()
    for i in range(len(swap_values)):
        if swap_values[i] > 0:
            disc_rate = np.sum(forward_rates[i, int(T_n)])
            positive_swaps.append(swap_values[i]* np.exp(- disc_rate))
        else:
            positive_swaps.append(0)
    return sum(positive_swaps) / len(swap_values)

In [110]:
price_european_MC(swap_values, tenor, forward_rate_normal)

0.032587536941098

In [66]:
def get_fair_value_ql(T_m, tenor, days_per_year, a, sigma, fixed_rate, r0):
    # Setup the market and yield term structure
    calendar = ql.TARGET()
    day_count = ql.Actual365Fixed()
    todays_date = ql.Date.todaysDate()
    ql.Settings.instance().evaluationDate = todays_date
    flat_forward = ql.FlatForward(todays_date, r0, day_count)     # Flat rate curve
    yield_curve_handle = ql.YieldTermStructureHandle(flat_forward)

    # Swaption characteristics
    # Define the fixed-rate leg
    start_date = todays_date + ql.Period(int(tenor*days_per_year), ql.Days)
    maturity_date = todays_date + ql.Period(int(T_m*days_per_year), ql.Days) + ql.Period(int(tenor*days_per_year), ql.Days)
    fixed_leg_tenor = ql.Period(int(tenor*days_per_year), ql.Days)
    fixed_leg_schedule = ql.Schedule(start_date, maturity_date, fixed_leg_tenor, calendar,
                                    ql.ModifiedFollowing, ql.ModifiedFollowing,
                                    ql.DateGeneration.Forward, False)

    # Define the floating-rate leg
    index = ql.IborIndex("CustomEuriborM", ql.Period(int(tenor*days_per_year), ql.Days), 0, ql.EURCurrency(), calendar, ql.ModifiedFollowing, False, day_count, yield_curve_handle)
    floating_leg_schedule = ql.Schedule(start_date, maturity_date, index.tenor(), calendar,
                                        ql.ModifiedFollowing, ql.ModifiedFollowing,
                                        ql.DateGeneration.Forward, False)

    # Define the swaption
    hull_white_model = ql.HullWhite(yield_curve_handle, a, sigma)

    # Setup the pricing engine
    engine = ql.TreeSwaptionEngine(hull_white_model, 200)

    # # Define the European swaption    (Outputs same value as swap)
    exercise = ql.EuropeanExercise(start_date)       # Very small period is necessary, otherwise output is zero
    european_swaption = ql.Swaption(ql.VanillaSwap(ql.VanillaSwap.Receiver, N, fixed_leg_schedule,
                                          fixed_rate, day_count, floating_leg_schedule,
                                          index, 0.0, index.dayCounter()), exercise)

    # # Setup the pricing engine for European swaption
    european_swaption.setPricingEngine(engine)
    european_swaption_npv = european_swaption.NPV()
    # print("QuantLib European Swaption price is: ", european_swaption_npv)
    return european_swaption_npv

In [105]:
get_fair_value_ql(5, tenor, days_per_year, a, sigma, 0.036, r0)

0.03274836883975357

In [102]:
price_european_MC(swap_values, tenor, forward_rate_normal) - get_fair_value_ql(5, tenor, days_per_year, a, sigma, 0.024, r0)

-3.7042353236458464e-05