# Introduction to the basics of RL algorithms

This notebook introduces some basic algorithms, before digging to more serious implementation using GPU-enabled simulators and baseline solver implementation in a second notebook.

## Setup

We need torch, gymnasium, and stable-baseline (just for the replay-buffer, I agree it is quite overkill). Plus we need a way to properly display the environments inside the notebook, which is tricky.

In [2]:
import torch
import gymnasium as gym
import matplotlib.pylab as plt
import stable_baselines3 as sb3

### Rendering
Gymnasium renders by default its environments in a X window. We need a bypass for rendering them in notebooks, using a canvas for pywidget. 
The solution below pre-load a canvas, and then use it with images generated by the environment. On some setups, it can blink. 

In [None]:
env = gym.make('CartPole-v1',render_mode='rgb_array')
env.reset()

from ipycanvas import Canvas
import time
canvas = Canvas(width=400, height=600)
display(canvas)

for j in range(200):
    env.step(j % 2)
    # Render in the matplotlib canvas
    canvas.put_image_data(env.render(), 0, 0)
    time.sleep(1e-2)

## Starting simple with a Q-Table

We first focus on a full-discrete environment, which allows to work with a Q-table.


### Code of the environment (execute but don't read it)

In [None]:
import numpy as np
import gymnasium as gym

class EnvMountainCarFullyDiscrete(gym.Wrapper):
    def __init__(self, n_bins=(51, 51),**kwargs):
        env = gym.make("MountainCar-v0",**kwargs).unwrapped
        super().__init__(env)
        self.n_bins = n_bins

        # Définition des bornes des états
        self.state_bounds = [
            [-1.2, 0.6],   # Position du chariot
            [-0.1, .1],   # Vitesse du chariot
       ]

        # Création des bins pour discrétiser l'état
        self.bins = [np.linspace(low-1e-6, high+1e-6, num=n, endpoint=True)
                     for (low, high), n in zip(self.state_bounds, self.n_bins)]
        # Basis value for representing the bin indexes as a single integer
        self.bin_base = [ int(np.prod(self.n_bins[:n])) for n in range(len(self.n_bins)) ]

        # Définition de l'espace d'observation discret
        #self.observation_space = gym.spaces.MultiDiscrete(self.n_bins)
        self.observation_space = gym.spaces.Discrete(np.prod(self.n_bins))

    def discrete_state_to_index(self,bins):
        return sum([ idx*base for (idx,base) in zip(bins,self.bin_base) ])
    def index_to_discrete_state(self,index):
        bins = []
        for n in self.n_bins:
            bins.append(index%n)
            index = index // n
        return bins

    def discretize_state(self, state):
        """Convertit un état continu en un index discret."""
        return [np.digitize(s, bins) for s, bins in zip(state, self.bins)]

    def undiscretize_state(self, discrete_state):
        """Transforme un état discret en état continu en prenant le centre du bin."""
        assert self.is_discrete_state_in_range(discrete_state)
        return np.array([
            (self.bins[i][d-1] + self.bins[i][d]) / 2 if 0 < d < len(self.bins[i]) else self.state_bounds[i][d == 0]
            for i, d in enumerate(discrete_state)
        ])

    def is_discrete_state_in_range(self,discrete_state):
        """Check that non of the bins is out of the state space"""
        res = 0 not in discrete_state
        res = res and  np.all([ d<n for d,n in zip(discrete_state,self.n_bins) ])
        return res

    def is_index_in_range(self,index):
        if index<0 or index>self.observation_space.n: return False
        return self.is_discrete_state_in_range(self.index_to_discrete_state(index))

    def step(self, action):
        """Exécute une action et retourne l'état sous forme discrète."""
        # Step 10 times in the continuous space to ensure enough movements
        # to pass a discrete bin.
        for i in range(10):
            next_state_cont, reward, done, trunc, info = self.env.step(action)
            if done: break
        next_state_discrete = self.discretize_state(next_state_cont)
        if not self.is_discrete_state_in_range(next_state_discrete):
            done = False
            trunc = True
            self.state = None
        else:
            # Reconvertir en état continu pour rester dans un espace propre
            self.env.state = self.undiscretize_state(next_state_discrete)
            self.discrete_state = next_state_discrete
            self.state = self.discrete_state_to_index(next_state_discrete)
        return self.state, reward, done, trunc, info

    def reset(self, **kwargs):
        """Réinitialise l'environnement avec un état strictement discret."""
        continuous_state, info = self.env.reset(**kwargs)
        self.discrete_state = self.discretize_state(continuous_state)
        assert self.is_discrete_state_in_range(self.discrete_state)
        self.state = self.discrete_state_to_index(self.discrete_state)
        # On force le reset sur un état discretisé
        self.env.state = self.undiscretize_state(self.discrete_state)
        return self.state,info



### Environment usage
You can play with it:

In [None]:
env = EnvMountainCarFullyDiscrete(render_mode='rgb_array')

# Configure the renderer
canvas = Canvas(width=400, height=600)
display(canvas)

env.reset()
canvas.put_image_data(env.render(), 0, 0)
time.sleep(0.5)

for _ in range(20):
    # Play the game
    action = env.action_space.sample()
    env.step(action)

    # Then render
    canvas.put_image_data(env.render(), 0, 0)
    time.sleep(1e-1)

### Q-Table algorithm

The algorithm represent the Q value as a simple table Q(x,u). The policy is an argmax on the possible u values: policy(x) = argmax(Q[x,:]). The optimization is done by applying the Belman operator.


In [None]:
### --- Random seed
RANDOM_SEED = 1188  # int((time.time()%10)*1000)
print("Seed = %d" % RANDOM_SEED)
np.random.seed(RANDOM_SEED)

### --- Environment
env = EnvMountainCarFullyDiscrete()

### --- Hyper paramaters
NEPISODES = 4000  # Number of training episodes
NSTEPS = 50  # Max episode length
LEARNING_RATE = 0.85  #
DECAY_RATE = 0.99  # Discount factor

assert isinstance(env.observation_space,gym.spaces.discrete.Discrete)
assert isinstance(env.action_space,gym.spaces.discrete.Discrete)
Q = np.zeros([env.observation_space.n, env.action_space.n])  # Q-table initialized to 0




h_rwd = []  # Learning history (for plot).
for episode in range(1, NEPISODES):
    x,_ = env.reset()
    rsum = 0.0
    for steps in range(NSTEPS):
        u = np.argmax(
            Q[x, :] + np.random.randn(1, env.action_space.n) / episode
        )  # Greedy action with noise
        x2, reward, done, trunc, info = env.step(u)

        # Compute reference Q-value at state x respecting HJB
        Qref = reward + DECAY_RATE * np.max(Q[x2, :])

        # Update Q-Table to better fit HJB
        Q[x, u] += LEARNING_RATE * (Qref - Q[x, u])
        x = x2
        rsum += reward
        if done or trunc: break

    h_rwd.append(rsum)
    if not episode % 200:
        print(
           "Episode #%d done with average cost %.2f" % (episode, sum(h_rwd[-20:]) / 20)
        )



We can plot some results.

First, the learning curve, showing cumulative rewards accross episods.

In [None]:

# ### Plot the learning curve
print("Total rate of success: %.3f" % (sum(h_rwd) / NEPISODES))
plt.plot(np.cumsum(h_rwd) / range(1, len(h_rwd)+1))


Then, let's rollout an episod of the resulting policy.

In [None]:
envrender = EnvMountainCarFullyDiscrete(render_mode = 'rgb_array')
canvas = Canvas(width=400, height=600)
display(canvas)

s,_ = envrender.reset()
traj = [s]
canvas.put_image_data(envrender.render(), 0, 0)
time.sleep(0.5)

for i in range(100):
    a = np.argmax(Q[s, :])
    s, r, done, trunc, info = envrender.step(a)
    traj.append(s) # traj is stored to later plot it

    if done: break # Will not display the last state, where the car is not visible anymore

    canvas.put_image_data(envrender.render(), 0, 0)
    time.sleep(0.1)

Finally, the value function. We display it as a color map along position and velocity of the car.

In [None]:
# ### Plot Q-Table as value function pos-vs-vel
fig,axs = plt.subplots(1,2,figsize=(10,5))

# Compute the (q,v)\in R^2 expression of each state
def indexes_to_continuous_states(indexes):
    return [ env.undiscretize_state(env.index_to_discrete_state(s))
             for s in indexes if env.is_index_in_range(s) ]
qvs = indexes_to_continuous_states(range(env.observation_space.n))
# Compute the value of each state
values = [ np.max(Q[s,:]) for s in range(env.observation_space.n)
           if env.is_index_in_range(s) ]
# Compute the policy of each state
policies = [ np.argmax(Q[s,:]) for s in range(env.observation_space.n)
           if env.is_index_in_range(s) ]

# Plot value on the left, policy and traj on the right ...
# ... value
vplot = axs[0].scatter([qv[0] for qv in qvs],[qv[1] for qv in qvs],c=values)
plt.colorbar(vplot,ax=axs[0])
axs[0].set_title('Value function')
# ... policy
piplot = axs[1].scatter([qv[0] for qv in qvs],[qv[1] for qv in qvs],c=policies)
axs[1].set_title('Policy function with traj')
plt.colorbar(piplot,ax=axs[1])
# ... traj for the displayed rollout.
qv_traj = indexes_to_continuous_states(traj)
plt.plot([qv[0] for qv in qv_traj],[qv[1] for qv in qv_traj],'r-', linewidth=3)



## Making the table deep

Next, we redo the same but considering the table as a simplistic neural network, and enforcing the Belman recursion through gradient descent. Much less efficient, but opens the door to deeper representation.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class QValueNetwork(nn.Module):
    def __init__(self, env, learning_rate=LEARNING_RATE):
        assert isinstance(env.observation_space,gym.spaces.discrete.Discrete)
        assert isinstance(env.action_space,gym.spaces.discrete.Discrete)

        super(QValueNetwork, self).__init__()

        # Linear layer from input size NX to output size NU
        self.fc = nn.Linear(env.observation_space.n,env.action_space.n)
        self.optimizer = torch.optim.SGD(self.parameters(), lr=learning_rate)

    def forward(self, x):
        qvalue = self.fc(x)
        return qvalue

    def predict_action(self, x, noise=0):
        qvalue = self.forward(x)
        if noise != 0:
            qvalue += torch.randn(qvalue.shape) * noise
        u = torch.argmax(qvalue,dim=1)
        return u

    def update(self, x, qref):
        self.optimizer.zero_grad()
        qvalue = self.forward(x)
        loss = F.mse_loss(qvalue, qref)
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def plot(self, traj=None):
        '''
        Plot Q-Table as value function pos-vs-vel
        If <traj>  is given as a list of states, then it is also plot in the Pi (right)
        subplot.
        '''
        def indexes_to_continuous_states(indexes):
            return [ env.undiscretize_state(env.index_to_discrete_state(s))
                     for s in indexes if env.is_index_in_range(s) ]
        Q = self.fc.weight.T
        fig,axs = plt.subplots(1,2,figsize=(10,5))
        qvs = indexes_to_continuous_states(range(env.observation_space.n))
        values = [ float(torch.max(Q[s,:])) for s in range(env.observation_space.n)
                   if env.is_index_in_range(s) ]
        policies = [ int(torch.argmax(Q[s,:])) for s in range(env.observation_space.n)
                     if env.is_index_in_range(s) ]
        # Plot value on the left, policy and traj on the right ...
        # ... value
        vplot = axs[0].scatter([qv[0] for qv in qvs],[qv[1] for qv in qvs],c=values)
        plt.colorbar(vplot,ax=axs[0])
        axs[0].set_title('Value function')
        # ... policy
        piplot = axs[1].scatter([qv[0] for qv in qvs],[qv[1] for qv in qvs],c=policies)
        plt.colorbar(piplot,ax=axs[1])
        axs[1].set_title('Policy')
        # ... traj for the displayed rollout.
        if traj is not None:
            qv_traj = indexes_to_continuous_states(traj)
            plt.plot([qv[0] for qv in qv_traj],[qv[1] for qv in qv_traj],'r-', linewidth=3)


qvalue = QValueNetwork(env, learning_rate=LEARNING_RATE)

def one_hot(ix, env):
    """Return a one-hot encoded tensor.

    - ix: index or batch of indices
    - n: number of classes (size of the one-hot vector)
    """
    ix = torch.tensor(ix).long()
    if ix.dim() == 0:  # If a single index, add batch dimension
        ix = ix.unsqueeze(0)
    return F.one_hot(ix,num_classes=env.observation_space.n).to(torch.float32)

### --- History of search
h_rwd = []  # Learning history (for plot).

### --- Training
for episode in range(1, NEPISODES):
    x, _ = env.reset()
    rsum = 0.0

    for step in range(NSTEPS - 1):

        u = int(qvalue.predict_action(one_hot(x,env),noise=1/episode))  # ... with noise
        x2, reward, done, trunc, info = env.step(u)

        # Compute reference Q-value at state x respecting HJB
        # Q2 = sess.run(qvalue.qvalue, feed_dict={qvalue.x: onhot(x2)})
        qnext = qvalue(one_hot(x2,env))
        qref = qvalue(one_hot(x,env))
        qref[0, u] = reward + DECAY_RATE * torch.max(qnext)

        # Update Q-table to better fit HJB
        #sess.run(qvalue.optim, feed_dict={qvalue.x: onehot(x), qvalue.qref: Qref})
        qvalue.update(x=one_hot(x,env),qref=qref)

        rsum += reward
        x = x2
        if done or trunc: break

    h_rwd.append(rsum)
    if not episode % 200:
        print("Episode #%d done with %d sucess" % (episode, sum(h_rwd[-20:])))


Similarly, we can plot the results.

In [None]:
print("Total rate of success: %.3f" % (sum(h_rwd) / NEPISODES))
plt.plot(np.cumsum(h_rwd) / range(1, len(h_rwd)+1))

In [None]:
envrender = EnvMountainCarFullyDiscrete(render_mode = 'rgb_array')
canvas = Canvas(width=400, height=600)
display(canvas)

s,_ = envrender.reset()
traj = [s]

canvas.put_image_data(envrender.render(), 0, 0)
time.sleep(0.5)

for i in range(100):
    a = int(qvalue.predict_action(one_hot(s,env)))
    s, r, done, trunc, info = envrender.step(a)
    traj.append(s)

    if done: break
        
    canvas.put_image_data(envrender.render(), 0, 0)
    time.sleep(0.1)


In [None]:
qvalue.plot()

## DQN

Now that we can accept a neural representation of the Q function, any inputs can be taken. Let's switch to a continuous state. For now, we will keep the argmax policy decision, meaning we need to stay in an environment with discrete policy.

In [None]:

import random
from stable_baselines3.common.buffers import ReplayBuffer

# HYPERPARAM
SEED = 1
ENV_ID = "CartPole-v1"
NUM_ENVS = 1
LEARNING_RATE = 0.00025
BUFFER_SIZE = 10000
TOTAL_TIMESTEPS = 500000
TRAIN_FREQUENCY = 10
BATCH_SIZE = 128
GAMMA = 0.99
TARGET_NETWORK_FREQUENCY = 500
TAU = 1.0

# ALGO LOGIC: initialize agent here:
class QNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(np.array(env.single_observation_space.shape).prod(), 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, env.single_action_space.n),
        )

    def forward(self, x):
        return self.network(x)

def linear_schedule(start_e: float, end_e: float, duration: int, t: int):
    slope = (end_e - start_e) / duration
    return max(slope * t + start_e, end_e)

# TRY NOT TO MODIFY: seeding
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# env setup
def make_env(env_id, seed, idx, capture_video, run_name):
    def thunk():
        if capture_video and idx == 0:
            env = gym.make(env_id, render_mode="rgb_array")
            env = gym.wrappers.RecordVideo(env, f"videos/{run_name}")
        else:
            env = gym.make(env_id)
        env = gym.wrappers.RecordEpisodeStatistics(env)
        env.action_space.seed(seed)

        return env

    return thunk

envs = gym.vector.SyncVectorEnv(
    [make_env(ENV_ID, SEED + i, i, False, "TP11_DQN") for i in range(NUM_ENVS)]
)
assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported"

q_network = QNetwork(envs)
optimizer = torch.optim.Adam(q_network.parameters(), lr=LEARNING_RATE)
target_network = QNetwork(envs)
target_network.load_state_dict(q_network.state_dict())

rb = ReplayBuffer(
    BUFFER_SIZE,
    envs.single_observation_space,
    envs.single_action_space,
    "cpu",
    handle_timeout_termination=False,
)

# start the game
obs, _ = envs.reset(seed=SEED)
started = np.zeros(NUM_ENVS, dtype=bool)
h_rwd = []

for global_step in range(TOTAL_TIMESTEPS):
    # ALGO LOGIC: put action logic here
    epsilon = linear_schedule(1, 0.05, .5 * TOTAL_TIMESTEPS, global_step)
    if random.random() < epsilon:
        actions = np.array([envs.single_action_space.sample() for _ in range(envs.num_envs)])
    else:
        q_values = q_network(torch.Tensor(obs))
        actions = torch.argmax(q_values, dim=1).numpy()

    # execute the game and log data.
    next_obs, rewards, terminations, truncations, infos = envs.step(actions)

    # record rewards for plotting purposes
    if "episode" in infos:
        for idx in range(NUM_ENVS):
            if truncations[idx] or terminations[idx]:
                assert infos["episode"]["l"]>0
                h_rwd.append(infos["episode"]["r"][idx])
                if len(h_rwd) % 100 == 0:  # verbose one every ~200 episode
                    print(f"global_step={global_step}/{TOTAL_TIMESTEPS},"
                          + f" episodic_return={h_rwd[-1]}")
                
    # Add samples to replay buffer if env not starting
    if np.any(started):
        rb.add(obs[started], next_obs[started], actions[started],
                rewards[started], terminations[started], infos)
    obs = next_obs
    started = np.logical_not(np.logical_or(terminations, truncations))

    # CRUCIAL step easy to overlook
    obs = next_obs

    # ALGO LOGIC: training.
    if global_step > TOTAL_TIMESTEPS // 50 and global_step % TRAIN_FREQUENCY == 0:
        data = rb.sample(BATCH_SIZE)
        with torch.no_grad():
            target_max, _ = target_network(data.next_observations).max(dim=1)
            td_target = data.rewards.flatten() + GAMMA * target_max * (1 - data.dones.flatten())
        old_val = q_network(data.observations).gather(1, data.actions).squeeze()
        loss = F.mse_loss(td_target, old_val)

        if int(round((global_step/100) ** (1/3))) ** 3 ==  global_step/100:
            print(f'*** Learning goes ... loss = {loss},  q = {old_val.mean()}')

        # optimize the model
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # update target network
    if global_step % TARGET_NETWORK_FREQUENCY == 0:
        for target_network_param, q_network_param in zip(target_network.parameters(), q_network.parameters()):
            target_network_param.data.copy_(
                TAU * q_network_param.data + (1.0 - TAU) * target_network_param.data
            )

envs.close()

And the plots:

In [None]:
print(f"Total rate of success: {np.mean(h_rwd)}")
plt.plot(np.cumsum(h_rwd) / range(1, len(h_rwd)+1))


In [None]:
envrender = gym.make('CartPole-v1', render_mode = 'rgb_array')
canvas = Canvas(width=400, height=600)
display(canvas)

s,_ = envrender.reset()
traj = [s]

canvas.put_image_data(envrender.render(), 0, 0)
time.sleep(0.5)

for i in range(100):
    a = int(torch.argmax(q_network(torch.Tensor(s).unsqueeze(0))))
    s, r, done, trunc, info = envrender.step(a)
    traj.append(s)

    if done: break
        
    canvas.put_image_data(envrender.render(), 0, 0)
    time.sleep(0.01)

This environment has a state of dimension 4. We plot the value on the positions only, cuting at zero velocities.

In [None]:

hq = []
hp = []
ha = []
for pos in np.arange(envs.single_observation_space.low[0],
                     envs.single_observation_space.high[0],.1):
    for angle in np.arange(envs.single_observation_space.low[2],
                           envs.single_observation_space.high[2],.01):
        hp.append(pos)
        ha.append(angle)
        hq.append(float(torch.max(
            q_network(torch.tensor([[pos, 0, angle, 0]], dtype=torch.float32)))))

# Scatter plot
plt.figure(figsize=(8, 6))
plt.scatter(hp, ha, c=hq)
plt.colorbar(label='Max Q-value')
plt.xlabel('Cart Position')
plt.ylabel('Pole Angle')
plt.title('Q-value landscape (velocity = 0)')


## Deep deterministic policy gradient (DDPG)

Finally, let's consider the case where the actions also are living in a continuous space. In that case, we also need to train a policy network to locally optimize the value (i.e. greedy behavior on the value).

In [None]:

# HYPERPARAM
SEED = 1
ENV_ID = "Pendulum-v1"
NUM_ENVS = 1
LEARNING_RATE = 3e-4
BUFFER_SIZE = int(1e6)
TOTAL_TIMESTEPS = 30000
TRAIN_FREQUENCY = 2
BATCH_SIZE = 256
GAMMA = 0.99
TARGET_NETWORK_FREQUENCY = 2
TAU = 0.005
EXPLORATION_NOISE = 0.1

def make_env(env_id, seed, idx, capture_video, run_name):
    def thunk():
        if capture_video and idx == 0:
            env = gym.make(env_id, render_mode="rgb_array")
            env = gym.wrappers.RecordVideo(env, f"videos/{run_name}")
        else:
            env = gym.make(env_id)
        env = gym.wrappers.RecordEpisodeStatistics(env)
        env.action_space.seed(seed)
        return env

    return thunk

# ALGO LOGIC: initialize agent here:
class QNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.fc1 = nn.Linear(np.array(env.single_observation_space.shape).prod() + np.prod(env.single_action_space.shape), 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)

    def forward(self, x, u):
        x = torch.cat([x, u], 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class Actor(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.fc1 = nn.Linear(np.array(env.single_observation_space.shape).prod(), 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc_mu = nn.Linear(256, np.prod(env.single_action_space.shape))
        # action rescaling
        self.register_buffer(
            "action_scale", torch.tensor((env.action_space.high - env.action_space.low) / 2.0, dtype=torch.float32)
        )
        self.register_buffer(
            "action_bias", torch.tensor((env.action_space.high + env.action_space.low) / 2.0, dtype=torch.float32)
        )

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = torch.tanh(self.fc_mu(x))
        return x * self.action_scale + self.action_bias


# TRY NOT TO MODIFY: seeding
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# env setup
envs = gym.vector.SyncVectorEnv([make_env(ENV_ID, SEED, 0, False, "TP11_DDPG")])
assert isinstance(envs.single_action_space, gym.spaces.Box), "only continuous action space is supported"

actor = Actor(envs)
qf1 = QNetwork(envs)
qf1_target = QNetwork(envs)
target_actor = Actor(envs)
target_actor.load_state_dict(actor.state_dict())
qf1_target.load_state_dict(qf1.state_dict())
q_optimizer = torch.optim.Adam(list(qf1.parameters()), lr=LEARNING_RATE)
actor_optimizer = torch.optim.Adam(list(actor.parameters()), lr=LEARNING_RATE)

envs.single_observation_space.dtype = np.float32
rb = ReplayBuffer(
    BUFFER_SIZE,
    envs.single_observation_space,
    envs.single_action_space,
    "cpu",
    handle_timeout_termination=False,
)

# start the game
obs, _ = envs.reset(seed=SEED)
h_rwd = []

for global_step in range(TOTAL_TIMESTEPS):
    # ALGO LOGIC: put action logic here
    if global_step < TOTAL_TIMESTEPS//40:
        actions = np.array([envs.single_action_space.sample() for _ in range(envs.num_envs)])
    else:
        with torch.no_grad():
            actions = actor(torch.Tensor(obs))
            actions += torch.normal(0, actor.action_scale * EXPLORATION_NOISE)
            actions = actions.numpy().clip(envs.single_action_space.low,
                                           envs.single_action_space.high)

    # execute the game
    next_obs, rewards, terminations, truncations, infos = envs.step(actions)

    # record rewards for plotting purposes
    if "episode" in infos:
        for idx in range(NUM_ENVS):
            if truncations[idx] or terminations[idx]:
                assert infos["episode"]["l"]>0
                h_rwd.append(infos["episode"]["r"][idx])
                if len(h_rwd) % 20 == 0:  # verbose one every ~20 episode
                    print(f"episode=#{len(h_rwd)} global_step={global_step}/{TOTAL_TIMESTEPS},"
                          + f" episodic_return={h_rwd[-1]} length={infos['episode']['l'][idx]}")
                
    # Add samples to replay buffer if env not starting
    if np.any(started):
        rb.add(obs[started], next_obs[started], actions[started],
                rewards[started], terminations[started], infos)
    obs = next_obs
    started = np.logical_not(np.logical_or(terminations, truncations))

    # ALGO LOGIC: training.
    if global_step > TOTAL_TIMESTEPS // 40:
        data = rb.sample(BATCH_SIZE)
        with torch.no_grad():
            next_state_actions = target_actor(data.next_observations)
            qf1_next_target = qf1_target(data.next_observations, next_state_actions)
            next_q_value = data.rewards.flatten() \
                + (1 - data.dones.flatten()) * GAMMA * (qf1_next_target).view(-1)

        qf1_a_values = qf1(data.observations, data.actions).view(-1)
        qf1_loss = F.mse_loss(qf1_a_values, next_q_value)

        # optimize the model
        q_optimizer.zero_grad()
        qf1_loss.backward()
        q_optimizer.step()

        if global_step % TRAIN_FREQUENCY == 0:
            actor_loss = -qf1(data.observations, actor(data.observations)).mean()
            actor_optimizer.zero_grad()
            actor_loss.backward()
            actor_optimizer.step()

        # update the target network
        if global_step % TARGET_NETWORK_FREQUENCY == 0:
            for param, target_param in zip(actor.parameters(), target_actor.parameters()):
                target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
            for param, target_param in zip(qf1.parameters(), qf1_target.parameters()):
                target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

envs.close()

Let's see the results.

In [None]:
print(f"Total rate of success: {np.mean(h_rwd)}")
plt.plot(np.cumsum(h_rwd) / range(1, len(h_rwd)+1))
plt.title('Learning rate')

Let's look at a roll-out.

In [None]:
envrender = gym.make(ENV_ID, render_mode = 'rgb_array')
canvas = Canvas(width=400, height=600)
display(canvas)

s,_ = envrender.reset()
traj = [s]

canvas.put_image_data(envrender.render(), 0, 0)
time.sleep(0.5)

for i in range(100):
    a = actor(torch.Tensor(s))
    a = a.detach().numpy()[0].clip(envrender.action_space.low,
                                envrender.action_space.high)
    s, r, done, trunc, info = envrender.step(a)
    traj.append(s)

    if done: break
        
    canvas.put_image_data(envrender.render(), 0, 0)
    time.sleep(0.01)



Finally, we can plot the value and policy landscapes.

In [None]:
hs = []
hpol = []
hval = []
for pos in np.arange(-np.pi,np.pi,.1):
    for vel in np.arange(envs.single_observation_space.low[2],
                           envs.single_observation_space.high[2],.1):
        s = torch.tensor([np.cos(pos), np.sin(pos),vel], dtype=torch.float32)
        a = actor(torch.Tensor(s)).clip(torch.tensor(envrender.action_space.low),torch.tensor(envrender.action_space.high)).detach()
        val = qf1(s.reshape(1,3),a).detach()
        hs.append(s.numpy())
        hpol.append(a.numpy())
        hval.append(val.numpy())
        

In [None]:
# Scatter plot
fig,axs = plt.subplots(1,2,figsize=(10,5))
# .. .value
vplot = axs[0].scatter([ np.arctan2(s[1],s[0]) for s in hs ], [ s[2] for s in hs ], c=hval)
plt.colorbar(vplot,label='Max Q-value',ax=axs[0])
axs[0].set_xlabel('Angle')
axs[0].set_ylabel('Vel')
axs[0].set_title('Q-value landscape ')
# ... policy
pyplot = axs[1].scatter([ np.arctan2(s[1],s[0]) for s in hs ], [ s[2] for s in hs ], c=hpol)
plt.colorbar(pyplot,label='Policy',ax=axs[1])
axs[1].set_xlabel('Angle')
axs[1].set_ylabel('Vel')
axs[1].set_title('Policy')
# ... roll-out
axs[0].plot([np.arctan2(qv[1],qv[0]) for qv in traj],[qv[2] for qv in traj],'r-', linewidth=3)
axs[1].plot([np.arctan2(qv[1],qv[0]) for qv in traj],[qv[2] for qv in traj],'r-', linewidth=3)
