<br>

<h1 style="text-align:center;">Test RL in OT2</h1>

<br>

## Initial Setup

---

In [1]:
# Standard library imports
import os
import sys
import random
import time
import logging
import numpy as np
import argparse
from collections import deque

# Add custom module paths
sys.path.append("./../task10")
sys.path.append("./../task12")
sys.path.append("./..")

# Third-party imports
import pybullet as p
import gymnasium as gym
import wandb
import torch
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, BaseCallback
from stable_baselines3.common.vec_env import VecNormalize, SubprocVecEnv, DummyVecEnv
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.evaluation import evaluate_policy
from wandb.integration.sb3 import WandbCallback

# Local imports
from callbacks import *
from pid_controller import *  
from train_ot2_rl import *
from ot2_env_wrapper import *

<br>

## Setup Env

---

In [2]:
# Hyperparameter configuration
hyperparams = {
    # Learning parameters
    "learning_rate": 1e-4,          # Increased for faster learning
    "buffer_size": 100000,          # Reduced for faster updates
    "batch_size": 256,              # Keep this
    "tau": 0.005,                   # Slower target updates for stability
    "gamma": 0.995,                 # Slightly higher discount for long-term rewards
    "train_freq": (1, "step"),      # Update every step
    "n_envs": 1,                    # Keep number of environments
    "gradient_steps": 1,            # Match number of environments
    
    # Architecture
    "actor_layers": [512, 512],     # Standard network size
    "critic_layers": [512, 512],
    "activation_fn": "relu",    
    "net_arch": "default",      
    
    # Training
    "total_timesteps": 1_000_000,      # Keep this for now
    "learning_starts": 1000,         # Increase this to collect more initial data
    "ent_coef": "auto_1.0",        # Automatic entropy adjustment
    
    # Environment
    "max_episode_steps": 1_000,      
    "normalize_observations": True,
    "normalize_rewards": True,
    "reward_scale": 1.0,
    "clip_obs": 5.0,
    "clip_rewards": 5.0,
    
    # Evaluation
    "eval_freq": 50_000,         
    "n_eval_episodes": 10,     
    "eval_deterministic": True, 
}

In [3]:
# Create eval environment with normalization stats
eval_env = setup_eval_environment(render=True)

# Load normalization statistics from training
if os.path.exists("./best_model/vec_normalize.pkl"):
    eval_env = VecNormalize.load("./best_model/vec_normalize.pkl", eval_env)
    print("Loaded normalization stats from training")
else:
    print("Warning: Could not find normalization stats file")

# Make sure training=False for evaluation
eval_env.training = False
eval_env.norm_reward = False  # We don't need reward normalization for evaluation

env = eval_env

Loaded normalization stats from training


<br>

## Load Trained Model

---

In [4]:
model = SAC.load(
    "./best_model/best_model.zip",
    env=env,
    device='cpu',
    custom_objects={
        "learning_rate": hyperparams["learning_rate"],
        "buffer_size": hyperparams["buffer_size"],
        "learning_starts": hyperparams["learning_starts"],
        "batch_size": hyperparams["batch_size"],
        "tau": hyperparams["tau"],
        "gamma": hyperparams["gamma"],
        "train_freq": hyperparams["train_freq"],
        "gradient_steps": hyperparams["gradient_steps"],
        "ent_coef": hyperparams["ent_coef"],
    }
)

<br>

## Test RL

---

In [5]:
# Define test positions
goal_positions = [
    # Left position
    np.array([-0.0871, 0.0890, 0.1190]),
    
    # Top-left position
    np.array([-0.0571, 0.1590, 0.1190]),
    
    # Top position
    np.array([0.0730, 0.1790, 0.1190]),
    
    # Top-right position
    np.array([0.1830, 0.1590, 0.1190]),
    
    # Right position
    np.array([0.2130, 0.0890, 0.1190]),
    
    # Bottom-right position
    np.array([0.1830, -0.0810, 0.1190]),
    
    # Bottom position
    np.array([0.0730, -0.1105, 0.1190]),
    
    # Bottom-left position
    np.array([-0.0571, -0.0810, 0.1190])
]

In [6]:
# Define test positions and action smoothing parameters
previous_action = np.zeros(3)  # Initialize previous action
time_delay = 0.1  # 100ms between actions

for goal_pos in goal_positions:
    # Set goal position and reset environment
    env.goal_position = goal_pos
    obs = env.reset()
    previous_action = np.zeros(3)

    # Make simulation slower
    env.dt = 1/1000
    
    while True:
        # Get action from model and smooth it
        action, _ = model.predict(obs, deterministic=True)
        previous_action = action
        
        # Take step in environment
        obs, reward, terminated, truncated = env.step(action)
        time.sleep(time_delay)
        
        # Check if we reached goal or failed
        if terminated:
            obs, info = env.reset()
            print("Goal not reached.")
            break
            
        # Check if we're close enough to goal
        current_pos = obs[:3]
        goal_pos = obs[3:6]
        distance = np.linalg.norm(goal_pos - current_pos)
        if distance < 0.001:
            print("Goal reached!")
            break

Goal reached!
Goal reached!
Goal reached!
Goal reached!
Goal reached!
Goal reached!
Goal reached!
Goal reached!
