# Example of API use to record and replay movements

**Recording**

Record present and goal position of all joints (both arms and head)

**Replay**

Replay present or goal positions of all joints (both arms and head)

## Import utils

In [None]:
import grpc
import numpy as np
import time

from reachy_sdk_api import joint_pb2, joint_pb2_grpc
from joint_pb2 import JointsCommand, JointCommand, JointField, JointId, JointsStateRequest
from google.protobuf.wrappers_pb2 import BoolValue, FloatValue

# Create channel to connect to server
channel = grpc.insecure_channel(f'localhost:50055')

In [None]:
allbody = [
        JointId(name='l_shoulder_pitch'),
        JointId(name='l_shoulder_roll'),
        JointId(name='l_arm_yaw'),
        JointId(name='l_elbow_pitch'),
        JointId(name='l_forearm_yaw'),
        JointId(name='l_wrist_pitch'),
        JointId(name='l_wrist_roll'),
        JointId(name='l_gripper'),
        JointId(name='r_shoulder_pitch'),
        JointId(name='r_shoulder_roll'),
        JointId(name='r_arm_yaw'),
        JointId(name='r_elbow_pitch'),
        JointId(name='r_forearm_yaw'),
        JointId(name='r_wrist_pitch'),
        JointId(name='r_wrist_roll'),
        JointId(name='r_gripper'),
        JointId(name='neck_disk_top'),
        JointId(name='neck_disk_middle'),
        JointId(name='neck_disk_bottom'),
    ]

## Record movements

#### Create stub to access requested service

In [None]:
state_stub = joint_pb2_grpc.JointServiceStub(channel)

#### Declare empty arrays to save goal and present positions, plus times

In [None]:
goal_pos = []
present_pos = []
time_array = []

In [None]:
alive = True

def get_pos():
    
    i=0
    duration = 0
    pres_pos = JointsStateRequest(
        ids=allbody,
        requested_fields=[JointField.GOAL_POSITION, JointField.PRESENT_POSITION]
    )

    while(duration < 10 and alive):
        tic = time.time()
        single_present_pos = []
        single_goal_pos = []

        state = state_stub.GetJointsState(pres_pos)

        for motor in state.states:
            x = motor.goal_position.value
            single_goal_pos.append(x)
            y = motor.present_position.value
            single_present_pos.append(y)

        goal_pos.append(np.array(single_goal_pos))
        present_pos.append(np.array(single_present_pos))
        time_array.append(time.time())

        time.sleep(0.01)
        duration += time.time() - tic
    print("stop")
        
        
from threading import Thread

t = Thread(target=get_pos)
time.sleep(5)
print("start")
t.start()

In [None]:
alive = False
t.join()

#### Save recordings

In [None]:
np.save('present_pos.npy', np.array(present_pos))

In [None]:
np.save('goal_pos.npy', np.array(goal_pos))

In [None]:
np.save('time_array.npy', np.array(time_array))

## Replay movements

#### Create stub to access requested service

In [None]:
cmd_stub = joint_pb2_grpc.JointServiceStub(channel)

#### Load recordings

In [None]:
present_pos = np.load('present_pos.npy')
goal_pos = np.load('goal_pos.npy')

In [None]:
all_ids = np.array([allbody for i in range(len(goal_pos))])

#### Set motors stiff

In [None]:
compliancy = False

request = JointsCommand(
            commands=[
                JointCommand(id=name, compliant=BoolValue(value=compliancy))
                for name in allbody
            ]
        )
cmd_stub.SendJointsCommands(request)

#### Replay goal positions

In [None]:
### GOAL_POSITION

send_alive = True

def send_pos():
    i = 0
    tic = time.time()
    while ((i < len(goal_pos)) and (send_alive)):
        toc = time.time()
        command = JointsCommand(
        commands=[
                JointCommand(id=name, goal_position=FloatValue(value=pos))
                for name, pos in zip(all_ids[i], goal_pos[i])
            ]
        )

        cmd_stub.SendJointsCommands(command)
        i += 1
        time.sleep(0.01)

        
from threading import Thread

t2 = Thread(target=send_pos)
t2.start()

#### Replay present positions

In [None]:
### PRESENT_POSITION

send_alive = True

def send_pos():
    i = 0
    tic = time.time()
    while ((i < len(goal_pos)) and (send_alive)):
        toc = time.time()
        command = JointsCommand(
        commands=[
                JointCommand(id=name, goal_position=FloatValue(value=pos))
                for name, pos in zip(all_ids[i], present_pos[i])
            ]
        )

        cmd_stub.SendJointsCommands(command)
        i += 1
        time.sleep(0.01)

        
from threading import Thread

t2 = Thread(target=send_pos)
t2.start()

In [None]:
send_alive = False
t2.join()

#### Set motors compliant

In [None]:
compliancy = True

request = JointsCommand(
    commands=[
        JointCommand(id=name, compliant=BoolValue(value=compliancy))
        for name in allbody
    ]
)

cmd_stub.SendJointsCommands(request)