In [None]:
"""
This is a script to verify the camera observations of the real robot. We assume that you are using deoxys_vision for capturing images. If you are using a different vision pipeline, please modify the code accordingly. 
"""
import argparse
import os
import cv2
import plotly.graph_objs as go
import matplotlib.pyplot as plt

from easydict import EasyDict
from deoxys import config_root
from deoxys.franka_interface import FrankaInterface
from deoxys.utils import YamlConfig
from deoxys.utils.input_utils import input2action
from deoxys.utils.io_devices import SpaceMouse
from deoxys.utils.log_utils import get_deoxys_example_logger

from deoxys_vision.networking.camera_redis_interface import \
    CameraRedisSubInterface
from deoxys_vision.utils.calibration_utils import load_default_extrinsics, load_default_intrinsics
from deoxys_vision.utils.camera_utils import assert_camera_ref_convention, get_camera_info
from deoxys_vision.utils.img_utils import save_depth_in_rgb

from orion.algos.grounded_sam_wrapper import GroundedSamWrapper
from PIL import Image
import numpy as np
from orion.utils.misc_utils import depth_to_rgb, save_depth
from orion.utils.o3d_utils import O3DPointCloud

from orion.utils.real_robot_utils import RealRobotObsProcessor

wrapper = GroundedSamWrapper()

def capture_image():
    # Make sure that you've launched camera nodes somewhere else
    observation_cfg = YamlConfig("./configs/real_robot_observation_cfg.yml").as_easydict()

    observation_cfg.cameras = []
    for camera_ref in observation_cfg.camera_refs:
        assert_camera_ref_convention(camera_ref)
        camera_info = get_camera_info(camera_ref)

        observation_cfg.cameras.append(camera_info)

    obs_processor = RealRobotObsProcessor(observation_cfg,
                                            processor_name="ImageProcessor")
    obs_processor.load_intrinsic_matrix(resize=False)
    obs_processor.load_extrinsic_matrix()
    extrinsic_matrix = obs_processor.get_extrinsic_matrix("agentview")
    intrinsic_matrix = obs_processor.get_intrinsic_matrix("agentview")

    color_imgs, depth_imgs = obs_processor.get_original_imgs()
    return color_imgs[0], depth_imgs[0]

In [None]:
import os
import json
import yaml

from orion.utils.misc_utils import get_palette

def load_from_runtime_config():
    with open("experiments/run_time.json") as f:
        run_time_config = json.load(f)
    return run_time_config


with open("experiments/experiment_cfg.yaml", "r") as f:
    cfg = yaml.load(f, Loader=yaml.FullLoader)
task_mapping = cfg["task_mapping"]
task_id_mapping = cfg["task_id_mapping"]
model_mapping = cfg["model_mapping"]
demo_mapping = cfg["demo_mapping"]
result_mapping = cfg["result_mapping"]

runtime_config = load_from_runtime_config()
# print(runtime_config)
runtime_folder = runtime_config["runtime_folder"]

task_id = runtime_folder.split("/")[-3]
model_id = runtime_folder.split("/")[-2]
# captialize all letters in model_id
model_id = model_id.upper()
run_id = runtime_folder.split("/")[-1]

task_name_str = task_id_mapping[task_id]

rollout_name = f"{task_name_str}_{model_id}_{run_id}"

rollout_prefix = "annotations/rollout"
rollout_folder = f"{rollout_prefix}/{rollout_name}"

human_video_annotation_folder = f"annotations/human_demo/{demo_mapping[task_id]}"
os.path.exists(human_video_annotation_folder)

with open(os.path.join(human_video_annotation_folder, "config.json")) as f:
    config_info = json.load(f)

args = EasyDict({
    "rollout_name": rollout_name,
    "text": config_info["text"]
})

print(f"Annotations to {rollout_name} with text: {args.text}")

args = EasyDict(args)
color_img, depth_img = capture_image()

annotation_path = os.path.join("annotations", "rollout", args.rollout_name)
os.makedirs(annotation_path, exist_ok=True)

captured_rgb_name = os.path.join(annotation_path, f"{args.rollout_name}_rgb.jpg")
captured_depth_name = os.path.join(annotation_path, f"depth.png")
frame_path = os.path.join(annotation_path, f"frame.jpg")

cv2.imwrite(captured_rgb_name, color_img)
cv2.imwrite(frame_path, color_img)
save_depth_in_rgb(captured_depth_name, depth_img)
image_source = Image.fromarray(color_img).convert("RGB")
image = np.asarray(image_source)
final_mask_image = wrapper.segment(image, args.text, 
                                #    box_threshold=0.3, 
                                #    text_threshold=0.25, 
                                #    filter_threshold=50
                                   )

from scipy.ndimage import label, binary_erosion, binary_dilation
final_mask_image_array = np.array(final_mask_image)
separated_mask, _ = label(final_mask_image_array)


final_mask_image = Image.fromarray(separated_mask).convert("L")
segmentaiton_annotation_path = os.path.join(annotation_path, "frame_annotation.png")
final_mask_image.putpalette(get_palette())
final_mask_image.save(segmentaiton_annotation_path)
print(f"Save to {segmentaiton_annotation_path}")
plt.imshow(np.array(final_mask_image))
plt.show()
