In [None]:
import requests
from pathlib import Path
from argparse import ArgumentParser
from pygame import mixer
from reachy_sdk import ReachySDK
from reachy_sdk.trajectory import goto
from reachy_sdk.trajectory import InterpolationMode
import time
import speech_recognition as sr
import winsound
import re
from twilio.rest import Client
from twilio.twiml.voice_response import VoiceResponse
import cv2 as cv
import os
import discord
import asyncio
import nest_asyncio

In [None]:
def play_sound(location:str): # use local .mp3 file for Reachy speak
    file = Path(f"sound/{location}").absolute()
    mixer.init()
    mixer.music.load(file)
    mixer.music.play()
    while mixer.music.get_busy():
        pass
    mixer.music.stop()

In [None]:
def create_sound(word:str,location:str): #create the .mp3 file in the local
    url = f'https://fanyi.baidu.com/gettts?lan=cte&text={word}&spd=5&source=web'
    file = Path(location).absolute()
    with file.open('wb') as f:
        f.write(requests.get(url).content)


In [None]:
def record(): # get the sound from user
    recognizer = sr.Recognizer()
    microphone = sr.Microphone()
    ## get sound 
    try:
        with microphone as mic:
            winsound.Beep(1000,1000)
            recognizer.adjust_for_ambient_noise(mic)
            sound = recognizer.listen(mic)
    except:
        play_sound("no_sound.mp3")
    ## sound change to text  
    try:
        text=recognizer.recognize_google(sound, language='zh-hk')  # language choose Hong Kong language
    except:
        pass
        #play_sound("no_sound.mp3")
    return text

In [None]:
def find_keyword(text:str,keyword):
    if type(keyword) == list:
        terms = [re.compile(term) 
                for term in keyword] 
        for term in terms:    
            if term.search(text): 
                return True 
        return False
    if type(keyword) == str:
        terms = re.compile(keyword)
        if terms.search(text):
            return True 
        return False


In [None]:
def get_action(location:str):
    file = Path(location).absolute()
    with file.open('r') as f:
        action = f.readlines()
    return eval(action[0])

In [None]:
def record_do(name,action_time):
    recorded_joints_right = [
        reachy.r_arm.r_shoulder_pitch,
        reachy.r_arm.r_shoulder_roll,
        reachy.r_arm.r_arm_yaw,
        reachy.r_arm.r_elbow_pitch,
        reachy.r_arm.r_forearm_yaw,
        reachy.r_arm.r_wrist_pitch,
        reachy.r_arm.r_wrist_roll,
        reachy.r_arm.r_gripper,
    ]
    recorded_joints_left = [
        reachy.l_arm.l_shoulder_pitch,
        reachy.l_arm.l_shoulder_roll,
        reachy.l_arm.l_arm_yaw,
        reachy.l_arm.l_elbow_pitch,
        reachy.l_arm.l_forearm_yaw,
        reachy.l_arm.l_wrist_pitch,
        reachy.l_arm.l_wrist_roll,
        reachy.l_arm.l_gripper,
    ]
    sampling_frequency = 100  # in Hz
    record_duration = action_time  # in sec.
    #reset place
    # Set all used joint stiff
    for joint in recorded_joints_left + recorded_joints_right:
        joint.compliant = False
    left = name[0]
    right = name[1]
    # Create a dict associating a joint to its first recorded position
    first_point_right = dict(zip(recorded_joints_right, right[0]))
    first_point_left = dict(zip(recorded_joints_left, left[0]))
    first_point = {**first_point_left, **first_point_right}

    # Goes to the start of the trajectory in 3s
    goto(first_point, duration=3.0)

    for joints_positions_left, joints_positions_right in zip(left, right):
        for joint_left, pos_left, joint_right, pos_right in zip(recorded_joints_left, joints_positions_left, recorded_joints_right, joints_positions_right):
            joint_left.goal_position = pos_left
            joint_right.goal_position = pos_right

        time.sleep(1 / sampling_frequency)
    reachy.turn_off_smoothly('r_arm')
    reachy.turn_off_smoothly('l_arm')

In [None]:
def get_file_name(file_name,time):
    name = list()
    name.append(get_action(f"user_control/action/{file_name}_left.txt"))
    name.append(get_action(f"user_control/action/{file_name}_right.txt"))
    record_do(name,time)   

In [None]:
def answer_yes_no(order,reachy_action):
    yes = False
    text = record()
    order_yes = ['係']
    yes = find_keyword(text,order_yes)
    if yes == True:
        get_file_name(reachy_action[order][0],reachy_action[order][1])
    else:
        play_sound("not_have_keyword.mp3")

In [None]:
def call_phone():
    # Your Account SID from twilio.com/console
    account_sid = 'your ID'
    # Your Auth Token from twilio.com/console
    auth_token  = 'your ID'

    client = Client(account_sid, auth_token)

    twiml = VoiceResponse()
    twiml.say('你好，我係你屋企嘅智能機器人，依家陳伯身體覺得好唔舒服，請你睇一睇信息，我哋已經傳送咗一張陳伯嘅圖片畀你，請你判斷一下有冇需要報警。') 


    call = client.calls.create(
        from_='+your phone',
        to='+your phone',
        twiml=str(twiml),
    )

In [None]:
def chatbot(reachy_action, text, type):
    ##chatbot
    phone = find_keyword(text,'打電話')
    if phone:
        return True
    else:
        true_false_keyword = list()
        for i in range (len(reachy_action)):
            true_false_keyword.append(find_keyword(text,reachy_action[i][2]))
        print(true_false_keyword)
        if true_false_keyword.count(True) > 1: #more than 1 keyword
            play_sound("unsupport.mp3")
        if true_false_keyword.count(True) == 1: #1 keyword
            for i in range (len(true_false_keyword)):
                if true_false_keyword[i] == True:
                    if type == "record":
                        play_sound(f"repert{i}.mp3")
                        answer_yes_no(i,reachy_action)
                    if type == "text":
                        get_file_name(reachy_action[i][0],reachy_action[i][1])
        if true_false_keyword.count(True) == 0: #0 keyword
            play_sound("not_have_keyword.mp3")

In [None]:
def read_reachy_action(location:str):
    f = open(location, "r",encoding="utf-8")
    data = f.readline()
    return eval(data)

In [None]:
def create_sound_list(reachy_action:list):
    welcome = list()
    keyword_name = str()
    for i in range (len(reachy_action)):
        if type(reachy_action[i][2]) == list:
            welcome.append(reachy_action[i][2][0])
        if type(reachy_action[i][2]) == str:
            welcome.append(reachy_action[i][2])
    for i in range (len(welcome)):
        keyword_name = keyword_name + str(welcome[i])+","

    sound_list = [["歡迎光臨,我哋呢度提供"+keyword_name+"請問你要邊一個?","sound/welcome.mp3"],
              ["唔好意思我哋依家淨係支援一次叫一個動作.請重新再講下你嘅要求.","sound/unsupport.mp3"],
              ["唔好意思,我哋呢邊接收唔到你嘅指令.請重新再講過你嘅要求","sound/not_have_keyword.mp3"],
              ["呢度冇接收到任何嘅聲音.請檢查一下麥克風.","sound/no_sound.mp3"],
              ["已經幫你打咗電話啦","sound/called.mp3"]]
    for i in range (len(welcome)):
        sound_list.append([f"呢邊再重複一下你嘅要求.你要{welcome[i]}.如果係嘅話請講係.",f"sound/repert{i}.mp3"])
    return sound_list

In [None]:
def get_ip(location:str):
    file = Path(location).absolute()
    with file.open('r') as f:
        ip = f.readlines()
    return ip[0]

In [None]:
def get_text(location:str):
    f = open(location,"r",encoding="utf-8")
    text = f.readlines()
    return text[0]

In [None]:
reachy_action = read_reachy_action("user_control/reachy_action.txt")
sound_list = create_sound_list(reachy_action)
for i in range (len(sound_list)):
    create_sound(sound_list[i][0],sound_list[i][1])

In [None]:
reachy_ip = get_ip("user_control/reachy_ip.txt")
reachy = ReachySDK(host=reachy_ip) # replace with correct IP in file "user_control/reachy_ip.txt"

In [None]:
#record
play_sound("welcome.mp3")
text = record()  # for input in record
send = chatbot(reachy_action,text,"record")
if send:
    img = reachy.right_camera.last_frame
    cv.imwrite('savedimage.jpg', img) 
    nest_asyncio.apply()
    intents = discord.Intents.default()
    intents.members = True
    client = discord.Client(intents=intents)

    @client.event
    async def on_ready():
        print(f'Logged in as {client.user}')

        # Get the channel object for the test channel
        test_channel = client.get_channel('your ID') # Replace with the actual channel ID

        # Open the image file and create a discord.File object
        with open('savedimage.jpg', 'rb') as f:
            image = discord.File(f)

        # Send the image to the test channel
        try:
            await test_channel.send(file=image)
            print('Image sent to test channel successfully')
        except:
            print(f'Failed to send image to test channel {test_channel.name}')

        # Close the client connection
        async with client:
            print('Client connection closed successfully')
    client.run('your ID')
    call_phone()
    play_sound('called.mp3')

In [None]:
#text
text = get_text("user_control/your_need.txt") # for input in text
send = chatbot(reachy_action,text,"text")
if send:
    img = reachy.right_camera.last_frame
    cv.imwrite('savedimage.jpg', img) 
    nest_asyncio.apply()
    intents = discord.Intents.default()
    intents.members = True
    client = discord.Client(intents=intents)

    @client.event
    async def on_ready():
        print(f'Logged in as {client.user}')

        # Get the channel object for the test channel
        test_channel = client.get_channel('your ID') # Replace with the actual channel ID

        # Open the image file and create a discord.File object
        with open('savedimage.jpg', 'rb') as f:
            image = discord.File(f)

        # Send the image to the test channel
        try:
            await test_channel.send(file=image)
            print('Image sent to test channel successfully')
        except:
            print(f'Failed to send image to test channel {test_channel.name}')

        # Close the client connection
        async with client:
            print('Client connection closed successfully')
    client.run('your ID')
    call_phone()
    play_sound('called.mp3')