# Solve Trading Bot using Rainbow DQN


In [None]:
%%bash

pip install numpy pandas pytorch-lightning tensorboard

#### Import the necessary code libraries

In [None]:
import copy
import torch
import random

import pandas as pd
import numpy as np

import torch.nn.functional as F

from collections import deque
from IPython.display import HTML
from base64 import b64encode

from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from torch.optim import AdamW

from pytorch_lightning import LightningModule, Trainer


device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
num_gpus = torch.cuda.device_count()

print(f'device {device} num_gpus {num_gpus}')

#### Create the Deep Q-Network

In [None]:
import math 
from torch.nn.init import kaiming_uniform_, zeros_

# Noisey DQN
# Add w_sigma and b_sigma components to the linear equation to add random noise
# These components help determine the significance of individual paramters efficiently
# Eliminates the need for "epsilon greedy" policy
class NoisyLinear(nn.Module):

  def __init__(self, in_features, out_features, sigma):
    super(NoisyLinear, self).__init__()
    self.w_mu = nn.Parameter(torch.empty((out_features, in_features)))
    self.w_sigma = nn.Parameter(torch.empty((out_features, in_features)))
    self.b_mu = nn.Parameter(torch.empty((out_features)))
    self.b_sigma = nn.Parameter(torch.empty((out_features)))

    kaiming_uniform_(self.w_mu, a=math.sqrt(5))
    kaiming_uniform_(self.w_sigma, a=math.sqrt(5))
    zeros_(self.b_mu)
    zeros_(self.b_sigma)
    

    self.sigma = sigma
  def forward(self, x):
    if self.training:
      w_noise = torch.normal(0, self.sigma, size=self.w_mu.size()).to(device)
      b_noise = torch.normal(0, self.sigma, size=self.b_mu.size()).to(device)
      return F.linear(x, self.w_mu + self.w_sigma * w_noise, self.b_mu + self.b_sigma * b_noise)
    else:
      return F.linear(x, self.w_mu, self.b_mu)

In [None]:
class DQN(nn.Module):

  def __init__(self, hidden_size, obs_shape, n_actions, atoms=51, sigma=0.5):
    """ Deep Q-Network NN Module

    Args:
        hidden_size (int): The # of features in our hidden layers
        obs_shape (int): The # of features in our state tensor
        n_actions (int): The # of q values our network produces
        atoms (int, optional): The # of atomic segments in each q value distribution. Defaults to 51.
        sigma (float, optional): Represents the std dev of the noise in each parameter. Defaults to 0.5.
    """
    super().__init__()
    # Distributional DQN
    # We break our output layers into "atoms" and output a distrbution across a range v_min - v_vmax
    self.atoms = atoms
    self.n_actions = n_actions
    
    self.net = nn.Sequential(
      # Noisey DQN
      # Add w_sigma and b_sigma components to the linear equation to add random noise
      # These components help determine the significance of individual paramters efficiently
      # Eliminates the need for "epsilon greedy" policy
      NoisyLinear(obs_shape, hidden_size, sigma=sigma),
      nn.ReLU(),
      NoisyLinear(hidden_size, hidden_size, sigma=sigma),
      nn.ReLU(),
    )

    # Dueling DQN
    # We take the output from the previous hidden layers and create 2 parallel output layers
    # One layer to represent the action rewards (advantage) and another for the state value (value)
    # This algorithm helps to determine the states where taking no action is optimal
    self.fc_adv = NoisyLinear(hidden_size, self.n_actions * self.atoms, sigma=sigma) # Distributional DQN atoms
    self.fc_value = NoisyLinear(hidden_size, self.atoms, sigma=sigma) # Distributional DQN atoms
  
  def forward(self, x):
    x = self.net(x)
    adv = self.fc_adv(x).view(-1, self.n_actions, self.atoms)  # (B, A, N)
    value = self.fc_value(x).view(-1, 1, self.atoms)  # (B, 1, N)
    # Dueling DQN uses recomposes the V(s) and Q(s, a) components subtracting the mean advantage
    q_logits = value + adv - adv.mean(dim=1, keepdim=True)  # (B, A, N)
    # Distributional DQN - estimate Q probability distribution
    q_probs = F.softmax(q_logits, dim=-1)  # (B, A, N)
    return q_probs

#### Create the policy

In [None]:
def greedy(state, net, support):
  """ Policy function for evaluating a given state and model

  Args:
      state (Tensor): The input state
      net (nn.Module): The model
      support (Tensor): The support tensor used to transform probabilities into q values

  Returns:
      int: The action chosen by the model for the given state
  """
  state = torch.tensor(np.array([state])).to(device).to(torch.float32)
  q_value_probs = net(state)  # (1, A, N) 
  q_values = (support * q_value_probs).sum(dim=-1)  # (1, A)
  action = torch.argmax(q_values, dim=-1)  # (1, 1)
  action = int(action.item())  # ()
  return action

#### Create the replay buffer

In [None]:
class ReplayBuffer:

  def __init__(self, capacity):
    """ A buffer for storing evaluated environment experiences

    Args:
        capacity (int): Max buffer size
    """
    self.buffer = deque(maxlen=capacity)
    self.priorities = deque(maxlen=capacity)
    self.capacity = capacity
    # Prioritized Experience Replay
    # alpha: high -> low will choose batches with the most loss vs batches with the highest probability
    # beta: low -> high corrects for the distribution bias of batches being chosen more frequently
    self.alpha = 0.0  # null
    self.beta = 1.0  # null
    self.max_priority = 0.0

  def __len__(self):
    return len(self.buffer)
  
  def append(self, experience):
    self.buffer.append(experience)
    self.priorities.append(self.max_priority)
  
  def update(self, index, priority):
    if priority > self.max_priority:
      self.max_priority = priority
    self.priorities[index] = priority

  def sample(self, batch_size):
    """ Return a sample from the buffer

    Args:
        batch_size (int): Size of the sample

    Returns:
        list: The returned sample
    """
    prios = np.array(self.priorities, dtype=np.float64) + 1e-4 # Stability constant.
    prios = prios ** self.alpha
    probs = prios / prios.sum()

    weights = (self.__len__() * probs) ** -self.beta
    weights = weights / weights.max()

    idx = random.choices(range(self.__len__()), weights=probs, k=batch_size)
    sample = [(i, weights[i], *self.buffer[i]) for i in idx]

    return sample

In [None]:
class RLDataset(IterableDataset):

  def __init__(self, buffer, sample_size=400):
    self.buffer = buffer
    self.sample_size = sample_size
  
  def __iter__(self):
    for experience in self.buffer.sample(self.sample_size):
      yield experience

#### Create and verify the environment

In [None]:
class MarketEnvironment:

  def __init__(self, file, window):
    self.data = list(pd.read_csv(file)['Adj Close'])
    self.states = []

    print(f'loaded {len(self.data)} data points')

    # prepopulate the states to avoid needless loops per step
    for idx, price in enumerate(self.data):
      if idx >= window:
        start = idx - window
        block = self.data[start: idx + 1] 
        state = []
        for i in range(window):
          state.append(self._sigmoid(block[i+1] - block[i]))
        self.states.append((price, state))
    
    print(f'generated {len(self.states)} states')

  def __len__(self):
    return len(self.states)
        
  def _sigmoid(self, x):
    """Performs sigmoid operation
    """
    try:
        if x < 0:
            return 1 - 1 / (1 + math.exp(x))
        return 1 / (1 + math.exp(-x))
    except Exception as err:
        print("Error in sigmoid: " + err)

  def __iter__(self):
    for state in self.states:
      yield state

In [None]:
env = MarketEnvironment('data/GOOG_2010_SAMPLE.csv', 10)

# verfy
for i, state in enumerate(env):
  print(f'state {i}: {state}')

In [None]:
class TradingAgent:
  
  def __init__(self):
    self.num_actions = 3
    self.reset()

  def sample_action(self):
    return random.randrange(3)

  def take_action(self, action, price):
    reward = 0

    if action == 1:
      self.inventory.append(price)
    elif action == 2 and len(self.inventory) > 0:
      buy_price = self.inventory.pop(0)
      delta = price - buy_price
      reward = delta
      self.total_profit += reward
    else:
      pass

    return reward

  def reset(self):
    self.inventory = []
    self.total_profit = 0

#### Create the Deep Q-Learning algorithm

In [None]:
class DeepQLearning(LightningModule):

  # Initialize.
  def __init__(self, env_name, window=10, policy=greedy, capacity=100_000, 
               batch_size=256, lr=1e-3, hidden_size=128, gamma=0.99, optim=AdamW, 
               samples_per_epoch=10_000, sync_rate=10, sigma=0.5, a_start=0.5, 
               a_end=0.0, a_last_episode=100, b_start=0.4, b_end=1.0, b_last_episode=100, 
               n_steps=3, v_min=-10.0, v_max=10.0, atoms=51):
    """ Rainbow Deep Q-Network 

    Args:
        env_name (string): The market data file
        window (int): The n period state size of each time t
        policy (function, optional): Our policy function. Defaults to greedy.
        capacity (int, optional): The max size of the replay buffer. Defaults to 100_000.
        batch_size (int, optional): The size of each replay batch. Defaults to 256.
        lr (float, optional): The network learning rate. Defaults to 1e-3.
        hidden_size (int, optional): # of features in the hidden layers. Defaults to 128.
        gamma (float, optional): Discount to future returns. Defaults to 0.99.
        optim (class, optional): Gradient descent optimization class. Defaults to AdamW.
        samples_per_epoch (_type_, optional): Buffer size needed for training to begin. Defaults to 10_000.
        sync_rate (int, optional): The # of epochs for target net to sync. Defaults to 10.
        sigma (float, optional): Noisey DQN std dev. Defaults to 0.5.
        a_start (float, optional): Initial alpha for prioritized experience replay. Defaults to 0.5.
        a_end (float, optional): Final alpha for prioritized experience replay. Defaults to 0.0.
        a_last_episode (int, optional): Ending epoch for alpha. Defaults to 100.
        b_start (float, optional): Initial beta for prioritized experience replay. Defaults to 0.4.
        b_end (float, optional): Final alpha for prioritized experience replay. Defaults to 1.0.
        b_last_episode (int, optional): Ending epoch for beta. Defaults to 100.
        n_steps (int, optional): Number of steps for N-Step DQN. Defaults to 3.
        v_min (float, optional): The minimum support value for Distributional DQN. Defaults to -10.0.
        v_max (float, optional): The maximum support value for Distributional DQN. Defaults to 10.0.
        atoms (int, optional): The number of atoms for Distributional DQN. Defaults to 51.
    """
    super().__init__()

    self.support = torch.linspace(v_min, v_max, atoms, device=device)  # (N)
    self.delta = (v_max - v_min) / (atoms - 1)

    self.env = MarketEnvironment(env_name, window)
    self.agent = TradingAgent()

    self.q_net = DQN(hidden_size, window, self.agent.num_actions, atoms=atoms, sigma=sigma)

    self.target_q_net = copy.deepcopy(self.q_net)

    self.policy = policy
    self.buffer = ReplayBuffer(capacity=capacity)

    self.save_hyperparameters()

    while len(self.buffer) < self.hparams.samples_per_epoch:
      print(f"{len(self.buffer)} samples in experience buffer. Filling...")
      self.play_episode()
    
  @torch.no_grad()
  def play_episode(self, policy=None):
    done = False
    transitions = []
    self.agent.reset()

    for i, (price, state) in enumerate(self.env):
      if policy:
        action = policy(state, self.q_net, self.support)
      else:
        action = self.agent.sample_action()
      reward = self.agent.take_action(action, price)
      done = i == len(self.env) - 1
      next_state = state if done else self.env.states[i + 1][1]
      exp = (state, action, reward, done, next_state)
      transitions.append(exp)

    # N-Step DQN
    # For each state t, caclulate the reward as the sum of discounted returns n -1 steps into the future
    for i, (s, a, r, d, ns) in enumerate(transitions):
      batch = transitions[i:i+self.hparams.n_steps]
      ret = sum([t[2] * self.hparams.gamma**j for j, t in enumerate(batch)])
      # The "next state" for each transition is the n future state used to calculate the training target
      _, _, _, ld, ls = batch[-1]
      self.buffer.append((s, a, ret, ld, ls))

  # Forward.
  def forward(self, x):
    return self.q_net(x)

  def custom_collate(self, data):
    indices, weights, states, actions, returns, dones, next_states = zip(*data)

    indices = torch.tensor(indices)
    weights = torch.tensor(weights)
    states = torch.tensor(states).to(torch.float32)
    returns = torch.tensor(returns).to(torch.double)
    dones = torch.tensor(dones)
    next_states = torch.tensor(next_states).to(torch.float32)

    return indices, weights, states, actions, returns, dones, next_states

  # Configure optimizers.
  def configure_optimizers(self):
    q_net_optimizer = self.hparams.optim(self.q_net.parameters(), lr=self.hparams.lr)
    return [q_net_optimizer]

  # Create dataloader.
  def train_dataloader(self):
    dataset = RLDataset(self.buffer, self.hparams.samples_per_epoch)
    dataloader = DataLoader(
        dataset=dataset,
        batch_size=self.hparams.batch_size,
        num_workers=36,
        collate_fn=self.custom_collate,
        drop_last=True
    )
    return dataloader

  # Training step
  # Double DQN
  # Target reward is the actual reward plus the discounted estimated reward in the next state
  # We predict the discounted rewards from the target model, but choose the action predicted by the current model
  # Then we update the target reward for the chosen action
  # Finally we train the current model towards the target values by computing the loss between actual and target rewards
  def training_step(self, batch, batch_idx):
    indices, weights, states, actions, returns, dones, next_states = batch
    returns = returns.unsqueeze(1)
    dones = dones.unsqueeze(1)
    batch_size = len(indices)

    q_value_probs = self.q_net(states)  # (B, A, N)

    action_value_probs = q_value_probs[range(batch_size), actions, :]  # (B, N)
    log_action_value_probs = torch.log(action_value_probs + 1e-6)  # (B, N)

    with torch.no_grad():
      next_q_value_probs = self.q_net(next_states)  # (B, A, N)
      next_q_values = (next_q_value_probs * self.support).sum(dim=-1)  # (B, A)
      next_actions = next_q_values.argmax(dim=-1)  # (B,)

      next_q_value_probs = self.target_q_net(next_states)  # (B, A, N)
      next_action_value_probs = next_q_value_probs[range(batch_size), next_actions, :]  # (B, N)

    # Distributional DQN
    # We must estimate m which is the target distrbution across n atoms from v_min to v_vmax for each action
    m = torch.zeros(batch_size * self.hparams.atoms, device=device, dtype=torch.float64)  # (B * N)

    # N-Step DQN discounts the future reward by gamma ** n, since rewards is the sum of 0 to n -1 discounted rewards
    Tz = returns + ~dones * self.hparams.gamma**self.hparams.n_steps * self.support.unsqueeze(0)  # (B, N)

    # Distributional DQN
    # Account for the skew and the offset in Tz distribution which occurs from adding discounted rewards
    Tz.clamp_(min=self.hparams.v_min, max=self.hparams.v_max)  # (B, N)
    b = (Tz - self.hparams.v_min) / self.delta  # (B, N)
    # Distribute fractional m values
    l, u = b.floor().long(), b.ceil().long()  # (B, N)

    offset = torch.arange(batch_size, device=device).view(-1, 1) * self.hparams.atoms  # (B, 1)

    l_idx = (l + offset).flatten()  # (B * N)
    u_idx = (u + offset).flatten()  # (B * N)
    
    upper_probs = (next_action_value_probs * (u - b)).flatten()  # (B * N)
    lower_probs = (next_action_value_probs * (b - l)).flatten()  # (B * N)

    m.index_add_(dim=0, index=l_idx, source=upper_probs)
    m.index_add_(dim=0, index=u_idx, source=lower_probs)

    m = m.reshape(batch_size, self.hparams.atoms)  # (B, N)

    cross_entropies = - (m * log_action_value_probs).sum(dim=-1)  # (B,)

    # Prioritized Experince Replay 
    # Update each state priority based on the loss
    # Weight the calculated losses to account for distribution bias
    for idx, e in zip(indices, cross_entropies):
      self.buffer.update(idx, e.detach().item())

    loss = (weights * cross_entropies).mean()

    self.log('episode/Q-Error', loss)
    return loss

  # Training epoch end.
  def training_epoch_end(self, training_step_outputs):
    # Prioritized Experience Replay
    # Update the buffer alpha and beta parameters over time
    alpha = max(
        self.hparams.a_end,
        self.hparams.a_start - self.current_epoch / self.hparams.a_last_episode
    )
    beta = min(
        self.hparams.b_end,
        self.hparams.b_start + self.current_epoch / self.hparams.b_last_episode
    )
    self.buffer.alpha = alpha
    self.buffer.beta = beta

    self.play_episode(policy=self.policy)
    self.log('episode/Return', self.agent.total_profit)

    if self.current_epoch % self.hparams.sync_rate == 0:
      self.target_q_net.load_state_dict(self.q_net.state_dict())

#### Purge logs and run the visualization tool (Tensorboard)

In [None]:
!rm -r ./lightning_logs/
!rm -r ./videos/
%load_ext tensorboard
%tensorboard --logdir ./lightning_logs/

#### Train the policy

In [None]:
algo = DeepQLearning(
  'data/GOOG_2010_SAMPLE.csv',
  lr=0.0001,
  sigma=0.5,
  hidden_size=512,
  a_last_episode=2_000,
  b_last_episode=2_000,
  n_steps=8,
)

trainer = Trainer(
  strategy='dp',
  accelerator='gpu',
  devices=1,
  max_epochs=2_400,
  log_every_n_steps=1
)

trainer.fit(algo)