In [None]:
### Basic Imports ###
import builtins, csv, datetime, os, subprocess, time, types, pprint
from time import sleep
from pprint import pprint
now = time.time

### Special Imports ###
import numpy as np
from numpy import sqrt

import py_trees
from py_trees.behaviour import Behaviour
from py_trees.common import Status
from py_trees.composites import Selector, Sequence
from py_trees.decorators import FailureIsSuccess
import py_trees.console as console

### Local Imports ###
from magpie.poses import pose_error

In [None]:
########## BASE CLASS ##############################################################################

class BasicBehavior( Behaviour ):
    """ Abstract class for repetitive housekeeping """
    
    
    def __init__( self, name = None, ctrl = None ):
        """ Set name to the child class name unless otherwise specified """
        if name is None:
            super().__init__( name = str( self.__class__.__name__  ) )
        else:
            super().__init__( name = name )
        self.ctrl = ctrl
        self.logger.debug( f"[{self.name}::__init__()]" )
        if self.ctrl is None:
            self.logger.warn( f"{self.name} is NOT conntected to a robot controller!" )

        
    def setup(self):
        self.logger.debug( f"[{self.name}::setup()]" )          
        
        
    def initialise( self ):
        """ Run first time behaviour is ticked or not RUNNING.  Will be run again after SUCCESS/FAILURE. """
        self.status = Status.RUNNING # Do not let the behavior idle in INVALID
        self.logger.debug( f"[{self.name}::initialise()]" )          

        
    def terminate( self, new_status ):
        """ Log how the behavior terminated """
        self.status = new_status
        self.logger.debug( f"[{self.name}::terminate().terminate()][{self.status}->{new_status}]" )
        
        
    def update( self ):
        """ Return true in all cases """
        self.status = py_trees.common.Status.SUCCESS
        return self.status

In [None]:
##### Move_Q #####################################

class Move_Q( BasicBehavior ):
    """ Move the joint config `qPos` """
    
    def __init__( self, qPos, name = None, ctrl = None, rotSpeed = 1.05, rotAccel = 1.4, asynch = True ):
        """ Set the target """
        # NOTE: Asynchronous motion is closest to the Behavior Tree paradigm, Avoid blocking!
        super().__init__( name, ctrl )
        self.qPos     = qPos
        self.rotSpeed = rotSpeed
        self.rotAccel = rotAccel
        self.asynch   = asynch
    
    
    def initialise( self ):
        """ Actually Move """
        super().initialise()
        self.ctrl.moveJ( self.qPos, self.rotSpeed, self.rotAccel, self.asynch )
    
    
    def update( self ):
        """ Return SUCCESS if the target reached """
        if self.ctrl.p_moving():
            self.status = Status.RUNNING
        else:
            error = np.subtract( self.qPos, self.ctrl.get_joint_angles() )
            error = error.dot( error )
            if( error > 0.1 ):
                self.status = Status.FAILURE
            else:
                self.status = Status.SUCCESS 
        return self.status

In [None]:
##### Open_Hand ##################################
    
    
class Open_Gripper( BasicBehavior ):
    """ Open fingers to max extent """
    
    def __init__( self, name = None, ctrl = None  ):
        """ Set the target """
        super().__init__( name, ctrl )
        self.wait_s = 0.5
        
        
    def initialise( self ):
        """ Actually Move """
        super().initialise()
        self.ctrl.open_gripper()
        sleep( self.wait_s )
        
        
    def update( self ):
        """ Return true if the target reached """
        self.status = Status.SUCCESS
        return self.status
        

In [None]:
##### Set_Fingers ##################################
    
    
class Set_Gripper( BasicBehavior ):
    """ Open fingers to max extent """
    
    def __init__( self, width_m, name = None, ctrl = None  ):
        """ Set the target """
        super().__init__( name, ctrl )
        self.width_m = width_m
        self.wait_s = 0.5
        
        
    def initialise( self ):
        """ Actually Move """
        super().initialise()
        self.ctrl.set_gripper( self.width_m )
        sleep( self.wait_s )
        
    
    def update( self ):
        """ Return true if the target reached """
        self.status = Status.SUCCESS
        return self.status

In [None]:
class Set_Gripper_Angles( BasicBehavior ):
    """ Open fingers to max extent """
    
    def __init__( self, angle1, angle2, name = None, ctrl = None  ):
        """ Set the target """
        super().__init__( name, ctrl )
        self.angle1 = angle1
        self.angle2 = angle2
        self.wait_s = 0.5
        
        
    def initialise( self ):
        """ Actually Move """
        super().initialise()
        self.ctrl.set_gripper_angles( self.angle1, self.angle2 )
        sleep( self.wait_s )
        
    
    def update( self ):
        """ Return true if the target reached """
        self.status = Status.SUCCESS
        return self.status

In [None]:
class Set_Gripper_Angles_After_Delay( BasicBehavior ):
    """ Set gripper angles after a time delay """
    
    def __init__( self, angle1, angle2, name = None, ctrl = None  ):
        """ Set the target """
        super().__init__( name, ctrl )
        self.angle1 = angle1
        self.angle2 = angle2
        self.wait_s = 0.5
        
        
    def initialise( self ):
        """ Actually Move """
        super().initialise()

        sleep( self.wait_s )
        self.ctrl.set_gripper_angles( self.angle1, self.angle2 )
        sleep( self.wait_s )
        
    
    def update( self ):
        """ Return true if the target reached """
        self.status = Status.SUCCESS
        return self.status

In [None]:
########## CONSTANTS & COMPONENTS ##################################################################

### Init data structs & Keys ###
builtins._DUMMYPOSE     = np.eye(4)
builtins.MP2BB = dict()  # Hack the BB object into the built-in namespace
builtins.SCAN_POSE_KEY  = "scanPoses"
builtins.HAND_OBJ_KEY   = "handHas"
PROTO_PICK_ROT = np.array( [[ 0.0,  1.0,  0.0, ],
                            [ 1.0,  0.0,  0.0, ],
                            [ 0.0,  0.0, -1.0, ]] )

### Set important BB items ###
MP2BB[ SCAN_POSE_KEY ] = dict()

In [None]:
########## MOVEMENT BEHAVIORS ######################################################################

### Constants ###
LIBBT_TS_S       = 0.25
DEFAULT_TRAN_ERR = 0.002
DEFAULT_ORNT_ERR = 3*np.pi/180.0

In [None]:
##### Move_Arm ###################################
    
    
class Move_Arm( BasicBehavior ):
    """ Move linearly in task space to the designated pose """
    
    def __init__( self, pose, name = None, ctrl = None, linSpeed = 0.25, linAccel = 0.5, asynch = True ):
        """ Set the target """
        # NOTE: Asynchronous motion is closest to the Behavior Tree paradigm, Avoid blocking!
        super().__init__( name, ctrl )
        self.pose     = pose
        self.linSpeed = linSpeed
        self.linAccel = linAccel
        self.asynch   = asynch
        
        
    def initialise( self ):
        """ Actually Move """
        super().initialise()
        self.ctrl.moveL( self.pose, self.linSpeed, self.linAccel, self.asynch )
        
        
    def update( self ):
        """ Return true if the target reached """
        if self.ctrl.p_moving():
            self.status = Status.RUNNING
        else:
            pM = self.ctrl.get_tcp_pose()
            pD = self.pose
            [errT, errO] = pose_error( pM, pD )
            if (errT <= DEFAULT_TRAN_ERR) and (errO <= DEFAULT_ORNT_ERR):
                self.status = Status.SUCCESS
            else:
                print( self.name, ", POSE ERROR:", [errT, errO] )
                self.status = Status.FAILURE
        return self.status

In [None]:
########## EXECUTION ###############################################################################


class HeartRate: 
    """ Sleeps for a time such that the period between calls to sleep results in a frequency not greater than the specified 'Hz' """
    # NOTE: This fulfills a purpose similar to the rospy rate
    
    def __init__( self , Hz ):
        """ Create a rate object with a Do-Not-Exceed frequency in 'Hz' """
        self.period = 1.0 / Hz; # Set the period as the inverse of the frequency , hearbeat will not exceed 'Hz' , but can be lower
        self.last = time.time()
        
    def check_elapsed( self, reset = True ):
        """ Check if the period has elapsed, Optionally `reset` the clock """
        elapsed = time.time() - self.last
        update  = elapsed >= self.period
        if( update and reset ):
            self.last = time.time()
        return update
    
    def sleep( self ):
        """ Sleep for a time so that the frequency is not exceeded """
        elapsed = time.time() - self.last
        if elapsed < self.period:
            time.sleep( self.period - elapsed )
        self.last = time.time()


""" Return a formatted timestamp string, useful for logging and debugging """
def nowTimeStamp(): return datetime.datetime.now().strftime(
    '%Y-%m-%d_%H-%M-%S')  # http://stackoverflow.com/a/5215012/893511



class StopBeetle:
    """Invasive Beetle: Kills (stops) all branches of the tree"""

    def __init__(self, killStatus):
        """Set the status that will be assigned to all branches"""
        self.status = killStatus

    def run(self, behav):
        """Kill all subtrees"""
        for chld in behav.children:
            self.run(chld)
        behav.stop(self.status)


        
def run_BT_until_done(
    rootNode,
    N              = 10000,
    tickPause      =     0.25,
    Nverb          =    50,
    breakOnFailure = True,
    breakOnSuccess = True,
    treeUpdate     = 0,
    failureTree    = 1,
    successTree    = 0,
):
    """Tick root until `maxIter` is reached while printing to terminal"""

    if Nverb:
        print(
            "About to run",
            type( rootNode ),
            "named",
            rootNode.name,
            "at",
            nowTimeStamp(),
            "with",
            1 / tickPause,
            "Hz update frequency ...",
        )

    # 0. Setup
    # rootNode.setup_subtree( childrenFirst = 0 )
    rootNode.setup_with_descendants()
    pacer = HeartRate(Hz=1 / tickPause)  # metronome

    if Nverb:
        print("Running ...\n")

    # 1. Run
    for i in range(1, N + 1):
        try:
            rootNode.tick_once()

            if Nverb > 0 and i % Nverb == 0:
                print("\n--------- Tick {0} ---------\n".format(i))
                print("Root node, Name:", rootNode.name, ", Status:", rootNode.status)
                print("\n")
                if treeUpdate:
                    print(
                        py_trees.display.unicode_tree(root=rootNode, show_status=True)
                    )

            if breakOnFailure and (rootNode.status == Status.FAILURE):
                print("Root node", rootNode.name, "failed!\n")
                if failureTree:
                    print(
                        py_trees.display.unicode_tree(root=rootNode, show_status=True)
                    )
                break
            elif breakOnSuccess and (rootNode.status == Status.SUCCESS):
                print("Root node", rootNode.name, "succeeded!\n")
                if successTree:
                    print(
                        py_trees.display.unicode_tree(root=rootNode, show_status=True)
                    )
                break
            else:
                pacer.sleep()

        except KeyboardInterrupt:
            print("\nSimulation HALTED by user at", nowTimeStamp())
            break

    print("\nRun completed! with status:", rootNode.status, "\n\n")

    insect = StopBeetle(rootNode.status)

    for i in range(3):
        rootNode.visit(insect)  # HACK required coz tree doesn't complete sometimes
        sleep(0.5)

    print("Root node", rootNode.name, "was killed by the running script!")

In [None]:
import numpy as np
import open3d as o3d
import time
import threading
from RealSense import RealSense
from magpie.ur5 import UR5_Interface
from magpie import poses
import os

In [None]:
# def filter_color_by_depth(depth_image, color_image, depth_threshold):
#     # Ensure the depth image and color image have compatible shapes
#     if depth_image.shape != color_image.shape[:2]:
#         raise ValueError("Depth image and color image must have the same dimensions")

#     # Create a mask where depth values are greater than the threshold
#     mask = depth_image > depth_threshold

#     # Convert mask to a 3-channel format to match the color image
#     mask_3channel = np.repeat(mask[:, :, np.newaxis], 3, axis=2)

#     # Apply the mask to the color image, setting those pixels to black
#     color_image_filtered = color_image.copy()
#     color_image_filtered[mask_3channel] = 0

#     return color_image_filtered

def filter_color_by_depth(depth_image, color_image, depth_threshold, zero_threshold):
    # Convert Open3D Image to NumPy array
    depth_np = np.asarray(depth_image)
    color_np = np.asarray(color_image)
    
    # Calculate the percentage of zero depth values
    zero_percentage = np.mean(depth_np == 0)

    if zero_percentage > zero_threshold:
        return color_np
    else:
        # Create a mask where depth values are greater than the threshold
        # mask = depth_np > depth_threshold
        mask = np.logical_or(depth_np == 0, depth_np > depth_threshold)

        # Convert mask to a 3-channel format to match the color image
        mask_3channel = np.repeat(mask[:, :, np.newaxis], 3, axis=2)

        # Apply the mask to the color image, setting those pixels to black
        color_np_filtered = color_np.copy()
        color_np_filtered[mask_3channel] = 0

        return color_np_filtered

In [None]:
TRAJ = 1

def save_data(color_image, depth_image, robot_pose, gripper_pose, traj, index):
    try:
        os.makedirs(f"./stream_data/{traj}", exist_ok = True)
    except Exception as e:
        print(f"Error: {e}")
    
    # print(type(depth_image))

    # import matplotlib.pyplot as plt
    # # Display the original and filtered images using matplotlib
    # plt.figure(figsize=(12, 6))
    # plt.subplot(1, 2, 1)
    # plt.imshow(depth_image)
    # plt.title('Depth Image')
    # plt.axis('off')
    
    o3d.io.write_image(f"./stream_data/{traj}/color_image_{index}.jpg", color_image)
    np.save(f"./stream_data/{traj}/depth_image_{index}.npy", np.asarray(depth_image))
    np.save(f"./stream_data/{traj}/pose_{index}.npy", np.append(np.array(poses.pose_mtrx_to_vec(robot_pose)), gripper_pose))

    # with open(f"./stream_data/{traj}/pose_{index}.txt", "w") as file:
        # file.write(str())

def capture_loop(robot, real, stop_event):
    index = 0
    while not stop_event.is_set():
        pcd, rgbd_image = real.getPCD()
        depth_image, color_image = rgbd_image.depth, rgbd_image.color

        # average_depth = np.mean(depth_image)
        # filtered_image = filter_color_by_depth(depth_image, color_image, depth_threshold = 0.7, zero_threshold = 0.7)
        
        # # Convert filtered NumPy array back to Open3D Image for visualization or further processing
        # filtered_image_o3d = o3d.geometry.Image(filtered_image)

        # import matplotlib.pyplot as plt
        # # Display the original and filtered images using matplotlib
        # plt.figure(figsize=(12, 6))
        # plt.subplot(1, 2, 1)
        # plt.imshow(depth_image)
        # plt.colorbar()
        # plt.title('Depth Image')
        # plt.axis('off')

        # plt.subplot(1, 2, 2)
        # plt.imshow(color_image)
        # plt.title('Color Image')
        # plt.axis('off')

        # plt.show()

        # # Create a heatmap of the data
        # plt.imshow(depth_image)
        # plt.colorbar()  # Add a colorbar to a plot
        # plt.show()

        robot_pose = robot.get_tcp_pose()
        gripper_pose = robot.get_gripper_angles()

        save_data(color_image, depth_image, robot_pose, gripper_pose, TRAJ, index)
        index += 1
        time.sleep(0.25)

In [None]:
robot = UR5_Interface()
robot.start()

real = RealSense(4) # 10
real.initConnection()

In [None]:
approach_pose = [[ 0.99789511,  0.0122757,  -0.06367615, -0.11642388],
 [-0.06399342,  0.02746575, -0.99757229, -0.67249229],
 [-0.01049699,  0.99954737,  0.0281935,   0.22991059],
 [ 0.,          0.,          0.,          1.        ]]

In [None]:
# ##### Tip_Over_Box ###################################

# from magpie import poses
    
# class Tip_Over_Box( Sequence ):
#     """ Tip over and pick up the box from a dynamically calculated sequence of poses based on a starting pose. """

#     def __init__(self, approach_pose, name = "Tip_Over_Box", ctrl = None):
#         """ Construct the subtree. """
#         super().__init__(name = name, memory = True)
#         self.ctrl = ctrl
#         self.approach_pose = approach_pose

#         self.mvTrgt = Open_Gripper(ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # POSITION GRIPPER TO TOP OF BOX
#         self.mvTrgt = Move_Arm(self.approach_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # POSITION GRIPPER ABOVE BOX
#         above_box_pose = poses.rotate_pose(poses.translate_pose(np.array(approach_pose), [0.0, -0.02, 0.0]), [np.pi / 4, 0.0, np.pi / 2])
#         self.mvTrgt = Move_Arm(above_box_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)
        
#         # MOVE TOP GRIPPER INTO POSITION
#         self.mvTrgt = Set_Gripper_Angles( 120, 216, ctrl = ctrl )
#         self.add_child( self.mvTrgt )

#         # # MOVE ARM DOWNWARD UNTIL CONTACT
#         contact_pose = poses.translate_pose(np.array(above_box_pose), [-0.03, 0.0, 0.03])
#         self.mvTrgt = Move_Arm(contact_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # ADJUST GRIPPER POSITION
#         self.mvTrgt = Set_Gripper_Angles(120, 195, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # # MOVE BACKWARD TO TIP BOX
#         backward_pose = poses.translate_pose(np.array(contact_pose), [-0.08, 0.0, -0.08])
#         self.mvTrgt = Move_Arm(backward_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # OPEN TOP GRIPPER TO ALLOW BOX TO FALL
#         self.mvTrgt = Set_Gripper_Angles( 85, 195, ctrl=ctrl )
#         self.add_child( self.mvTrgt )

#         # OPEN BOTTOM GRIPPER TO ADJUST BOX FURTHER
#         self.mvTrgt = Set_Gripper_Angles_After_Delay( 85, 216, ctrl=ctrl )
#         self.add_child( self.mvTrgt )

#         # MOVE FORWARD
#         forward_pose = poses.translate_pose(np.array(backward_pose), [-0.05, 0, 0.05])
#         self.mvTrgt = Move_Arm(forward_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # CLOSE TOP GRIPPER TO GRAB BOX
#         self.mvTrgt = Set_Gripper_Angles_After_Delay(140, 216, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         self.mvTrgt = Set_Gripper(0.056, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

#         # MOVE UP & OUT
#         final_pose = poses.translate_pose(np.array(forward_pose), [0.1, 0, -0.1])
#         self.mvTrgt = Move_Arm(final_pose, ctrl = ctrl)
#         self.add_child(self.mvTrgt)

In [None]:
##### Tip_Over_Box ###################################

from magpie import poses

def tip_over_box(approach_pose, ctrl):
    """ Tip over and pick up the box from a dynamically calculated sequence of poses based on a starting pose. """
    
    action = Open_Gripper(name = 'open gripper', ctrl = ctrl)
    run_BT_until_done(action)

    # POSITION GRIPPER TO TOP OF BOX
    # action = Move_Arm(approach_pose, ctrl = ctrl)
    # run_BT_until_done(action)

    # POSITION GRIPPER ABOVE BOX
    above_box_pose = poses.rotate_pose(poses.translate_pose(np.array(approach_pose), [0.0, -0.02, 0.0]), [np.pi / 4, 0.0, np.pi / 2])
    action = Move_Arm(above_box_pose, ctrl = ctrl)
    run_BT_until_done(action)

    # MOVE TOP GRIPPER INTO POSITION
    action = Set_Gripper_Angles(120, 216, ctrl = ctrl)
    run_BT_until_done(action)

    # MOVE ARM DOWNWARD UNTIL CONTACT
    contact_pose = poses.translate_pose(np.array(above_box_pose), [-0.03, 0.0, 0.03])
    action = Move_Arm(contact_pose, ctrl = ctrl)
    run_BT_until_done(action)

    # ADJUST GRIPPER POSITION
    action = Set_Gripper_Angles(120, 195, ctrl = ctrl)
    run_BT_until_done(action)

    # MOVE BACKWARD TO TIP BOX
    backward_pose = poses.translate_pose(np.array(contact_pose), [-0.08, 0.0, -0.08])
    action = Move_Arm(backward_pose, ctrl = ctrl)
    run_BT_until_done(action)

    # OPEN TOP GRIPPER TO ALLOW BOX TO FALL
    action = Set_Gripper_Angles(85, 195, ctrl = ctrl)
    run_BT_until_done(action)

    # OPEN BOTTOM GRIPPER TO ADJUST BOX FURTHER
    action = Set_Gripper_Angles(85, 216, ctrl = ctrl)
    run_BT_until_done(action)

    # MOVE FORWARD
    forward_pose = poses.translate_pose(np.array(backward_pose), [-0.05, 0, 0.05])
    action = Move_Arm(forward_pose, ctrl = ctrl)
    run_BT_until_done(action)

    # # CLOSE TOP GRIPPER TO GRAB BOX
    action = Set_Gripper_Angles(140, 216, ctrl = ctrl)
    run_BT_until_done(action)
    
    action = Set_Gripper(0.056, ctrl = ctrl)
    run_BT_until_done(action)

    # MOVE UP & OUT
    final_pose = poses.translate_pose(np.array(forward_pose), [0.1, 0, -0.1])
    action = Move_Arm(final_pose, ctrl = ctrl)
    run_BT_until_done(action)

In [None]:
# def execute_tip_over_box(robot):
#     current_pose = robot.get_tcp_pose()
#     tip_over_box = Tip_Over_Box(current_pose, ctrl=robot)
#     run_BT_until_done(tip_over_box)

# def keypress_handler(stop_event, robot):
#     while not stop_event.is_set():
#         user_input = input("Press 't' to tip over the box or 'q' to quit: ").strip().lower()
#         if user_input == 't':
#             execute_tip_over_box(robot)
#         elif user_input == 'q':
#             stop_event.set()

# def control_thread(robot, stop_event):
#     while not stop_event.is_set():
#         command = input("Enter 't' to tip over box or 'q' to quit: ").strip().lower()
#         if command == 't':
#             print(robot.get_tcp_pose())
#             # tip_over_box(robot.get_tcp_pose(), robot)
#             tip_over_box(approach_pose, robot)

#         elif command == 'q':
#             stop_event.set()

# import keyboard  # Import the keyboard library

# def control_thread(robot, stop_event):
#     while not stop_event.is_set():
#         if keyboard.is_pressed('t'):  # Check if 't' is pressed
#             print("Tipping over the box...")
#             approach_pose = robot.get_tcp_pose()  # You need to determine how to get this pose
#             tip_over_box(approach_pose, robot)
#             while keyboard.is_pressed('t'):  # Wait for 't' to be released
#                 continue
        
#         elif keyboard.is_pressed('q'):  # Check if 'q' is pressed
#             print("Quitting...")
#             stop_event.set()
#             break

#         time.sleep(0.1)  # Add a small delay to prevent high CPU usage

from pynput.keyboard import Key, Listener

def on_press(key, robot, stop_event):
    try:
        if key.char == 't':  # Respond to 't'
            print("Tipping over the box...")
            approach_pose = robot.get_tcp_pose()
            tip_over_box(approach_pose, robot)
        elif key.char == 'w':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.0, 0.005])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'x':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.0, -0.005])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'a':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.005, 0.0, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'd':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [-0.005, 0.0, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'e':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.005, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'c':
            new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, -0.005, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )

        elif key.char == 'i':
            new_pose = poses.rotate_pose(np.array(robot.get_tcp_pose()), [-0.03, 0.0, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'k':
            new_pose = poses.rotate_pose(np.array(robot.get_tcp_pose()), [0.03, 0.0, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'j':
            new_pose = poses.rotate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.03, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )
        elif key.char == 'l':
            new_pose = poses.rotate_pose(np.array(robot.get_tcp_pose()), [0.0, -0.03, 0.0])
            robot.moveL( new_pose, 0.25, 0.5, True )

        elif key.char == 'q':  # Respond to 'q'
            print("Quitting...")
            stop_event.set()
            return False  # Stop the listener
    except AttributeError:
        pass

def control_thread(robot, stop_event):
    with Listener(on_press=lambda key: on_press(key, robot, stop_event)) as listener:
        listener.join()

In [None]:
# import ipywidgets as widgets
# from IPython.display import display
# import time
# import threading

# def move_robot(direction, duration=0.1):
#     if direction == 'Forward':
#         new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.0, 0.01])
#         robot.moveL( new_pose, 0.25, 0.5, True )
#     if direction == 'Backward':
#         new_pose = poses.translate_pose(np.array(robot.get_tcp_pose()), [0.0, 0.0, -0.01])
#         robot.moveL( new_pose, 0.25, 0.5, True )

#     print(f"Moving {direction}")

# def continuous_movement(direction):
#     while button_states[direction]:
#         move_robot(direction)
#         time.sleep(0.30) # adjust for finer control

# def on_button_clicked(b):
#     direction = b.owner.description
#     if b.new:
#         button_states[direction] = True
#         threading.Thread(target=continuous_movement, args=(direction,)).start()
#     else:
#         button_states[direction] = False


# button_states = {
#     'Forward': False,
#     'Backward': False,
#     'Left': False,
#     'Right': False,
#     'Up': False,
#     'Down': False,
#     'Tilt Up': False,
#     'Tilt Down': False,
#     'Rotate Left': False,
#     'Rotate Right': False
# }

# buttons = {
#     direction: widgets.ToggleButton(description=direction)
#     for direction in button_states.keys()
# }

# for button in buttons.values():
#     button.observe(on_button_clicked, 'value')
#     display(button)

In [None]:
run_BT_until_done(Open_Gripper(ctrl = robot))

initial = np.array([-np.pi / 2.0, -np.pi / 2.6, -np.pi / 2.2, -np.pi * 4.0 / 2.8, -np.pi / 2.0, 0])
start = Move_Q(initial, name = None, ctrl = robot, rotSpeed = 1.05, rotAccel = 1.4, asynch = True)
run_BT_until_done(start)

# tip_over_box = Tip_Over_Box(robot.get_tcp_pose(), name = "Tip Over Box", ctrl = robot)
# run_BT_until_done(tip_over_box)

stop_event = threading.Event()
capture_thread = threading.Thread(target=capture_loop, args=(robot, real, stop_event))
capture_thread.start()

control_thread = threading.Thread(target=control_thread, args=(robot, stop_event))
control_thread.start()

# # # # #

try:
    capture_thread.join()
    control_thread.join()
finally:
    real.disconnect()
    # robot.stop()

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Load the data from the .npy file
depth = np.load('./stream_data/1/depth_image_14.npy')

# Create a heatmap of the data
plt.imshow(depth)
plt.colorbar()  # Add a colorbar to a plot
plt.show()

print(depth.shape)

# import required module 
from PIL import Image 
  
# get image 
filepath = "./stream_data/1/color_image_14.jpg"
img = Image.open(filepath) 
  
# get width and height 
width = img.width 
height = img.height 
  
# display width and height 
print("The height of the image is: ", height) 
print("The width of the image is: ", width) 


In [None]:
arr = np.load('./stream_data/1/depth_image_14.npy')
print(np.max(arr))
print(np.min(arr))

In [None]:
arr = np.load('./stream_data/1/pose_14.npy')
print(arr)