## Step 1: Physical System

In [None]:
import numpy as np

# ========== Constants (from the paper) ========== #

# Simulation parameter
dt = 0.05  # s

# Pendulum parameters
omega = 4.882          # rad/s
kv = 0.07              # viscous friction
g = 9.81
l = g / omega**2       # pendulum length

# Cart / actuator parameters
tau = 0.0482
kU = 0.051
fc = 1.166
fd = -0.097

# Noise parameter (measured from the physical system in the paper)
sigma_theta = 0.0026     # rad (angle sensor noise)


In [None]:
# ========== Class to simulate the physical system ========== #

class CartPoleSimulator:
    def __init__(self):
        # True physical state
        self.theta = np.pi      # upright position
        self.theta_dot = 0.0
        self.x = 0.0
        self.x_dot = 0.0

    def step(self, U):

        # function to simulate one step as a function of the control voltage U
        # it returns only noisy observations (check paper and report)

        # ---------- Eq. (3): commanded velocity ---------- #
        x_dot_c = kU * U

        # ---------- Eq. (2): cart acceleration ---------- #
        x_ddot = (
            (x_dot_c - self.x_dot) / tau
            - fc * np.sign(self.x_dot)
            - fd
        )

        # ---------- Eq. (1): pendulum angular acceleration ---------- #
        theta_ddot = (
            - kv * self.theta_dot
            - omega**2 * np.sin(self.theta)
            - (x_ddot / l) * np.cos(self.theta)
        )

        # ---------- Integrate (semi-implicit Euler) ---------- #
        self.theta_dot += theta_ddot * dt
        self.theta += self.theta_dot * dt

        self.x_dot += x_ddot * dt
        self.x += self.x_dot * dt

        # ---------- Measurement noise (as in the paper) ---------- #
        # Noisy angle measurement
        theta_m = self.theta + np.random.normal(0.0, sigma_theta)

        # Induced angular velocity noise
        sigma_theta_dot = sigma_theta / dt
        theta_dot_m = self.theta_dot + np.random.normal(0.0, sigma_theta_dot)

        # ---------- Observation ---------- #
        # use sin and cos in the state and not θ (Why? -> paper - report)
        observation = np.array([
            np.sin(theta_m),
            np.cos(theta_m),
            theta_dot_m,
            self.x,
            self.x_dot
        ])

        return observation


    def reset(self):
      # ----- True physical state -----
      self.theta = np.pi          # upright position
      self.theta_dot = 0.0
      self.x = 0.0
      self.x_dot = 0.0

      # ----- Small random perturbations -----
      # Helps exploration and avoids overfitting a single IC
      self.theta += np.random.uniform(-0.05, 0.05)   # ~ +-3 degrees
      self.x += np.random.uniform(-0.02, 0.02)

      return self._make_observation()

    def _make_observation(self):
        # Noisy angle measurement
        theta_m = self.theta + np.random.normal(0.0, sigma_theta)

        # Noisy angular velocity measurement
        theta_dot_m = self.theta_dot + np.random.normal(0.0, sigma_theta / dt)

        obs = np.array([
            np.sin(theta_m),
            np.cos(theta_m),
            theta_dot_m,
            self.x,
            self.x_dot
        ], dtype=np.float32)

        return obs
    
    def reset_downward(self):
        self.theta = 0.0
        self.theta_dot = 0.0
        self.x = 0.0
        self.x_dot = 0.0
        return self._make_observation()






## Step 2: Reward function, Episode Termination Logic

In [None]:
# ========== Constants (from the paper) ========== #

# Episode / environment limits
x_max = 0.35           # m
theta_dot_max = 14.0   # rad/s
max_steps = 800        # steps per episode (max real system time is 40s)

# Reward scaling
x0 = x_max

In [None]:
# ========== reward function ========== #
def compute_reward(theta, x):
    return 0.5 * (1 - np.cos(theta)) - (x / x0)**2

In [None]:
# ========== function to follow the paper's termination rules ========== #
def check_termination(x, theta_dot, step_count):
    if abs(x) > x_max:
        return True, -400.0
    if abs(theta_dot) > theta_dot_max:
        return True, 0.0
    if step_count >= max_steps:
        return True, 0.0
    return False, 0.0


## Step 3: Q-learning algorithm

In [None]:
# ========== Constants check and move ========== #
U0 = 12                 # [V] constant voltage magnitude

actions = np.array([-U0, 0.0, U0])
N_ACTIONS = len(actions)


# ========== Q-learning hyperparameters (check paper file S1) ========== #

alpha = 0.01      # learning rate
gamma = 0.99      # discount factor
epsilon_min = 0.1




In [None]:
# ========== Discredization Process ========== #

n_bins = {
    "sin_theta": 10,
    "cos_theta": 10,
    "theta_dot": 10,
    "x": 10,
    "x_dot": 10
}

state_ranges = {
    "sin_theta": (-1.0, 1.0),
    "cos_theta": (-1.0, 1.0),
    "theta_dot": (-14.0, 14.0),   # from termination condition
    "x": (-0.35, 0.35),
    "x_dot": (-2.0, 2.0)          # conservative bound
}

def discretize(value, min_val, max_val, n_bins):
    value = np.clip(value, min_val, max_val)
    bin_width = (max_val - min_val) / n_bins
    idx = int((value - min_val) / bin_width)
    return min(idx, n_bins - 1)



def discretize_state(obs):
    sin_t, cos_t, theta_dot, x, x_dot = obs

    return (
        discretize(sin_t, *state_ranges["sin_theta"], n_bins["sin_theta"]),
        discretize(cos_t, *state_ranges["cos_theta"], n_bins["cos_theta"]),
        discretize(theta_dot, *state_ranges["theta_dot"], n_bins["theta_dot"]),
        discretize(x, *state_ranges["x"], n_bins["x"]),
        discretize(x_dot, *state_ranges["x_dot"], n_bins["x_dot"]),
    )


In [None]:
# ========== Q-table initialization ========== #

Q = np.zeros((
    n_bins["sin_theta"],
    n_bins["cos_theta"],
    n_bins["theta_dot"],
    n_bins["x"],
    n_bins["x_dot"],
    N_ACTIONS
), dtype=np.float32)


In [None]:
# ========== Decay Law for ε ========== #
def epsilon_schedule(n, d):
    return max(
        epsilon_min,
        min(1.0, 1.0 - np.log10(n + 1) / d)
    )


In [None]:
# ========== Greedy action selection ========== #
def select_action(state_idx, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(N_ACTIONS)
    return np.argmax(Q[state_idx])


In [None]:
# ========== Q-learning update rule ========== #
def update_q(state, action, reward, next_state):
    best_next = np.max(Q[next_state])
    Q[state + (action,)] += alpha * (
        reward + gamma * best_next - Q[state + (action,)]
    )


In [None]:
# ========== moving average function (check paper) ========== #
def moving_average(data, window):
    if len(data) < window:
        return None
    return float(np.mean(data[-window:]))

In [None]:
def evaluate_q_policy(env, Q, n_eval=10, return_best_traj=False):
    returns = []

    best_return = -np.inf
    best_cos_traj = None

    for _ in range(n_eval):
        obs = env.reset_downward()
        state = discretize_state(obs)

        total_reward = 0.0
        step_count = 0
        done = False

        cos_traj = []

        while not done and step_count < max_steps:
            cos_traj.append(np.cos(env.theta))

            action = np.argmax(Q[state])   # greedy
            U = actions[action]

            obs = env.step(U)
            reward = compute_reward(env.theta, env.x)

            done, terminal_penalty = check_termination(
                env.x, env.theta_dot, step_count
            )
            reward += terminal_penalty

            total_reward += reward
            state = discretize_state(obs)
            step_count += 1

        norm_ret = total_reward / max_steps
        returns.append(norm_ret)

        if return_best_traj and norm_ret > best_return:
            best_return = norm_ret
            best_cos_traj = cos_traj

    mean_ret = float(np.mean(returns))
    std_ret  = float(np.std(returns))

    if return_best_traj:
        return mean_ret, std_ret, best_cos_traj
    else:
        return mean_ret, std_ret


In [None]:
def train_q_learning(env, N_T):
    global_step = 0
    d = N_T / 10   # epsilon decay parameter

    best_eval_return = -np.inf
    best_cos_traj = None

    episode_returns = []
    step_returns = []
    eval_history = []   # (global_step, mean_return, std)

    for episode in range(N_T):
        obs = env.reset()        # upright reset (training)
        state = discretize_state(obs)

        epsilon = epsilon_schedule(episode, d)
        total_reward = 0.0
        done = False
        step_count = 0

        # ===== TRAINING EPISODE =====
        while not done and step_count < max_steps:
            action_idx = select_action(state, epsilon)
            U = actions[action_idx]

            obs = env.step(U)
            reward = compute_reward(env.theta, env.x)

            done, terminal_penalty = check_termination(
                env.x, env.theta_dot, step_count
            )
            reward += terminal_penalty
            total_reward += reward

            next_state = discretize_state(obs)
            update_q(state, action_idx, reward, next_state)

            state = next_state
            step_count += 1
            global_step += 1

            # ===== PAPER EVALUATION =====
            if global_step % 5000 == 0:
                mean_ret, std_ret, cos_traj = evaluate_q_policy(
                    env, Q, n_eval=10, return_best_traj=True
                )

                eval_history.append((global_step, mean_ret, std_ret))

                if mean_ret > best_eval_return:
                    best_eval_return = mean_ret
                    best_cos_traj = cos_traj

        # ----- training diagnostics -----
        norm_ret = total_reward / max_steps
        episode_returns.append(norm_ret)
        step_returns.append((global_step, norm_ret))

        if episode % 1000 == 0 and episode > 0:
            ma300 = moving_average(episode_returns, 300)
            ma_str = f"{ma300:.3f}" if ma300 is not None else "N/A"
            print(
                f"Episode {episode:6d}/{N_T} | "
                f"Steps {global_step:9d} | "
                f"ε={epsilon:.4f} | "
                f"Return={norm_ret:.3f} | "
                f"MA(300)={ma_str}"
            )

    return step_returns, episode_returns, eval_history, best_cos_traj


In [None]:
# ========== Plot Normalized return vs time steps ========== #

import matplotlib.pyplot as plt

def plot_learning_curve(step_returns, title, window=300):
    steps = [s for s, r in step_returns]
    returns = [r for s, r in step_returns]

    plt.figure(figsize=(8, 4))

    # raw returns
    plt.plot(steps, returns, alpha=0.3, label="Normalized return")

    # moving average curve (array)
    if len(returns) >= window:
        ma_curve = np.convolve(
            returns,
            np.ones(window) / window,
            mode="valid"
        )
        ma_steps = steps[window - 1:]
        plt.plot(ma_steps, ma_curve, linewidth=2, label=f"MA({window})")

    plt.xlabel("Time steps")
    plt.ylabel("Normalized return")
    plt.title(title)
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# ========== Plot Normalized return vs time steps like the paper ========== #

def plot_multiple_learning_curves(curves, window=300):

    plt.figure(figsize=(8, 4))

    for step_returns, label in curves:
        steps = [s for s, r in step_returns]
        returns = [r for s, r in step_returns]

        # compute MA curve locally (array)
        if len(returns) >= window:
            ma_curve = np.convolve(
                returns,
                np.ones(window) / window,
                mode="valid"
            )
            ma_steps = steps[window - 1:]

            plt.plot(
                ma_steps,
                ma_curve,
                linewidth=2,
                label=label
            )

    plt.xlabel("Time steps")
    plt.ylabel("Normalized return")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
# ========== Plot cos(θ) vs time steps like the paper ========== #

def plot_cos_theta_q(env, Q, steps=800):
    obs = env.reset_downward()   # θ = 0
    state = discretize_state(obs)

    cos_vals = []

    for t in range(steps):
        cos_vals.append(np.cos(env.theta))

        action = np.argmax(Q[state])   # greedy
        U = actions[action]

        obs = env.step(U)
        state = discretize_state(obs)

    plt.figure(figsize=(6,4))
    plt.plot(cos_vals)
    plt.xlabel("Time steps")
    plt.ylabel(r"$\cos(\theta)$")
    plt.title("Pendulum stabilization (one episode, 800 steps)")
    plt.grid(True)
    plt.tight_layout()
    plt.show()



In [None]:
def plot_best_cos_theta(best_cos_traj):
    plt.figure(figsize=(6,4))
    plt.plot(best_cos_traj)
    plt.xlabel("Time steps")
    plt.ylabel(r"$\cos(\theta)$")
    plt.title("Best greedy episode (800 steps)")
    plt.grid(True)
    plt.tight_layout()
    plt.show()


In [None]:
def plot_q_eval_curve(eval_history, label):
    steps = [s for s, _, _ in eval_history]
    means = [m for _, m, _ in eval_history]

    plt.plot(steps, means, label=label)
    plt.xscale("log")
    plt.xlabel("Time step")
    plt.ylabel("Normalized return")
    plt.grid(True)
    plt.legend()


In [None]:
def smooth_curve(y, window=5):
    if len(y) < window:
        return y
    return np.convolve(y, np.ones(window)/window, mode="valid")


In [None]:
def plot_q_eval_curves(curves):
    plt.figure(figsize=(6, 4))

    for eval_hist, label in curves:
        steps = np.array([s for s, _, _ in eval_hist])
        means = np.array([m for _, m, _ in eval_hist])

        smooth_means = smooth_curve(means, window=30)
        smooth_steps = steps[len(steps) - len(smooth_means):]
        plt.plot(smooth_steps, smooth_means, label=label)

    plt.xscale("log")
    plt.xlabel("Time step")
    plt.ylabel("Normalized return")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
# ========== Run and Plot ========== #
env = CartPoleSimulator()

_, _, eval_hist, cos_tr = train_q_learning(env, N_T=10000)

# Paper-style learning curve
plt.figure(figsize=(6,4))
steps = [s for s, _, _ in eval_hist]
means = [m for _, m, _ in eval_hist]
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()

# Paper-style cos(theta) plot
plot_best_cos_theta(cos_tr)



In [None]:
env = CartPoleSimulator()

_, _, eval_hist5, cos_tr = train_q_learning(env, N_T=100000)

# Paper-style learning curve
plt.figure(figsize=(6,4))
steps = [s for s, _, _ in eval_hist5]
means = [m for _, m, _ in eval_hist5]
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()

# Paper-style cos(theta) plot
# plot_cos_theta_q(env, Q)
plot_best_cos_theta(cos_tr)

In [None]:
env = CartPoleSimulator()

_, _, eval_hist6, cos_tr = train_q_learning(env, N_T=1000000)

# Paper-style learning curve
plt.figure(figsize=(6,4))
steps = [s for s, _, _ in eval_hist6]
means = [m for _, m, _ in eval_hist6]
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()

# Paper-style cos(theta) plot
plot_best_cos_theta(cos_tr)

In [None]:
curves = [
    (eval_hist,  r"Q-learning ($N_T = 10^4$)"),
    (eval_hist5, r"Q-learning ($N_T = 10^5$)"),
    (eval_hist6, r"Q-learning ($N_T = 10^6$)")
]

plot_q_eval_curves(curves)


In [None]:
env = CartPoleSimulator()

_, _, eval_hist67, cos_tr2 = train_q_learning(env, N_T=1000000)

# Paper-style learning curve
plt.figure(figsize=(6,4))
steps = [s for s, _, _ in eval_hist67]
means = [m for _, m, _ in eval_hist67]
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()

# Paper-style cos(theta) plot
# plot_cos_theta_q(env, Q)
plot_best_cos_theta(cos_tr2)

In [None]:
plot_q_eval_curves(curves)

In [None]:
curves = [
    (eval_hist,  r"Q-learning ($N_T = 10^4$)"),
    (eval_hist5, r"Q-learning ($N_T = 10^5$)"),
    (eval_hist67, r"Q-learning ($N_T = 10^6$)")
]

plot_q_eval_curves(curves)


## Step 4: DQN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random


In [None]:
# ========== DQN hyperparameters ========== #
LR = 3e-4
GAMMA = 0.995
EPSILON = 0.178
BUFFER_SIZE = 50_000
BATCH_SIZE = 1024
TARGET_UPDATE_INTERVAL = 1000  # time steps
EVAL_EVERY_STEPS = 5000        # time steps
MAX_GLOBAL_STEPS = 150_000   # total environment steps (paper-style)

U0 = 2.4                 # [V] constant voltage magnitude

actions = np.array([-U0, 0.0, U0])
N_ACTIONS = len(actions)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
def reset_with_ic(env, theta0, theta_dot0=0.0, x0_=0.0, x_dot0=0.0):
  # function to set the true internal state and return the initial observation
  env.theta = theta0
  env.theta_dot = theta_dot0
  env.x = x0
  env.x_dot = x_dot0

  return env._make_observation()

In [None]:
# ========== Network to use ========== #

class QNetwork(nn.Module):
    def __init__(self, input_dim=5, hidden_dim=256, output_dim=3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)  # linear output for Q-values
        )

    def forward(self, x):
        return self.net(x)


In [None]:
# ========== Replay Buffer with 50000 transitions ========== #

class ReplayBuffer:
    def __init__(self, capacity=50_000):
        self.buffer = deque(maxlen=capacity)

    def push(self, s, a, r, s2, done):
        # store as numpy arrays / python primitives
        self.buffer.append((s, a, r, s2, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, s2, done = map(np.array, zip(*batch))

        # Convert to torch tensors
        s = torch.tensor(s, dtype=torch.float32)
        a = torch.tensor(a, dtype=torch.int64).unsqueeze(1)     # (B,1)
        r = torch.tensor(r, dtype=torch.float32).unsqueeze(1)   # (B,1)
        s2 = torch.tensor(s2, dtype=torch.float32)
        done = torch.tensor(done, dtype=torch.float32).unsqueeze(1)  # (B,1)
        return s, a, r, s2, done

    def __len__(self):
        return len(self.buffer)


In [None]:
# ========== Action Mapping ========== #

actions = np.array([-U0, 0.0, U0], dtype=np.float32)

def select_action_dqn(q_net, obs, epsilon=EPSILON):
    # obs: numpy array shape (5,)
    if np.random.rand() < epsilon:
        return np.random.randint(3)
    with torch.no_grad():
        x = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
        q = q_net(x)  # (1,3)
        return int(torch.argmax(q, dim=1).item())


In [None]:
# ========== One DQN step ========== #

huber = nn.SmoothL1Loss()  # Huber loss (PyTorch SmoothL1Loss)

def train_step(q_net, target_net, optimizer, replay_buffer):
    if len(replay_buffer) < BATCH_SIZE:
        return None

    s, a, r, s2, done = replay_buffer.sample(BATCH_SIZE)
    s = s.to(device); a = a.to(device); r = r.to(device); s2 = s2.to(device); done = done.to(device)

    # Q(s,a) from online network
    q_values = q_net(s).gather(1, a)  # (B,1)

    # target: r + gamma * (1-done) * max_a' Q_target(s',a')
    with torch.no_grad():
        q_next = target_net(s2).max(dim=1, keepdim=True)[0]  # (B,1)
        y = r + GAMMA * (1.0 - done) * q_next

    loss = huber(q_values, y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    return float(loss.item())


In [None]:
# ========== Evaluation Routine ========== #

def evaluate_policy(env, q_net, n_eval=10):
    q_net.eval()
    returns = []

    for _ in range(n_eval):
        theta0 = np.deg2rad(np.random.uniform(-10.0, 10.0))
        obs = reset_with_ic(env, theta0, 0.0, 0.0, 0.0)

        total = 0.0
        done = False
        step_count = 0

        while not done:
            # greedy action
            with torch.no_grad():
                x = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
                a_idx = int(torch.argmax(q_net(x), dim=1).item())
            U = float(actions[a_idx])

            obs2 = env.step(U)

            r = compute_reward(env.theta, env.x)
            done, terminal_penalty = check_termination(env.x, env.theta_dot, step_count)
            r += terminal_penalty
            total += r

            obs = obs2
            step_count += 1

        returns.append(total / max_steps)  # normalized return

    q_net.train()
    return float(np.mean(returns)), float(np.std(returns))


In [None]:
# ========== Full Training (STEP-LIMITED) ========== #

def train_dqn(env):
    q_net = QNetwork().to(device)
    target_net = QNetwork().to(device)
    target_net.load_state_dict(q_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(q_net.parameters(), lr=LR)
    replay = ReplayBuffer(capacity=BUFFER_SIZE)

    global_step = 0
    eval_history = []          # (global_step, mean_norm_return, std)
    episode_returns = []

    ep = 0
    while global_step < MAX_GLOBAL_STEPS:
        obs = env.reset()
        done = False
        step_count = 0
        ep_steps = 0
        ep_return = 0.0

        # ----- Run one episode (collect transitions) -----
        while not done and global_step < MAX_GLOBAL_STEPS:
            a_idx = select_action_dqn(q_net, obs, epsilon=EPSILON)
            U = float(actions[a_idx])

            obs2 = env.step(U)

            r = compute_reward(env.theta, env.x)
            done, terminal_penalty = check_termination(
                env.x, env.theta_dot, step_count
            )
            r += terminal_penalty

            replay.push(obs, a_idx, r, obs2, done)

            obs = obs2
            step_count += 1
            ep_steps += 1
            global_step += 1
            ep_return += r

            # ----- Target network update -----
            if global_step % TARGET_UPDATE_INTERVAL == 0:
                target_net.load_state_dict(q_net.state_dict())

            # ----- Evaluation -----
            if global_step % EVAL_EVERY_STEPS == 0:
                mean_ret, std_ret = evaluate_policy(env, q_net, n_eval=10)
                eval_history.append((global_step, mean_ret, std_ret))
                print(
                    f"[Eval @ step {global_step}] "
                    f"mean norm return = {mean_ret:.3f} ± {std_ret:.3f}"
                )

        # ----- Training after episode -----
        losses = []
        for _ in range(ep_steps):
            loss = train_step(q_net, target_net, optimizer, replay)
            if loss is not None:
                losses.append(loss)

        # ----- Normalize & store return -----
        norm_ret = ep_return / max_steps
        episode_returns.append(norm_ret)

        # ----- Logging with MA(30) -----
        if ep % 50 == 0:
            ma30 = moving_average(episode_returns, window=30)
            ma30_str = f"{ma30:.3f}" if ma30 is not None else "N/A"
            avg_loss = float(np.mean(losses)) if losses else None

            print(
                f"Episode {ep:6d} | "
                f"steps={ep_steps:4d} | "
                f"norm return={norm_ret:.3f} | "
                f"MA(30)={ma30_str} | "
                f"loss={avg_loss}"
            )

        ep += 1

    print(f"Training stopped at global_step = {global_step}, episodes = {ep}")
    return q_net, eval_history


In [None]:
def run_dqn_inference(env, q_net, steps=800):
    obs = env.reset_downward()   # same IC as paper
    
    traj = {
        "theta": [],
        "theta_dot": [],
        "x": [],
        "x_dot": [],
        "U": []
    }

    for t in range(steps):
        # Log state
        traj["theta"].append(env.theta)
        traj["theta_dot"].append(env.theta_dot)
        traj["x"].append(env.x)
        traj["x_dot"].append(env.x_dot)

        # Greedy action
        with torch.no_grad():
            x = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)
            q_vals = q_net(x)
            action_idx = torch.argmax(q_vals).item()

        U = float(actions[action_idx])
        traj["U"].append(U)

        obs = env.step(U)

    return traj


In [None]:
def plot_x_time(traj, label):
    plt.plot(traj["x"], label=label)
    plt.xlabel("Time step")
    plt.ylabel("x [m]")
    plt.grid(True)


In [None]:
def plot_theta_time(traj, label):
    plt.plot(traj["theta"], label=label)
    plt.xlabel("Time step")
    plt.ylabel(r"$\theta$ [rad]")
    plt.grid(True)


In [None]:
def plot_cart_phase(traj, label):
    plt.plot(traj["x"], traj["x_dot"], label=label)
    plt.xlabel("x [m]")
    plt.ylabel(r"$\dot{x}$ [m/s]")
    plt.grid(True)


In [None]:
def plot_pendulum_phase(traj, label):
    plt.plot(traj["theta"], traj["theta_dot"], label=label)
    plt.xlabel(r"$\theta$ [rad]")
    plt.ylabel(r"$\dot{\theta}$ [rad/s]")
    plt.grid(True)


In [None]:
def plot_voltage(traj, label, steps=200):
    plt.step(range(steps), traj["U"][:steps], where="post", label=label)
    plt.xlabel("Time step")
    plt.ylabel("Applied voltage [V]")
    plt.grid(True)


In [None]:
env = CartPoleSimulator()
q_net, eval_hist = train_dqn(env)

In [None]:
steps = [s for s, _, _ in eval_hist]
means = [m for _, m, _ in eval_hist]

plt.figure(figsize=(6,4))
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
traj = run_dqn_inference(env, q_net, steps=800)


In [None]:
env2 = CartPoleSimulator()
q_net2, eval_hist2 = train_dqn(env2)

In [None]:
traj2 = run_dqn_inference(env2, q_net2, steps=800)

In [None]:
plot_x_time(traj, "DQN")




In [None]:
plot_cart_phase(traj, "DQN")

In [None]:
plot_theta_time(traj, "DQN")


In [None]:
plot_pendulum_phase(traj, "DQN")


In [None]:
plot_voltage(traj, "DQN")

In [None]:
steps = [s for s, _, _ in eval_hist2]
means = [m for _, m, _ in eval_hist2]

plt.figure(figsize=(6,4))
plt.plot(steps, means)
plt.xscale("log")
plt.xlabel("Time step")
plt.ylabel("Normalized return")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
def plot_x_time_two(traj_green, traj_blue, label_green="7.1 V", label_blue="2.4 V"):
    plt.figure(figsize=(6,4))
    plt.plot(traj_green["x"], color="green", label=label_green)
    plt.plot(traj_blue["x"], color="blue", label=label_blue)
    plt.xlabel("Time step")
    plt.ylabel("x [m]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
def plot_cart_phase_two(traj_green, traj_blue, label_green="7.1 V", label_blue="2.4 V"):
    plt.figure(figsize=(6,4))
    plt.plot(traj_green["x"], traj_green["x_dot"], color="green", label=label_green)
    plt.plot(traj_blue["x"], traj_blue["x_dot"], color="blue", label=label_blue)
    plt.xlabel("x [m]")
    plt.ylabel(r"$\dot{x}$ [m/s]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
def plot_theta_time_two(traj_green, traj_blue, label_green="7.1 V", label_blue="2.4 V"):
    plt.figure(figsize=(6,4))
    plt.plot(traj_green["theta"], color="green", label=label_green)
    plt.plot(traj_blue["theta"], color="blue", label=label_blue)
    plt.xlabel("Time step")
    plt.ylabel(r"$\theta$ [rad]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
def plot_pendulum_phase_two(traj_green, traj_blue, label_green="7.1 V", label_blue="2.4 V"):
    plt.figure(figsize=(6,4))
    plt.plot(traj_green["theta"], traj_green["theta_dot"], color="green", label=label_green)
    plt.plot(traj_blue["theta"], traj_blue["theta_dot"], color="blue", label=label_blue)
    plt.xlabel(r"$\theta$ [rad]")
    plt.ylabel(r"$\dot{\theta}$ [rad/s]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
def plot_voltage_two(traj_green, traj_blue, steps=200, label_green="7.1 V", label_blue="2.4 V"):
    plt.figure(figsize=(6,4))
    plt.step(range(steps), traj_green["U"][:steps], where="post",
             color="green", label=label_green)
    plt.step(range(steps), traj_blue["U"][:steps], where="post",
             color="blue", label=label_blue)
    plt.xlabel("Time step")
    plt.ylabel("Applied voltage [V]")
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()


In [None]:
plot_x_time_two(traj, traj2)

In [None]:
plot_cart_phase_two(traj, traj2)

In [None]:
plot_theta_time_two(traj, traj2)

In [None]:
plot_pendulum_phase_two(traj, traj2)

In [None]:
plot_voltage_two(traj, traj2)