In [2]:
import re
import cv2
import matplotlib.pyplot as plt
import time
import numpy as np
import torch
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
import json
import PIL
import google.generativeai as genai

from scripts.robocontrol import RobotController
from lerobot.common.robot_devices.cameras.configs import OpenCVCameraConfig
from lerobot.common.robot_devices.control_utils import busy_wait
from scripts.image_utils import tensor_to_pil, display_images

In [4]:
class MockPolicy:
    """Mock policy that generates smooth movements for a specified joint within its range"""
    def __init__(self, joint='s5', cycle_steps=100):
        self.cycle_steps = cycle_steps
        self.step_counter = 0
        self.joint = joint
        
        # Joint ranges and indices
        self.joint_ranges = {
            's0': {'idx': 0, 'min': -50, 'max': 50, 'name': 'shoulder_pan'},
            's1': {'idx': 1, 'min': 40, 'max': 180, 'name': 'shoulder_lift'},
            's2': {'idx': 2, 'min': 30, 'max': 160, 'name': 'elbow_flex'},
            's3': {'idx': 3, 'min': -30, 'max': 90, 'name': 'wrist_flex'},
            's4': {'idx': 4, 'min': -90, 'max': 90, 'name': 'wrist_roll'},
            's5': {'idx': 5, 'min': 0, 'max': 100, 'name': 'gripper'}
        }
        
        # Get range for selected joint
        self.joint_info = self.joint_ranges[joint]
        self.joint_idx = self.joint_info['idx']
        
        # Calculate movement parameters
        self.mid_value = (self.joint_info['max'] + self.joint_info['min']) / 2
        self.amplitude = (self.joint_info['max'] - self.joint_info['min']) / 2
        
        print(f"Testing {self.joint_info['name']} (joint {joint})")
        print(f"Range: {self.joint_info['min']} to {self.joint_info['max']}")
        
    def select_action(self, observation):
        """Generate smooth sinusoidal movements for selected joint"""
        # Get current state
        for key, value in observation.items():
            if isinstance(value, torch.Tensor) and len(value.shape) == 1 and value.shape[0] == 6:
                current_state = value
                break
        
        # Create action: copy current state but modify selected joint
        action = current_state.clone()
        
        # Calculate smooth sinusoidal movement
        phase = 2 * np.pi * (self.step_counter % self.cycle_steps) / self.cycle_steps
        joint_value = self.mid_value + self.amplitude * np.sin(phase)
        
        # Set joint value
        action[self.joint_idx] = joint_value
        
        self.step_counter += 1
        return action.unsqueeze(0)

In [5]:
def process_observation(observation, action=None, save_path=None, step=None):
    """
    Process, display and optionally save robot observation data.
    
    Args:
        observation: Robot observation dictionary containing state and images
        action: Optional action tensor
        save_path: Optional path to save images and data
        step: Optional step number for saving files
    """

    
    # Get state
    state = None
    for key, value in observation.items():
        if isinstance(value, torch.Tensor) and len(value.shape) == 1 and value.shape[0] == 6:
            state = value.numpy()
            break
    
    # Print state and action first
    print(f"\nStep {step}")
    print(f"State: {state}")
    if action is not None:
        print(f"Action: {action.numpy()}")
    
    # Count valid images
    image_items = [(k, v) for k, v in observation.items() if 'image' in k]
    num_cameras = len(image_items)
    
    # Only process images if we have any
    if num_cameras > 0:
        plt.figure(figsize=(15, 5))
        
        for i, (key, value) in enumerate(image_items):
            # Get camera name
            cam_name = key.split('.')[-1]
            
            # Display image
            plt.subplot(1, num_cameras, i+1)
            img = value.numpy()
            plt.imshow(img)
            plt.title(f"Camera: {cam_name}\nGripper: {state[-2]:.2f}")
            plt.axis('off')
            
            # Save image if path provided
            if save_path is not None:
                # Create directories if they don't exist
                img_dir = os.path.join(save_path, 'images')
                os.makedirs(img_dir, exist_ok=True)
                
                # Save image
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                img_filename = f"step_{step:04d}_{cam_name}_{timestamp}.jpg"
                img_path = os.path.join(img_dir, img_filename)
                plt.imsave(img_path, img)
                
                # Save state and action data
                data_dict = {
                    'step': step,
                    'timestamp': timestamp,
                    'camera': cam_name,
                    'image_path': img_filename,
                }
                
                # Add state values
                for j, s in enumerate(state):
                    data_dict[f'state_{j}'] = s
                
                # Add action values if present
                if action is not None:
                    for j, a in enumerate(action):
                        data_dict[f'action_{j}'] = a.item()
                
                # Save to CSV
                csv_path = os.path.join(save_path, 'observation_data.csv')
                df = pd.DataFrame([data_dict])
                if os.path.exists(csv_path):
                    df.to_csv(csv_path, mode='a', header=False, index=False)
                else:
                    df.to_csv(csv_path, index=False)
        
        plt.tight_layout()
        plt.show()



In [7]:
# Create camera config using proper config objects
cameras = {
    "laptop": OpenCVCameraConfig(
        camera_index=0,  # Built-in webcam
        fps=30,
        width=640,
        height=480
    ),
    "phone": OpenCVCameraConfig(
        camera_index=1,  # iPhone camera
        fps=30,
        width=640,
        height=480
    )
}

In [None]:
# Get test observation from robot
controller = RobotController(
    robot_type='so100',
    device='cpu',
    fps=30,
    cameras=cameras
)

In [46]:
controller.disconnect()