# Classic Control: Control theory problems from the classic RL literature

<br><br>

In this notebook we will present some classic environments in Reinforcement Learning research. These environments have continuous states spaces (i.e., infinite possible states) and therefore tabular methods cannot solve them. To tackle these environments (and more complex ones) we will have two tools:

- Extend the tabular methods with the techniques of discretization and tile coding
- Use function approximators (Neural Networks)

<br>

In [1]:
# Install gymnasium
!pip install -q gymnasium

In [2]:

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML, display

%matplotlib inline

In [3]:

def display_video(frames):
    if not frames:
        raise ValueError("No frames to display")

    # Create figure without displaying it immediately
    fig = plt.figure(figsize=(5, 5))
    ax = fig.add_subplot(111)
    ax.set_axis_off()
    im = ax.imshow(frames[0])

    def update(frame):
        im.set_data(frame)
        return [im]

    # Create animation
    anim = animation.FuncAnimation(fig, update, frames=frames, interval=50, blit=True, repeat=False)

    # Convert to HTML5 video and close figure to avoid static display
    html_video = anim.to_html5_video()
    plt.close(fig)  # Prevent static image from showing
    return HTML(html_video)

def test_env(environment, episodes=1):
    frames = []

    for episode in range(episodes):
        # Handle reset() return value
        result = environment.reset()
        if isinstance(result, tuple):
            state, info = result
        else:
            state = result
            info = {}

        done = False
        frame = environment.render()
        if frame is not None:
            frames.append(frame)
        else:
            print("Warning: Render returned None")

        while not done:
            action = environment.action_space.sample()
            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated

            frame = environment.render()
            if frame is not None:
                frames.append(frame)
            state = next_state

    return display_video(frames)


In [4]:

# Create environment with render_mode
env = gym.make('CartPole-v1', render_mode='rgb_array')

# Run simulation and display animation only
html_output = test_env(env, 1)
display(html_output)  # Display only the video

env.close()

# The state

* The states of the cartpole task will be represented by a vector of four real numbers:

        Num     Observation               Min                     Max
        0       Cart Position             -4.8                    4.8
        1       Cart Velocity             -Inf                    Inf
        2       Pole Angle                -0.418 rad (-24 deg)    0.418 rad (24 deg)
        3       Pole Angular Velocity     -Inf                    Inf


In [5]:
env.observation_space

Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)

# The actions available

**We can perform two actions in this environment:**

        0     Push cart to the left.
        1     Push cart to the right.



In [6]:
env.action_space

Discrete(2)

In [7]:

def display_video(frames, actions, states):
    if not frames:
        raise ValueError("No frames to display")

    fig = plt.figure(figsize=(6, 6))
    ax = fig.add_subplot(111)
    ax.set_axis_off()
    im = ax.imshow(frames[0])
    # Add text for state and action
    state_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))
    action_text = ax.text(0.05, 0.90, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))

    def update(idx):
        im.set_data(frames[idx])
        # Update text with state and action
        state = states[idx]
        action = actions[idx]
        state_text.set_text(f"Pos: {state[0]:.2f}, Vel: {state[1]:.2f}\nAngle: {state[2]:.2f}, AngVel: {state[3]:.2f}")
        action_text.set_text(f"Action: {'Left' if action == 0 else 'Right'}")
        return [im, state_text, action_text]

    anim = animation.FuncAnimation(fig, update, frames=len(frames), interval=50, blit=True, repeat=False)
    html_video = anim.to_html5_video()
    plt.close(fig)
    return HTML(html_video)

def test_env(environment, episodes=1):
    frames = []
    states = []
    actions = []

    for episode in range(episodes):
        result = environment.reset()
        if isinstance(result, tuple):
            state, info = result
        else:
            state = result
            info = {}

        done = False
        frame = environment.render()
        if frame is not None:
            frames.append(frame)
            states.append(state)
            actions.append(None)  # No action for initial state

        while not done:
            action = environment.action_space.sample()
            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated

            frame = environment.render()
            if frame is not None:
                frames.append(frame)
                states.append(next_state)
                actions.append(action)
            state = next_state

    return display_video(frames, actions, states)

# Create environment
env = gym.make('CartPole-v1', render_mode='rgb_array')
print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)

# Run and display animation
html_output = test_env(env, 1)
display(html_output)

env.close()

Observation Space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Action Space: Discrete(2)


# Acrobot: Swing the bar up to a certain height.

In [8]:
# Create environment with render_mode
env = gym.make('Acrobot-v1', render_mode='rgb_array')

# Run simulation and display animation only
html_output = test_env(env, 1)
display(html_output)  # Display only the video

env.close()

# The state

The states of the cartpole task will be represented by a vector of six real numbers. The first two are the cosine and sine of the first joint. The next two are the cosine and sine of the other joint. The last two are the angular velocities of each joint.
    
$\cos(\theta_1), \sin(\theta_1), \cos(\theta_2), \sin(\theta_2), \dot\theta_1, \dot\theta_2$

In [9]:
env.observation_space

Box([ -1.        -1.        -1.        -1.       -12.566371 -28.274334], [ 1.        1.        1.        1.       12.566371 28.274334], (6,), float32)

# The actions available

We can perform two actions in this environment:

    0    Apply +1 torque on the joint between the links.
    1    Apply -1 torque on the joint between the links.

In [10]:
env.action_space

Discrete(3)

# MountainCar: Reach the goal from the bottom of the valley.

In [13]:
import matplotlib

def display_video(frames):
    if not frames:
        raise ValueError("No frames to display")

    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')  # Use non-interactive backend for video generation
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)  # Restore original backend
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])

    def update(frame):
        im.set_data(frame)
        return [im]

    anim = animation.FuncAnimation(fig=fig, func=update, frames=frames,
                                   interval=50, blit=True, repeat=False)
    html_video = anim.to_html5_video()
    plt.close(fig)  # Prevent static image
    return HTML(html_video)

def test_env(environment, episodes=1):  # Changed to 1 episode as per your call
    frames = []
    for episode in range(episodes):
        # Handle reset() return value (observation, info)
        result = environment.reset()
        if isinstance(result, tuple):
            state, info = result
        else:
            state = result
            info = {}

        done = False
        frame = environment.render()  # No mode argument
        if frame is not None:
            frames.append(frame)
        else:
            print("Warning: Render returned None")

        while not done:
            action = environment.action_space.sample()
            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated
            frame = environment.render()  # No mode argument
            if frame is not None:
                frames.append(frame)
            state = next_state

    return display_video(frames)



In [14]:
# Create environment with render_mode
env = gym.make('MountainCar-v0', render_mode='rgb_array')

# Run simulation and display animation
html_output = test_env(env, 1)
display(html_output)

env.close()

# The state

The observation space consists of the car position $\in [-1.2, 0.6]$ and car velocity $\in [-0.07, 0.07]$

In [16]:
env.observation_space

Box([-1.2  -0.07], [0.6  0.07], (2,), float32)

# The actions available


The actions available three:

    0    Accelerate to the left.
    1    Don't accelerate.
    2    Accelerate to the right.

In [18]:
env.action_space

Discrete(3)

In [19]:


def display_video(frames, actions, positions):
    if not frames:
        raise ValueError("No frames to display")
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    action_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))
    pos_text = ax.text(0.05, 0.85, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))

    def update(idx):
        im.set_data(frames[idx])
        action = actions[idx]
        position = positions[idx]
        action_text.set_text(f"Action: {['Left', 'None', 'Right'][action if action is not None else 1]}")
        pos_text.set_text(f"Pos: {position:.2f}")
        return [im, action_text, pos_text]

    anim = animation.FuncAnimation(fig=fig, func=update, frames=len(frames),
                                   interval=50, blit=True, repeat=False)
    html_video = anim.to_html5_video()
    plt.close(fig)
    return HTML(html_video)

def test_env(environment, episodes=1):
    frames = []
    actions = []
    positions = []

    for episode in range(episodes):
        result = environment.reset()
        if isinstance(result, tuple):
            state, info = result
        else:
            state = result
            info = {}

        done = False
        frame = environment.render()
        if frame is not None:
            frames.append(frame)
            actions.append(None)
            positions.append(state[0])

        while not done:
            position, velocity = state
            # Optimized heuristic to reach x >= 0.5
            if velocity < 0:  # Moving left
                action = 2 if position < -0.8 else 0  # Push right if far left, else swing back
            else:  # Moving right
                action = 0 if position > -0.2 else 2  # Push left if near right, else push right

            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated
            frame = environment.render()
            if frame is not None:
                frames.append(frame)
                actions.append(action)
                positions.append(next_state[0])
            state = next_state

            # Stop when goal is reached
            if next_state[0] >= 0.5:
                print(f"Goal reached at position {next_state[0]:.2f}!")
                break  # End episode cleanly

    return display_video(frames, actions, positions)

# Create environment
env = gym.make('MountainCar-v0', render_mode='rgb_array')
print("Observation Space:", env.observation_space)  # Confirm Box([-1.2 -0.07], [0.6 0.07], (2,), float32)
print("Action Space:", env.action_space)  # Discrete(3)
html_output = test_env(env, 1)
display(html_output)

env.close()

Observation Space: Box([-1.2  -0.07], [0.6  0.07], (2,), float32)
Action Space: Discrete(3)


# Pendulum: swing it and keep it upright

In [20]:

def display_video(frames, actions):
    if not frames:
        raise ValueError("No frames to display")
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    action_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))

    def update(idx):
        im.set_data(frames[idx])
        action = actions[idx]
        action_text.set_text(f"Torque: {action[0]:.2f}" if action is not None else "Torque: N/A")
        return [im, action_text]

    anim = animation.FuncAnimation(fig=fig, func=update, frames=len(frames),
                                   interval=50, blit=True, repeat=False)
    html_video = anim.to_html5_video()
    plt.close(fig)
    return HTML(html_video)

def test_env(environment, episodes=1):
    frames = []
    actions = []

    for episode in range(episodes):
        result = environment.reset()
        if isinstance(result, tuple):
            state, info = result
        else:
            state = result
            info = {}

        done = False
        frame = environment.render()
        if frame is not None:
            frames.append(frame)
            actions.append(None)

        while not done:
            action = environment.action_space.sample()  # Random torque [-2, 2]
            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated
            frame = environment.render()
            if frame is not None:
                frames.append(frame)
                actions.append(action)
            state = next_state

    return display_video(frames, actions)

# Create and test environment
env = gym.make('Pendulum-v1', render_mode='rgb_array')
print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)
html_output = test_env(env, 1)
display(html_output)

env.close()

Observation Space: Box([-1. -1. -8.], [1. 1. 8.], (3,), float32)
Action Space: Box(-2.0, 2.0, (1,), float32)


In [21]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML, display
from collections import deque
import random

# Neural network for Actor
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, action_dim),
            nn.Tanh()
        )
        self.max_action = max_action

    def forward(self, state):
        return self.max_action * self.net(state)

# Neural network for Critic
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim + action_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, state, action):
        return self.net(torch.cat([state, action], dim=-1))

# DDPG Agent
class DDPG:
    def __init__(self, state_dim, action_dim, max_action):
        self.actor = Actor(state_dim, action_dim, max_action).float()
        self.actor_target = Actor(state_dim, action_dim, max_action).float()
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)

        self.critic = Critic(state_dim, action_dim).float()
        self.critic_target = Critic(state_dim, action_dim).float()
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.max_action = max_action
        self.memory = deque(maxlen=10000)
        self.batch_size = 64
        self.gamma = 0.99
        self.tau = 0.005

    def act(self, state, noise=0.1):
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).detach().numpy()[0]
        if noise > 0:
            action += np.random.normal(0, noise * self.max_action, size=action.shape)
        return np.clip(action, -self.max_action, self.max_action)

    def train(self):
        if len(self.memory) < self.batch_size:
            return
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states)
        actions = torch.FloatTensor(actions)
        rewards = torch.FloatTensor(rewards).unsqueeze(1)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones).unsqueeze(1)

        # Critic update
        next_actions = self.actor_target(next_states)
        target_q = self.critic_target(next_states, next_actions)
        target_q = rewards + (1 - dones) * self.gamma * target_q
        current_q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_q, target_q.detach())
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Actor update
        actor_actions = self.actor(states)
        actor_loss = -self.critic(states, actor_actions).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # Soft update target networks
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    def store(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

# Training loop
def train_pendulum(env, agent, episodes=200):
    for episode in range(episodes):
        state, _ = env.reset()
        episode_reward = 0
        done = False
        step_count = 0

        while not done and step_count < 500:  # Max steps per episode
            action = agent.act(state, noise=0.1 if episode < 150 else 0)  # Reduce noise after 150 episodes
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            agent.store(state, action, reward, next_state, done)
            agent.train()
            state = next_state
            episode_reward += reward
            step_count += 1

        print(f"Episode {episode + 1}/{episodes}, Reward: {episode_reward:.2f}")

# Visualization
def display_video(frames, actions):
    if not frames:
        raise ValueError("No frames to display")
    orig_backend = matplotlib.get_backend()
    matplotlib.use('Agg')
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    matplotlib.use(orig_backend)
    ax.set_axis_off()
    ax.set_aspect('equal')
    ax.set_position([0, 0, 1, 1])
    im = ax.imshow(frames[0])
    action_text = ax.text(0.05, 0.95, '', transform=ax.transAxes, color='white', fontsize=10, bbox=dict(facecolor='black', alpha=0.5))

    def update(idx):
        im.set_data(frames[idx])
        action = actions[idx]
        action_text.set_text(f"Torque: {action[0]:.2f}" if action is not None else "Torque: N/A")
        return [im, action_text]

    anim = animation.FuncAnimation(fig=fig, func=update, frames=len(frames),
                                   interval=50, blit=True, repeat=False)
    html_video = anim.to_html5_video()
    plt.close(fig)
    return HTML(html_video)

def test_env(environment, agent, episodes=1):
    frames = []
    actions = []

    for episode in range(episodes):
        state, _ = environment.reset()
        done = False
        frame = environment.render()
        if frame is not None:
            frames.append(frame)
            actions.append(None)

        while not done:
            action = agent.act(state, noise=0)  # No noise for testing
            next_state, reward, terminated, truncated, info = environment.step(action)
            done = terminated or truncated
            frame = environment.render()
            if frame is not None:
                frames.append(frame)
                actions.append(action)
            state = next_state

    return display_video(frames, actions)

# Create environment and train
env = gym.make('Pendulum-v1', render_mode='rgb_array')
agent = DDPG(state_dim=3, action_dim=1, max_action=2.0)
train_pendulum(env, agent, episodes=200)  # Train for 200 episodes
html_output = test_env(env, agent, 1)
display(html_output)

env.close()

  states = torch.FloatTensor(states)


Episode 1/200, Reward: -1319.11
Episode 2/200, Reward: -1351.08
Episode 3/200, Reward: -1572.50
Episode 4/200, Reward: -1722.58
Episode 5/200, Reward: -1567.59
Episode 6/200, Reward: -1556.12
Episode 7/200, Reward: -1556.13
Episode 8/200, Reward: -1481.03
Episode 9/200, Reward: -1549.17
Episode 10/200, Reward: -1012.75
Episode 11/200, Reward: -1469.10
Episode 12/200, Reward: -1117.16
Episode 13/200, Reward: -1128.65
Episode 14/200, Reward: -1527.75
Episode 15/200, Reward: -1342.05
Episode 16/200, Reward: -878.59
Episode 17/200, Reward: -1137.15
Episode 18/200, Reward: -1161.31
Episode 19/200, Reward: -1220.15
Episode 20/200, Reward: -1119.93
Episode 21/200, Reward: -1167.48
Episode 22/200, Reward: -904.56
Episode 23/200, Reward: -1029.55
Episode 24/200, Reward: -1104.45
Episode 25/200, Reward: -1465.75
Episode 26/200, Reward: -1200.99
Episode 27/200, Reward: -1188.57
Episode 28/200, Reward: -1042.28
Episode 29/200, Reward: -1088.53
Episode 30/200, Reward: -1003.30
Episode 31/200, Rewar