### `Soft Actor-Critic` using `Snapbot`

In [1]:
import sys,mujoco
import numpy as np
import matplotlib.pyplot as plt
sys.path.append('KU-DATA403-simulator-tutorials/package/helper/')
sys.path.append('KU-DATA403-simulator-tutorials/package/mujoco_usage/')
sys.path.append('KU-DATA403-simulator-tutorials/package/gym/')
sys.path.append('KU-DATA403-simulator-tutorials/package/rl/')
from mujoco_parser import *
from slider import *
from utility import *
from snapbot_env import *
from sac import *
np.set_printoptions(precision=2,suppress=True,linewidth=100)
plt.rc('xtick',labelsize=6); plt.rc('ytick',labelsize=6)
%config InlineBackend.figure_format = 'retina'
%matplotlib inline
print ("Ready.")

Ready.


#### Parse `Snapbot` gym

In [2]:
xml_path = 'KU-DATA403-simulator-tutorials/asset/snapbot/scene_snapbot.xml'
env = MuJoCoParserClass(name='Snapbot',rel_xml_path=xml_path,verbose=True)
gym = SnapbotGymClass(
    env = env,
    HZ  = 50,
    history_total_sec = 0.2,
    history_intv_sec  = 0.1,
    VERBOSE =True,
)
print ("Ready.")

name:[Snapbot] dt:[0.002] HZ:[500]
n_qpos:[25] n_qvel:[24] n_qacc:[24] n_ctrl:[8]

n_body:[24]
 [0/24] [world] mass:[0.00]kg
 [1/24] [torso] mass:[0.24]kg
 [2/24] [Camera_module_1] mass:[0.06]kg
 [3/24] [Camera_module_2] mass:[0.06]kg
 [4/24] [Leg_module_1_1] mass:[0.06]kg
 [5/24] [Leg_module_1_2] mass:[0.08]kg
 [6/24] [Leg_module_1_3] mass:[0.02]kg
 [7/24] [Leg_module_1_4] mass:[0.01]kg
 [8/24] [Leg_module_1_4bar] mass:[0.01]kg
 [9/24] [Leg_module_2_1] mass:[0.06]kg
 [10/24] [Leg_module_2_2] mass:[0.08]kg
 [11/24] [Leg_module_2_3] mass:[0.02]kg
 [12/24] [Leg_module_2_4] mass:[0.01]kg
 [13/24] [Leg_module_2_4bar] mass:[0.01]kg
 [14/24] [Leg_module_4_1] mass:[0.06]kg
 [15/24] [Leg_module_4_2] mass:[0.08]kg
 [16/24] [Leg_module_4_3] mass:[0.02]kg
 [17/24] [Leg_module_4_4] mass:[0.01]kg
 [18/24] [Leg_module_4_4bar] mass:[0.01]kg
 [19/24] [Leg_module_5_1] mass:[0.06]kg
 [20/24] [Leg_module_5_2] mass:[0.08]kg
 [21/24] [Leg_module_5_3] mass:[0.02]kg
 [22/24] [Leg_module_5_4] mass:[0.01]kg
 [

#### `SAC` hyperparameters

In [None]:
n_episode         = 700 # number of total episodes (rollouts)
max_epi_sec       = 3.0 # maximum episode length in second (IMPORTANT)
max_epi_tick      = int(max_epi_sec*gym.HZ) # maximum episode length in tick
n_warmup_epi      = 10 # number of warm-up episodes
buffer_limit      = 50000 # 50000
buffer_warmup     = buffer_limit // 5
init_alpha        = 0.1
max_torque        = 2.0
# Update
lr_actor          = 0.0004 # 0.0002 # 0.0005
lr_alpha          = 0.0003 # 0.0003
lr_critic         = 0.0001
n_update_per_tick = 1 # number of updates per tick
batch_size        = 256
gamma             = 0.95
tau               = 0.005
# Debug
print_every       = 1
eval_every        = 1
save_every        = 50
RENDER_EVAL       = False # False
print ("n_episode:[%d], max_epi_sec:[%.2f], max_epi_tick:[%d]"%
       (n_episode,max_epi_sec,max_epi_tick))
print ("n_warmup_epi:[%d], buffer_limit:[%.d], buffer_warmup:[%d]"%
       (n_warmup_epi,buffer_limit,buffer_warmup))

n_episode:[1000], max_epi_sec:[3.00], max_epi_tick:[150]
n_warmup_epi:[10], buffer_limit:[50000], buffer_warmup:[10000]


#### Initialize networks

In [4]:
device = 'cpu' # cpu / mps / cuda
replay_buffer = ReplayBufferClass(buffer_limit, device=device)
actor_arg = {'obs_dim':gym.o_dim,'h_dims':[256,256],'out_dim':gym.a_dim,
             'max_out':max_torque,'init_alpha':init_alpha,'lr_actor':lr_actor,
             'lr_alpha':lr_alpha,'device':device}
critic_arg = {'obs_dim':gym.o_dim,'a_dim':gym.a_dim,'h_dims':[256,256],'out_dim':1,
              'lr_critic':lr_critic,'device':device}
actor           = ActorClass(**actor_arg).to(device)
critic_one      = CriticClass(**critic_arg).to(device)
critic_two      = CriticClass(**critic_arg).to(device)
critic_one_trgt = CriticClass(**critic_arg).to(device)
critic_two_trgt = CriticClass(**critic_arg).to(device)
print ("Ready.")

Ready.


In [5]:
# Modify floor friction priority
env.model.geom('floor').priority = 1 # 0=>1
print ("Floor priority:%s"%(env.model.geom('floor').priority))
gym.env.ctrl_ranges[:,0] = -max_torque
gym.env.ctrl_ranges[:,1] = +max_torque
print ("gym.env.ctrl_ranges:\n",gym.env.ctrl_ranges)

Floor priority:[1]
gym.env.ctrl_ranges:
 [[-2.  2.]
 [-2.  2.]
 [-2.  2.]
 [-2.  2.]
 [-2.  2.]
 [-2.  2.]
 [-2.  2.]
 [-2.  2.]]


In [None]:
def reward_long_jump(info):
    x_pos = gym.env.get_p_body('torso')[0]  # Horizontal distance (primary reward)
    z_pos = gym.env.get_p_body('torso')[2]  # Height (secondary, to ensure proper jump)
    
    x_vel = gym.env.get_qvel()[0]  # Forward velocity
    z_vel = gym.env.get_qvel()[2]  # Vertical velocity
    
    qvel = gym.env.get_qvel()  # Joint velocities
    qpos = gym.env.get_qpos()  # Joint positions
    
    # Penalize excessive rotation (keep torso stable)
    orientation_penalty = np.linalg.norm(qpos[3:6])  # Penalize pitch/roll/yaw
    
    # Check if agent is airborne (no ground contact)
    contact = gym.env.get_contact_info()
    airborne = not any(contact)
    
    reward = 0.0

   
    reward += 50.0 * x_pos  # Main reward for forward progress
    
    #upward thrust
    if airborne and z_pos < 0.5:  # Early jump phase
        reward += 20.0 * max(0, z_vel)  # Reward upward thrust
    
    #forward velocity and yaw
    reward += 30.0 * max(0, x_vel)  
    if airborne:
        target_yaw = 45.0
        yaw_diff = abs(gym.env.get_qpos()[6] - np.radians(target_yaw))
        reward += 25.0 * max(0, 1 - yaw_diff / np.radians(30)) 
    
    #decent stability
    reward -= 7.50 * orientation_penalty  
    
    # bonus landing after long jump
    if not airborne and x_pos > 1.0:  # Only reward if jumped far
        reward += 100.0  # Big bonus for successful long jump
    
    #make sure height is reasonable)
    if z_pos < 0.4 and z_pos > 0.2:  # Too high = inefficient for long jump
        reward += 25.0 * z_pos
    
    # 6. Leg drive efficiency (encourage synchronized rear-leg push)
    rear_joint_vel = np.array([qvel[21], qvel[22]])  # Rear leg joints
    rear_actuators = np.array(gym.env.get_ctrl(['actuator_5_2', 'actuator_5_3']))
    spring_effort = np.dot(np.abs(rear_joint_vel), np.abs(rear_actuators))
    reward += 15.0 * spring_effort  # Reward coordinated leg drive
    
    # Clip reward for training stability
    reward = np.clip(reward, -10.0, 500.0)  
    
    return reward

#### Train using `SAC`

In [None]:
REMOVE_PREV_FILES = True # remove previous files
TRACK_BEST_EPISODE = True
best_longest_jump = -np.inf
best_episode_data = None
best_episode_idx = 0


# Loop
np.random.seed(seed=0) # fix seed
print ("Start training.")
for epi_idx in range(n_episode+1): # for each episode
    zero_to_one = epi_idx/n_episode
    one_to_zero = 1-zero_to_one

    if TRACK_BEST_EPISODE:
        current_episode_actions = []
    # Reset gym
    s = gym.reset()

    # Loop
    USE_RANDOM_POLICY = (np.random.rand()<(0.1*one_to_zero)) or (epi_idx < n_warmup_epi)
    reward_total,reward_forward = 0.0,0.0
    longest_jump = 0.0
    for tick in range(max_epi_tick): # for each tick in an episode
        if USE_RANDOM_POLICY:
            a_np = gym.sample_action()
        else:
            a,log_prob = actor(np2torch(s,device=device))
            a_np = torch2np(a)
        # Step
        if TRACK_BEST_EPISODE:
            current_episode_actions.append(a_np.copy())
            
        s_prime,_,done,info = gym.step(a_np,max_time=max_epi_sec)
        reward = reward_long_jump(info)  #using long jump reward            
        replay_buffer.put((s,a_np,reward,s_prime,done))
        reward_total += reward 
        reward_forward += info['r_forward']
        s = s_prime

        # compute z_diff
        x_diff = gym.env.get_p_body('torso')[0]
        if x_diff > longest_jump:
            longest_jump = x_diff
        if done is True: break # terminate condition
        
        # Replay buffer
        if replay_buffer.size() > buffer_warmup:
             for _ in range(n_update_per_tick): 
                mini_batch = replay_buffer.sample(batch_size)
                # Update critics
                td_target = get_target(
                    actor,
                    critic_one_trgt,
                    critic_two_trgt,
                    gamma      = gamma,
                    mini_batch = mini_batch,
                    device     = device,
                )
                critic_one.train(td_target,mini_batch)
                critic_two.train(td_target,mini_batch)
                # Update actor
                actor.train(
                    critic_one,
                    critic_two,
                    target_entropy = -gym.a_dim,
                    mini_batch     = mini_batch,
                )
                # Soft update of critics
                critic_one.soft_update(tau=tau,net_target=critic_one_trgt)
                critic_two.soft_update(tau=tau,net_target=critic_two_trgt)

    # Compute x_diff
    x_diff = gym.env.get_p_body('torso')[0]

    # Print
    if (epi_idx%print_every)==0:
        epi_tick = tick
        print ("[%d/%d][%.1f%%]"%(epi_idx,n_episode,100.0*(epi_idx/n_episode)))
        print ("  reward:[%.1f] x_diff:[%.3f] epi_len:[%d/%d]"%
               (reward_total,x_diff,epi_tick,max_epi_tick))
               # replay_buffer.size(),actor.log_alpha.exp(),z_diff))
    
    # Evaluation
    if (epi_idx%eval_every)==0:
        if RENDER_EVAL: gym.init_viewer()
        s = gym.reset()
        reward_total = 0.0
        for tick in range(max_epi_tick):
            a,_ = actor(np2torch(s,device=device),SAMPLE_ACTION=False)
            s_prime,reward,done,info = gym.step(torch2np(a),max_time=max_epi_sec)
            reward_total += reward
            if RENDER_EVAL and ((tick%5) == 0):
                gym.render(
                    TRACK_TORSO      = True,
                    PLOT_WORLD_COORD = True,
                    PLOT_TORSO_COORD = True,
                    PLOT_SENSOR      = True,
                    PLOT_CONTACT     = True,
                    PLOT_TIME        = True,
                )
            s = s_prime
            if RENDER_EVAL:
                if not gym.is_viewer_alive(): break
        if RENDER_EVAL: gym.close_viewer()
        x_diff = gym.env.get_p_body('torso')[0]
        z_diff = gym.env.get_p_body('torso')[2]
        if x_diff > longest_jump:
            longest_jump = x_diff
        print ("  [Eval] reward:[%.3f] x_diff:[%.3f] epi_len:[%d/%d] longest_jump:[%.3f]"%
               (reward_total,x_diff,tick,max_epi_tick,longest_jump))
        
    if TRACK_BEST_EPISODE and longest_jump > best_longest_jump:
        best_longest_jump = longest_jump
        best_episode_actions = current_episode_actions.copy()
        best_episode_idx = epi_idx
        best_pth_path = './result/weights/sac_%s/longjump/best_longjump%d.pth'%(gym.name.lower(),epi_idx)
        torch.save(actor.state_dict(),best_pth_path)
        print(f" NEW BEST JUMP: {best_longest_jump:.3f} (Episode {epi_idx})")

        # Modify the training loop section
    if TRACK_BEST_EPISODE and longest_jump > best_longest_jump:
        best_longest_jump = longest_jump
        best_episode_actions = current_episode_actions.copy()
        best_episode_idx = epi_idx
        best_pth_path = './result/weights/sac_%s/longjump/best_longjump%d.pth'%(gym.name.lower(),epi_idx)
        torch.save(actor.state_dict(),best_pth_path)
    
    # Save video of the best episode
        gym.init_viewer()
        s = gym.reset()
        frames = []
        for action in best_episode_actions:
            s_prime, _, _, _ = gym.step(action)
            frame = gym.render(
                TRACK_TORSO=True,
                PLOT_WORLD_COORD=True,
                PLOT_TORSO_COORD=True,
                PLOT_SENSOR=True,
                PLOT_CONTACT=True,
                PLOT_TIME=True,
                RETURN_FRAME=True  # Add this parameter to your render function
            )
            frames.append(frame)
            if not gym.is_viewer_alive():
                break
        gym.close_viewer()
    
    # Save frames as MP4
        video_path = f'./result/videos/sac_{gym.name.lower()}/longjump/best_jump_{epi_idx}.mp4'
        os.makedirs(os.path.dirname(video_path), exist_ok=True)
    
    # Using imageio to save as MP4
        with imageio.get_writer(video_path, fps=30) as writer:
            for frame in frames:
                writer.append_data(frame)
    
        print(f" NEW BEST JUMP: {best_longest_jump:.3f} (Episode {epi_idx})")
        print(f"  [Save] Best jump video saved to {video_path}")

    # Save network
if (epi_idx%save_every)==0:
    pth_path = './result/weights/sac_%s/longjump/episode_%d.pth'%(gym.name.lower(),epi_idx)
    dir_path = os.path.dirname(pth_path)
    if not os.path.exists(dir_path): os.makedirs(dir_path)
    if (epi_idx == 0) and REMOVE_PREV_FILES: # remove all existing files
        files = os.listdir(path=dir_path)
        print ("  [Save] Remove existing [%d] pth files."%(len(files)))
        for file in files: os.remove(os.path.join(dir_path,file))
    torch.save(actor.state_dict(),pth_path)
    print ("  [Save] [%s] saved."%(pth_path))
    
    # Save best episode
if TRACK_BEST_EPISODE and best_episode_actions:
        best_action_path = './result/weights/sac_%s/longjump/best_jump_actions.pth'%gym.name.lower()
        torch.save({
            'episode_idx': best_episode_idx,
            'longest_jump': best_longest_jump,
            'actions': best_episode_actions
        }, best_pth_path)
        print ("  [Save] Best jump actions saved to [%s]."%(best_action_path))

#def replay_best_episode():
  #  best_pth_path = './result/weights/sac_%s/longjump/best_jump_actions.pth' % gym.name.lower()
   # if not os.path.exists(best_pth_path):
    #    print("No best episode found")
     #   return
    
    #best_data = torch.load(best_pth_path)
    #print(f"Replaying best jump (Length: {best_data['longest_jump']:.3f} from Episode {best_data['episode_idx']})")
    
    #gym.init_viewer()
    #s = gym.reset()
    
    #for action in best_data['actions']:
    #    s_prime, _, _, _ = gym.step(action)
     #   gym.render(
     #       TRACK_TORSO=True,
     #       PLOT_WORLD_COORD=True,
      #      PLOT_TORSO_COORD=True,
      ##      PLOT_SENSOR=True,
      #      PLOT_CONTACT=True,
         #   PLOT_TIME=True,
       # )
        #if not gym.is_viewer_alive():
        #    break
    
    #gym.close_viewer()

# After training completes
if TRACK_BEST_EPISODE:
    print(f"\nTraining complete! Best jump: {best_longest_jump:.3f} (Episode {best_episode_idx})")
    #replay_best_episode()  # Automatically play the best episode

    
print ("Done.")



Start training.
[0/1000][0.0%]
  reward:[2114.6] x_diff:[0.014] epi_len:[149/150]
  [Eval] reward:[0.257] x_diff:[-0.021] epi_len:[149/150] longest_jump:[0.092]
 NEW BEST JUMP: 0.092 (Episode 0)
  [Save] Remove existing [37] pth files.
  [Save] [./result/weights/sac_snapbot/longjump/episode_0.pth] saved.
  [Save] Best jump actions saved to [./result/weights/sac_snapbot/longjump/best_jump_actions.pth].
[1/1000][0.1%]
  reward:[2371.5] x_diff:[0.016] epi_len:[149/150]
  [Eval] reward:[0.257] x_diff:[-0.021] epi_len:[149/150] longest_jump:[0.024]
[2/1000][0.2%]
  reward:[3125.1] x_diff:[-0.045] epi_len:[149/150]
  [Eval] reward:[0.257] x_diff:[-0.021] epi_len:[149/150] longest_jump:[0.063]
[3/1000][0.3%]
  reward:[1733.1] x_diff:[-0.406] epi_len:[149/150]
  [Eval] reward:[0.257] x_diff:[-0.021] epi_len:[149/150] longest_jump:[0.043]
[4/1000][0.4%]
  reward:[2264.3] x_diff:[0.068] epi_len:[149/150]
  [Eval] reward:[0.257] x_diff:[-0.021] epi_len:[149/150] longest_jump:[0.176]
 NEW BEST JUM

KeyboardInterrupt: 

In [None]:
print(best_longest_jump)
print(best_episode_idx)
print(best_episode_actions)

1.12235968622594
1652
[array([-2.  , -2.  , -2.  ,  0.99,  2.  ,  1.99,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -1.54,  1.99, -1.97, -2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -1.98,  0.54,  1.99,  2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -2.  , -0.18,  2.  ,  2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -2.  , -1.54,  2.  ,  1.99,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -2.  , -1.99,  2.  ,  2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -2.  , -1.66,  2.  ,  2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  , -2.  , -1.76,  1.89,  2.  ,  2.  , -2.  ], dtype=float32), array([-1.95, -2.  , -2.  , -1.99,  2.  ,  2.  ,  2.  , -2.  ], dtype=float32), array([-2.  , -2.  ,  1.97,  2.  , -1.98, -2.  ,  2.  ,  2.  ], dtype=float32), array([-2., -2.,  2.,  2., -2., -2.,  2.,  2.], dtype=float32), array([-2.  , -2.  ,  2.  ,  2.  , -1.97, -2.  ,  2.  ,  2.  ], dtype=float32), array([-2.  , -2.  ,  2.  ,  2.  

In [None]:
# Configuration
max_epi_sec  = 15.0 # maximum episode length in second
max_epi_tick = int(max_epi_sec*gym.HZ) # maximum episode length in tick
# Actor
device     = 'cpu' # cpu / mps / cuda
max_torque = 2.0
init_alpha = 0.1
lr_actor   = 0.0004
lr_alpha   = 0.0003
actor = ActorClass(
    obs_dim    = gym.o_dim,
    h_dims     = [256,256],
    out_dim    = gym.a_dim,
    max_out    = max_torque,
    init_alpha = init_alpha,
    lr_actor   = lr_actor,
    lr_alpha   = lr_alpha,
    device     = device,
).to(device)

max_length = 0.0 
# Load pth
pth_path = './result/weights/sac_%s/longjump/best_longjump%d.pth'%(gym.name.lower(),epi_idx)#'./result/weights/sac_%s/longjump/best_longjump%d.pth'%(gym.name.lower(),epi_idx)
actor.load_state_dict(torch.load(pth_path,map_location=device))
# Run
gym.init_viewer()
s = gym.reset()
gym.viewer_pause() # pause
print ("   Viewer paused. Press [space] to resume.")
reward_total = 0.0
for tick in range(max_epi_tick):
    a,_ = actor(np2torch(s,device=device),SAMPLE_ACTION=False)
    s_prime,reward,done, info = gym.step(torch2np(a),max_time=max_epi_sec)
    gym.render(
        TRACK_TORSO      = True,
        PLOT_WORLD_COORD = True,
        PLOT_TORSO_COORD = True,
        PLOT_SENSOR      = True,
        PLOT_CONTACT     = True,
        PLOT_TIME        = True,
    )
    reward_total += reward
    s = s_prime
    if not gym.is_viewer_alive(): break
gym.close_viewer()
x_diff = gym.env.get_p_body('torso')[0]
z_diff = gym.env.get_p_body('torso')[2]
if x_diff > max_length:
        max_height = x_diff
print ("  [Eval] reward:[%.3f] x_diff:[%.3f] epi_len:[%d/%d]"%
       (reward_total,x_diff,tick,max_epi_tick))