<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Reinforcement-Learning-Example" data-toc-modified-id="Reinforcement-Learning-Example-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Reinforcement Learning Example</a></span><ul class="toc-item"><li><span><a href="#Model,-network-and-memory-definitions" data-toc-modified-id="Model,-network-and-memory-definitions-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Model, network and memory definitions</a></span></li><li><span><a href="#Example-with-defaults" data-toc-modified-id="Example-with-defaults-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Example with defaults</a></span><ul class="toc-item"><li><span><a href="#Example-comparing-hyperparameters" data-toc-modified-id="Example-comparing-hyperparameters-1.2.1"><span class="toc-item-num">1.2.1&nbsp;&nbsp;</span>Example comparing hyperparameters</a></span></li></ul></li></ul></li></ul></div>

# Reinforcement Learning Example

The only thing that I have changed from my eploration of RL 
is that I have added a little bit of a docstring to my base class CPSolver
and removed the many many cells of hyperparameter searching.

If you want to read more, do some googling. But also check out this paper https://arxiv.org/abs/1312.5602

[Disclaimer] It was never meant to see the light of day

L

In [None]:
# uses Python 3.7
# quick and dirty way to make sure you have the right packages 
# uncomment the next line and run this cell
# ! pip install jupyter gym torch matplotlib

In [None]:
from collections import namedtuple
import gym
import attr
import random
import math
from itertools import count

import torch
from torch import optim
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt

import pdb
from time import time

## Model, network and memory definitions

In [None]:
Experience = namedtuple("Experience", ("state", "action", "reward", "next_state"))


@attr.s
class ReplayMemory(object):
    capacity = attr.ib()
    memory = []
    position = 0

    def push(self, transition):
        """adds experiences to the memory buffer"""
        self.memory.append(transition)
        if len(self.memory) > self.capacity:
            del self.memory[0]

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
    def __repr__(self):
        return str(self.capacity)


class CPNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(4, 200)
        # self.fc2 = nn.Linear(200, 200)
        self.fc3 = nn.Linear(200, 2)

    def forward(self, x):
        xb = x.view(-1, 4)
        xb = F.relu(self.fc1(xb))
        # xb = F.relu(self.fc2(xb))
        xb = self.fc3(xb)
        return xb.view(-1, xb.size(1))
    



class CPSolver(object):
    """ 
    This is the base class that manages the whole RL pipeline.
    This was written for my own understanding, but it runs and 
    hopefully provides a bit of insight.

    Args:
        episodes (int): Number of runs of the game (up to end) to perform
        memory (int): Number of memories in the memory buffer
        gamma (float): Parameter in the loss function determining the 
            fractional impact of actions. between 0 and 1
        lr (float): The learning rate of the gradient descent when training the network
        batch_size (int): Number of memories passed in for training simulataneously
        eps_start (float): Starting fraction of true random decisions
        eps_end (float): Final fraction of true random decisions
        eps_decay (int): Exponential decay constant for the proportion of
            random decisions in episodes. See self.eps_greedy for 
            implementation of decaying epsilon greedy strategy
        optimizer (torch optimizer): SGD = stochastic gradient descent. could use others
        loss_fn (torch loss): MSE = Mean square error loss
        render (bool): Whether you want to see the game run
        render_step (int): render the game every x steps
        output = False: For saving video files. requires ffmpeg
    
    """
    def __init__(
        self,
        episodes=300,
        memory=10000,
        gamma=0.8,
        lr=0.01,
        batch_size=32,
        eps_start=0.9,
        eps_end=0.01,
        eps_decay=100,
        optimizer=optim.SGD,
        loss_fn=nn.MSELoss,
        render = True,
        render_step = 100,
        output = False
    ):

        self.eps_start = eps_start
        self.eps_end = eps_end
        self.eps_decay = eps_decay
        self.episodes = episodes
        self.gamma = gamma
        self.lr = lr
        self.batch_size = batch_size
        self.memory_size = memory
        
        self.memory = ReplayMemory(memory)
        self.env = gym.make("CartPole-v1")
        self.model = CPNet()
        self.optimizer = optimizer(self.model.parameters(), lr=lr)
        self.loss_fn=loss_fn()
        
        self.render = render
        self.render_step = render_step
        if output:
            # enables video file output
            self.env = gym.wrappers.Monitor(self.env, f'./RL_vids/{str(time())}/',video_callable=self.render_check)
        
        
        
    def render_check(self, step):
        if step==0:
            return True
        else:
            return (step+1)%self.render_step==0
    
    def eps_threshold(self, steps_done):
        return self.eps_end + (self.eps_start - self.eps_end) * math.exp(
            -1.0 * steps_done / self.eps_decay
        )

    def select_action(self, state, steps_done):
        """Selects the best action using the model"""
        with torch.no_grad():
            # model predicts highest predicted reward
            prediction = self.model(state)
            # selects the action with the highest predicted probability
            action = prediction.data.max(1)[1].view(1, 1)
        return action
            
        
    def eps_greedy(self, state, steps_done):
        if random.random() > self.eps_threshold(steps_done):
            return self.select_action(state, steps_done)
        else:
            return torch.tensor([[random.choice([0, 1])]])
        

    def optimize_model(self):
        # lol
        transitions = self.memory.sample(self.batch_size)
        # print(transitions)

        batch_state, batch_action, batch_reward, batch_next_state = zip(*transitions)

        batch_state = torch.cat(batch_state)
        batch_action = torch.cat(batch_action)
        batch_reward = torch.cat(batch_reward)
        batch_next_state = torch.cat(batch_next_state)
        # print(f'batch_reward:{batch_reward}')

        self.optimizer.zero_grad()
        # The network returns probabilities
        # The probs corresponding to the actions taken are selected
        current_q_values = self.model(batch_state).gather(1, batch_action).view(-1)
        # best probabilities possible from the next state
        max_next_q_values = self.model(batch_next_state).detach().max(1)[0]
        
        expected_q_values = batch_reward + (self.gamma * max_next_q_values)
        
        # loss is measured from error between current and Best expected Q values.
        loss = self.loss_fn(current_q_values, expected_q_values)
        # backpropagation of loss to NN
        loss.backward()
        # print(f'ep:{episode:03d}-step:{i:03d}-loss:{loss:0.4f}')
        # print(current_q_values.mean().item(), expected_q_values.mean().item())
        # print(optimizer.param_groups[0]['params'][0].grad.mean().item(),
        #       optimizer.param_groups[0]['params'][0].grad.var().item())
        self.optimizer.step()

    def learn(self):
        steps_done = 0
        ep_count = []
        step_count = []
        
        for episode in range(self.episodes):
            state = self.env.reset()
            for i in count():
                if self.render and (episode+1) % self.render_step ==0:
                    if i==0:
                        print(f'lr:{self.lr}_mem:{self.memory_size}_Episode:{episode+1}')
                    self.env.render()
                action = self.eps_greedy(torch.FloatTensor([state]), steps_done)
                steps_done += 1
                next_state, reward, done, info = self.env.step(action[0, 0].item())
                if done:
                    reward = -1
                self.memory.push(
                    (
                        torch.FloatTensor(state),
                        action,
                        torch.FloatTensor([reward]),
                        torch.FloatTensor(next_state),
                    )
                )
                # Only train if there are enough memories to produce a full batch
                if len(self.memory) >= self.batch_size:
                    self.optimize_model()
                    
                state = next_state
                if done:
                    ep_count.append(episode)
                    step_count.append(i)
                    break
        return ep_count, step_count

    def close(self):
        self.env.close()


## Example with defaults

In [None]:
fig, ax = plt.subplots()
cartpole_solver = CPSolver()
ep_count, steps = cartpole_solver.learn()
cartpole_solver.close()
ax.plot(steps, label=f"lr={cartpole_solver.lr} mem={cartpole_solver.memory}")
plt.legend()
plt.show()

### Example comparing hyperparameters

In [None]:
fig, ax = plt.subplots()
lr_space = [1,0.1,0.01]
for lr in lr_space:
    cartpole_solver = CPSolver(lr=lr)
    ep_count, steps = cartpole_solver.learn()
    cartpole_solver.close()
    ax.plot(steps, label=f"lr={cartpole_solver.lr} mem={cartpole_solver.memory}")
plt.legend()
plt.show()