<a href="https://colab.research.google.com/github/newmantic/DQN_options_hedge/blob/main/DQN_options_hedge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install gym numpy pandas tensorflow

In [None]:
import gym
import numpy as np

class OptionHedgingEnv(gym.Env):
    def __init__(self):
        super(OptionHedgingEnv, self).__init__()

        # Define action space: 0 (do nothing), 1 (buy call option), 2 (buy put option), 3 (sell call), 4 (sell put)
        self.action_space = gym.spaces.Discrete(5)

        # Define observation space (state): stock price, portfolio position, time to expiration
        self.observation_space = gym.spaces.Box(low=0, high=np.inf, shape=(3,), dtype=np.float32)

        # Simulate initial state
        self.stock_price = 100
        self.portfolio_value = 100000
        self.time_to_expiration = 30  # in days

    def step(self, action):
        # Apply action (buy/sell options)
        option_price = self._get_option_price(self.stock_price, self.time_to_expiration)
        if action == 1:  # buy call
            self.portfolio_value -= option_price
        elif action == 2:  # buy put
            self.portfolio_value -= option_price
        elif action == 3:  # sell call
            self.portfolio_value += option_price
        elif action == 4:  # sell put
            self.portfolio_value += option_price

        # Simulate next time step (stock price change)
        self.stock_price += np.random.normal(0, 1)
        self.time_to_expiration -= 1

        # Calculate reward (hedging effectiveness)
        reward = self._calculate_reward(self.portfolio_value)

        # Check if done (when options expire or max steps reached)
        done = self.time_to_expiration <= 0

        # Update state
        state = np.array([self.stock_price, self.portfolio_value, self.time_to_expiration], dtype=np.float32)

        return state, reward, done, {}

    def reset(self):
        self.stock_price = 100
        self.portfolio_value = 100000
        self.time_to_expiration = 30
        return np.array([self.stock_price, self.portfolio_value, self.time_to_expiration], dtype=np.float32)

    def _get_option_price(self, stock_price, time_to_expiration):
        # Simple option pricing model (e.g., Black-Scholes could be used here)
        return max(1, np.random.normal(10, 2))

    def _calculate_reward(self, portfolio_value):
        # Reward function based on portfolio protection and minimizing cost
        return portfolio_value / 1000


In [None]:
import tensorflow as tf
import random
from tensorflow.keras import layers

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = []
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # Neural network for deep Q-learning model
        model = tf.keras.Sequential()
        model.add(layers.Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(layers.Dense(24, activation='relu'))
        model.add(layers.Dense(self.action_size, activation='linear'))

        # Use 'learning_rate' instead of 'lr'
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size)  # Exploration
        q_values = self.model.predict(state)
        return np.argmax(q_values[0])  # Exploitation

    def replay(self, batch_size):
        # Use random.sample to get a random minibatch from memory
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

In [None]:
env = OptionHedgingEnv()
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)
episodes = 1000
batch_size = 32

for e in range(episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    for time in range(500):
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"episode: {e}/{episodes}, score: {reward}, epsilon: {agent.epsilon:.2}")
            break
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)