In [None]:
import preamble
from virtual_dynamics import SimpleVirtualDynamics
from robot import Robot
from app_loop import AppLoop
from vocalizer import Vocalizer
from plot_client import PlotClient
from phrase_mapping import PHRASE_MAPPING, WORDS_PER_PHRASE
from rgbd_stream import RGBDStream_iOS
from camera_feed import CameraFeed
import numpy as np
from numpy.typing import NDArray
import pandas as pd
from typing import Tuple
from model import DualAutoencoderModel
from globals import FORCE_SAMPLE_COUNT, FORCE_CURVE_DURATION
import pickle
import time

In [None]:
class RobotApp(AppLoop):
    MASS = 10.0
    DAMPENING = 30.0
    AXES = Robot.TRANSLATION

    def __init__(self, use_plotter: bool = False, use_camera: bool = False, record_data: bool = False):
        super().__init__()
        self.use_plotter = use_plotter
        self.use_camera = use_camera
        self.record_data = record_data

    def startup(self) -> None:
        self.robot = Robot("169.254.9.43",
                           translational_force_deadband=5.0,
                           rotational_force_deadband=0.5)
        self.robot.control.zeroFtSensor()
        self.p0 = self.robot.INIT_POSE[:3]
        self.F_ext = np.zeros_like(self.p0)
        self.F_r = np.zeros_like(self.F_ext)
        self.F_v = np.zeros_like(self.F_ext)

        self.dynamics = SimpleVirtualDynamics(self.MASS, B=self.DAMPENING, K=0.0)

        self.path, self.v_path, self.a_path = self.load_reference_path('../data/reference_path.csv')
        self.forward_state = True

        self.P = 0.5
        self.V = 0.5
        self.k = 120.0 * 10.0 * 0.5 * 0.8
        self.b = 40.0 * 10.0 * 0.8

        self.speaking_start_t = -1000.0
        self.speaking_start_V = 0.5

        self.vocalizer = Vocalizer()
        with open('../models/svm_knn.pkl', 'rb') as file:
            self.model = pickle.load(file)

        if self.use_plotter:
            self.plotter = PlotClient()
            self.plotter.create_plot("A", title="Estimated Compliance", xlabel="Time (s)", ylabel="Compliance Level")
            self.plotter.create_line("A", "x", plot_style='-', color='#DF2935', label='P')
            self.plotter.create_line("A", "y", plot_style='-', color='#4DA167', label='V')

        if self.use_camera:
            rgbd_stream = RGBDStream_iOS()
            calibration_matrix = np.load('calibration_matrix.npy')
            self.camera_feed = CameraFeed('Camera Feed', rgbd_stream, calibration_matrix)

        if self.record_data:
            self.data = []
            time.sleep(7)

    def load_reference_path(self, file_path: str) -> NDArray:
        data = pd.read_csv(file_path)

        path = np.stack([data['p_x'].values, data['p_y'].values, data['p_z'].values], axis=1)
        path += self.p0 - path[0]

        v_path = np.stack([data['v_x'].values, data['v_y'].values, data['v_z'].values], axis=1)
        a_path = np.stack([data['a_x'].values, data['a_y'].values, data['a_z'].values], axis=1)

        return path, v_path, a_path

    def update_path(self) -> None:
        p = self.robot.get_pose(self.AXES)

        if p[0] <= self.path[-1, 0] if self.forward_state else p[0] >= self.path[0, 0]:
            self.forward_state = not self.forward_state
            self.v_path *= -1.0

    def get_target(self, p: NDArray) -> NDArray:
        distances = np.linalg.norm(self.path - p, axis=1)
        index = np.argmin(distances)
        return self.path[index], self.v_path[index]

    def update_F_ext(self, dt: float) -> None:
        alpha = 1.0 - np.exp(-dt * 32.0 * 0.25)
        F_ext = self.robot.get_force(self.AXES)
        self.F_ext = alpha * F_ext + (1.0 - alpha) * self.F_ext

    def compute_costs(self) -> Tuple[float, float]:
        return (self.P + self.V) * 0.5, 1.0 - self.V

    def compute_F_ref_curve(self, duration: float = FORCE_CURVE_DURATION, n_samples: int = FORCE_SAMPLE_COUNT) -> NDArray:
        F_ref_curve = []

        p = self.robot.get_pose(self.AXES)
        v = self.robot.get_velocity(self.AXES)
        p_ref, v_ref = self.get_target(p)
        dt = duration / n_samples

        for i in range(n_samples):
            p_ref, v_ref = self.get_target(p)
            F_ref = -self.k * (p - p_ref) - self.b * (v - v_ref)
            v += (F_ref - self.DAMPENING * v) / self.MASS * dt
            p += v * dt
            F_ref_curve.append(F_ref)

        return np.linspace(0.0, duration, n_samples), np.array(F_ref_curve)
    
    def split_F_ref(self, F_ref: float | NDArray, c_p: float, c_v: float) -> Tuple[float | NDArray, float | NDArray]:
        return c_v / (c_p + c_v) * F_ref, c_p / (c_p + c_v) * F_ref
    
    def generate_phrase(self, F_v_curve: NDArray) -> str:
        impulse_curve = F_v_curve.cumsum(axis=0) * FORCE_CURVE_DURATION / FORCE_SAMPLE_COUNT
        phrase = self.model.force_to_phrase(impulse_curve[None, :])[0]
        phrase_text = ' '.join(phrase).strip()
        return PHRASE_MAPPING[phrase_text]

    def speaking_period(self) -> float:
        return WORDS_PER_PHRASE / (0.36 * 2.2 ** (2 - self.P - self.V))
    
    def log_normal(self, x: float, mu: float, sigma: float) -> float:
        return np.exp(-(np.log(x) - mu) ** 2 / (2.0 * sigma ** 2)) / (x * sigma * np.sqrt(2.0 * np.pi))

    def normal(self, x: float, mu: float, sigma: float) -> float:
        return np.exp(-0.5 * ((x - mu) / sigma) ** 2) / np.sqrt(2.0 * np.pi * sigma ** 2)

    def estimate_state(self, F_r: NDArray, F_v: NDArray, t: float, dt: float) -> None:
        alpha = 1.0 - np.exp(-0.025 * dt)
        uncertainty = np.sqrt(dt / 0.004)

        z_p = np.sum(F_r * self.F_ext) / np.linalg.norm(F_r)
        P_pred = alpha * (1.0 - self.P) + (1.0 - alpha) * self.P
        self.P = self.normal(z_p, mu=0.0, sigma=200.0 / uncertainty) * P_pred
        self.P = self.P / (self.P + self.normal(z_p, mu=-15.0, sigma=200.0 / uncertainty) * (1.0 - P_pred))

        # P_pred = Pr(P=1|P=0) * (1 - P_hat) + Pr(P=1|P=1) * P_hat
        # P_hat = Pr(z_p|P=1) * P_pred / (Pr(z_p|P=0) * (1 - P_pred) + Pr(z_p|P=1) * P_pred)

        decay = np.exp(-4.0 * (np.log(max(t - self.speaking_start_t, 1e-6)) - np.log(2.5)) ** 2)
        if decay > 0.01:
            z_v = np.sum(F_v * self.F_ext) / np.linalg.norm(F_v) * decay
            V_pred = alpha * (1.0 - self.V) + (1.0 - alpha) * self.V
            self.V = self.normal(z_v, mu=0.0, sigma=200.0 / uncertainty) * V_pred
            self.V = self.V / (self.V + self.normal(z_v, mu=-10.0, sigma=200.0 / uncertainty) * (1.0 - V_pred))

    def update(self, t: float, dt: float) -> None:
        period_start = self.robot.control.initPeriod()
        self.update_path()
        self.update_F_ext(dt)

        c_p, c_v = self.compute_costs()
        t_curve, F_ref_curve = self.compute_F_ref_curve()
        F_r_curve, F_v_curve = self.split_F_ref(F_ref_curve, c_p, c_v)
        F_r, F_v = F_r_curve[0], F_v_curve[0]

        phrase = self.generate_phrase(F_v_curve)
        if t - self.speaking_start_t > 4.0 and t - self.speaking_start_t < 4.5 and self.V - self.speaking_start_V > 0.05:
            spoke_encouraging = self.vocalizer.utter("good job")
        else:
            spoke_encouraging = False

        if t - self.speaking_start_t > self.speaking_period() and self.vocalizer.utter(phrase):
            self.speaking_start_t = t
            self.speaking_start_V = self.V
            spoke_instructional = True
        else:
            spoke_instructional = False

        self.estimate_state(F_r, F_v, t, dt)

        self.F_r = 0.25 * F_r + 0.75 * self.F_r
        self.F_v = 0.25 * F_v + 0.75 * self.F_v
        self.dynamics.apply_force(self.F_ext + self.F_r, dt)
        self.robot.set_velocity(self.dynamics.get_velocity(),
                                Robot.TRANSLATION, acceleration=10)

        self.robot.control.waitPeriod(period_start)

        if self.use_plotter:
            self.plotter.update_line("A", "x", (t, self.P))
            self.plotter.update_line("A", "y", (t, self.V))
            self.plotter.config_plot("A", xlim=(t - 30, t), ylim=(0, 1))

        if self.use_camera:
            p = self.robot.get_pose(self.AXES)
            p_r = p + self.F_r / (np.linalg.norm(self.F_r) + 1e-6) * 0.075 * np.log(np.linalg.norm(self.F_r) + 1.0)
            p_v = p + self.F_v / (np.linalg.norm(self.F_v) + 1e-6) * 0.075 * np.log(np.linalg.norm(self.F_v) + 1.0)
            p_h = p + self.F_ext / (np.linalg.norm(self.F_ext) + 1e-6) * 0.075 * np.log(np.linalg.norm(self.F_ext) + 1.0)
            self.camera_feed.draw_world_arrow(p - np.array([0.0, 0.0, 0.025]), p_r - np.array([0.0, 0.0, 0.025]), thickness=6, color=(0x3C / 0xFF, 0x91 / 0xFF, 0xE6 / 0xFF))
            self.camera_feed.draw_world_arrow(p + np.array([0.0, 0.0, 0.025]), p_v + np.array([0.0, 0.0, 0.025]), thickness=6, color=(0xDF / 0xFF, 0x29 / 0xFF, 0x35 / 0xFF))
            self.camera_feed.draw_world_arrow(p, p_h, thickness=6, color=(0xFF / 0xFF, 0x87 / 0xFF, 0x1F / 0xFF))
            self.camera_feed.update_window()

        if self.record_data:                
            p = self.robot.get_pose(self.AXES)
            v = self.robot.get_velocity(self.AXES)
            p_ref, v_ref = self.get_target(p)
            F_ref = F_ref_curve[0]
            F_h = self.robot.get_force(self.AXES)
            self.data.append((t, dt, p[0], p[1], p[2], v[0], v[1], v[2], p_ref[0], p_ref[1], p_ref[2], v_ref[0], v_ref[1], v_ref[2], self.forward_state, F_h[0], F_h[1], F_h[2], self.P, self.V, c_p, c_v, F_ref[0], F_ref[1], F_ref[2], F_r[0], F_r[1], F_r[2], F_v[0], F_v[1], F_v[2], phrase, self.speaking_start_t, self.speaking_start_V, spoke_encouraging, spoke_instructional))

    def shutdown(self) -> None:
        self.robot.set_velocity(Robot.zeroed_translation_rotation())
        if self.use_plotter:
            self.plotter.close()

        if self.record_data:
            column_names = ["t", "dt", "p_x", "p_y", "p_z", "v_x", "v_y", "v_z", "p_ref_x", "p_ref_y", "p_ref_z", "v_ref_x", "v_ref_y", "v_ref_z", "path_direction", "F_h_x", "F_h_y", "F_h_z", "P_hat", "V_hat", "c_p", "c_v", "F_ref_x", "F_ref_y", "F_ref_z", "F_r_x", "F_r_y", "F_r_z", "F_v_x", "F_v_y", "F_v_z", "phrase", "speaking_start_t", "speaking_start_V", "encouraging", "instructional"]
            pd.DataFrame(self.data, columns=column_names).to_csv(f'{time.time()}.csv')

In [None]:
app = RobotApp(use_plotter=True, use_camera=False, record_data=False)
app.run()