In [None]:
import cv2 as cv
import threading
import random
from time import sleep
import ipywidgets as widgets
from IPython.display import display
from color_follow import color_follow
from face_follow import face_follow

In [None]:
import Arm_Lib
Arm = Arm_Lib.Arm_Device()
joints_0 = [90, 135, 20, 25, 90, 30]
Arm.Arm_serial_servo_write6_array(joints_0, 1000)

In [None]:
import smbus
import time
bus = smbus.SMBus(1)

i2c_addr = 0x0f   #Speech recognition module address
asr_add_word_addr  = 0x01   #Entry add address
asr_mode_addr  = 0x02   #Recognition mode setting address, the value is 0-2, 0: cyclic recognition mode 1: password mode, 2: button mode, the default is cyclic detection
asr_rgb_addr = 0x03   #RGB lamp setting address, need to send two bits, the first one is directly the lamp number 1: blue 2: red 3: green
                      #The second byte is brightness 0-255, the larger the value, the higher the brightness
asr_rec_gain_addr  = 0x04    #Identification sensitivity setting address, sensitivity can be set to 0x00-0x7f, the higher the value, the easier it is to detect but the easier it is to misjudge
                             #It is recommended to set the value to 0x40-0x55, the default value is 0x40
asr_clear_addr = 0x05   #Clear the operation address of the power-off cache, clear the cache area information before entering the information
asr_key_flag = 0x06  #Used in key mode, set the startup recognition mode
asr_voice_flag = 0x07   #Used to set whether to turn on the recognition result prompt sound
asr_result = 0x08  #Recognition result storage address
asr_buzzer = 0x09  #Buzzer control register, 1 bit is on, 0 bit is off
asr_num_cleck = 0x0a #Check the number of entries
asr_vession = 0x0b #firmware version number
asr_busy = 0x0c #Busy and busy flag

i2c_speech_addr = 0x30   
speech_date_head = 0xfd

In [None]:
def I2C_WriteBytes(str_):
    global i2c_speech_addr
    for ch in str_:
        try:
            bus.write_byte(i2c_speech_addr, ch)
            time.sleep(0.01)
        except:
            print("write I2C error")


EncodingFormat_Type = {
    'GB2312': 0x00,
    'GBK': 0X01,
    'BIG5': 0x02,
    'UNICODE': 0x03
}


def Speech_text(str_, encoding_format):
    str_ = str_.encode('gb2312')
    size = len(str_) + 2
    DataHead = speech_date_head
    Length_HH = size >> 8
    Length_LL = size & 0x00ff
    Command = 0x01
    EncodingFormat = encoding_format

    Data_Pack = [DataHead, Length_HH, Length_LL, Command, EncodingFormat]

    I2C_WriteBytes(Data_Pack)
    I2C_WriteBytes(str_)


def SetBase(str_):
    str_ = str_.encode('gb2312')
    size = len(str_) + 2

    DataHead = speech_date_head
    Length_HH = size >> 8
    Length_LL = size & 0x00ff
    Command = 0x01
    EncodingFormat = 0x00

    Data_Pack = [DataHead, Length_HH, Length_LL, Command, EncodingFormat]

    I2C_WriteBytes(Data_Pack)
    I2C_WriteBytes(str_)


def TextCtrl(ch, num):
    if num != -1:
        str_T = '[' + ch + str(num) + ']'
        SetBase(str_T)
    else:
        str_T = '[' + ch + ']'
        SetBase(str_T)


ChipStatus_Type = {
    'ChipStatus_InitSuccessful': 0x4A,  # Initialization successful response
    'ChipStatus_CorrectCommand': 0x41,  # Correct command frame received response
    'ChipStatus_ErrorCommand': 0x45,  # Unrecognized command frame received response
    'ChipStatus_Busy': 0x4E,  # Chip busy status response
    'ChipStatus_Idle': 0x4F  # Chip idle status response
}


def GetChipStatus():
    global i2c_speech_addr
    AskState = [0xfd, 0x00, 0x01, 0x21]
    try:
        I2C_WriteBytes(AskState)
        time.sleep(0.05)
    except:
        print("I2CRead_Write error")

    try:
        Read_result = bus.read_byte(i2c_speech_addr)
        return Read_result
    except:
        print("I2CRead error")


Style_Type = {
    'Style_Single': 0,  # Single character style
    'Style_Continue': 1  # Continuous synthesis
}  # Synthesis style setting [f?]


def SetStyle(num):
    TextCtrl('f', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


Language_Type = {
    'Language_Auto': 0,  # Automatic language detection
    'Language_Chinese': 1,  # Arabic numerals, units of measure, special symbols 
    'Language_English': 2  # Arabic numerals, units of measure, special symbols 
}  # Synthesis language setting [g?]


def SetLanguage(num):
    TextCtrl('g', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


Articulation_Type = {
    'Articulation_Auto': 0,  # Automatic word pronunciation detection
    'Articulation_Letter': 1,  # Letter pronunciation
    'Articulation_Word': 2  # Word pronunciation
}  # Word pronunciation setting [h?]


def SetArticulation(num):
    TextCtrl('h', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


Spell_Type = {
    'Spell_Disable': 0,  
    'Spell_Enable': 1  
}  


def SetSpell(num):
    TextCtrl('i', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


Reader_Type = {
    'Reader_XiaoYan': 3,  
    'Reader_XuJiu': 51,  
    'Reader_XuDuo': 52,  
    'Reader_XiaoPing': 53, 
    'Reader_DonaldDuck': 54,  
    'Reader_XuXiaoBao': 55  
}  


def SetReader(num):
    TextCtrl('m', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


NumberHandle_Type = {
    'NumberHandle_Auto': 0,  # Automatic detection
    'NumberHandle_Number': 1,  # Numbers processed as phone numbers
    'NumberHandle_Value': 2  # Numbers processed as values
}  # Number processing strategy setting [n?]


def SetNumberHandle(num):
    TextCtrl('n', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


ZeroPronunciation_Type = {
    'ZeroPronunciation_Zero': 0,  # Pronounced as "zero"
    'ZeroPronunciation_O': 1  # Pronounced as "O"
}  # Pronunciation of "0" in English/number [o?]


def SetZeroPronunciation(num):
    TextCtrl('o', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


NamePronunciation_Type = {
    'NamePronunciation_Auto': 0,  # Automatic surname pronunciation detection
    'NamePronunciation_Constraint': 1  # Force use of surname pronunciation rules
}  # Name pronunciation strategy setting [r?]


def SetNamePronunciation(num):
    TextCtrl('r', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


# Set speech speed [s?] ? is the speed value, ranging from 0 to 10
def SetSpeed(speed):
    TextCtrl('s', speed)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


# Set intonation [t?] ? is the intonation value, ranging from 0 to 10
def SetIntonation(intonation):
    TextCtrl('t', intonation)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


# Set volume [v?] ? is the volume value, ranging from 0 to 10
def SetVolume(volume):
    TextCtrl('v', volume)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


OnePronunciation_Type = {
    'OnePronunciation_Yao': 0,  
    'OnePronunciation_Yi': 1  
}  


def SetOnePronunciation(num):
    TextCtrl('y', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


Rhythm_Type = {
    'Rhythm_Disable': 0,  # " *" and "#" pronounced as symbols
    'Rhythm_Enable': 1  # Processed as rhythm, "*" for word breaks, "#" for pauses
}  # Use of rhythm marks "*" and "#" [z?]


def SetRhythm(num):
    TextCtrl('z', num)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


# Restore default synthesis parameters [d] All settings (except reader and language) revert to default values
def SetRestoreDefault():
    TextCtrl('d', -1)
    while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
        time.sleep(0.002)


# ConfigureSpeech

In [None]:
#Write entry
def AsrAddWords(idnum,str):
    global i2c_addr
    global asr_add_word_addr
    words = []
    words.append(asr_add_word_addr)
    words.append(len(str) + 2)
    words.append(idnum)
    for  alond_word in str:
        words.append(ord(alond_word))
    words.append(0)
    print(words)
    for date in words:
        bus.write_byte (i2c_addr, date)
        time.sleep(0.03)

#Set RGB
def RGBSet(R,G,B):
    global i2c_addr
    global asr_rgb_addr
    date = []
    date.append(R)
    date.append(G)
    date.append(B)
    print(date)
    bus.write_i2c_block_data (i2c_addr,asr_rgb_addr,date)

#Read result
def I2CReadByte(reg):
    global i2c_addr
    bus.write_byte (i2c_addr, reg)
    time.sleep(0.05)
    Read_result = bus.read_byte (i2c_addr)
    return Read_result

#Wait busy
def Busy_Wait():
    busy = 255
    while busy != 0:
        busy = I2CReadByte(asr_busy)
        print(asr_busy)	

### Create an instance and initialize parameters.

In [None]:
follow = color_follow()
follow2 = face_follow()
model = 'General'
HSV_learning = ()
color_hsv = {"red": ((170, 124, 134), (229, 242, 255)),
             "green": ((54, 113, 64), (75, 255, 255)),
             "blue": ((102, 150, 124), (124, 253, 255)),
             "yellow": ((22, 125, 130), (47, 255, 255))}
color = [[random.randint(0, 255) for _ in range(3)] for _ in range(255)]
HSV_path="/home/jetson/dofbot_ws/src/dofbot_color_follow/HSV_config.txt"
try: read_HSV(HSV_path,color_hsv)
except Exception: print("Read HSV_config Error !!!")

### Create a control

In [None]:
button_layout = widgets.Layout(width='200px', height='100px', align_self='center')
# Output control
output = widgets.Output()
# Color tracking
color_follow = widgets.Button(description='color_follow', button_style='success', layout=button_layout)
# Choose color
choose_color = widgets.ToggleButtons(options=['red', 'green', 'blue', 'yellow'], button_style='success',
             tooltips=['Description of slow', 'Description of regular', 'Description of fast'])
# Cancel tracking
follow_cancel = widgets.Button(description='follow_cancel', button_style='danger', layout=button_layout)
# Learn color
learning_color = widgets.Button(description='learning_color', button_style='primary', layout=button_layout)
# Learn color tracking
learning_follow = widgets.Button(description='learning_follow', button_style='success', layout=button_layout)
# Exit
exit_button = widgets.Button(description='Exit', button_style='danger', layout=button_layout)
# Image control
imgbox = widgets.Image(format='jpg', height=480, width=640, layout=widgets.Layout(align_self='auto'))
# Vertical layout
img_box = widgets.VBox([imgbox, choose_color], layout=widgets.Layout(align_self='auto'))
# Vertical layout
Slider_box = widgets.VBox([color_follow, learning_color, learning_follow, follow_cancel, exit_button],
                          layout=widgets.Layout(align_self='auto'))
# Horizontal layout
controls_box = widgets.HBox([img_box, Slider_box], layout=widgets.Layout(align_self='auto'))
# ['auto', 'flex-start', 'flex-end', 'center', 'baseline', 'stretch', 'inherit', 'initial', 'unset']


### mode switch

In [None]:
def color_follow_Callback(value):
    global model
    model = 'color_follow'
def learning_color_Callback(value):
    global model
    model = 'learning_color'
def learning_follow_Callback(value):
    global model
    model = 'learning_follow'
def follow_cancel_Callback(value):
    global model
    model = 'General'
def exit_button_Callback(value):
    global model
    model = 'Exit'
color_follow.on_click(color_follow_Callback)
learning_color.on_click(learning_color_Callback)
learning_follow.on_click(learning_follow_Callback)
follow_cancel.on_click(follow_cancel_Callback)
exit_button.on_click(exit_button_Callback)

# Clear the entries and module mode data in the power-down buffer. This part only needs to be written for the first use. 
# If no further changes are needed later, you can set 1 to 0 or skip it. 
# Then set the module's sensitivity and the switch for recognition prompt sound. 
# After that, light up the module's RGB light in white for 1 second and beep for 1 second, 
# and announce "Initialization complete, please issue a command."


In [None]:
'''
The mode and phrase have the function of power-down save, if there is no modification after the first entry, you can change 1 to 0
'''
cleck = 1

if 1:
    bus.write_byte_data(i2c_addr, asr_clear_addr, 0x40)#Clear the power-down buffer area
    Busy_Wait()#Wait for the module to be free
    print("Cache cleared")
    bus.write_byte_data(i2c_addr, asr_mode_addr, 1)
    Busy_Wait()
    print("The mode is set")
    AsrAddWords(0,"xiao ya")
    Busy_Wait()
    AsrAddWords(1,"kai shi zhui zong hong se")
    Busy_Wait()
    AsrAddWords(2,"kai shi zhui zong huang se")
    Busy_Wait()
    AsrAddWords(3,"kai shi zhui zong lv se")
    Busy_Wait()
    AsrAddWords(4,"kai shi zhui zong lan se")
    Busy_Wait()
    AsrAddWords(5,"kai shi zhui zong ren lian")
    Busy_Wait()
    AsrAddWords(6,"qu xiao zhui zong")
    Busy_Wait()
    while cleck != 7:
        cleck = I2CReadByte(asr_num_cleck)
        print(cleck)

bus.write_byte_data(i2c_addr, asr_rec_gain_addr, 0x40)#Set the sensitivity, the recommended value is 0x40-0x55
bus.write_byte_data(i2c_addr, asr_voice_flag, 1)#Set switch sound
bus.write_byte_data(i2c_addr, asr_buzzer, 1)#buzzer
RGBSet(255,255,255)
time.sleep(1)
RGBSet(0,0,0)
bus.write_byte_data(i2c_addr, asr_buzzer, 0)#buzzer

SetReader(Reader_Type["Reader_XiaoPing"])
SetVolume(8)
Speech_text("初始化完成，请发布指令",EncodingFormat_Type["GB2312"])
while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']:
    time.sleep(0.1)  

### main program

In [None]:
def camera():
    global HSV_learning, model
    # Open the camera
    capture = cv.VideoCapture(0)
    capture.set(3, 640)
    capture.set(4, 480)
    capture.set(5, 30)  # Set frame rate
    # Loop while the camera is opened
    while capture.isOpened():
        try:
            result = I2CReadByte(asr_result)
            time.sleep(0.01)
            # Read each frame from the camera
            _, img = capture.read()
            # Standardize image size
            img = cv.resize(img, (640, 480))
            
            if result == 1:
                choose_color.value = 'red'
                model = 'color_follow'
                result = 255
                Speech_text("Okay, starting to track red", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1)  
            elif result == 2:
                choose_color.value = 'yellow'
                model = 'color_follow'
                result = 255
                Speech_text("Okay, starting to track yellow", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1)   
            elif result == 3:
                choose_color.value = 'green'
                model = 'color_follow'
                result = 255
                Speech_text("Okay, starting to track green", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1)  
            elif result == 4:
                choose_color.value = 'blue'
                model = 'color_follow'
                result = 255
                Speech_text("Okay, starting to track blue", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1)   
            elif result == 5:
                model = 'follow2'
                result = 255
                Speech_text("Okay, starting to track faces", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1)  
            elif result == 6:
                model = 'General'
                result = 255
                Speech_text("Okay, canceling tracking", EncodingFormat_Type["GB2312"])
                while GetChipStatus() != ChipStatus_Type['ChipStatus_Idle']: # Wait for the current speech to finish
                    time.sleep(0.1) 
            
            if model == 'color_follow':
                img = follow.follow_function(img, color_hsv[choose_color.value])
                # Add text
                cv.putText(img, choose_color.value, (int(img.shape[0] / 2), 50), cv.FONT_HERSHEY_SIMPLEX, 2, color[random.randint(0, 254)], 2)
            if model == 'follow2':
                img = follow2.follow_function(img)
            if model == 'learning_color':
                img, HSV_learning = follow.get_hsv(img)
            if model == 'learning_follow':
                if len(HSV_learning) != 0:
                    print(HSV_learning)
                    img = follow.learning_follow(img, HSV_learning)
                    # Add text
                    cv.putText(img, 'LeColor', (240, 50), cv.FONT_HERSHEY_SIMPLEX, 1, color[random.randint(0, 254)], 1)
            if model == 'Exit':
                cv.destroyAllWindows()
                capture.release()
                break
            imgbox.value = cv.imencode('.jpg', img)[1].tobytes()
        except KeyboardInterrupt:
            capture.release()


### start

In [None]:
display(controls_box,output)
threading.Thread(target=camera, ).start()

##### 