## Entorno `CRLPusherEnv`

In [1]:
import numpy as np
from gymnasium.envs.mujoco.pusher_v5 import PusherEnv
from gymnasium.spaces import Box


class CRLPusherEnv(PusherEnv):
    """
    Pusher con no-estacionariedad por material (dinamica + reward) para SAC.

    reward = reward_base_material
             + progress_weight * delta_dist_obj_goal
             - stagnation_penalty (si hay contacto sin progreso)
             + success_bonus (si alcanza el target)

    Observacion = obs_pusher (23-dim) + one_hot(material) (2-dim) = 25-dim
    """

    MATERIALS = {
        "rigid": dict(
            physics=dict(
                damping_scale=1.0,
                frictionloss_scale=1.0,
                object_mass_scale=1.2,
                object_sliding_friction_scale=1.1,
            ),
            reward=dict(
                success_bonus=150.0,
                success_threshold=0.07,
                dist_weight=1.25,
                near_weight=0.50,
                control_weight=0.0,
                progress_weight=50.0,
                stagnation_penalty=0.30,
                stagnation_contact_threshold=0.10,
                stagnation_progress_tolerance=3e-4,
            ),
        ),
        "delicate": dict(
            physics=dict(
                damping_scale=0.7,
                frictionloss_scale=0.5,
                object_mass_scale=0.5,
                object_sliding_friction_scale=0.4,
            ),
            reward=dict(
                success_bonus=150.0,        # antes 120
                success_threshold=0.07,     # antes 0.05
                dist_weight=1.15,
                near_weight=0.40,           # antes 0.25
                control_weight=0.0,         # antes 0.12
                progress_weight=55.0,       # antes 35
                stagnation_penalty=0.35,    # antes 0.25
                stagnation_contact_threshold=0.10,  # antes 0.06
                stagnation_progress_tolerance=3e-4,
            ),
        ),
    }

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

        self.material = "rigid"
        self.obj_body_id = self.model.body("object").id
        self.obj_geom_id = self.model.body_geomadr[self.obj_body_id]

        # Fisica base
        self._base_dof_damping = self.model.dof_damping.copy()
        self._base_dof_frictionloss = self.model.dof_frictionloss.copy()
        self._base_body_mass = self.model.body_mass.copy()
        self._base_geom_friction = self.model.geom_friction.copy()

        # Parametros de reward (se sobreescriben en set_material)
        self.success_bonus = 150.0
        self.success_threshold = 0.07
        self.progress_weight = 50.0
        self.stagnation_penalty = 0.30
        self.stagnation_contact_threshold = 0.10
        self.stagnation_progress_tolerance = 3e-4
        self.episode_steps = 0
        self._prev_obj_goal_dist = np.nan

        # Extender observacion con one-hot(material) — 2 dims
        base_low = self.observation_space.low
        base_high = self.observation_space.high
        self.observation_space = Box(
            low=np.concatenate([base_low, np.zeros(2, dtype=base_low.dtype)]),
            high=np.concatenate([base_high, np.ones(2, dtype=base_high.dtype)]),
            dtype=base_low.dtype,
        )
        self.set_material("rigid")

    # Observaciones

    def _material_one_hot(self):
        idx = list(self.MATERIALS.keys()).index(self.material)
        oh = np.zeros(len(self.MATERIALS), dtype=self.observation_space.dtype)
        oh[idx] = 1.0
        return oh

    def _get_obs(self):
        return np.concatenate(
            [super()._get_obs(), self._material_one_hot()]
        ).astype(self.observation_space.dtype)

    # Distancias

    def _get_obj_goal_distance(self):
        return float(np.linalg.norm(
            self.get_body_com("object") - self.get_body_com("goal")
        ))

    def _get_tip_obj_distance(self):
        return float(np.linalg.norm(
            self.get_body_com("tips_arm") - self.get_body_com("object")
        ))

    # Fisica

    def set_physics(self, *, damping_scale=1.0, frictionloss_scale=1.0,
                    object_mass_scale=1.0, object_sliding_friction_scale=1.0):
        self.model.dof_damping[:] = self._base_dof_damping * damping_scale
        self.model.dof_frictionloss[:] = self._base_dof_frictionloss * frictionloss_scale

        self.model.body_mass[:] = self._base_body_mass
        self.model.body_mass[self.obj_body_id] = (
            self._base_body_mass[self.obj_body_id] * object_mass_scale
        )

        self.model.geom_friction[:] = self._base_geom_friction
        self.model.geom_friction[self.obj_geom_id, 0] = (
            self._base_geom_friction[self.obj_geom_id, 0] * object_sliding_friction_scale
        )

    def set_material(self, name: str):
        if name not in self.MATERIALS:
            raise ValueError(f"Material '{name}' no existe: {list(self.MATERIALS)}")

        cfg = self.MATERIALS[name]
        self.set_physics(**cfg["physics"])

        rw = cfg["reward"]
        self.success_bonus = rw["success_bonus"]
        self.success_threshold = rw["success_threshold"]
        self.progress_weight = rw["progress_weight"]
        self.stagnation_penalty = rw["stagnation_penalty"]
        self.stagnation_contact_threshold = rw["stagnation_contact_threshold"]
        self.stagnation_progress_tolerance = rw["stagnation_progress_tolerance"]

        self.reward_dist_weight = rw["dist_weight"]
        self.reward_near_weight = rw["near_weight"]
        self.reward_control_weight = rw["control_weight"]

        self.material = name

    # Reset

    def reset(self, *, seed=None, options=None):
        if options and "material" in options:
            self.set_material(options["material"])

        obs, info = super().reset(seed=seed, options=options)
        self.episode_steps = 0
        self._prev_obj_goal_dist = self._get_obj_goal_distance()

        obs = self._get_obs()
        info["material"] = self.material
        info["distance_to_goal"] = self._prev_obj_goal_dist
        info["tip_to_object"] = self._get_tip_obj_distance()
        return obs, info

    # Step

    def step(self, action):
        obs, base_reward, terminated, truncated, info = super().step(action)
        self.episode_steps += 1

        obj_goal_dist = self._get_obj_goal_distance()
        tip_obj_dist = self._get_tip_obj_distance()

        prev_dist = self._prev_obj_goal_dist if np.isfinite(self._prev_obj_goal_dist) else obj_goal_dist
        progress = prev_dist - obj_goal_dist

        is_success = obj_goal_dist <= self.success_threshold

        is_stagnating = (
            tip_obj_dist <= self.stagnation_contact_threshold
            and obj_goal_dist > self.success_threshold
            and progress <= self.stagnation_progress_tolerance
        )
        stagnation_cost = self.stagnation_penalty if is_stagnating else 0.0

        reward_progress = self.progress_weight * progress
        reward = base_reward + reward_progress - stagnation_cost
        if is_success:
            reward += self.success_bonus
            terminated = True

        self._prev_obj_goal_dist = obj_goal_dist

        obs = self._get_obs()
        info.update({
            "material": self.material,
            "distance_to_goal": obj_goal_dist,
            "tip_to_object": tip_obj_dist,
            "distance_progress": float(progress),
            "is_success": float(is_success),
            "episode_steps": self.episode_steps,
            "reward_base": float(base_reward),
            "reward_progress": float(reward_progress),
            "stagnation_cost": float(stagnation_cost),
        })
        return obs, reward, terminated, truncated, info

## Hiperparámetros SAC

In [None]:
%matplotlib inline

import os, shutil, time
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from IPython.display import display, clear_output

os.environ["CUDA_VISIBLE_DEVICES"] = ""

from stable_baselines3 import SAC
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import BaseCallback, CallbackList
import torch as th

# ═══════════════════════════════════════════════════════════════════
# Configuracion general
# ═══════════════════════════════════════════════════════════════════

LOG_DIR = "./logs/pusher_crl_sac"
MAX_EPISODE_STEPS = 200
TOTAL_STEPS = 1_500_000


EXPERIMENT_MODE = "baseline"

if os.path.exists(LOG_DIR):
    shutil.rmtree(LOG_DIR)
os.makedirs(LOG_DIR, exist_ok=True)

if EXPERIMENT_MODE == "baseline":
    SCHEDULE = [(0, "rigid")]
elif EXPERIMENT_MODE == "non_stationary":
    SCHEDULE = [
        (0, "rigid"),
        (250_000, "delicate"),
        (500_000, "rigid"),
        (750_000, "delicate"),
    ]
else:
    raise ValueError("EXPERIMENT_MODE debe ser 'baseline' o 'non_stationary'")


# ═══════════════════════════════════════════════════════════════════
# Callbacks
# ═══════════════════════════════════════════════════════════════════

class NonStationaryMaterialCallback(BaseCallback):
    """Cambia material segun un schedule en timesteps."""

    def __init__(self, schedule, verbose=1):
        super().__init__(verbose)
        self.schedule = sorted(schedule, key=lambda x: x[0])
        self._next_idx = 0
        self.applied_changes = []

    def _apply_material(self, t_step, material):
        self.training_env.env_method("set_material", material)
        self.applied_changes.append((int(t_step), material))
        if self.verbose:
            bar = "=" * 60
            print(f"\n{bar}\n  [CAMBIO MATERIAL] t={t_step:,} -> {material}\n{bar}\n")

    def _on_training_start(self) -> None:
        while (
            self._next_idx < len(self.schedule)
            and self.schedule[self._next_idx][0] <= 0
        ):
            t_step, material = self.schedule[self._next_idx]
            self._apply_material(t_step, material)
            self._next_idx += 1

    def _on_step(self) -> bool:
        while (
            self._next_idx < len(self.schedule)
            and self.num_timesteps >= self.schedule[self._next_idx][0]
        ):
            _, material = self.schedule[self._next_idx]
            self._apply_material(self.num_timesteps, material)
            self._next_idx += 1
        return True


class LivePlotCallback(BaseCallback):
    """Graficas en vivo + heartbeat durante el entrenamiento SAC."""

    def __init__(self, plot_freq=20_000, heartbeat_freq=5_000,
                 total_steps=1_000_000, verbose=1):
        super().__init__(verbose)
        self.plot_freq = plot_freq
        self.heartbeat_freq = heartbeat_freq
        self.total_steps = total_steps
        self.last_plot_step = 0
        self.last_hb_step = 0

        self.ts = []
        self.returns = []
        self.success_rates = []
        self.final_distances = []
        self.alphas = []

        self._successes = []
        self._final_dists = []
        self.material_changes = []

    def _read_alpha(self):
        # Compatible con diferentes versiones/configuraciones de SAC en SB3
        try:
            log_ent_coef = getattr(self.model, "log_ent_coef", None)
            if log_ent_coef is not None:
                if isinstance(log_ent_coef, th.Tensor):
                    return float(th.exp(log_ent_coef.detach()).mean().cpu().item())
                return float(np.exp(float(log_ent_coef)))

            ent_coef_tensor = getattr(self.model, "ent_coef_tensor", None)
            if ent_coef_tensor is not None:
                if isinstance(ent_coef_tensor, th.Tensor):
                    return float(ent_coef_tensor.detach().mean().cpu().item())
                return float(ent_coef_tensor)

            ent_coef = getattr(self.model, "ent_coef", None)
            if isinstance(ent_coef, (int, float)):
                return float(ent_coef)
        except Exception:
            pass

        return float("nan")

    def _on_step(self) -> bool:
        dones = self.locals.get("dones", [])
        infos = self.locals.get("infos", [])

        for done, info in zip(dones, infos):
            if not done:
                continue
            ti = info.get("terminal_info", info)
            self._successes.append(float(ti.get("is_success", 0.0)))
            d = ti.get("distance_to_goal", np.nan)
            if not np.isnan(d):
                self._final_dists.append(float(d))

        # Heartbeat
        if (self.num_timesteps - self.last_hb_step) >= self.heartbeat_freq:
            self.last_hb_step = self.num_timesteps
            pct = 100.0 * self.num_timesteps / self.total_steps
            sr = np.mean(self._successes[-50:]) if self._successes else 0.0
            md = np.mean(self._final_dists[-50:]) if self._final_dists else np.nan
            alpha_val = self._read_alpha()
            try:
                mat = self.training_env.get_attr("material")[0]
            except Exception:
                mat = "?"
            print(
                f"  [{pct:5.1f}%] t={self.num_timesteps:>8,}  "
                f"sr={sr:.2f}  dist={md:.3f}  alpha={alpha_val:.5f}  material={mat}"
            )

        if (self.num_timesteps - self.last_plot_step) < self.plot_freq:
            return True
        self.last_plot_step = self.num_timesteps

        if not self.model.ep_info_buffer:
            return True

        mean_r = float(np.mean([e["r"] for e in self.model.ep_info_buffer]))
        sr = np.mean(self._successes[-100:]) if self._successes else 0.0
        mfd = np.mean(self._final_dists[-100:]) if self._final_dists else np.nan
        alpha_val = self._read_alpha()

        self.ts.append(self.num_timesteps)
        self.returns.append(mean_r)
        self.success_rates.append(float(sr))
        self.final_distances.append(float(mfd))
        self.alphas.append(alpha_val)

        clear_output(wait=True)
        fig, axes = plt.subplots(1, 4, figsize=(20, 4))

        axes[0].plot(self.ts, self.returns, "o-", ms=3)
        axes[0].set_title("Rolling mean return")
        axes[0].set_ylabel("return")

        axes[1].plot(self.ts, self.success_rates, "o-", ms=3, color="green")
        axes[1].set_title("Success rate (ult. 100 eps)")
        axes[1].set_ylim(-0.05, 1.05)

        axes[2].plot(self.ts, self.final_distances, "o-", ms=3, color="red")
        axes[2].set_title("Mean final distance")
        axes[2].set_ylabel("distance")

        alpha_arr = np.asarray(self.alphas, dtype=float)
        if np.isfinite(alpha_arr).any():
            axes[3].plot(self.ts, self.alphas, "o-", ms=3, color="darkorange")
        else:
            axes[3].text(0.5, 0.5, "alpha sin datos", ha="center", va="center")
        axes[3].set_title("Entropia alpha (SAC)")
        axes[3].set_ylabel("alpha")

        for ax in axes:
            ax.set_xlabel("timesteps")
            for ts_c, mat_c in self.material_changes:
                ax.axvline(
                    ts_c,
                    ls="--",
                    alpha=0.5,
                    color="red" if mat_c == "delicate" else "blue",
                )
            ax.grid(alpha=0.3)

        try:
            mat = self.training_env.get_attr("material")[0]
        except Exception:
            mat = "?"
        pct = 100.0 * self.num_timesteps / self.total_steps
        fig.suptitle(
            f"SAC - CRLPusherEnv  [{pct:.0f}%]  |  material: {mat}  |  "
            f"return: {mean_r:.1f}  success: {sr:.2f}  alpha: {alpha_val:.5f}"
        )
        plt.tight_layout()
        display(fig)
        plt.close(fig)
        return True


# ═══════════════════════════════════════════════════════════════════
# Crear entornos
# ═══════════════════════════════════════════════════════════════════

def make_env(material="rigid", log_path=None):
    env = CRLPusherEnv()
    env = gym.wrappers.TimeLimit(env, max_episode_steps=MAX_EPISODE_STEPS)
    info_kw = (
        "is_success", "distance_to_goal", "tip_to_object",
        "distance_progress", "episode_steps", "material",
        "reward_base", "reward_progress", "stagnation_cost",
    )
    env = Monitor(env, log_path or LOG_DIR, info_keywords=info_kw)
    env.reset(options={"material": material})
    return env


train_env = make_env("rigid")
eval_env = make_env("rigid", log_path=os.path.join(LOG_DIR, "eval"))


# ═══════════════════════════════════════════════════════════════════
# Modelo SAC
# ═══════════════════════════════════════════════════════════════════

model = SAC(
    "MlpPolicy",
    train_env,
    verbose=0,
    tensorboard_log=os.path.join(LOG_DIR, "tb"),

    learning_rate=3e-4,
    batch_size=256,
    gamma=0.99,
    tau=0.005,

    buffer_size=500_000,      
    learning_starts=10_000,

    train_freq=1,
    gradient_steps=2,

    ent_coef="auto",
    target_entropy="auto",

    policy_kwargs=dict(
    net_arch=[256, 256],
    ),

    device="cpu",
)

In [None]:
# ═══════════════════════════════════════════════════════════════════
# Evaluación antes del entrenamiento
# ═══════════════════════════════════════════════════════════════════

def evaluate(model, env, material, n_episodes=20):
    successes, rets, dists = 0, [], []
    for _ in range(n_episodes):
        obs, _ = env.reset(options={"material": material})
        done, ep_ret, last = False, 0.0, {}
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, r, term, trunc, last = env.step(action)
            done = term or trunc
            ep_ret += r
        rets.append(ep_ret)
        dists.append(last.get("distance_to_goal", np.nan))
        if last.get("is_success", 0.0) > 0.5:
            successes += 1
    return dict(
        mean_return=float(np.mean(rets)),
        success_rate=successes / n_episodes,
        mean_dist=float(np.nanmean(dists)),
    )


for mat in ["rigid", "delicate"]:
    res = evaluate(model, eval_env, mat, n_episodes=5)
    print(f"  [{mat:>8s}]  return={res['mean_return']:.2f}  "
          f"success={res['success_rate']:.2f}  dist={res['mean_dist']:.4f}")
print()


# ═══════════════════════════════════════════════════════════════════
# Entrenamiento SAC con schedule no-estacionario
# ═══════════════════════════════════════════════════════════════════

switch_cb = NonStationaryMaterialCallback(SCHEDULE, verbose=1)
plot_cb   = LivePlotCallback(
    plot_freq=15_000,
    heartbeat_freq=5_000,
    total_steps=TOTAL_STEPS,
)

class _LinkCallbacks(BaseCallback):
    """Propaga los cambios de material al plot en vivo."""
    def __init__(self, switch_cb, plot_cb):
        super().__init__(verbose=0)
        self._switch = switch_cb
        self._plot   = plot_cb
        self._seen   = 0
    def _on_step(self) -> bool:
        n = len(self._switch.applied_changes)
        if n > self._seen:
            for c in self._switch.applied_changes[self._seen:]:
                self._plot.material_changes.append(c)
            self._seen = n
        return True

link_cb   = _LinkCallbacks(switch_cb, plot_cb)
callbacks = CallbackList([switch_cb, link_cb, plot_cb])

print(f"Iniciando entrenamiento SAC: {TOTAL_STEPS:,} steps")
print(f"Schedule: {SCHEDULE}\n")

t0 = time.time()
model.learn(total_timesteps=TOTAL_STEPS, callback=callbacks, progress_bar=False)
elapsed = time.time() - t0
print(f"\n✓ Entrenamiento completado en {elapsed/60:.1f} min")


In [None]:
# ═══════════════════════════════════════════════════════════════════
# (Opcional) Visualización en ventana MuJoCo
# ═══════════════════════════════════════════════════════════════════
import time

env_vis = CRLPusherEnv(render_mode="human")

for material in ["rigid", "delicate"]:
    print(f"\n{'='*40}  material = {material}")
    obs, _ = env_vis.reset(options={"material": material})
    successes, episodes = 0, 0

    for step in range(3000):
        action, _ = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env_vis.step(action)
        time.sleep(1 / 30)
        if terminated or truncated:
            episodes += 1
            if info.get("is_success", 0.0) > 0.5:
                successes += 1
            obs, _ = env_vis.reset(options={"material": material})

    sr = successes / max(episodes, 1)
    print(f"  Episodes={episodes}  Successes={successes}  Rate={sr:.2f}")

env_vis.close()


In [None]:
# ═══════════════════════════════════════════════════════════════════
# Guardar checkpoint final (inferencia + reanudar entrenamiento)
# ═══════════════════════════════════════════════════════════════════

CHECKPOINT_DIR = os.path.join(LOG_DIR, "checkpoints")
os.makedirs(CHECKPOINT_DIR, exist_ok=True)

checkpoint_name = f"sac_pusher_{EXPERIMENT_MODE}_{TOTAL_STEPS}_steps"
checkpoint_path = os.path.join(CHECKPOINT_DIR, checkpoint_name)
replay_buffer_path = checkpoint_path + "_replay_buffer.pkl"

model.save(checkpoint_path)
model.save_replay_buffer(replay_buffer_path)

print(f"Checkpoint guardado: {checkpoint_path}.zip")
print(f"Replay buffer guardado: {replay_buffer_path}")

Checkpoint guardado: ./logs/pusher_crl_sac/checkpoints/sac_pusher_baseline_1000000_steps.zip
Replay buffer guardado: ./logs/pusher_crl_sac/checkpoints/sac_pusher_baseline_1000000_steps_replay_buffer.pkl


In [6]:
# Carga para inferencia
inference_model = SAC.load(checkpoint_path, device="cpu")
print("Modelo de inferencia cargado correctamente.")

# Carga para continuar entrenamiento
resume_env = make_env("rigid", log_path=os.path.join(LOG_DIR, "resume"))
resume_model = SAC.load(checkpoint_path, env=resume_env, device="cpu")
if os.path.exists(replay_buffer_path):
    resume_model.load_replay_buffer(replay_buffer_path)
print("Modelo listo para continuar entrenamiento.")

# Para ver heartbeat + charts en vivo al reanudar,
# hay que crear de nuevo los callbacks y pasarlos a learn().
RESUME_EXTRA_STEPS = 200_000

resume_switch_cb = NonStationaryMaterialCallback(SCHEDULE, verbose=1)
resume_plot_cb = LivePlotCallback(
    plot_freq=15_000,
    heartbeat_freq=5_000,
    total_steps=resume_model.num_timesteps + RESUME_EXTRA_STEPS,
)
resume_link_cb = _LinkCallbacks(resume_switch_cb, resume_plot_cb)
resume_callbacks = CallbackList([resume_switch_cb, resume_link_cb, resume_plot_cb])

print(f"Reanudando entrenamiento por {RESUME_EXTRA_STEPS:,} steps...")
resume_model.learn(
    total_timesteps=RESUME_EXTRA_STEPS,
    callback=resume_callbacks,
    progress_bar=False,
    reset_num_timesteps=False,
)
print("Resume completado.")


FileNotFoundError: [Errno 2] No such file or directory: 'logs/pusher_crl_sac/checkpoints/sac_pusher_baseline_1000000_steps.zip'