
# Inferring an RL API from a **Black-Box** MuJoCo Environment

**Goal:** Using only the **MuJoCo Python API**, you will *infer* a standard RL interface:
- `reset()` → returns an initial **state** and **info**
- `step(action)` → returns **next_state, reward, terminated, truncated, info**
- Distinguish **state vs next_state**, log a **trajectory**, and understand **`terminated` vs `truncated`**.

> Treat the environment as a **sealed box**. You can inspect shapes, bounds, samples, and results,
> but you should not rely on its internal implementation.


In [2]:

# --- Setup ---
# If you don't have mujoco installed, uncomment:
# !pip install -U mujoco matplotlib numpy pandas

import os, sys, math, random, tempfile, numpy as np #, pandas as pd
import matplotlib.pyplot as plt

import mujoco  # official MuJoCo Python bindings

print("Python:", sys.version.split()[0])
print("MuJoCo version:", mujoco.__version__)
print("Numpy:", np.__version__)
# print("Pandas:", pd.__version__)
print("Matplotlib:", plt.matplotlib.__version__)


Python: 3.12.3
MuJoCo version: 3.3.6
Numpy: 1.26.4
Matplotlib: 3.8.4



## 1) Our **black-box** MuJoCo world

We embed a tiny model: a 1D **cart** sliding along x with a single motor actuator.
You won’t need to understand the XML; it exists so we can interact via MuJoCo only.

> In a real assignment you could swap in any MuJoCo XML you like. The *API* you infer stays the same.


In [3]:

# Write the embedded XML to a temp file so MuJoCo can load it.
import tempfile, pathlib
xml_text = r"""<mujoco model="cart1d">
  <compiler angle="degree" autolimits="true"/>
  <option timestep="0.01" gravity="0 0 -9.81"/>
  <default>
    <geom size="0.05" rgba="0.7 0.7 0.7 1"/>
  </default>

  <worldbody>
    <geom name="ground" type="plane" size="10 10 0.1" rgba="0.2 0.2 0.2 1" pos="0 0 0"/>
    <body name="rail" pos="0 0 0.1">
      <geom type="capsule" fromto="-3 0 0.1 3 0 0.1" size="0.02" rgba="0.5 0.5 0.5 1"/>
    </body>
    <body name="cart" pos="0 0 0.2">
      <joint name="x" type="slide" axis="1 0 0" limited="true" range="-3 3" damping="0.5"/>
      <geom type="box" size="0.1 0.05 0.05" rgba="0.1 0.4 0.8 1"/>
    </body>
  </worldbody>

  <actuator>
    <motor name="x_motor" joint="x" gear="1" ctrllimited="true" ctrlrange="-3 3"/>
  </actuator>
</mujoco>"""
tmp_dir = tempfile.mkdtemp(prefix="mujoco_blackbox_")
xml_path = str(pathlib.Path(tmp_dir) / "cart1d.xml")
with open(xml_path, "w") as f:
    f.write(xml_text)
print("Model XML written to:", xml_path)


Model XML written to: /tmp/mujoco_blackbox_p_yvxx6a/cart1d.xml



## 2) Black-box wrapper (opaque internals)

The class below defines `reset()` and `step()` **directly** on top of MuJoCo physics.
You should focus on using its API, not its internal rules.


In [4]:

import numpy as np
import mujoco

class MuJoCoBlackBoxEnv:
    def __init__(self, xml_path, frame_skip=5, max_episode_steps=200):
        self.model = mujoco.MjModel.from_xml_path(xml_path)
        self.data = mujoco.MjData(self.model)
        self.rng = np.random.default_rng()
        self.frame_skip = int(frame_skip)
        self.max_episode_steps = int(max_episode_steps)
        self._t = 0

        self.nq = self.model.nq
        self.nv = self.model.nv
        self.nu = self.model.nu

        if self.model.nu > 0 and self.model.actuator_ctrllimited.any():
            low = self.model.actuator_ctrlrange[:,0]
            high = self.model.actuator_ctrlrange[:,1]
        else:
            low = -np.ones(self.model.nu, dtype=float)
            high =  np.ones(self.model.nu, dtype=float)

        self.action_low  = low.astype(float)
        self.action_high = high.astype(float)

        self.state_shape = (self.nq + self.nv,)

    def reset(self, seed=None, options=None):
        if seed is not None:
            self.rng = np.random.default_rng(seed)

        mujoco.mj_resetData(self.model, self.data)
        self._t = 0

        qpos = 0.01 * self.rng.standard_normal(self.nq)
        qvel = 0.01 * self.rng.standard_normal(self.nv)

        if isinstance(options, dict):
            if "qpos" in options and np.size(options["qpos"]) == self.nq:
                qpos = np.asarray(options["qpos"], dtype=float)
            if "qvel" in options and np.size(options["qvel"]) == self.nv:
                qvel = np.asarray(options["qvel"], dtype=float)

        self.data.qpos[:] = qpos
        self.data.qvel[:] = qvel
        self.data.ctrl[:] = 0.0

        mujoco.mj_forward(self.model, self.data)

        return self._get_state(), {"t": self._t}

    def step(self, action):
        a = np.asarray(action, dtype=float).reshape(self.nu)
        a = np.clip(a, self.action_low, self.action_high)
        self.data.ctrl[:] = a

        for _ in range(self.frame_skip):
            mujoco.mj_step(self.model, self.data)

        self._t += 1

        reward = self._compute_reward(a)
        terminated = self._check_terminated()
        truncated = self._t >= self.max_episode_steps

        info = {"t": self._t}
        return self._get_state(), float(reward), bool(terminated), bool(truncated), info

    def close(self):
        pass

    def sample_action(self):
        u = self.rng.uniform(0.0, 1.0, size=self.nu)
        return self.action_low + u * (self.action_high - self.action_low)

    def action_bounds(self):
        return self.action_low.copy(), self.action_high.copy()

    def _get_state(self):
        return np.concatenate([self.data.qpos, self.data.qvel], dtype=float)

    def _compute_reward(self, action):
        x = float(self.data.qpos[0]) if self.nq > 0 else 0.0
        ctrl_pen = 0.01 * float(np.dot(action, action))
        return -(x*x) - ctrl_pen

    def _check_terminated(self):
        x = float(self.data.qpos[0]) if self.nq > 0 else 0.0
        return abs(x) > 2.5



## 3) Instantiate and probe shapes/bounds


In [5]:

env = MuJoCoBlackBoxEnv(xml_path, frame_skip=5, max_episode_steps=100)

state, info = env.reset(seed=123)
print("Initial state shape:", state.shape, "dtype:", state.dtype)
act_low, act_high = env.action_bounds()
print("Action bounds:", act_low, act_high)
print("Info keys:", list(info.keys()))
print("Sample state (first 4):", state[:4])


Initial state shape: (2,) dtype: float64
Action bounds: [-3.] [3.]
Info keys: ['t']
Sample state (first 4): [-0.00989121 -0.00367787]



## 4) Single-step probe


In [6]:

action = env.sample_action()
next_state, reward, terminated, truncated, info = env.step(action)
print("Action shape:", action.shape)
print("Next state shape:", next_state.shape)
print("Reward:", reward, type(reward))
print("terminated:", terminated, "truncated:", truncated)
print("Info:", info)


Action shape: (1,)
Next state shape: (2,)
Reward: -0.028249524384621895 <class 'float'>
terminated: False truncated: False
Info: {'t': 1}



## 5) Reset, seeding, and options


In [7]:

s1, _ = env.reset(seed=7)
s2, _ = env.reset(seed=7)
s3, _ = env.reset(seed=8)

print("Seed reproducibility (7 vs 7):", np.allclose(s1, s2))
print("Different seeds (7 vs 8):", np.allclose(s1, s3))

custom = {"qpos": np.array([2.0]), "qvel": np.array([0.0])}
s_custom, info_custom = env.reset(options=custom)
print("Custom reset accepted; state[0] (x) ~", float(s_custom[0]))


Seed reproducibility (7 vs 7): True
Different seeds (7 vs 8): False
Custom reset accepted; state[0] (x) ~ 2.0



## 6) `terminated` vs `truncated`


In [8]:

def run_one_episode_random(env, verbose=True):
    s, info = env.reset(seed=42)
    total_reward = 0.0
    t = 0
    while True:
        a = env.sample_action()
        s_next, r, terminated, truncated, info = env.step(a)
        total_reward += r
        t += 1
        if verbose:
            print(f"t={t:<3} r={r: .3f} term={terminated} trunc={truncated}")
        if terminated or truncated:
            reason = "terminated" if terminated else "truncated"
            if verbose:
                print(f"Episode ended by **{reason}** at t={t}; total_reward={total_reward:.2f}")
            break
        s = s_next
    return t, total_reward

_ = run_one_episode_random(env, verbose=False)
print("Ran a silent random episode.")


Ran a silent random episode.



### Force truncation with a short max length


In [9]:

env_short = MuJoCoBlackBoxEnv(xml_path, frame_skip=5, max_episode_steps=10)
steps, total_reward = run_one_episode_random(env_short, verbose=True)


t=1   r=-0.046 term=False trunc=False
t=2   r=-0.014 term=False trunc=False
t=3   r=-0.059 term=False trunc=False
t=4   r=-0.081 term=False trunc=False
t=5   r=-0.025 term=False trunc=False
t=6   r=-0.029 term=False trunc=False
t=7   r=-0.050 term=False trunc=False
t=8   r=-0.001 term=False trunc=False
t=9   r=-0.006 term=False trunc=False
t=10  r=-0.066 term=False trunc=True
Episode ended by **truncated** at t=10; total_reward=-0.38



## 7) Trajectory logging and alternating (s0, a0, s1, a1, ...)


In [10]:

def rollout_random(env, max_steps=200, seed=0):
    s, info = env.reset(seed=seed)
    traj_alt = [np.copy(s)]
    rows = []
    terminated = truncated = False
    t = 0
    while not (terminated or truncated) and t < max_steps:
        a = env.sample_action()
        s_next, r, terminated, truncated, info = env.step(a)
        traj_alt.append(np.copy(a))
        traj_alt.append(np.copy(s_next))
        rows.append({
            "t": t,
            "reward": float(r),
            "terminated": bool(terminated),
            "truncated": bool(truncated),
            "s0": float(s[0]) if s.size > 0 else np.nan,
            "a0": float(a[0]) if a.size > 0 else np.nan,
        })
        s = s_next
        t += 1
    return traj_alt, pd.DataFrame(rows)

traj_alt, traj_df = rollout_random(env, max_steps=100, seed=123)
print(f"Alternating list length = {len(traj_alt)} (expect 2*T + 1)")
print(traj_df.head())


NameError: name 'pd' is not defined

In [None]:

try:
    from caas_jupyter_tools import display_dataframe_to_user
    display_dataframe_to_user("Trajectory (first components)", traj_df)
except Exception:
    traj_df



### Plots


In [None]:

plt.figure()
plt.plot(traj_df["t"], traj_df["reward"])
plt.xlabel("t")
plt.ylabel("reward")
plt.title("Reward per step")
plt.show()


In [None]:

plt.figure()
plt.plot(traj_df["t"], traj_df["s0"])
plt.xlabel("t")
plt.ylabel("state[0] (x)")
plt.title("First state component over time")
plt.show()



## 8) Summary of the inferred API

- `state, info = env.reset(seed=..., options=...)`
- `next_state, reward, terminated, truncated, info = env.step(action)`
- `done = terminated or truncated`
- State shape `(nq+nv,)`, action shape `(nu,)` with bounds from MuJoCo control ranges.



## 9) Exercises
1. Sample 1,000 actions and verify they respect the bounds.
2. With a fixed seed and a fixed action sequence, show identical first *k* steps across runs.
3. Increase `max_episode_steps` and intentionally cause termination by pushing beyond \|x\|>2.5.
4. Use `options` to start near the boundary; compare episode lengths.
5. Swap `sample_action()` for a proportional controller toward x=0; compare rewards.
