https://github.com/AI-Core/Reinforcement-Learning/blob/master/Policy%20gradients%20solutions.ipynb

In [1]:
import torch

class NN(torch.nn.Module):
    def __init__(self, layers, embedding=False, distribution=False):
        super().__init__()
        l = []
        for idx in range(len(layers) - 1):
            l.append(torch.nn.Linear(layers[idx], layers[idx+1]))   # add a linear layer
            if idx + 1 != len(layers) - 1: # if this is not the last layer ( +1 = zero indexed) (-1 = layer b4 last)
                l.append(torch.nn.ReLU())   # activate
        if distribution:    # if a probability dist output is required
            l.append(torch.nn.Softmax())    # apply softmax to output
            
        self.layers = torch.nn.Sequential(*l) # unpack layers & turn into a function which applies them sequentially 

    def forward(self, x):
        return self.layers(x)
    
    

In [2]:
import gymnasium as gym
from time import sleep
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter
import numpy as np


In [3]:
def train(env, optimiser, agent_tag, epochs=100, episodes=30, use_baseline=False, use_causality=False):
    assert not (use_baseline and use_causality)   # cant implement both simply
    baseline = 0
    try:
        for epoch in range(epochs):
            avg_reward = 0
            objective = 0
            for episode in range(episodes):
                done = False
                state = env.reset()
                log_policy = []

                rewards = []

                step = 0

                # RUN AN EPISODE
                while not done:     # while the episode is not terminated
                    state = torch.Tensor(state[0] if type(state) == tuple else state)     # correct data type for passing to model
                    #print('STATE:', state)
                    state = state.view(np.prod(state.shape))

                    action_distribution = policy(state)     # get a distribution over actions from the policy given the state
                    #print('ACTION DISTRIBUTION:', action_distribution)

                    action = torch.distributions.Categorical(action_distribution).sample()      # sample from that distrbution
                    action = int(action)
                    # print('ACTION:', action)

                    new_state, reward, done, _,info = env.step(action)    # take timestep

                    rewards.append(reward)

                    state = new_state
                    log_policy.append(torch.log(action_distribution[action]))

                    step += 1
                    if done:
                        break
                    if step > 10000000:
                        # break
                        pass

                avg_reward += ( sum(rewards) - avg_reward ) / ( episode + 1 )   # accumulate avg reward
                writer.add_scalar(f'{agent_tag}/Reward/Train', avg_reward, epoch*episodes + episode)     # plot the latest reward

                # update baseline
                if use_baseline:
                    baseline += ( sum(rewards) - baseline ) / (epoch*episodes + episode + 1)    # accumulate average return  

                for idx in range(len(rewards)):     # for each timestep experienced in the episode
                    # add causality
                    if use_causality:   
                        weight = sum(rewards[idx:])     # only weight the log likelihood of this action by the future rewards, not the total
                    else:
                        weight = sum(rewards) - baseline           # weight by the total reward from this episode
                    objective += log_policy[idx] * weight   # add the weighted log likelihood of this taking action to 


            objective /= episodes   # average over episodes
            objective *= -1     # invert to represent reward rather than cost


            # UPDATE POLICY
            # print('updating policy')
            print('EPOCH:', epoch, f'AVG REWARD: {avg_reward:.2f}')
            objective.backward()    # backprop
            optimiser.step()    # update params
            optimiser.zero_grad()   # reset gradients to zero

            # VISUALISE AT END OF EPOCH AFTER UPDATING POLICY
            state = env.reset()
            done = False
            while not done:
                env.render()
                state = torch.Tensor(state[0] if type(state) == tuple else state)
                state = state.view(np.prod(state.shape))
                action_distribution = policy(state)
                action = torch.distributions.Categorical(action_distribution).sample()
                action = int(action)
                state, reward, done,_ ,info = env.step(action)
                sleep(0.01)
    except KeyboardInterrupt:
        print('interrupted')
        env.close()

    env.close()
    checkpoint = {
        'model': policy,
        'state_dict': policy.state_dict() 
    }
    torch.save(checkpoint, f"reinforce_agents/trained-agent-{agent_tag}.pt")



In [4]:
writer = SummaryWriter()

env = gym.make('CartPole-v1')

policy = NN([np.prod(env.observation_space.shape), 32, env.action_space.n], distribution=True)

lr = 0.001
weight_decay = 1
optimiser = torch.optim.SGD(policy.parameters(), lr=lr, weight_decay=weight_decay)
agent_tag = 'cartpole-improved'

train(
    env,
    optimiser,
    agent_tag,
    use_baseline=True,
    use_causality=False,
    epochs=30,
    episodes=30
)

  return self._call_impl(*args, **kwargs)
  gym.logger.warn(


EPOCH: 0 AVG REWARD: 23.00
EPOCH: 1 AVG REWARD: 25.20
EPOCH: 2 AVG REWARD: 25.73
EPOCH: 3 AVG REWARD: 25.60
EPOCH: 4 AVG REWARD: 24.70
EPOCH: 5 AVG REWARD: 30.07
EPOCH: 6 AVG REWARD: 29.53
EPOCH: 7 AVG REWARD: 24.17
EPOCH: 8 AVG REWARD: 27.73
EPOCH: 9 AVG REWARD: 27.83
EPOCH: 10 AVG REWARD: 25.00
EPOCH: 11 AVG REWARD: 30.87
EPOCH: 12 AVG REWARD: 31.67
EPOCH: 13 AVG REWARD: 32.03
EPOCH: 14 AVG REWARD: 26.83
EPOCH: 15 AVG REWARD: 28.70
EPOCH: 16 AVG REWARD: 25.83
EPOCH: 17 AVG REWARD: 36.83
EPOCH: 18 AVG REWARD: 29.53
EPOCH: 19 AVG REWARD: 31.93
EPOCH: 20 AVG REWARD: 30.77
EPOCH: 21 AVG REWARD: 35.43
EPOCH: 22 AVG REWARD: 28.67
EPOCH: 23 AVG REWARD: 39.40
EPOCH: 24 AVG REWARD: 34.90
EPOCH: 25 AVG REWARD: 43.67
EPOCH: 26 AVG REWARD: 36.30
EPOCH: 27 AVG REWARD: 35.37
EPOCH: 28 AVG REWARD: 37.67
EPOCH: 29 AVG REWARD: 45.90


In [5]:
train(
    env,
    optimiser,
    agent_tag,
    use_baseline=False,
    use_causality=True,
    epochs=30,
    episodes=30
)

EPOCH: 0 AVG REWARD: 43.17
EPOCH: 1 AVG REWARD: 46.93
EPOCH: 2 AVG REWARD: 49.53
EPOCH: 3 AVG REWARD: 38.33
EPOCH: 4 AVG REWARD: 48.13
EPOCH: 5 AVG REWARD: 50.97
EPOCH: 6 AVG REWARD: 48.53
EPOCH: 7 AVG REWARD: 52.70
EPOCH: 8 AVG REWARD: 52.30
EPOCH: 9 AVG REWARD: 55.83
EPOCH: 10 AVG REWARD: 52.17
EPOCH: 11 AVG REWARD: 48.30
EPOCH: 12 AVG REWARD: 64.60
EPOCH: 13 AVG REWARD: 71.97
EPOCH: 14 AVG REWARD: 67.97
EPOCH: 15 AVG REWARD: 68.87
EPOCH: 16 AVG REWARD: 46.20
EPOCH: 17 AVG REWARD: 67.30
EPOCH: 18 AVG REWARD: 77.00
EPOCH: 19 AVG REWARD: 66.73
EPOCH: 20 AVG REWARD: 25.50
EPOCH: 21 AVG REWARD: 29.17
EPOCH: 22 AVG REWARD: 33.20
EPOCH: 23 AVG REWARD: 40.50
EPOCH: 24 AVG REWARD: 55.23
EPOCH: 25 AVG REWARD: 121.17
EPOCH: 26 AVG REWARD: 67.13
EPOCH: 27 AVG REWARD: 109.23
EPOCH: 28 AVG REWARD: 64.87
EPOCH: 29 AVG REWARD: 127.60


In [8]:
def deploy(env, saved_model):
    
#     policy = NN([np.prod(env.observation_space.shape), 32, env.action_space.n], distribution=True) # we must remember the architecture
    policy = saved_model['model']
    policy.load_state_dict(saved_model['state_dict']) # load in our pre-trained model
    policy.eval() # put our model in evaluation mode
    try:
        for episode in range(100): # keep demonstrating your skills
                done = False # not done yet
                observation = env.reset() # initialise the environemt
                while not done: # until the episode is over
                    observation = torch.Tensor(observation[0] if type(observation) == tuple else observation) # turn observation to tensor
                    observation = observation.view(np.prod(observation.shape)) # view observation as vector
                    action_distribution = policy(observation) # infer what actions to take with what probability
                    action = torch.distributions.Categorical(action_distribution).sample() # sample an action from that distribution
                    action = int(action) # make it an int not a float
                    observation, reward, done, _,info = env.step(action) # take an action and transition the environment
                    env.render() # show us the environment
                    sleep(0.01)
    except KeyboardInterrupt:
        env.close()
       

cartpole_env = gym.make('CartPole-v1', render_mode="human")
cartpole_agent_params = torch.load('reinforce_agents/trained-agent-cartpole-improved.pt')
deploy(cartpole_env, cartpole_agent_params)


  cartpole_agent_params = torch.load('reinforce_agents/trained-agent-cartpole-improved.pt')
  return self._call_impl(*args, **kwargs)


: 