In [2]:
#! python3

import argparse

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np # NOTE only imported because https://github.com/pytorch/pytorch/issues/13918
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim



class PolicyGradient(nn.Module):
    def __init__(self, state_size, action_size, lr_actor=1e-3, lr_critic=1e-3, mode='REINFORCE', n=128, gamma=0.99, device='cpu'):
        super(PolicyGradient, self).__init__()

        self.state_size = state_size
        self.action_size = action_size

        self.mode = mode
        self.n = n
        self.gamma = gamma

        self.device = device

        hidden_layer_size = 256

        # actor
        self.actor = nn.Sequential(
            nn.Linear(state_size, hidden_layer_size),
            nn.ReLU(),
            nn.Linear(hidden_layer_size, action_size),
            # BEGIN STUDENT SOLUTION
            nn.Softmax(dim=-1)
            # END STUDENT SOLUTION
        )

        # critic
        self.critic = nn.Sequential(
            nn.Linear(state_size, hidden_layer_size),
            nn.ReLU(),
            # BEGIN STUDENT SOLUTION
            # END STUDENT SOLUTION
        )

        # initialize networks, optimizers, move networks to device
        # BEGIN STUDENT SOLUTION
        self.reinforce = nn.Sequential(
            nn.Linear(state_size, hidden_layer_size),
            nn.ReLU(),
            nn.Linear(hidden_layer_size, action_size),
            nn.Softmax(dim=-1)
        )


        #move to device
        self.reinforce.to(device)
        self.optimizer = optim.Adam(self.reinforce.parameters(), lr=lr_actor)
        self.reinforce.to(device)
        # END STUDENT SOLUTION


    def forward(self, state):
        if self.mode == 'REINFORCE':
            return(self.reinforce(state))
        return(self.actor(state), self.critic(state))


    def get_action(self, state, stochastic):
        # if stochastic, sample using the action probabilities, else get the argmax
        # BEGIN STUDENT SOLUTION
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

        probs = self.reinforce(state_tensor)
        if stochastic:
            distribution = torch.distributions.Categorical(probs)
            action = distribution.sample()
            return (probs, action.item())
        else:
            return (probs, torch.argmax(probs).item())
        # END STUDENT SOLUTION


    def calculate_n_step_bootstrap(self, rewards_tensor, values):
        # calculate n step bootstrap
        # BEGIN STUDENT SOLUTION

        # END STUDENT SOLUTION
        pass

    def train(self, states, actions, rewards):
        # BEGIN STUDENT SOLUTION
        #actions = torch.tensor(actions, dtype=torch.int64, device=self.device)

        # Calculate returns
        returns = []
        G = 0
        for r in rewards[::-1]:
            G = r + self.gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns, dtype=torch.float32, device=self.device)

        # Convert states to a tensor
        states = torch.tensor(states, dtype=torch.float32, device=self.device)

        action_probs_tensor = torch.stack(actions)
        log_probs = torch.log(action_probs_tensor)

        loss = -torch.sum(log_probs * returns)
        print(loss)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        # END STUDENT SOLUTION
        pass

    def run(self, env, max_steps, num_episodes, train):
        total_rewards = []

        # run the agent through the environment num_episodes times for at most max steps
        # BEGIN STUDENT SOLUTION
        for i in range(num_episodes):
            state = env.reset()[0]
            rewards = []
            action_probs = []
            for t in range(max_steps):
                probs, action = self.get_action(state, True)
                state, reward, done, done2, _ = env.step(action)
                done = done or done2
                rewards.append(reward)
                action_prob = probs[0, action]
                action_probs.append(action_prob)
                if (done):
                    max_steps = t
                    break

            if train:
                self.train(state, action_probs, rewards)
        # END STUDENT SOLUTION
        return(total_rewards)



def graph_agents(graph_name, agents, env, max_steps, num_episodes):
    print(f'Starting: {graph_name}')

    # graph the data mentioned in the homework pdf
    # BEGIN STUDENT SOLUTION
    total_episode_rewards = []
    for i in range(5):
      total_reward = agent.run(env, args.max_steps, args.num_episodes, True)
      total_episode_rewards.append(total_reward)

    transposed_array = list(zip(total_episode_rewards))
    avgerage_total_rewards = avg_array = [sum(column) / len(column) for column in transposed_array]
    min_total_rewards = [min(r) for r in zip(total_episode_rewards)]
    max_total_rewards = [max(r) for r in zip(total_episode_rewards)]

    graph_every = 100
    # END STUDENT SOLUTION

    # plot the total rewards
    xs = [i * graph_every for i in range(len(average_total_rewards))]
    fig, ax = plt.subplots()
    plt.fill_between(xs, min_total_rewards, max_total_rewards, alpha=0.1)
    ax.plot(xs, average_total_rewards)
    ax.set_ylim(-max_steps * 0.01, max_steps * 1.1)
    ax.set_title(graph_name, fontsize=10)
    ax.set_xlabel('Episode')
    ax.set_ylabel('Average Total Reward')
    fig.savefig(f'./graphs/{graph_name}.png')
    plt.close(fig)
    print(f'Finished: {graph_name}')



def parse_args():
    mode_choices = ['REINFORCE', 'REINFORCE_WITH_BASELINE', 'A2C']

    parser = argparse.ArgumentParser(description='Train an agent.')
    parser.add_argument('--mode', type=str, default='REINFORCE', choices=mode_choices, help='Mode to run the agent in')
    parser.add_argument('--n', type=int, default=64, help='The n to use for n step A2C')
    parser.add_argument('--num_runs', type=int, default=5, help='Number of runs to average over for graph')
    parser.add_argument('--num_episodes', type=int, default=3500, help='Number of episodes to train for')
    parser.add_argument('--max_steps', type=int, default=200, help='Maximum number of steps in the environment')
    parser.add_argument('--env_name', type=str, default='CartPole-v1', help='Environment name')
    return parser.parse_args()



def main():
    #args = parse_args()

    # init args, agents, and call graph_agents on the initialized agents
    # BEGIN STUDENT SOLUTION
    #init agent
    # get states and action sizes
    env = gym.make('CartPole-v1')
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.n
    agent = PolicyGradient(state_size, action_size, mode='REINFORCE', n=64)
    graph_agents('reinfoce', agent, env, 200, 3500)
    #agent.run(env, args.max_steps, args.num_episodes, True)

    return agent
    # END STUDENT SOLUTION


#if '__main__' == __name__:
agent = main()



usage: ipykernel_launcher.py [-h]
                             [--mode {REINFORCE,REINFORCE_WITH_BASELINE,A2C}]
                             [--n N] [--num_runs NUM_RUNS]
                             [--num_episodes NUM_EPISODES]
                             [--max_steps MAX_STEPS] [--env_name ENV_NAME]
ipykernel_launcher.py: error: unrecognized arguments: --f=/Users/ram/Library/Jupyter/runtime/kernel-v2-34746VzymrkLiwIyG.json


SystemExit: 2