In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from numpy import exp

In [2]:
class BlackProcess:
    def __init__(self, S0, r, sigma, n):
        self.S0 = S0
        self.r = r
        self.sigma = sigma
        self.n = n

    def generate(self):
        S0, r, sigma, n = self.S0, self.r, self.sigma, self.n
        dt = 1 / 365
        dW = np.random.normal(0, dt ** 0.5, n)
        chg = np.ones(n + 1)
        chg[1:] += r * dt + sigma * dW
        accum_chg = chg.cumprod()
        return S0 * accum_chg

In [3]:
class VanillaEnv():
    n_observation = 5

    def __init__(self, process: BlackProcess, tenor, strike):
        self.process = process
        self.tenor = tenor
        self.strike = strike
        self.t = 0
        self.path = None
        self.observations = None
        self.reset()

    def df(self):
        return exp(-self.process.r / 365)

    def mu(self):
        return exp(self.process.r / 365) - 1

    def reset(self):
        self.path = self.process.generate()
        self.t = 0
        self.observations = np.stack([self.observation(t) for t in range(self.tenor + 1)], 0)
        return self.observations[0]

    def St(self, t=None) -> np.float32:
        t = self.t if t is None else t
        return self.path[t]

    def observation(self, t=None):
        S_K = self.St(t) / self.strike
        moneyness = max(0, S_K)

        t = self.t if t is None else t
        tenor = (self.tenor - t) / 365

        obs = np.array([moneyness, moneyness ** 2, tenor, tenor ** 2, moneyness * tenor])
        assert len(obs) == self.n_observation
        return obs

    def step(self, action):
        """
        :param action: hedge ratio, i.e. delta
        :return: S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS
        """
        S_t0 = self.observations[self.t]
        self.t = self.t + 1
        dS = self.St() - self.St(self.t - 1)
        reward = dS * action
        S_t1 = self.observations[self.t]
        terminated = True if self.t >= self.tenor else False
        can_early_exercise = False
        payoff = self.payoff()
        return S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS

    def payoff(self, t=None) -> np.float32:
        """
        :return: option payoff if exercise now, regardless it can be exercised, equivalent to moneyless
        """
        return max(0, self.St(t) - self.strike)

In [4]:
def get_actor(n_hidden, N_OBSERVATION):
    inputs = layers.Input(shape=(N_OBSERVATION))
    x = inputs
    for l in n_hidden:
        x = layers.Dense(l, activation='relu')(x)
    x = layers.Dense(1, activation='tanh')(x)
    return tf.keras.Model(inputs, x)


def get_critic(n_hidden, N_OBSERVATION):
    inputs = layers.Input(shape=(N_OBSERVATION))
    x = inputs
    for l in n_hidden:
        x = layers.Dense(l, activation='relu')(x)
    x = layers.Dense(1, activation='sigmoid')(x)
    return tf.keras.Model(inputs, x)


def get_pretrain_model(n_hidden, days, N_OBSERVATION):
    obs_input = layers.Input((days, N_OBSERVATION))
    dS_input = layers.Input((days, 1))
    x = obs_input
    for l in n_hidden:
        x = layers.Dense(l, activation='relu')(x)
    x = layers.Dense(1, activation='tanh')(x)
    pretrain_actor = tf.keras.Model(obs_input, x)
    sum_hedge_pl = tf.reduce_sum(dS_input * x, axis=(1, 2))
    pretrainer = tf.keras.Model((obs_input, dS_input), sum_hedge_pl)
    return pretrain_actor, pretrainer

In [5]:
class Buffer():
    """
    S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS
    """

    def __init__(self, size, N_OBSERVATION):
        self.size = size

        def blank_array(dim):
            assert dim <= 2
            dim = N_OBSERVATION if dim == 2 else 1
            return np.zeros((size, dim), dtype=np.float32)

        self.storage = [blank_array(2), blank_array(2),
                        blank_array(1), blank_array(1), blank_array(1), blank_array(1), blank_array(1)]
        # order: S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS
        self.count = 0

    def store(self, values):
        index = self.count % self.size
        for storage, value in zip(self.storage, values):
            storage[index, :] = value
        self.count = self.count + 1

    def sample(self, batch_size):
        indexes = np.random.choice(self.size, batch_size, False)
        return [v[indexes] for v in self.storage]

In [6]:
class EpisodeBuffer():
    def __init__(self, capacity, ep_length, N_OBSERVATION):
        self.capacity = capacity
        self.ep_count = 0

        def blank_array(dim):
            assert dim <= 2
            dim = N_OBSERVATION if dim == 2 else 1
            return np.zeros((capacity, ep_length, dim), dtype=np.float32)

        self.storage = [blank_array(2), blank_array(2),
                        blank_array(1), blank_array(1), blank_array(1), blank_array(1), blank_array(1)]
        # order: S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS

    def store(self, values, t):
        ep_index = self.ep_count % self.capacity
        for storage, value in zip(self.storage, values):
            storage[ep_index, t, :] = value
        done = values[3]
        if done:
            self.ep_count = self.ep_count + 1

    def sample(self, batch_size):
        indexes = np.random.choice(self.capacity, batch_size, False)
        return [v[indexes] for v in self.storage]

In [7]:
def gather_episode_wise(env, buffer: EpisodeBuffer, episodes, action=0.5):
    for i in range(episodes):
        env.reset()
        while True:
            data = env.step(action)  # actual delta still doesn't matter, avoid calling actor to save time
            done = data[3]
            buffer.store(data, env.t - 1)
            if done:
                break

In [8]:
def get_pretrain_actor(env, n_hidden, n_samples, epoch=12):
    N_OBSERVATION = env.n_observation
    days = env.tenor
    buffer = EpisodeBuffer(n_samples, days, VanillaEnv.n_observation)
    gather_episode_wise(env, buffer, n_samples)
    observations, dS, payoff = buffer.storage[0], buffer.storage[-1], buffer.storage[-2][:, -1, 0]
    pretrain_actor, pretrainer = get_pretrain_model(n_hidden, days, N_OBSERVATION)  #
    pretrainer.compile(loss=tf.keras.losses.mse, optimizer="Adam")
    pretrainer.fit((observations, dS), payoff, 64, epoch)
    actor = get_actor(n_hidden, N_OBSERVATION)
    actor.set_weights(pretrain_actor.get_weights())
    return actor

In [9]:
def gather_data_pretrain_critic(pretrain_actor, env, n_samples):
    df = env.df()
    days = env.tenor
    buffer = EpisodeBuffer(n_samples, days, VanillaEnv.n_observation)
    gather_episode_wise(env, buffer, n_samples)
    S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS = buffer.storage
    y_t1 = payoff[:, -1, :]
    Y_hedge = np.zeros_like(payoff, dtype=np.float32)
    for i in reversed(range(days)):
        def reshape(arr):
            return arr[:, i, :]

        hedge_pl = reshape(dS) * pretrain_actor(reshape(S_t0)).numpy()
        y_t0 = y_t1 * df - hedge_pl
        y_t0 = np.maximum(0, y_t0)
        Y_hedge[:, i, :] = y_t0
        y_t1 = y_t0
    return Y_hedge, buffer

In [10]:
def get_pretrain_critic(env, actor, n_hidden, N_OBSERVATION, n_samples, epoc=10):
    Y_hedge, buffer = gather_data_pretrain_critic(actor, env, n_samples)
    S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS = buffer.storage

    def reshape(arr):
        return arr.reshape((-1, arr.shape[-1]))

    critic = get_critic(n_hidden, N_OBSERVATION)
    critic.compile(loss=tf.keras.losses.mse, optimizer="Adam")
    critic.fit(reshape(S_t0), reshape(Y_hedge), 64, epoc)
    return critic, buffer

In [11]:
def next_valuation(terminated, model_valuation, payoff):
    return terminated * payoff + (1 - terminated) * model_valuation


def get_critic_loss(model, target_model, S_t0, S_t1, reward, terminated, payoff, df):
    learn_target = next_valuation(terminated, target_model(S_t1), payoff) * df - reward
    return tf.math.reduce_mean(tf.math.square(learn_target - model(S_t0)))


def get_actor_loss(actor, critic, S_t0, S_t1, dS, df, mu):
    delta = actor(S_t0)
    hedge_pl = delta * (dS - mu)
    learn_target = critic(S_t1) * df - critic(S_t0)
    return tf.math.reduce_mean(tf.math.square(learn_target - hedge_pl))

In [12]:
@tf.function
def learn(train_data, actor, critic, critic_target, optimizer_critic, optimizer_actor, df, mu):
    S_t0, S_t1, reward, terminated, can_early_exercise, payoff, dS = train_data
    with tf.GradientTape() as tape:
        # todo: dcf shold apply interest rate
        critic_loss = get_critic_loss(critic, critic_target, S_t0, S_t1, reward, terminated, payoff, df)
        critic_gradient = tape.gradient(critic_loss, critic.trainable_variables)
        optimizer_critic.apply_gradients(zip(critic_gradient, critic.trainable_variables))
    with tf.GradientTape() as tape:
        actor_loss = get_actor_loss(actor, critic, S_t0, S_t1, dS, df, mu)
        actor_gradient = tape.gradient(actor_loss, actor.trainable_variables)
        optimizer_actor.apply_gradients(zip(actor_gradient, actor.trainable_variables))


@tf.function
def soft_update(target_weights, weights, tau):
    for (old_value, new_value) in zip(target_weights, weights):
        old_value.assign(new_value * tau + old_value * (1 - tau))

In [13]:
S0, r, vol, days, strike = 1, 0.01, 0.3, 30, 1.1
n_samples = 2 ** 12
n_hidden = [64, 64]
process = BlackProcess(S0, r, vol, days)
N_OBSERVATION = VanillaEnv.n_observation
env = VanillaEnv(process, days, strike)
print("pretrain actor")
actor = get_pretrain_actor(env, n_hidden, n_samples)
env = VanillaEnv(process, days, S0)
print("pretrain critic")
critic, _ = get_pretrain_critic(env, actor, n_hidden, N_OBSERVATION, n_samples, epoc=5)




pretrain actor


2022-04-04 16:48:34.240635: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
2022-04-04 16:48:34.241034: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 4. Tune using inter_op_parallelism_threads for best performance.


Train on 4096 samples
Epoch 1/12
Epoch 2/12
Epoch 3/12
Epoch 4/12
Epoch 5/12
Epoch 6/12
Epoch 7/12
Epoch 8/12
Epoch 9/12
Epoch 10/12
Epoch 11/12
Epoch 12/12
pretrain critic
Train on 122880 samples
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [14]:
critic_target = get_critic(n_hidden, N_OBSERVATION)
critic_target.set_weights(critic.get_weights())
optimizer_critic = tf.keras.optimizers.Adam(0.002)
optimizer_actor = tf.keras.optimizers.Adam(0.002)
learn_per_step = 1
buffer = Buffer(1024, N_OBSERVATION)
batch_size = 32
tau = 0.1
df = env.df()
mu = env.mu()
print("train like actor-critic")
for eps in range(40):
    print("episode {}".format(eps))
    epsode_reward = 0
    S_t0 = env.reset()[np.newaxis, :]
    epsode_reward += critic(S_t0)
    while True:
        action = actor(S_t0)
        step_result = env.step(action)
        _, S_t1, reward, terminated, can_early_exercise, payoff, dS = step_result
        epsode_reward += reward

        buffer.store(step_result)
        if buffer.count > batch_size:
            train_data = buffer.sample(batch_size)
            for _ in range(learn_per_step):
                learn(train_data, actor, critic, critic_target, optimizer_critic, optimizer_actor, df, mu)
            soft_update(critic_target.variables, critic.variables, tau)
        if terminated:
            print("total hedge P/L: {:5.4f}, option payoff: {:5.4f}".format(epsode_reward[0, 0], payoff))
            break
        S_t0 = S_t1[np.newaxis, :]



train like actor-critic
episode 0


To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

total hedge P/L: 0.0054, option payoff: 0.0000
episode 1
total hedge P/L: 0.1381, option payoff: 0.1774
episode 2
total hedge P/L: -0.0078, option payoff: 0.0000
episode 3
total hedge P/L: 0.0601, option payoff: 0.0438
episode 4
total hedge P/L: 0.0156, option payoff: 0.0161
episode 5
total hedge P/L: 0.0057, option payoff: 0.0000
episode 6
total hedge P/L: 0.02

In [15]:
from numpy.lib.scimath import log, sqrt
from scipy import stats

cdf = stats.norm.cdf
pdf = stats.norm.pdf

def d1(s, k, t, r, q, v):
    return (log(s / k) + (r - q + 0.5 * v ** 2) * t) / (v * sqrt(t))

def d2(s, k, t, r, q, v):
    return d1(s, k, t, r, q, v) - v * sqrt(t)

def call(s, k, t, r, q, v):
    d1_ = d1(s, k, t, r, q, v)
    d2_ = d2(s, k, t, r, q, v)
    return s * exp(-q * t) * cdf(d1_) - k * exp(-r * t) * cdf(d2_)

def delta(s, k, t, r, q, v, isCall):
    isPut = np.bitwise_not(isCall)
    d1_ = d1(s, k, t, r, q, v)
    return exp(-q * t) * cdf(d1_) * isCall + -exp(-q * t) * cdf(-d1_) * isPut

In [16]:
S_t0 = env.reset()[np.newaxis, :]
print("by RL model: \n option value: {:5.4f}, delta: {:5.4f}".format(critic(S_t0)[0, 0], actor(S_t0)[0, 0]))
bs_call = call(S0, S0, days / 365, r, 0, vol)
bs_delta = delta(S0, S0, days / 365, r, 0, vol, True)
print("by black-scholes model: \n option value: {:5.4f}, delta: {:5.4f}".format(bs_call, bs_delta))

by RL model: 
 option value: 0.0352, delta: 0.5162
by black-scholes model: 
 option value: 0.0347, delta: 0.5210
