In [3]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import itertools
import time
import torch
import pylab as plt
# %matplotlib inline
# %matplotlib notebook
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm

import memory as mem   
from feedforward import Feedforward

## Helper Functions

In [4]:
def running_mean(x, N):
    cumsum = np.cumsum(np.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

In [5]:
class DiscreteActionWrapper(gym.ActionWrapper):
    def __init__(self, env: gym.Env, bins = 5):
        """A wrapper for converting a 1D continuous actions into discrete ones.
        Args:
            env: The environment to apply the wrapper
            bins: number of discrete actions
        """
        assert isinstance(env.action_space, spaces.Box)
        super().__init__(env)
        self.bins = bins
        self.orig_action_space = env.action_space
        self.action_space = spaces.Discrete(self.bins)

    def action(self, action):
        """ discrete actions from low to high in 'bins'
        Args:
            action: The discrete action
        Returns:
            continuous action
        """
        return self.orig_action_space.low + action/(self.bins-1.0)*(self.orig_action_space.high-self.orig_action_space.low)  

# DQN

Complete the implemenation of DQN with a main Q-network and a target Q-network

In [6]:
""" Q Network, input: observations, output: q-values for all actions """
class QFunction(Feedforward):
    def __init__(self, observation_dim, action_dim, 
                 hidden_sizes=[100,100], learning_rate = 0.0002):
        super().__init__(input_size=observation_dim, 
                         hidden_sizes=hidden_sizes, 
                         output_size=action_dim)
        self.optimizer=torch.optim.Adam(self.parameters(), 
                                        lr=learning_rate, 
                                        eps=0.000001)
        # The L1 loss is often easier for choosing learning rates etc than for L2 (MSELoss)
        #  Imagine larger q-values (in the hundreds) then an squared error can quickly be 10000!, 
        #  whereas the L1 (absolute) error is simply in the order of 100. 
        self.loss = torch.nn.SmoothL1Loss()
        
    def fit(self, observations, actions, targets):
        # TODO: complete this
        self.optimizer.zero_grad()

        #calculate q(s,a) values for the actions chosen

        q_sa = self.Q_value(observations, actions)     # (B,)
        loss = self.loss(q_sa, targets)

        loss.backward()
        self.optimizer.step()
        return loss.item()
        
       
    def Q_value(self, observations, actions):
        # compute the Q value for the give actions
        # Hint: use the torch.gather function select the right outputs 
        # Complete this
        #gather q(s,a) for chosen actions
        q = self.forward(observations)                 # (B,n_actions)
        actions = actions.unsqueeze(1)        # (B,1)
        return q.gather(1, actions).squeeze(1)
    
    def maxQ(self, observations):
        # compute the maximal Q-value
        # Complete this
        q = self.forward(observations)
        return q.max(dim=1)[0]

    
    def greedyAction(self, observations):
        # this computes the greedy action
        return np.argmax(self.predict(observations), axis=-1)

In [7]:
class DQNAgent(object):
    """
    Agent implementing Q-learning with NN function approximation.    
    """
    def __init__(self, observation_space, action_space, **userconfig):
        
        if not isinstance(observation_space, spaces.box.Box):
            raise UnsupportedSpace('Observation space {} incompatible ' \
                                   'with {}. (Require: Box)'.format(observation_space, self))
        if not isinstance(action_space, spaces.discrete.Discrete):
            raise UnsupportedSpace('Action space {} incompatible with {}.' \
                                   ' (Reqire Discrete.)'.format(action_space, self))
        
        self._observation_space = observation_space
        self._action_space = action_space
        self._action_n = action_space.n
        self._config = {
            "eps": 0.05,            # Epsilon in epsilon greedy policies                        
            "discount": 0.95,
            "buffer_size": int(1e5),
            "batch_size": 128,
            "learning_rate": 0.0002, 
            # add additional parameters here  
            "use_target" : True,
            "target_update_interval" : 1,
            "hidden_sizes" : [100,100],     
        }
        self._config.update(userconfig)        
        self._eps = self._config['eps']
        self.buffer = mem.Memory(max_size=self._config["buffer_size"])
        
        obs_dim = observation_space.shape[0]
        act_dim = action_space.n

        #main q network
        self.Q = QFunction(observation_dim=obs_dim, 
                           action_dim=act_dim,
                           hidden_sizes=self._config["hidden_sizes"],
                           learning_rate=self._config["learning_rate"])
        #target q network
        self.Q_target = QFunction(observation_dim=obs_dim, 
                                  action_dim=act_dim,
                                  hidden_sizes=self._config["hidden_sizes"],
                                  learning_rate=self._config["learning_rate"])
        self._update_target_net()
        # complete here
        self.train_iter = 0
        self._action_space = action_space
            
    def _update_target_net(self):        
        # complete here
        # Hint: use load_state_dict() and state_dict() functions
        self.Q_target.load_state_dict(self.Q.state_dict())
        # pass 
    
    def act(self, observation, eps=None):
        #epsilon greedy
        if eps is None:
            eps = self._eps

        if np.random.random() > eps:
            #always call greedyAction with batch dimension
            obs_batch = np.array(observation, dtype=np.float32)[None, :]
            action = self.Q.greedyAction(obs_batch)[0]   #get scalar
        else: 
            action = self._action_space.sample()

        return int(action)
    
    def store_transition(self, transition):
        self.buffer.add_transition(transition)
            
    def train(self, iter_fit=32):
        losses = []
        # complete this! 
        if self.buffer.size < self._config["batch_size"]:
            return losses #not enough data in replay buffer yet
        
        #update tgt network per train() call
        if self._config.get("use_target", True):
            k = self._config.get("target_update_interval", 1)
            if self.train_iter % k == 0:
                self._update_target_net()
            
        batch_size= self._config["batch_size"]
        gamma= self._config["discount"]

        # Hint: look at last exercise's solution
        # Hint: while developing print the shape of !all! tensors/arrays to make sure 
        #  they have the right shape: (batchsize, X)  
        
        # Hint: for the target network, update its parameters at the beginning of this function 
        # every k  train calls. 
        
        # Hint:
        for i in range(iter_fit):
            # ....
            batch = self.buffer.sample(batch_size)

            #batch shape: (B,5) with [s,a,r, s_t+1, done]
            states = torch.tensor(np.vstack([t[0] for t in batch]), dtype=torch.float32) 
            actions = torch.tensor([t[1] for t in batch], dtype=torch.long) 
            rewards = torch.tensor([t[2] for t in batch], dtype=torch.float32) 
            next_states = torch.tensor(np.vstack([t[3] for t in batch]), dtype=torch.float32) 
            dones = torch.tensor([t[4] for t in batch], dtype=torch.float32)

            #calculate bootstrapped targets
            with torch.no_grad():
                if self._config["use_target"]:
                    max_next = self.Q_target.maxQ(next_states)
                else:
                    max_next = self.Q.maxQ(next_states)

                targets = rewards + gamma * (1 - dones) * max_next

            loss = self.Q.fit(states, actions, targets)
            losses.append(loss)

        self.train_iter += 1
        return losses

## Test in Env

In [8]:
env_name = 'Pendulum-v1'
# env_name = 'CartPole-v1'

env = gym.make(env_name)
if isinstance(env.action_space, spaces.Box):
    env = DiscreteActionWrapper(env,5)

ac_space = env.action_space
o_space = env.observation_space
print(ac_space)
print(o_space)
print(list(zip(env.observation_space.low, env.observation_space.high)))

Discrete(5)
Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
[(np.float32(-1.0), np.float32(1.0)), (np.float32(-1.0), np.float32(1.0)), (np.float32(-8.0), np.float32(8.0))]


In [9]:
q_agent = DQNAgent(o_space, ac_space, discount=0.95, eps=0.2)

In [10]:
ob,_info = env.reset()
q_agent.Q.predict(ob)

array([ 0.05592456, -0.1854772 , -0.06763475,  0.3149558 , -0.27281252],
      dtype=float32)

Train the agent!

In [11]:
stats = []
losses = []

In [12]:
max_episodes=600
max_steps=500 
for i in range(max_episodes):
    # print("Starting a new episode")    
    total_reward = 0
    ob, _info = env.reset()
    for t in range(max_steps):
        done = False        
        a = q_agent.act(ob)
        (ob_new, reward, done, trunc, _info) = env.step(a)
        total_reward+= reward
        q_agent.store_transition((ob, a, reward, ob_new, done))            
        ob=ob_new        
        if done: break    
    losses.extend(q_agent.train(32))
    stats.append([i,total_reward,t+1])    
    
    if ((i-1)%20==0):
        print("{}: Done after {} steps. Reward: {}".format(i, t+1, total_reward))

1: Done after 500 steps. Reward: -4227.648745897821
21: Done after 500 steps. Reward: -3117.6128748174615
41: Done after 500 steps. Reward: -2789.287687171019
61: Done after 500 steps. Reward: -3946.4837508666305
81: Done after 500 steps. Reward: -3168.8187647762056
101: Done after 500 steps. Reward: -4197.359526677847
121: Done after 500 steps. Reward: -3480.950575703372
141: Done after 500 steps. Reward: -3411.2193377169465
161: Done after 500 steps. Reward: -3558.4348877802713
181: Done after 500 steps. Reward: -3598.9923955063837
201: Done after 500 steps. Reward: -3106.537816695146
221: Done after 500 steps. Reward: -2301.202542899437
241: Done after 500 steps. Reward: -1928.038323418165
261: Done after 500 steps. Reward: -2866.035913072736
281: Done after 500 steps. Reward: -2989.4373453640037
301: Done after 500 steps. Reward: -2549.1891755315714
321: Done after 500 steps. Reward: -1665.8785209272817
341: Done after 500 steps. Reward: -647.7360428248404
361: Done after 500 steps

Plot the training reward over time. Use the running_mean(array, window_size) to plot a smooth version 

In [None]:
def running_mean(array, window_size):
    if window_size > len(array):
        window_size = len(array)
    window = np.ones(int(window_size)) / float(window_size)
    return np.convolve(array, window, 'valid')

def plot_training_progress(stats_np, losses_np, window_size=100):
    rewards = stats_np[:, 1]

    plt.figure(figsize=(12, 5))
    
    #rewards plot
    plt.subplot(1, 2, 1)
    plt.plot(rewards, alpha=0.3, color='gray', label='raw rewards')
    
    if len(rewards) >= window_size:
        smoothed = running_mean(rewards, window_size)
        x_smooth = np.arange(window_size-1, len(rewards))
        plt.plot(x_smooth, smoothed, color='blue', linewidth=2,
                 label=f'smoothed (window={window_size})')
    
    plt.xlabel('episode')
    plt.ylabel('episode reward')
    plt.title('training rewards')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    #loss plot
    plt.subplot(1, 2, 2)
    plt.plot(losses_np, color='orange', alpha=0.8)
    plt.xlabel('training step')
    plt.ylabel('loss')
    plt.title('training loss')
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
stats_np = np.asarray(stats)
losses_np = np.asarray(losses)

plot_training_progress(stats_np, losses_np)
# pass

In [None]:
q_agent.buffer.size

## Evaluate (without exploration)

Please look at the behavior for a small number of episodes

In [None]:
env_eval = gym.make(env_name, render_mode="human")
if isinstance(env.action_space, spaces.Box):
    env_eval = DiscreteActionWrapper(env_eval,5)

In [None]:
test_stats = []
episodes=50
env_ = env    # without rendering
#env_ = env_eval # with rendering

for i in range(episodes):
    total_reward = 0
    ob, _info = env_.reset()
    for t in range(max_steps):
        done = False        
        a = q_agent.act(ob, eps=0.0)
        (ob_new, reward, done, trunc, _info) = env_.step(a)
        total_reward+= reward
        ob=ob_new        
        if done: break    
    test_stats.append([i,total_reward,t+1])        

Evaluate mean and standard deviation of performance 

(for the Pendulum: an average return around -30 or better should be achieved)

(for the CartPendulum it is possible to get 200)

In [None]:
pass

# Visualize

Visualization of the value function.

In [None]:
Adapt the value_function plotting from last time to plot the maxQ value

## Pendulum Env

Observation space:

0 angle

1 angular velocity

Do that for the pendulum function. Does it look like you expect?

Do that for the pendulum function. Does it look like you expect

## Cartpole Env

Observation space:
 
0       Cart Position             -4.8                    4.8

1       Cart Velocity             -Inf                    Inf

2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)

3       Pole Angular Velocity     -Inf                    Inf

Try to adapt the plotting function that it also works in higher input spaces where all other inputs are 0

In [None]:
figQ = plot_Q_function_generic(q_agent.Q, input_dims=o_space.shape[0], plot_dim1=0, plot_dim2=2, 
                       label_dim1="Cart Pos", label_dim2="Pole Angle")

In [None]:
figQ = plot_Q_function_generic(q_agent.Q, input_dims=o_space.shape[0], plot_dim1=0, plot_dim2=1, 
                       label_dim1="Cart Pos", label_dim2="Cart Vel")

In [None]:
figQ = plot_Q_function_generic(q_agent.Q, input_dims=o_space.shape[0], plot_dim1=2, plot_dim2=3, 
                       label_dim1="Pol Angle", label_dim2="Pole Vel")

In [None]:
%matplotlib notebook

In [None]:
# env_name = 'Acrobot-v1'
# env_name = 'MountainCar-v0'
# env_name = 'LunarLander-v2'