In [None]:
!pip install num2words

In [None]:
import random
import numpy as np
import os
import torch
import json
from PIL import Image
from src.env.env import RILAB_OMY_ENV

from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.common.datasets.utils import dataset_to_policy_features
from lerobot.configs.types import FeatureType
import glfw
from lerobot.common.policies.smolvla.configuration_smolvla import SmolVLAConfig
from lerobot.common.policies.smolvla.modeling_smolvla import SmolVLAPolicy

In [None]:
'''
Configuration
'''
ROOT = "./dataset/transformed_data2"
device = 'cuda'

# Environment Configuration
action_type = 'joint'  # Options: 'joint','delta_joint, 'delta_eef_pose', 'eef_pose'
proprio_type = 'joint_pos' # Options: 'joint_pos', 'eef_pose'
observation_type = 'image' # options: 'image', 'object_pose'

# Evaluation Configuration
TEST_EPISODES = 20
MAX_EPISODE_STEPS = 10_000
IMAGE_NOISE = True

dataset_metadata = LeRobotDatasetMetadata("transformed_data", root=ROOT)
features = dataset_to_policy_features(dataset_metadata.features)
output_features = {key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION}
input_features = {key: ft for key, ft in features.items() if key not in output_features}

kwargs = {}
# kwargs['force_download'] = True
kwargs["dataset_stats"] = dataset_metadata.stats

In [None]:
policy = SmolVLAPolicy.from_pretrained("Jeongeun/smolvla_deep_learning_2025_joint", **kwargs)
policy.to(device)

In [None]:
'''
Load environment configuration and initialize environments
'''


config_file_path = './configs/train2.json'
with open(config_file_path) as f:
    env_conf = json.load(f)
omy_env = RILAB_OMY_ENV(cfg=env_conf, seed=0, 
                        action_type=action_type, 
                        obs_type=proprio_type,
                        vis_mode = 'teleop')

In [None]:
from torchvision import transforms
# Approach 1: Using torchvision.transforms
def get_default_transform(image_size: int = 256):
    """
    Returns a torchvision transform that:
     Converts to a FloatTensor and scales pixel values [0,255] -> [0.0,1.0]
    """
    return transforms.Compose([
        transforms.ToTensor(),  # PIL [0–255] -> FloatTensor [0.0–1.0], shape C×H×W
    ])
IMG_TRANSFORM = get_default_transform()

In [None]:
def run_one_episode():
    omy_env.reset()
    policy.reset()
    if IMAGE_NOISE:
        omy_env.agument_object_random_color()
    observation = omy_env.get_observation()
    omy_env.env.tick = 0
    while omy_env.env.is_viewer_alive() and omy_env.env.tick < MAX_EPISODE_STEPS:
        omy_env.step_env()
        if omy_env.env.loop_every(HZ = 20):
            success = omy_env.check_success()
            if success: break
            if omy_env.env.is_key_pressed_once(glfw.KEY_Z):
                break  # for debugging: press 'z' to end the episode
            agent_image, wrist_image = omy_env.grab_image(return_side=False)
            # # resize to 256x256
            frame = {
                "observation.state": observation,
            }
            if observation_type == 'image':
                agent_image = Image.fromarray(agent_image)
                wrist_image = Image.fromarray(wrist_image)
                agent_image = agent_image.resize((256, 256))
                wrist_image = wrist_image.resize((256, 256))
                agent_image = IMG_TRANSFORM(agent_image)
                wrist_image = IMG_TRANSFORM(wrist_image)
                frame["observation.image"] = agent_image
                frame["observation.wrist_image"] = wrist_image
            else:
                obj_states, recp_q_poses = omy_env.get_object_pose(pad=10)
                # extract position and rpy
                # sort the object names in alphabetical order to maintain consistency
                obj_states_sorted = np.zeros((24,), dtype=np.float32)
                sorted_indices = np.argsort(obj_states['names'])
                for i, idx in enumerate(sorted_indices):
                    if 'pad' in obj_states['names'][idx]:
                        continue
                    obj_state = obj_states['poses'][idx]
                    obj_states_sorted[i*6:(i+1)*6] = obj_state
                # sort the receptacle q states in alphabetical order
                recp_q_poses_sorted = np.zeros((3,), dtype=np.float32)
                sorted_indices = np.argsort(recp_q_poses['names'])
                i = 0
                for _, idx in enumerate(sorted_indices):
                    if 'pad' in recp_q_poses['names'][idx]:
                        continue
                    recp_q_pose = recp_q_poses['poses'][idx]
                    recp_q_poses_sorted[i] = recp_q_pose
                    i += 1
                obj_states_final = np.concatenate([obj_states_sorted, recp_q_poses_sorted])
                frame["observation.environment_state"] = obj_states_final
            # numpy to torch
            frame = {k: torch.tensor(v, dtype=torch.float32).unsqueeze(0).to(device) for k, v in frame.items()}
            frame['task'] = [env_conf['language_instruction']]
            action = policy.select_action(frame)
            action = action.squeeze(0).cpu().numpy()
            observation = omy_env.step(action, gripper_mode='continuous')
            omy_env.render()
    return success

In [None]:
import pandas as pd
results = []
for episode in range(TEST_EPISODES):
    success = run_one_episode()
    results.append(success)
    print(f"Episode {episode+1}/{TEST_EPISODES} - Success: {success}")
omy_env.env.close_viewer()
# log average success rate
avg_success = sum(results) / len(results)
print(f"Average Success Rate over {TEST_EPISODES} episodes: {avg_success*100:.2f}%")
