In [20]:
import torch

import numpy as np
import gymnasium as gym

from stable_baselines3 import SAC, HER

import gymnasium_robotics

In [21]:
env_id = "FetchPushDense-v4"

In [22]:
device = "mps" if torch.backends.mps.is_available() else "cuda"

In [23]:
if device == "mps":
    from schemas import make_env
    env = make_env(env_id)
else:
    from stable_baselines3.common.monitor import Monitor
    gym.register_envs(gymnasium_robotics)
    env = Monitor(gym.make(env_id, render_mode="human"))


# Training SAC with optimised hyperparams from Exercise 1
First, we'll train a baseline SAC with optimised hyperparms from exercise 1 (except for noise_type and noise_std, since they are not considered by SAC, only by DDPG and TD3). We'll use the recommend 1000000 traing steps. 

In addition to our earlier parameter, we will use gamma (a discount factor) and tau (soft update coefficient). We used gamma=0.95 because FetchPush has short episodes (50 steps). A lower discount factor encourages the agent to complete the task quickly rather than taking unnecessary steps. We used Ï„=0.005 for soft target updates, which provides stable Q-value targets while still allowing the target network to track improvements in the main network. This is a standard value for continuous control tasks.

In [25]:
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import EvalCallback

def create_and_train_sac_model(env):
    model = SAC(
        policy="MultiInputPolicy",
        env=env,
        learning_rate=0.0010108124085550568,
        batch_size=256,
        buffer_size=1_000_000,
        tau=0.005,          
        gamma=0.95,
        train_freq=1,
        gradient_steps=1,
        learning_starts=1000,
        policy_kwargs=dict(net_arch=[400, 300]),
        device=device,
        verbose=1,
    )
    
    eval_callback = EvalCallback(
        env,
        eval_freq=10_000,
        n_eval_episodes=10,
        log_path="logs/reward_shaping",
        verbose=1,
    )
    
    model.learn(total_timesteps=1_000_000, callback=eval_callback)
    return model

In [26]:
# model = create_and_train_sac_model(env=env)

# Designing Wrappers to Facilitate Learning
Since SAC by itself is not enough, we need to facilitate learning by adding some constraints or rules that assist the agent during training. For this purpose, we will use reward shaping.

- **Reward Shaping:** adding extra rewards for...
    - ... gripper getting close to the object.
    - ... object moving toward the goal.


In [27]:
class RewardShapingWrapper(gym.Wrapper):
    def __init__(self, env, w_gripper=0.5, w_goal=1.0):
        super().__init__(env)
        self.w_gripper = w_gripper
        self.w_goal = w_goal
        self.prev_potential = None
        
        self.episode_shaped_reward = 0
        self.episode_count = 0
    
    def compute_potential(self, observation):
        gripper_pos = observation["observation"][0:3]
        block_pos = observation["achieved_goal"]
        goal_pos = observation["desired_goal"]
        
        d_grip_block = np.linalg.norm(gripper_pos - block_pos)
        d_block_goal = np.linalg.norm(block_pos - goal_pos)
        
        return -(self.w_gripper * d_grip_block + self.w_goal * d_block_goal)
    
    def reset(self, **kwargs):
        if self.prev_potential is not None:
            self.episode_count += 1
            if self.episode_count % 100 == 0:
                print(f"Episode {self.episode_count}, Shaped reward: {self.episode_shaped_reward:.2f}")
        
        self.episode_shaped_reward = 0
        
        observation, info = self.env.reset(**kwargs)
        self.prev_potential = self.compute_potential(observation)
        return observation, info
    
    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        
        potential = self.compute_potential(observation)
        shaping_reward = potential - self.prev_potential
        self.prev_potential = potential
        
        self.episode_shaped_reward += shaping_reward
        
        reward += shaping_reward
        return observation, reward, terminated, truncated, info

Now, we update the environment using the wrapper, and check whether it will increase in performance

In [28]:
updated_env = RewardShapingWrapper(env=env)
updated_env

<RewardShapingWrapper<Float32Wrapper<Monitor<TimeLimit<OrderEnforcing<PassiveEnvChecker<MujocoFetchPushEnv<FetchPushDense-v4>>>>>>>>

In [29]:
# model = create_and_train_sac_model(env=updated_env)