In [1]:
from framework import StandardScaler
from dql import DQLAgent, DQLTrainer
from ddpg import DDPGAgent, DDPGTrainer, LichtenbergAgent

from PIL import Image
import gym
import numpy as np

def to_gif(matrices, filepath, duration=25):
    frames = []
    for matrix in matrices:
        image = Image.fromarray(matrix)
        frames.append(image)
    frames[0].save(filepath, save_all=True, append_images=frames[1:], duration=duration, loop=0)

## Submarine

In [2]:
from environments.submarine import ContinuousSubmarine
import time

episodes = []
time_taken = []
env = ContinuousSubmarine(delta_t=1, randomize=False) 
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
featurizer = StandardScaler(state_dim, learn=False)
featurizer.mean = np.array([ 7.73739964, 11.54841878,  1.6379028 ,  1.92430292])
featurizer.var = np.array([ 580.12531577, 1460.05771442,   36.57866531,   37.08183991])
# for training comparison stability, ensure that the featurizer scaling is constant

In [56]:
trainer.env.delta_t = 0.05

In [84]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from mpl_toolkits.mplot3d import Axes3D

def to_gif(matrices, filepath, duration=25):
    frames = []
    for matrix in matrices:
        image = Image.fromarray(matrix)
        frames.append(image)
    frames[0].save(filepath, save_all=True, append_images=frames[1:], duration=duration, loop=0)

# Initialize simulation
state, info = trainer.env.reset()
state = trainer.featurizer.transform_state(state, info)
frames = []
q_frames = []
actions = []
total_reward = 0.0

done = False
while not done:
    action = trainer.agent.act(state, explore=False)
    a = trainer.featurizer.transform_action(action, trainer.env.action_space.low, trainer.env.action_space.high)
    actions.append(a)
    next_state, reward, terminated, truncated, info = trainer.env.step(a)
    done = terminated or truncated
    total_reward += reward
    state = trainer.featurizer.transform_state(next_state, info)
    
    # Render environment frame
    env_frame = trainer.env.render()

    trainer.agent.q_approximator.state = trainer.featurizer.transform_state(trainer.env.state)
    
    # Generate Q-approximator plot
    x = np.linspace(trainer.agent.q_approximator.lower_bound[0], trainer.agent.q_approximator.upper_bound[0], 50)
    y = np.linspace(trainer.agent.q_approximator.lower_bound[1], trainer.agent.q_approximator.upper_bound[1], 50)
    X, Y = np.meshgrid(x, y)
    Z = np.fromiter((trainer.agent.q_approximator.evaluate(np.array([x, y])) for x, y in zip(np.ravel(X), np.ravel(Y))), dtype=float, count=X.size).reshape(X.shape)
    
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap='viridis', alpha=0.7)
    ax.set_xlabel('$a_0$ (angle)')
    ax.set_ylabel('$a_1$ (magnitude)')
    ax.set_title(f'$Q(s_t, a)$ for timestep')
    
    # Convert Q-approximator plot to RGB array
    fig.canvas.draw()
    q_frame = np.array(fig.canvas.renderer.buffer_rgba())
    plt.close(fig)
    
    # Resize Q-approximator plot to be smaller
    scale = 0.45
    f1, f0 = env_frame.shape[1] * scale, env_frame.shape[0] * scale
    q_frame = Image.fromarray(q_frame).resize((int(f1), int(f0)))
    q_frame = np.array(q_frame)
    q_frames.append(q_frame)
    
    # Convert environment frame to PIL image
    env_img = Image.fromarray(env_frame)
    
    # Create blank canvas
    combined_img = Image.new("RGB", (env_img.width, env_img.height), (255, 255, 255))
    combined_img.paste(env_img, (0, 0))
    q_frame = q_frame[20:-30, 80:-60, :]
    combined_img.paste(Image.fromarray(q_frame), (130, env_img.height - q_frame.shape[0] - 90))  # Place Q-approximator in top right
    
    frames.append(np.array(combined_img))
    
    # Stop when x threshold is passed
    if trainer.env.state[1] > 15:
        break

gif_path = 'simulation_q_function.gif'
to_gif(frames, gif_path, duration=25)


In [89]:
import math

# Generate continuation GIF with smoother translation and scaling of the Q-approximator animation
last_frame = frames[-1]
paused_frames = [last_frame] * 30  # Pause effect

# Define the starting and ending positions and scales
start_scale = 0.45
end_scale = 1.0
start_position = (130, env_img.height - q_frames[-1].shape[0] - 90)  # Top-right corner
end_position = ((env_img.width - int(env_frame.shape[1] * end_scale)) // 2,
                (env_img.height - int(env_frame.shape[0] * end_scale)) // 2)  # Center

# Number of frames for the transition
num_transition_frames = 40

# Easing function: Slow at the start, fast in the middle, slow at the end
def ease_in_out_sine(t):
    return -(math.cos(math.pi * t) - 1) / 2

# Generate smooth scaling and translation frames with easing
scaling_frames = []
for i in range(1, num_transition_frames + 1):
    # Calculate eased progress (t) using the easing function
    t = ease_in_out_sine(i / num_transition_frames)

    # Interpolate scale
    scale_factor = start_scale + (end_scale - start_scale) * t

    # Interpolate position
    x_pos = start_position[0] + (end_position[0] - start_position[0]) * t
    y_pos = start_position[1] + (end_position[1] - start_position[1]) * t

    # Resize Q-approximator plot
    resized_q_frame = Image.fromarray(q_frames[-1]).resize(
        (int(env_frame.shape[1] * scale_factor), int(env_frame.shape[0] * scale_factor))
    )
    resized_q_frame = np.array(resized_q_frame)

    # Create new frame with translated and scaled Q-approximator
    new_frame = Image.new("RGB", (env_img.width, env_img.height), (255, 255, 255))
    new_frame.paste(env_img, (0, 0))
    new_frame.paste(Image.fromarray(resized_q_frame), (int(x_pos), int(y_pos)))

    scaling_frames.append(np.array(new_frame))

# Save the continuation GIF
gif_cont_path = 'q_function_transition.gif'
to_gif(paused_frames + scaling_frames + 30 * [scaling_frames[-1]], gif_cont_path, duration=25)

In [91]:
class LichtenbergFigure:
    def __init__(self, grid, center, bounds):
        self.grid = grid
        self.Rc = grid.shape[0] // 2
        self.center = center
        self.bounds = bounds

        self.trigger = self.center
        self.lower, self.upper = self.bounds

        self.scale = 1
        self.rot = 0

    def sample(self, n):
        flat = self.grid.flatten()
        indices = np.random.choice(len(flat), p=flat/np.sum(flat), size=n)
        coords = []
        factor = (self.upper - self.lower) / (self.Rc * 2)
        for idx in indices:
            coord = np.unravel_index(idx, self.grid.shape)
            # 2d rotate the coord about the center (Rc, Rc)
            coord = np.array(coord) - self.Rc
            coord = np.dot(coord, [[np.cos(self.rot), -np.sin(self.rot)], [np.sin(self.rot), np.cos(self.rot)]]) + self.Rc # 2d rotation
            # scale the coord about (Rc, Rc)
            coord = (coord - self.Rc) * self.scale + self.Rc
            coord = factor * coord + self.lower + self.trigger - self.center
            coords.append(coord)
        
        return coords
    
    def rand_transform(self, trigger):
        self.scale = np.random.uniform(0.01, 1)
        self.rot = np.random.uniform(0, 2*np.pi)
        self.trigger = trigger

    def copy(self, ref=1.):
        copy = LichtenbergFigure(self.grid.copy(), self.center, self.bounds)
        copy.scale = self.scale * ref
        copy.rot = self.rot
        copy.trigger = self.trigger
        return copy

In [115]:
# Initialize the optimization process
J = trainer.agent.q_approximator  # Replace with your Q-approximator
n_iter = 3
pop = 15

figures = []
max_samples = []
sampled_points = []

trigger = J.center() + np.random.normal(0, 0.1, 2)
best_coords = trigger
best_fitness = J.evaluate(trigger)

lf = LichtenbergFigure(np.load('figure2d.npy'), trigger, J.bounds())

# Define constant z_lim
x = np.linspace(J.lower_bound[0], J.upper_bound[0], 200)
y = np.linspace(J.lower_bound[1], J.upper_bound[1], 200)
X, Y = np.meshgrid(x, y)
Z = np.array([J.evaluate(np.array([x, y])) for x, y in zip(X.flatten(), Y.flatten())]).reshape(X.shape)
min_z = np.min(Z.flatten())
max_z = np.max(Z.flatten())

frames = []
best_coords_history = []  # To store all best coordinates

for it in range(n_iter):
    lf.rand_transform(trigger)
    samples = lf.sample(pop)
    sampled_points.extend(samples)
    figures.append(lf.sample(10000))  # For figure display

    # Animate the Lichtenberg figure by expanding radius
    radius_steps = 20
    for r in np.linspace(0, 1, radius_steps):
        fig = plt.figure(figsize=(10, 8), dpi=200)
        ax = fig.add_subplot(111, projection='3d')

        # Plot the Q-approximator surface
        ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap='viridis', alpha=0.7, edgecolor='none')

        # Plot the Lichtenberg figure with expanding radius
        figure = np.array(figures[-1])
        mask = np.linalg.norm(figure - trigger, axis=1) <= r * np.max(np.linalg.norm(figure - trigger, axis=1))
        figure_x = figure[mask, 0]
        figure_y = figure[mask, 1]
        figure_z = [min_z for _ in figure[mask]]
        ax.scatter(figure_x, figure_y, figure_z, color='black', alpha=0.5, s=0.1)

        # Plot all previous best points in red
        if best_coords_history:
            best_x = [p[0] for p in best_coords_history]
            best_y = [p[1] for p in best_coords_history]
            best_z = [J.evaluate(p) for p in best_coords_history]
            ax.scatter(best_x, best_y, best_z, color='red', s=100, edgecolors='black', label='Best Points')
            ax.plot(best_x, best_y, best_z, color='red', linestyle='--', label='Best Path')

        # Set labels, title, and z_lim
        ax.set_xlabel('$a_0$ (angle)')
        ax.set_ylabel('$a_1$ (magnitude)')
        ax.set_title(f'$Q(s_t, a)$ for timestep')
        ax.set_zlim(min_z, max_z)

        # Convert plot to image
        fig.canvas.draw()
        frame = np.array(fig.canvas.renderer.buffer_rgba())
        frames.append(frame)
        plt.close(fig)

    # Animate the sampled points rising from the bottom
    rise_steps = 20
    for t in np.linspace(0, 1, rise_steps):
        fig = plt.figure(figsize=(10, 8), dpi=200)
        ax = fig.add_subplot(111, projection='3d')

        # Plot the Q-approximator surface
        ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap='viridis', alpha=0.7, edgecolor='none')

        # Plot the Lichtenberg figure
        figure = np.array(figures[-1])
        figure_x = figure[:, 0]
        figure_y = figure[:, 1]
        figure_z = [min_z for _ in figure]
        ax.scatter(figure_x, figure_y, figure_z, color='black', alpha=0.5, s=0.1)

        # Animate sampled points rising
        sampled_x = [p[0] for p in samples]
        sampled_y = [p[1] for p in samples]
        sampled_z = [(1 - t) * min_z + t * J.evaluate(p) for p in samples]
        ax.scatter(sampled_x, sampled_y, sampled_z, color='blue', alpha=0.8, s=50)

        # Plot all previous best points in red
        if best_coords_history:
            best_x = [p[0] for p in best_coords_history]
            best_y = [p[1] for p in best_coords_history]
            best_z = [J.evaluate(p) for p in best_coords_history]
            ax.scatter(best_x, best_y, best_z, color='red', s=100, edgecolors='black', label='Best Points')
            ax.plot(best_x, best_y, best_z, color='red', linestyle='--', label='Best Path')

        # Set labels, title, and z_lim
        ax.set_xlabel('$a_0$ (angle)')
        ax.set_ylabel('$a_1$ (magnitude)')
        ax.set_title(f'$Q(s_t, a)$ for timestep')
        ax.set_zlim(min_z, max_z)

        # Convert plot to image
        fig.canvas.draw()
        frame = np.array(fig.canvas.renderer.buffer_rgba())
        frames.append(frame)
        plt.close(fig)

    # Highlight the best point
    sample_fitnesses = [(J.evaluate(s), s) for s in samples]
    max_fitness, max_sample = max(sample_fitnesses, key=lambda x: x[0])
    max_samples.append(max_sample)

    if max_fitness > best_fitness:
        best_fitness = max_fitness
        best_coords = max_sample

    best_coords_history.append(best_coords)  # Add to history
    trigger = best_coords

    # Plot the final frame with the best point highlighted
    fig = plt.figure(figsize=(10, 8), dpi=200)
    ax = fig.add_subplot(111, projection='3d')

    # Plot the Q-approximator surface
    ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap='viridis', alpha=0.7, edgecolor='none')

    # Plot the Lichtenberg figure
    figure = np.array(figures[-1])
    figure_x = figure[:, 0]
    figure_y = figure[:, 1]
    figure_z = [min_z for _ in figure]
    ax.scatter(figure_x, figure_y, figure_z, color='black', alpha=0.5, s=0.1)

    # Plot sampled points
    sampled_x = [p[0] for p in samples]
    sampled_y = [p[1] for p in samples]
    sampled_z = [J.evaluate(p) for p in samples]
    ax.scatter(sampled_x, sampled_y, sampled_z, color='blue', alpha=0.8, s=50)

    # Highlight the best point in green
    ax.scatter(best_coords[0], best_coords[1], best_fitness, color='green', s=100, edgecolors='black', label='Selected Point')

    # Plot all previous best points in red
    if best_coords_history:
        best_x = [p[0] for p in best_coords_history]
        best_y = [p[1] for p in best_coords_history]
        best_z = [J.evaluate(p) for p in best_coords_history]
        ax.scatter(best_x, best_y, best_z, color='red', s=100, edgecolors='black', label='Best Points')
        ax.plot(best_x, best_y, best_z, color='red', linestyle='--', label='Best Path')

    # Set labels, title, and z_lim
    ax.set_xlabel('$a_0$ (angle)')
    ax.set_ylabel('$a_1$ (magnitude)')
    ax.set_title(f'$Q(s_t, a)$ for timestep')
    ax.set_zlim(min_z, max_z)

    # Convert plot to image
    fig.canvas.draw()
    frame = np.array(fig.canvas.renderer.buffer_rgba())
    frames.append(frame)
    plt.close(fig)

    # Add a pause after the points are sampled and the best point is highlighted
    for _ in range(20):  # Add 20 frames for pause
        frames.append(frame)

fig = plt.figure(figsize=(10, 8), dpi=200)
ax = fig.add_subplot(111, projection='3d')
ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap='viridis', alpha=0.7, edgecolor='none')
ax.scatter(best_coords[0], best_coords[1], best_fitness, color='red', s=150, edgecolors='black', label='Selected Point')
ax.set_xlabel('$a_0$ (angle)')
ax.set_ylabel('$a_1$ (magnitude)')
ax.set_title(f'$Q(s_t, a)$ for timestep')
ax.set_zlim(min_z, max_z)
fig.canvas.draw()
frame = np.array(fig.canvas.renderer.buffer_rgba())
frames.append(frame)
plt.close(fig)
for _ in range(20):  # Add 20 frames for pause
    frames.append(frame)

to_gif(frames, 'optimization_process.gif', duration=25)

In [122]:
import imageio.v2 as imageio
import cv2
import numpy as np

# Function to read a GIF and get its frames
def read_gif(filename):
    reader = imageio.get_reader(filename)
    frames = [cv2.cvtColor(np.array(frame), cv2.COLOR_RGBA2BGR) for frame in reader]
    return frames

# Load GIFs
gif1_frames = read_gif("simulation_q_function.gif")
gif2_frames = read_gif("q_function_transition.gif")
gif3_frames = read_gif("optimize1.gif")

# Find the GIF with the lowest resolution
min_height = min(frame.shape[0] for gif in [gif1_frames, gif2_frames, gif3_frames] for frame in gif)
min_width = min(frame.shape[1] for gif in [gif1_frames, gif2_frames, gif3_frames] for frame in gif)

# Function to resize frames to the smallest resolution while keeping the aspect ratio
def resize_frame(frame, target_size):
    h, w, _ = frame.shape
    scale = min(target_size[0] / h, target_size[1] / w)
    new_size = (int(w * scale), int(h * scale))
    resized_frame = cv2.resize(frame, new_size, interpolation=cv2.INTER_AREA)
    
    # Centering the frame
    pad_top = (target_size[0] - resized_frame.shape[0]) // 2
    pad_bottom = target_size[0] - resized_frame.shape[0] - pad_top
    pad_left = (target_size[1] - resized_frame.shape[1]) // 2
    pad_right = target_size[1] - resized_frame.shape[1] - pad_left
    
    padded_frame = cv2.copyMakeBorder(resized_frame, pad_top, pad_bottom, pad_left, pad_right, cv2.BORDER_CONSTANT, value=(0, 0, 0))
    
    return padded_frame

# Resize all frames
target_size = (min_height, min_width)
all_frames = []
for gif in [gif1_frames, gif2_frames, gif3_frames]:
    for frame in gif:
        all_frames.append(resize_frame(frame, target_size))

# Define video writer
output_filename = "output.mp4"
fps = 30  # Increased frame rate
height, width = target_size
fourcc = cv2.VideoWriter_fourcc(*'mp4v')

video_writer = cv2.VideoWriter(output_filename, fourcc, fps, (width, height))

# Write frames to video
for frame in all_frames:
    video_writer.write(frame)

video_writer.release()
print("Video saved as", output_filename)


Video saved as output.mp4


In [27]:
for _ in range(1):
    # agent = DDPGAgent(state_dim, action_dim, batch_size=32, tau=0.05)
    agent = LichtenbergAgent(state_dim, action_dim, "figure2d.npy", batch_size=32, tau=0.1, n_iter=3, pop=15)

    trainer = DDPGTrainer(env, agent, featurizer, until_convergence=True, convergence_reward=100)
    start = time.perf_counter()
    trainer.train(episodes=10000)
    while not trainer.has_converged(100):
        trainer.train(episodes=100)
    episodes.append(len(trainer.episode_rewards))
    time_taken.append(int(round(time.perf_counter() - start)))
    # print(episodes[-1], time_taken[-1])
    # print(trainer.episode_rewards, trainer.actor_losses, trainer.critic_losses)
    # trainer.plot_losses()
    # trainer.plot_rewards()

 20%|██        | 2026/10000 [02:17<09:02, 14.69it/s]


KeyboardInterrupt: 

## Parking

In [None]:
import torch
class ParkingFeaturizer(StandardScaler):
    def __init__(self):
        super().__init__(19)
    
    def transform_state(self, state, info=None):
        speed = info['speed'] if info else 0.0
        state = np.concat([state['observation'], state['achieved_goal'], state['desired_goal'], np.array([speed])])
        return torch.tensor(state, dtype=torch.float32).unsqueeze(0)


In [None]:
import highway_env.envs.parking_env as parking_env

env = parking_env.ParkingEnv({
    "observation": {
        "type": "KinematicsGoal",
        "features": ['x', 'y', 'vx', 'vy', 'cos_h', 'sin_h'],
        "scales": [100, 100, 5, 5, 1, 1],
        "normalize": True
    },
    "action": {
        "type": "ContinuousAction"
    },
    "simulation_frequency": 15,
    "policy_frequency": 3,
    "screen_width": 600,
    "screen_height": 300,
    "centering_position": [0.5, 0.5],
    "scaling": 7,
    "show_trajectories": False,
    "render_agent": True,
    "offscreen_rendering": True
})
env.render_mode = 'rgb_array'
featurizer = ParkingFeaturizer()
agent = LichtenbergAgent(19, 2, "figure2d.npy", hidden_layers=3, tau=0.05, batch_size=64, n_iter=3, pop=15)
trainer = DDPGTrainer(env, agent, featurizer)

In [None]:
trainer.train(episodes=10000)
# trainer.save()

## Inverted Pendulum

In [None]:
env = gym.make('InvertedDoublePendulum-v4', render_mode='rgb_array')
env.action_space = np.linspace(env.action_space.low, env.action_space.high, 21) # discretize action space
state_dim = env.observation_space.shape[0]

featurizer = StandardScaler(state_dim) # apply adaptive scaling to state vectors
agent = DQLAgent( 
                 input_dim=state_dim, # neural net params
                 output_dim=env.action_space.shape[0],
                 hidden_dim=128,
                 hidden_layers=5,
                 batch_size=256,
                 gamma=0.99, # discount factor
                 min_epsilon=0.1, epsilon_decay=0.999, # exploration rate and decay
                 tau=0.005 # update rate of target net
                )

trainer = DQLTrainer(env, agent, featurizer)
trainer.train(episodes=300)
info = trainer.run_episode(False)
print(f"cumulative reward: {info['reward']:.2f}, steps: {info['steps']}")
trainer.plot_losses()

In [None]:
env = gym.make('InvertedDoublePendulum-v4', render_mode='rgb_array')
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
featurizer = StandardScaler(state_dim)
agent = DDPGAgent(state_dim, action_dim, hidden_layers=2, tau=0.01, batch_size=256)
featurizer = StandardScaler(state_dim)
trainer = DDPGTrainer(env, agent, featurizer)
trainer.train(episodes=10000)
trainer.plot_losses()

## Ball and Beam Problem

In [None]:
import ballbeam_gym.envs
import warnings
warnings.filterwarnings('ignore')

env = ballbeam_gym.envs.BallBeamSetpointEnv(timestep=0.02, setpoint=-0.8, beam_length=2.0, max_angle=0.5, max_timesteps=500, action_mode='discrete')
env.action_space = np.arange(3)
state_dim = env.observation_space.shape[0]

featurizer = StandardScaler(state_dim)
agent = DQLAgent(state_dim, env.action_space.shape[0], batch_size=128, epsilon_decay=0.9995)

In [None]:
trainer = DQLTrainer(env, agent, featurizer)
trainer.train(episodes=1000)
trainer.plot_losses()

In [None]:
info = trainer.run_episode()
to_gif(info['rgb_arrays'], 'ball_and_beam.gif', duration=25)

## Panda Reach

In [3]:
import torch
class PandaFeaturizer(StandardScaler):
    def __init__(self):
        super().__init__(12)
    
    def transform_state(self, state, info=None):
        state = np.concatenate([state['achieved_goal'], state['desired_goal'], state['observation']])
        return torch.tensor(state, dtype=torch.float32).unsqueeze(0)

In [110]:
import gymnasium 
import panda_gym
env = gymnasium.make('PandaReach-v3', render_mode="rgb_array")
featurizer = PandaFeaturizer()
agent = DDPGAgent(12, env.action_space.shape[0])
trainer = DDPGTrainer(env, agent, featurizer)


argv[0]=--background_color_red=0.8745098114013672
argv[1]=--background_color_green=0.21176470816135406
argv[2]=--background_color_blue=0.1764705926179886
