In [1]:
from reachy_sdk import ReachySDK
import numpy as np

import time
import asyncio

from reachy_face_tracking.detection import Detection

In [2]:
reachy = ReachySDK(host='localhost')

detect = Detection(reachy=reachy, detection_model_path='/home/pierre/dev/reachy-face-tracking/models/ssd_mobilenet_v2_face_quant_postprocess_edgetpu.tflite')

In [3]:
detect.start()

In [4]:
# cough_head = np.load('cough_head.npy')
# cough = np.load('cough.npy')

# behavior_actions = {}

# behavior_actions['cough'] = [cough, cough_head]
# behavior_actions['look_at'] = [reachy.head.look_at(x=0.5, y=-0.5, z=0.1, duration=1.0),
#                                 time.sleep(0.1),
#                                reachy.head.look_at(x=0.5, y=0, z=-0.4, duration=1.0),
#                                 time.sleep(0.1),
#                                reachy.head.look_at(x=0.5, y=0.3, z=-0.3, duration=1.0),
#                                 time.sleep(0.1),
#                                reachy.head.look_at(x=0.5, y=0, z=0, duration=1.0)]

In [5]:
class Behavior:
    def __init__(self, name: str, sub_behavior: bool = False) -> None:
        self.name = name
        self._task = None
        self.sub_behavior = sub_behavior

    async def start(self):
        print(f'Start-begin behavior {self.name}')
        await self.setup()
        self._task = asyncio.create_task(self._run(), name=f'behavior_{self.name}')
        print(f'Start-end behavior {self.name}')
        return self._task

    async def stop(self):
        print(f'Stop-begin behavior {self.name}')

        if self._task is not None:
            self._task.cancel()
            await self._task

        print(f'Stop-end behavior {self.name}')

    async def setup(self):
        print(f'Setup behavior {self.name}')

    async def run(self):
        pass

    async def _run(self):
        try:
            await self.run()
        except asyncio.CancelledError:
            print(f'{self.name} got cancelled')
            if self.sub_behavior:
                raise
        await self.teardown()

    async def teardown(self):
        print(f'Teardown behavior {self.name}')
        
    def is_running(self):
        return self._task is not None and not self._task.done()

In [6]:
class Popote(Behavior):
    cough_head = np.load('cough_head.npy')
    popote = np.load('popote.npy')

    sampling_frequency = 100

    recorded_joints_head = [
        reachy.joints.neck_disk_top,
        reachy.joints.neck_disk_middle,
        reachy.joints.neck_disk_bottom,
    ]

    recorded_joints_right = [
        reachy.r_arm.r_shoulder_pitch,
        reachy.r_arm.r_shoulder_roll,
        reachy.r_arm.r_arm_yaw,
        reachy.r_arm.r_elbow_pitch,
        reachy.r_arm.r_forearm_yaw,
        reachy.r_arm.r_wrist_pitch,
        reachy.r_arm.r_wrist_roll,
    ]
    async def run(self):
        for (jp_head, jp_r_arm) in zip(Popote.cough_head, Popote.popote):
            for joint, pos in zip(Popote.recorded_joints_head, jp_head):
                joint.goal_position = pos
            for joint, pos in zip(Popote.recorded_joints_right, jp_r_arm):
                joint.goal_position = pos

            await asyncio.sleep(1 / Popote.sampling_frequency)        

popote = Popote(name='popote')

In [7]:
class LookAt(Behavior):
    async def run(self):
        await reachy.head.look_at_async(x=0.5, y=-0.5, z=0.1, duration=1.1)
        await asyncio.sleep(0.2)
        await reachy.head.look_at_async(x=0.5, y=0, z=-0.4, duration=1.1)
        await asyncio.sleep(0.2)
        await reachy.head.look_at_async(x=0.5, y=0.3, z=-0.3, duration=1.1)
        await asyncio.sleep(0.2)
        await reachy.head.look_at_async(x=0.5, y=0, z=0, duration=1.1)

lookat = LookAt(name='look_at')

In [8]:
class Idle(Behavior):
    behaviors = {
        'look_at': LookAt(name='look_at', sub_behavior=True),
        'popote': Popote(name='popote', sub_behavior=True),
    }
    async def run(self):
        while True:
            await Idle.behaviors[np.random.choice(list(Idle.behaviors.keys()))]._run()


idle = Idle(name='idle', sub_behavior=False)

In [9]:
class HeadController(Behavior):
    def __init__(self, name, sub_behavior, init_x=0.5, init_y=0.0, init_z=0.0):
        Behavior.__init__(self, name=name, sub_behavior=sub_behavior)
        self.x, self.y, self.z = init_x, init_y, init_z

    async def run(self):
#         self.x, self.y, self.z = init_x, init_y, init_z
        while True:
            try:
                gp_dic = reachy.head._look_at(self.x, self.y, self.z)
                reachy.head.neck_disk_bottom.goal_position = gp_dic[reachy.head.neck_disk_bottom]
                reachy.head.neck_disk_middle.goal_position = gp_dic[reachy.head.neck_disk_middle]
                reachy.head.neck_disk_top.goal_position = gp_dic[reachy.head.neck_disk_top]
            except ValueError:
                pass
            await asyncio.sleep(0.01)


head_controller = HeadController(name='head_controller', sub_behavior=False, init_x=0.5, init_y=0.0, init_z=0.0)

In [10]:
# class FaceTracking(Behavior):
#     def __init__(self, name: str, sub_behavior: bool = False) -> None:
#         super().__init__(name, sub_behavior=sub_behavior)

#         self.idle = Idle(name='idle', sub_behavior=False)
#         self.hc = HeadController(name='hc', sub_behavior=False)

#     async def run(self):
#         center = np.array([160, 160])

#         prev_y, prev_z = 0, 0
#         cmd_y, cmd_z = prev_y, prev_z
#         xM, yM = 0, 0

#         Kpy, Kpz, Kiy, Kiz, Kdy, Kdz = [0.00006, 0.00005, 0, 0, 0.017, 0.002]

#         nobody_index = 0
        
#         tic = time.time()
#         i = 0

#         try:
#             while True:
#                 if np.random.rand() > 0.5:
#                     await self.idle.start()
#                     await asyncio.sleep(5)
#                     await self.idle.stop()
#                 else:
#                     await self.hc.start()
#                     while i < 500:
#                         i += 1

# #                         nobody_index = 0

#                         x_target, y_target, _ = detect._face_target

#                         target = np.array([y_target, x_target]) - center

#                         cmd_z += np.round(-target[0] * Kpz, 3)
#                         cmd_y += np.round(-target[1] * Kpy, 3)

#                         self.hc.y = cmd_y
#                         self.hc.z = cmd_z
#                         await asyncio.sleep(0.01)

#                     i = 0
#                     await self.hc.stop()
#         except asyncio.CancelledError:
#             await self.idle.stop()
#             await self.hc.stop()
#             raise

In [38]:
class FaceTracking(Behavior):
    def __init__(self, name: str, sub_behavior: bool = False) -> None:
        super().__init__(name, sub_behavior=sub_behavior)

        self.idle = Idle(name='idle', sub_behavior=False)
        self.hc = HeadController(name='hc', sub_behavior=False)

    async def run(self):
        center = np.array([160, 160])

        prev_y, prev_z = 0, 0
        cmd_y, cmd_z = prev_y, prev_z
        xM, yM = 0, 0

        Kpy, Kpz, Kiy, Kiz, Kdy, Kdz = [0.00006, 0.00005, 0, 0, 0.017, 0.002]

        tic = time.time()
        i = 0

        try:
            while True:
                if not detect.somebody_detected():
                    if not self.idle.is_running():
                        await self.idle.start()
                        await self.hc.stop()
                        
                else:
                    if not self.hc.is_running():
                        await self.idle.stop()
                        await self.hc.start()

                    x_target, y_target, _ = detect._face_target

                    target = np.array([y_target, x_target]) - center

                    cmd_z += np.round(-target[0] * Kpz, 3)
                    cmd_y += np.round(-target[1] * Kpy, 3)

                    self.hc.y = cmd_y
                    self.hc.z = cmd_z
                await asyncio.sleep(0.01)

        except asyncio.CancelledError:
            await self.idle.stop()
            await self.hc.stop()
            raise

In [18]:
# for _ in range(100):
#     print(detect.somebody_detected())
#     time.sleep(0.1)

In [41]:
ft = FaceTracking(name='nft')

In [42]:
await ft.start()

Start-begin behavior nft
Setup behavior nft
Start-end behavior nft


<Task pending name='behavior_nft' coro=<Behavior._run() running at /tmp/ipykernel_145514/550224226.py:29>>

jl
Stop-begin behavior idle
Stop-end behavior idle
Start-begin behavior hc
Setup behavior hc
Start-end behavior hc
bob
Start-begin behavior idle
Setup behavior idle
Start-end behavior idle
Stop-begin behavior hc
hc got cancelled
Teardown behavior hc
Stop-end behavior hc
jl
Stop-begin behavior idle
look_at got cancelled
idle got cancelled
Teardown behavior idle
Stop-end behavior idle
Start-begin behavior hc
Setup behavior hc
Start-end behavior hc
bob
Start-begin behavior idle
Setup behavior idle
Start-end behavior idle
Stop-begin behavior hc
hc got cancelled
Teardown behavior hc
Stop-end behavior hc
jl
Stop-begin behavior idle
look_at got cancelled
idle got cancelled
Teardown behavior idle
Stop-end behavior idle
Start-begin behavior hc
Setup behavior hc
Start-end behavior hc
bob
Start-begin behavior idle
Setup behavior idle
Start-end behavior idle
Stop-begin behavior hc
hc got cancelled
Teardown behavior hc
Stop-end behavior hc
jl
Stop-begin behavior idle
popote got cancelled
idle got c

bob
Start-begin behavior idle
Setup behavior idle
Start-end behavior idle
Stop-begin behavior hc
hc got cancelled
Teardown behavior hc
Stop-end behavior hc


In [43]:
await ft.stop()

Stop-begin behavior nft
Stop-begin behavior idle
popote got cancelled
idle got cancelled
Teardown behavior idle
Stop-end behavior idle
Stop-begin behavior hc
Stop-end behavior hc
nft got cancelled
Teardown behavior nft
Stop-end behavior nft


In [14]:
reachy.turn_on('reachy')

In [34]:
await idle.stop()

Teardown behavior popote
Stop-begin behavior idle
popote got cancelled
idle got cancelled
Teardown behavior idle
Stop-end behavior idle


In [44]:
reachy.turn_off_smoothly('reachy')

In [36]:
a = [False, True, True]

In [38]:
np.mean(a) > 0.5

True