<a href="https://colab.research.google.com/github/johnli100/Quant-Finance/blob/main/Derivative_Pricing_American_put_using_Reinforcement_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Derivative Pricing: American options using (Policy Gradient) Reinforcement Learning


In [None]:
import numpy as np
import pandas as pd
from scipy.stats import norm
from sklearn.tree import DecisionTreeRegressor
from sklearn.linear_model import LinearRegression
import torch
from torch.utils.data import TensorDataset, DataLoader, Dataset

import matplotlib.pyplot as plt
import plotly.express as px

In [None]:
# Parameters for the opton
S0 = 100
K = 100
T = 1
r = 0.05
sigma = 0.3
steps = 100

#### 1. Benchmarks - LSMC/MLMC for American options

In [None]:
# Least Square Monte Carlo/Machine learning Monte Carlo for American options
def american_option_mc(S0, K, T, r, sigma, steps=100, optionType="P",method="LSMC",shock=0.5, N=10000):
    # Generate Monte Carlo paths
    dt = T / steps
    S = np.zeros((N, steps + 1))
    S[:, 0] = S0
    delta = np.zeros((N, steps + 1))
    payoffs = np.zeros((N, steps + 1))

    for t in range(1, steps + 1):
        Z = np.random.normal(size=N)
        S[:, t] = S[:, t - 1] * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * Z)

    # Initialize payoff at maturity
    if optionType == "P":
      payoff = np.maximum(K - S[:, -1], 0)
      payoff_up = np.maximum(K - (S[:, -1] + shock), 0)
      payoff_down = np.maximum(K - (S[:, -1] - shock), 0)
    elif optionType == "C":
      payoff = np.maximum(S[:, -1]-K, 0)
      payoff_up = np.maximum((S[:, -1] + shock) - K, 0)
      payoff_down = np.maximum((S[:, -1] - shock) - K, 0)
    else:
        raise ValueError("Invalid option type: please choose eithre 'P' or 'C'.")

    # Backward induction
    for t in range(steps - 1, 0, -1):
        # Features: current stock price and time to maturity
        X = S[:, t].reshape(-1, 1)

        if method == "LSMC":
          if optionType == "P":
            itm = S[:, t] < K
            exercise_value = np.maximum(K - S[itm, t], 0)
          else:
            itm = S[:, t] > K
            exercise_value = np.maximum(S[itm, t] - K, 0)

          X = X[itm]
          X = np.hstack((X, X **2))

          # Target: discounted future cash flows
          Y = np.exp(-r * dt) * payoff[itm]

          model = LinearRegression()
          model.fit(X, Y)

          # Estimate continuation value
          continuation_value = model.predict(X)

          # update payoff
          payoff[itm] = np.where(exercise_value > continuation_value, exercise_value, Y)

        elif method == "MLMC":
          # Immediate exercise value
          if optionType == "P":
            exercise_value = np.maximum(K - S[:, t], 0)
            exercise_value_up = np.maximum(K - (S[:, t] + shock), 0)
            exercise_value_down = np.maximum(K - (S[:, t] - shock), 0)
          else:
            exercise_value = np.maximum(S[:, t] - K, 0)
            exercise_value_up = np.maximum((S[:, t] + shock - K), 0)
            exercise_value_down = np.maximum((S[:, t] - shock - K), 0)

          # Target: discounted future cash flows
          Y = np.exp(-r * dt) * payoff
          Y_up = np.exp(-r * dt) * payoff_up
          Y_down = np.exp(-r * dt) * payoff_down

          model = DecisionTreeRegressor(max_depth=5, min_samples_leaf=100)
          model.fit(X, Y)

          # Estimate continuation value
          continuation_value = model.predict(X)
          continuation_value_up = model.predict(X+shock)
          continuation_value_down = model.predict(X-shock)

          # update payoff
          payoff = np.where(exercise_value > continuation_value, exercise_value, Y)
          payoffs[:,t]=payoff
          payoff_up = np.where(exercise_value_up > continuation_value_up, exercise_value_up, Y_up)
          payoff_down = np.where(exercise_value_down > continuation_value_down, exercise_value_down, Y_down)

          # delta
          delta[:,t] = (payoff_up - payoff_down) / shock * 0.5
        else:
          raise ValueError("Invalid method: please choose either 'LSMC' or 'MLMC'.")

    # Discount and average
    option_price = np.exp(-r * dt) * np.mean(payoff)
    return option_price, S, payoffs, delta

In [None]:
# Black-Scholes as lower bound
def european_bs(S, K, T, r, sigma,optionType="P"):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    if optionType == "P":
      return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    elif optionType == "C":
      return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

In [None]:
put_price_European_BS=european_bs(S0, K, T, r, sigma,optionType="P")
print(f"European Put Option Price using Black-Scholes model as lower bound of American option: ${put_price_European_BS:.2f}")
price, S, payoffs, delta = american_option_mc(S0, K, T, r, sigma, steps, optionType="P", method="MLMC", N=100_000)
print(f'American Put Option Price using MLMC: ${price:.2f}')
price, S, payoffs, delta = american_option_mc(S0, K, T, r, sigma, steps, optionType="P", method="LSMC", N=100_000)
print(f'American Put Option Price using LSMC: ${price:.2f}')

European Put Option Price using Black-Scholes model as lower bound of American option: $9.35
American Put Option Price using MLMC: $9.96
American Put Option Price using LSMC: $9.93


#### 2. Reinforcement Learning in American options

##### 2.1 Defining American option class

In [None]:
# define class for Amercian options
class AmericanOption:
  def __init__(self, S0, K, T, r, sigma, steps, optionType="P"):
     self.S0 = S0
     self.K = K
     self.T = T
     self.r = r
     self.sigma = sigma
     self.N = steps
     self.dt = T/steps
     self.optionType = optionType

     self.S = S0
     self.t2mat = T

  def reset(self):
    self.S = self.S0
    self.t2mat = self.T
    return [self.S, self.t2mat]

  def step(self, action, pre_train=False):
  # generate one path/simulation: stock price, payoff and if done (early excerise or expire)
    if (action == 1) or (np.abs(self.t2mat) < 0.00000001) :
      done = True
      if self.optionType == "P":
        reward = max(self.K - self.S, -0.1) * np.exp(-self.r * (self.T - self.t2mat))
      elif self.optionType == "C":
        reward = max(self.S - self.K, -0.1) * np.exp(-self.r * (self.T - self.t2mat))
      else:
        raise ValueError("Invalid option type. Must be 'P' or 'C'.")
    else:
      reward = 0.
      done = False

    self.t2mat -= self.dt
    self.S = self.S * np.exp((self.r - 0.5 * self.sigma ** 2) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal())
    next_state = [self.S, self.t2mat]

    return done, reward, next_state

  def mc(self, num_paths=1):
    # step = np.random.randint(1, self.N+1)
    step = self.N
    S = np.zeros(step + 1)
    t2mat = np.zeros(step + 1)
    payoffs = np.zeros(step + 1)
    S[ 0] = self.S0
    for i in range(1, step + 1):
        t2mat[ i] = self.T - i * self.dt
        S[i] = S[i - 1] * np.exp((self.r - 0.5 * self.sigma**2) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal())
        if self.optionType == "P":
            payoffs[i] = np.maximum(self.K - S[i], 0) * np.exp(-self.r * i * self.dt)
        elif self.optionType == "C":
            payoffs[i] = np.maximum(S[i] - self.K, 0) * np.exp(-self.r * i * self.dt)

    payoffs_final = np.copy(payoffs)
    for j in range(payoffs_final.shape[0] - 1, 0, -1):
      payoffs_final[j-1] = np.maximum(payoffs_final[j], payoffs_final[j-1])

    return payoffs_final, np.stack((S, t2mat),axis=-1)

In [None]:
# define put option
put = AmericanOption(S0=S0, K=K, T=T, r=r, sigma=sigma, steps=steps, optionType="P")

In [None]:
# define pre-training samples for Value model
vs, states = [],[]
put.reset()
for i in range(1000):
  v, s = put.mc()
  vs.append(v)
  states.append(s)

Y = []
for v in vs:
  Y.append(np.array(v))
Y=np.array(Y).flatten()
X = []
for s in states:
  X.append(np.array(s))
X=np.array(X).reshape(-1,2)

# Training data for torch
X_train=torch.tensor(X, dtype=torch.float32)
Y_train=torch.tensor(Y, dtype=torch.float32).unsqueeze(1)
dataset = TensorDataset(X_train, Y_train)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [None]:
# visualizing the relationship between option value and stock price, time to maturity
fig = px.scatter_3d(x=np.array(X_train.data.squeeze().numpy()[:,0]),
                    y=np.array(X_train.data.squeeze().numpy()[:,1]),
                    z=np.array(Y_train.data.squeeze().numpy()),
                    size = np.ones(len(Y_train))*5,
                    size_max=5,
                    opacity=1,
                    color_discrete_sequence=['orange'],
                    title = "Fitted Option Price vs. Stock price and time to maturity"
                    )
fig.update_layout(
    scene=dict(
        xaxis_title='Stock price',
        yaxis_title='Time to maturity',
        zaxis_title='Put option value',
    ),
)
fig.show()

##### 2.2 Defining Neural Network for Value Model

In [None]:
# Define Value model using Neural Network
class ValueNet(torch.nn.Module):
    def __init__(self, state_dim=2, hidden_dim=[16,16],output_dim=1):
        super(ValueNet, self).__init__()
        self.state_dim = state_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        state_hidden_dim = [state_dim] + hidden_dim
        self.layers = torch.nn.ModuleList(
            [torch.nn.Linear(state_hidden_dim[l],state_hidden_dim[l+1])
            for l in range(len(state_hidden_dim)-1)])
        self.output_layer = torch.nn.Linear(state_hidden_dim[-1],output_dim)
        self.activation = torch.nn.ReLU()

  # Value Network forward
    def forward(self, states):
        x = states.view(-1,self.state_dim)
        for layer in self.layers:
          x = self.activation(layer(x))
        x = self.output_layer(x)

        return x

In [None]:
# set up Value model and test
value_model = ValueNet()
value_model(torch.tensor([[100.,1.0],[80,1.0],[120,1.0],[100,.1],[80,.1],[120,.1]]))

tensor([[-0.1765],
        [-0.1433],
        [-0.2096],
        [-0.1791],
        [-0.1459],
        [-0.2122]], grad_fn=<AddmmBackward0>)

In [None]:
# pre-train value function
criterion = torch.nn.MSELoss()
value_optimizer = torch.optim.Adam(value_model.parameters(), lr=0.0005)
value_model.train()
num_epochs=5
for epoch in range(num_epochs):
  epoch_loss = 0.
  for X,Y in dataloader:
    # Loss function for Value model

    loss = criterion(value_model(X), Y)
    value_optimizer.zero_grad()
    loss.backward()
    value_optimizer.step()

    epoch_loss += loss.item()

  avg_epoch_loss = epoch_loss / len(dataloader)
  print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}")

Epoch [1/5], Loss: 206.4428
Epoch [2/5], Loss: 120.0555
Epoch [3/5], Loss: 89.4891
Epoch [4/5], Loss: 78.5558
Epoch [5/5], Loss: 74.4541


In [None]:
# checking the pre-trained Value model
value_model(torch.tensor([[100.,1.0],[80,1.0],[120,1.0],[100,.1],[80,.1],[120,.1]]))

tensor([[19.7721],
        [35.2927],
        [ 4.8168],
        [ 9.8964],
        [21.6939],
        [ 5.4656]], grad_fn=<AddmmBackward0>)

##### 2.3 Defining Neural Network for Policy Model

In [None]:
# Define Policy model using Neural Network
class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim=2, hidden_dim=[32,32],action_dim=2, steps=100):
        super(PolicyNet, self).__init__()
        self.N = steps
        self.state_dim = state_dim
        self.hidden_dim = hidden_dim
        self.action_dim = action_dim
        state_hidden_dim = [state_dim] + hidden_dim
        self.layers = torch.nn.ModuleList(
            [torch.nn.Linear(state_hidden_dim[l],state_hidden_dim[l+1])
            for l in range(len(state_hidden_dim)-1)])
        self.output_layer = torch.nn.Linear(state_hidden_dim[-1],action_dim)
        self.activation = torch.nn.Tanh()
        self.softmax = torch.nn.Softmax(dim=1)

  # Policy Network forward
    def forward(self, states, explore_mode=False):
        x = states.view(-1,self.state_dim)
        for layer in self.layers:
          x = self.activation(layer(x))
        x = self.softmax(self.output_layer(x))

        # explore_mode enables the exploration of exercise action to be uniformly distributed across all time steps
        # otherwise, the policy will select exercise action step by step based on policy model, which leads to
        # very early exercise of the option at the early stage of the training
        if explore_mode:
          dist = torch.distributions.Categorical(torch.stack([torch.clamp(self.N * states[:,1] /(self.N * states[:,1]+1),0.,1.),
                                                              torch.clamp(1/(self.N * states[:,1] + 1),0.,1.)]).T)
        else:
          dist = torch.distributions.Categorical(x)

        actions = dist.sample()
        log_probs = dist.log_prob(actions)

        return actions, log_probs

In [None]:
# Set up Policy model and test
policy_model = PolicyNet()
a,p=policy_model(torch.tensor([[100.,0.9],[80,0.9],[120,0.9],[100,.1],[80,.1],[120,.1]]),explore_mode=False)
print(f'Action: {a}')
print(f'Probability: {torch.exp(p)}')

Action: tensor([0, 0, 0, 1, 0, 0])
Probability: tensor([0.5876, 0.5885, 0.5871, 0.4143, 0.5866, 0.5852],
       grad_fn=<ExpBackward0>)


##### 2.4 Defining Simulation (1 path) function

In [None]:
def simulate(option, policy_model, value_model, explore_mode=False, q_learning=True):
  # simulation action for 1 path; pre_train means sampling actions so that early
  # exercising the option is uniformly distributed for each time step, which allows
  # exploration generating more long paths samples for efficient training

  states = []
  actions = []
  rewards = []
  Gs = []
  Vs= []
  Ads = []
  log_probs = []

  state = option.reset()
  # first action is always not exercise option (why own the option if exercise right away)
  action = torch.tensor([0])
  done = False

  # Turn off training for Policy model - only for simulation
  policy_model.eval()
  value_model.eval()
  with torch.no_grad():
    G, V = 0., 0.
    while not done:
      # current state, current action
      states.append(torch.tensor(state, dtype=torch.float32))
      actions.append(action)
      V = value_model(torch.tensor(state, dtype=torch.float32).unsqueeze(0)).detach()
      Vs.append(V)

      # current reward and next state
      done, reward, state = option.step(action.data.numpy().squeeze())
      rewards.append(reward)
      # next action
      action, _ = policy_model(torch.tensor(state, dtype=torch.float32).unsqueeze(0),explore_mode=explore_mode)

      if q_learning:
        q_next = value_model(torch.tensor(state, dtype=torch.float32).unsqueeze(0)).detach()
        G = reward + q_next * (1 - done)
        Gs.append(G.squeeze())
        Ads.append((G - V).squeeze())
      else:
        pass

    if q_learning:
      pass
    else:
      # collect rewards, advantages corresponding to the states
      for r, V in zip(rewards[::-1], Vs[::-1]):
        G = r + G
        Gs.append(torch.tensor(G, dtype=torch.float32))
        Ads.append((G - V).squeeze())
      Gs = Gs[::-1]
      Ads = Ads[::-1]

  return torch.stack(states), torch.stack(Gs).unsqueeze(1), torch.stack(Ads).unsqueeze(1)

##### 2.5 Training Policy model and Value model using simulated samples

In [None]:
# define optimization algorithm for Policy model and Value model
value_optimizer = torch.optim.Adam(value_model.parameters(), lr=0.0001)

In [None]:
# Training Value model with explore_mode
def train_value(option, value_model,value_optimizer, num_epochs=5000):
  episodes = []
  losses_value = []

  for epoch in range(num_epochs):
    # train using samples whose actions are based on the Policy model
    states, Gs, Ads = simulate(option, policy_model, value_model, explore_mode=True)

    # train Value model; turn on traning model
    value_model.train()
    value_optimizer.zero_grad()
    # Loss function for Value model
    loss_value = ((Gs - value_model(states))**2).sum()
    loss_value.backward()
    value_optimizer.step()

    # print(f'states:{states.shape}\n{states};\n Gs: {Gs.shape} \n {Gs} \n; Ads: {Ads.shape} \n {Ads};\n log_prob: {log_probs.shape}\n{log_probs} ')
    # keep track of the value for a proxy option
    if epoch % 100 == 0:
      losses_value.append(loss_value.item())
      episodes.append(epoch)
      print(f"Epoch [{epoch + 1}/{num_epochs}]: Value Loss: {np.array(losses_value).mean():.2f}")

In [None]:
train_value(put, value_model,value_optimizer, num_epochs=1000)

Epoch [1/1000]: Value Loss: 377.04
Epoch [101/1000]: Value Loss: 249.77
Epoch [201/1000]: Value Loss: 200.68
Epoch [301/1000]: Value Loss: 174.68
Epoch [401/1000]: Value Loss: 149.64
Epoch [501/1000]: Value Loss: 138.95
Epoch [601/1000]: Value Loss: 138.11
Epoch [701/1000]: Value Loss: 136.14
Epoch [801/1000]: Value Loss: 132.05
Epoch [901/1000]: Value Loss: 125.83


In [None]:
# checking the trained Value model behaves
value_model(torch.tensor([[100.,1.0],[80,1.0],[120,1.0],[100,.1],[80,.1],[120,.1]]))

tensor([[ 6.2489],
        [21.3295],
        [ 1.2644],
        [ 6.9770],
        [11.9614],
        [ 1.9925]], grad_fn=<AddmmBackward0>)

In [None]:
# define optimization algorithm for Policy model and Value model
policy_optimizer = torch.optim.Adam(policy_model.parameters(), lr=0.0001)

In [None]:
# Training Policy model and Value model
def train(option, policy_model, policy_optimizer, value_model,value_optimizer, num_epochs=5000):
  episodes = []
  losses_value = []
  losses_policy = []

  for epoch in range(num_epochs):
    value_model.train()
    policy_model.train()
    if epoch % 2 == 0:
      # train using samples whose actions are based on the Policy model
      states, Gs, Ads = simulate(option, policy_model, value_model, explore_mode=False)
    else:
      # pre-train using samples whose actions are Not based on the Policy model
      states, Gs, Ads = simulate(option, policy_model, value_model, explore_mode=True)

    # train Value model; turn on traning model
    value_optimizer.zero_grad()
    # Loss function for Value model
    loss_value = ((Gs - value_model(states))**2).sum()
    loss_value.backward()
    value_optimizer.step()

    # train Policy model: turn on traning model
    policy_optimizer.zero_grad()
    _, log_probs = policy_model(states, explore_mode=False)
    log_probs = log_probs.unsqueeze(1)
    # Loss function for Policy model
    loss_policy = (-log_probs * Ads).sum()
    loss_policy.backward()
    policy_optimizer.step()

    # print(f'states:{states.shape}\n{states};\n Gs: {Gs.shape} \n {Gs} \n; Ads: {Ads.shape} \n {Ads};\n log_prob: {log_probs.shape}\n{log_probs} ')
    # keep track of the value for a proxy option
    if epoch % 100 == 0:
      losses_value.append(loss_value.item())
      losses_policy.append(loss_policy.item())
      episodes.append(epoch)
      print(f"Epoch [{epoch + 1}/{num_epochs}]: Value Loss: {np.array(losses_value).mean():.2f}, Policy Loss: {np.array(losses_policy).mean():.6f}")

In [None]:
train(put, policy_model, policy_optimizer, value_model, value_optimizer, num_epochs=3000)

Epoch [1/3000]: Value Loss: 30.02, Policy Loss: -3.434110
Epoch [101/3000]: Value Loss: 18.62, Policy Loss: -0.776818
Epoch [201/3000]: Value Loss: 15.49, Policy Loss: -1.131539
Epoch [301/3000]: Value Loss: 15.83, Policy Loss: -0.931058
Epoch [401/3000]: Value Loss: 14.05, Policy Loss: -0.569410
Epoch [501/3000]: Value Loss: 13.91, Policy Loss: -0.748911
Epoch [601/3000]: Value Loss: 12.62, Policy Loss: -0.961384
Epoch [701/3000]: Value Loss: 12.05, Policy Loss: -1.096224
Epoch [801/3000]: Value Loss: 12.57, Policy Loss: -1.514335
Epoch [901/3000]: Value Loss: 12.45, Policy Loss: -1.641816
Epoch [1001/3000]: Value Loss: 12.26, Policy Loss: -1.693274
Epoch [1101/3000]: Value Loss: 12.46, Policy Loss: -1.815291
Epoch [1201/3000]: Value Loss: 14.22, Policy Loss: -1.224356
Epoch [1301/3000]: Value Loss: 15.93, Policy Loss: -0.740874
Epoch [1401/3000]: Value Loss: 15.33, Policy Loss: -0.769126
Epoch [1501/3000]: Value Loss: 15.12, Policy Loss: -0.847667
Epoch [1601/3000]: Value Loss: 14.91

In [None]:
a,p=policy_model(torch.tensor([[100.,1.0],[80,1.0],[120,1.0],[100,.1],[80,.1],[120,.1]]),explore_mode=False)
print(f'Action: {a}')
print(f'Probability: {np.exp(p.data.numpy())}')

Action: tensor([1, 0, 0, 1, 1, 0])
Probability: [0.28224844 0.71810156 0.71787477 0.2837541  0.28343385 0.71639735]


In [None]:
value_model(torch.tensor([[100.,0.9],[80,0.9],[120,0.9],[100,.1],[80,.1],[120,.1]]))

tensor([[ 5.0625],
        [20.9712],
        [-0.1575],
        [ 5.7489],
        [10.9690],
        [ 0.5289]], grad_fn=<AddmmBackward0>)