In [1]:
import preamble
from virtual_dynamics import SimpleVirtualDynamics
from robot import Robot
from app_loop import AppLoop
from force_guider import ForceGuider
from language_generator import DirectionalLanguageGenerator
from vocalizer import Vocalizer
from data_management import TabularDataStore
from rgbd_stream import RGBDStream_iOS
from camera_feed import CameraFeed
import numpy as np
import random

In [2]:
class HandGuidingGame(AppLoop):

    MASS = 20
    AXES = Robot.TRANSLATION
    PHRASES = [
        [""],
        ["move <direction>"],
        ["move <direction> more"],
        ["You need to move <direction>"]
    ]
    DIRECTION_PAIRS = [("right", "left"), ("forward", "backward"), ("down", "up")]

    def __init__(self, randomize_goal: bool = False, use_guiding_force: bool = True, use_language: bool = True) -> None:
        super().__init__()
        self.use_guiding_force = use_guiding_force
        self.use_language = use_language
        self.randomize_goal = randomize_goal
        self.data_store = TabularDataStore(
            column_names=["t", "dt", "F_human", "velocity", "delta_x", "F_trajectory", "F_error", "F_guide", "utterance", "emphasis", "modulation"])

    def startup(self) -> None:
        self.robot = Robot("169.254.9.43",
                           translational_force_deadband=6.0,
                           rotational_force_deadband=0.5)
        self.robot.control.zeroFtSensor()

        self.dynamics = SimpleVirtualDynamics(M=self.MASS, B=50, K=0)

        if self.randomize_goal:
            self.goal_point = np.array(
                [random.uniform(-1.0, -0.5), random.uniform(-0.6, 0.6), random.uniform(0.0, 0.6)])
        else:
            self.goal_point = self.robot.get_pose(self.AXES)

        self.force_guider = ForceGuider(ramp=lambda x: np.sin(np.pi * x) ** 2)
        self.language_generator = DirectionalLanguageGenerator(self.PHRASES, self.DIRECTION_PAIRS, magnitude_interval=12)
        self.vocalizer = Vocalizer(buffer_period=2.0)

    def update(self, t: float, dt: float) -> None:
        period_start = self.robot.control.initPeriod()

        F_human = self.robot.get_force(self.AXES)
        velocity = self.robot.get_velocity(self.AXES)
        delta_x = self.robot.get_pose(self.AXES) - self.goal_point
        F_trajectory = -self.MASS * (2.0 * velocity + 2.0 * delta_x)
        F_trajectory = 0.0 if np.linalg.norm(F_trajectory) == 0.0 else F_trajectory / np.linalg.norm(F_trajectory) * np.linalg.norm(F_human)
        F_error = F_trajectory - F_human
        
        if self.use_guiding_force:
            if np.linalg.norm(F_error) > 10:
                self.force_guider.initiate_guiding_force(guide_time=2.5)

            F_guide = self.force_guider.modulate_force(F_error)
        else:
            F_guide = 0.0

        if self.use_language:
            utterance = self.language_generator.generate(F_error)
        else:
            utterance = ""
        
        stop_condition = np.linalg.norm(delta_x) < 0.1 and np.linalg.norm(velocity) < 0.02
        if stop_condition:
            self.stop()
            utterance = "You are at the goal. The experiment has completed."
        
        if not self.vocalizer.utter(utterance, interupt=stop_condition):
            utterance = ""
        
        self.dynamics.apply_force(F_human + F_guide, dt)
        self.robot.set_velocity(self.dynamics.get_velocity(),
                                Robot.TRANSLATION, acceleration=150)

        modulation = self.force_guider.get_modulation()
        emphasis = self.language_generator.get_emphasis(F_error)
        self.data_store.append_row([t, dt, F_human, velocity, delta_x, F_trajectory, F_error, F_guide, utterance, emphasis, modulation])
        
        self.robot.control.waitPeriod(period_start)

    def shutdown(self) -> None:
        self.robot.set_velocity(Robot.zeroed_translation_rotation())
    
    def get_data(self) -> TabularDataStore:
        return self.data_store

In [None]:

app = HandGuidingGame(randomize_goal=True, use_guiding_force=True, use_language=True)
app.run_threaded()

use_camera = True

if use_camera:
    stream = RGBDStream_iOS()
    calibration_matrix = np.load('calibration_matrix.npy')
    camera_feed = CameraFeed("Camera Feed", stream, calibration_matrix)

try:
    while app.is_running():
        if use_camera:
            camera_feed.draw_world_point(app.goal_point, radius=8, color=(0, 1, 0))
            camera_feed.update_window()
finally:
    app.stop()