In [None]:
import time
import RPi.GPIO as GPIO

import serial
import numpy as np

import os
import sys
sys.path.append('./lib/python3.11/site-packages')

#rom openai import OpenAI
import pyttsx3
import warnings
import speech_recognition as sr

warnings.filterwarnings("ignore", module='speech_recognition')
from concurrent import futures

# Uart tf luna distance detection
ser = serial.Serial("/dev/serial0", 115200, timeout=0)

if ser.isOpen() == False:
    ser.open()
    
# Setup GPIO pins for both motors
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

Ena_M1, M1_In1, M1_In2 = 25, 23, 24
GPIO.setup(Ena_M1, GPIO.OUT)
GPIO.setup(M1_In1, GPIO.OUT)
GPIO.setup(M1_In2, GPIO.OUT)
pwm_1 = GPIO.PWM(Ena_M1, 100)
pwm_1.start(0)

Ena_M2, M2_In1, M2_In2 = 22, 27, 17
GPIO.setup(Ena_M2, GPIO.OUT)
GPIO.setup(M2_In1, GPIO.OUT)
GPIO.setup(M2_In2, GPIO.OUT)
pwm_2 = GPIO.PWM(Ena_M2, 100)
pwm_2.start(0)

# Bluetooth voice feedback
speak = pyttsx3.init()
speak.setProperty('rate', 120)

# Trying to disable ALSA lib warnings
def custom_warning_filter(message, category, filename, lineno, file=None, line = None):
    if "ALSA lib" in str(message):
        return
    return warnings.formatwarning(message, category, filename, lineno, line)
warnings.showwarning = custom_warning_filter

# Bluetooth Speech recognition
mic = sr.Microphone()
rec = sr.Recognizer()

# Voice recognition commands 1-forward, 2-reverse, 3-right, 4-left
commands = {
    'straight': 1,
    'forward' : 1,
    'reverse' : 2,
    'back' : 2,
    'backwards' : 2,
    'right' : 3,
    'write' : 3,
    'left' : 4
}

In [None]:

def read_tfluna_data():
    #counter = 10
    while True:
        time.sleep(1)
        counter = ser.in_waiting # count the number of bytes of the serial port
        if counter > 8:
            bytes_serial = ser.read(9) # read 9 bytes
            ser.reset_input_buffer() # reset buffer

            if bytes_serial[0] == 0x59 and bytes_serial[1] == 0x59: # check first two bytes
                distance = bytes_serial[2] + bytes_serial[3]*256 # distance in next two bytes
                strength = bytes_serial[4] + bytes_serial[5]*256 # signal strength in next two bytes
                temperature = bytes_serial[6] + bytes_serial[7]*256 # temp in next two bytes
                temperature = (temperature/8.0) - 256.0 # temp scaling and offset
                return distance/100.0,strength,temperature

def reverse(duration, speed = 70):
    t_end = time.time() + duration
    
    while time.time() < t_end:
        GPIO.output(M1_In1, GPIO.LOW)
        GPIO.output(M1_In2, GPIO.HIGH)
        pwm_1.ChangeDutyCycle(speed)
        GPIO.output(M2_In1, GPIO.HIGH)
        GPIO.output(M2_In2, GPIO.LOW)
        pwm_2.ChangeDutyCycle(speed)
        
    # Stop motors
    pwm_1.ChangeDutyCycle(0)
    pwm_2.ChangeDutyCycle(0)
    time.sleep(1)
        
def straight(duration, speed = 70):
    t_end = time.time() + duration
    
    while time.time() < t_end:
        GPIO.output(M1_In1, GPIO.HIGH)
        GPIO.output(M1_In2, GPIO.LOW)
        pwm_1.ChangeDutyCycle(speed)
        GPIO.output(M2_In1, GPIO.LOW)
        GPIO.output(M2_In2, GPIO.HIGH)
        pwm_2.ChangeDutyCycle(speed)
            
    # Stop motors
    pwm_1.ChangeDutyCycle(0)
    pwm_2.ChangeDutyCycle(0)
    time.sleep(1)
    
def right(angle = 90):
    
    # Set speed and duration to adjust perfect turns
    speed = 50
    duration = angle/190
    t_end = time.time() + duration
    
    while time.time() < t_end:
        GPIO.output(M1_In1, GPIO.LOW)
        GPIO.output(M1_In2, GPIO.HIGH)
        pwm_1.ChangeDutyCycle(speed)
        GPIO.output(M2_In1, GPIO.LOW)
        GPIO.output(M2_In2, GPIO.HIGH)
        pwm_2.ChangeDutyCycle(speed)
        
    # Stop motors
    pwm_1.ChangeDutyCycle(0)
    pwm_2.ChangeDutyCycle(0)
    time.sleep(1)
        
def left(angle = 90):
    # Set speed and duration to adjust perfect turns
    speed = 50
    duration = angle/200
    t_end = time.time() + duration
    
    while time.time() < t_end:
        GPIO.output(M1_In1, GPIO.HIGH)
        GPIO.output(M1_In2, GPIO.LOW)
        pwm_1.ChangeDutyCycle(speed)
        GPIO.output(M2_In1, GPIO.HIGH)
        GPIO.output(M2_In2, GPIO.LOW)
        pwm_2.ChangeDutyCycle(speed)
            
    # Stop motors
    pwm_1.ChangeDutyCycle(0)
    pwm_2.ChangeDutyCycle(0)
    time.sleep(1)
        

In [None]:
# This cell runs the voice recognition loop, say out loud the commands and say stop to terminate loop
listen = True
while listen:
    with mic as source:
        rec.adjust_for_ambient_noise(source)
        speak.say("Say something")
        speak.runAndWait()
        try:
            audio=rec.listen(source,timeout=8,phrase_time_limit=8)
            speech = str(rec.recognize_google(audio))# key = GOOGLE_API_KEY)
        except:
            speak.say('No audio detected, try again in two seconds')
            speak.runAndWait()
            time.sleep(2)
            continue
         
    #print(speech)
    words = speech.split(' ')
    for word in words:
        word = word.lower().strip()
        if word in ['break', 'shut', 'stop']:
            listen = False
            break
        elif word in commands.keys():
            val = commands[word]
            if val == 1:
                distance, strength, temperature = read_tfluna_data()
                tf_luna_distance_in_1_s = 0.45
                duration = distance/tf_luna_distance_in_1_s
                #speak.say('straight')
                straight(min(duration, 5))
            elif val == 2:
                #speak.say('reverse')
                reverse(2)
            elif val == 3:
                #speak.say('right')
                right()
            elif val == 4:
                #speak.say('left')
                left()
            else:
                speak.say(f"{word} not a valid command")
                speak.runAndWait()
        else:
            speak.say(f"{word} not a valid command")
            speak.runAndWait()
        time.sleep(1)
                    

In [None]:
def set_samp_rate(samp_rate=100):
    ##########################
    # change the sample rate
    samp_rate_packet = [0x5a,0x06,0x03,samp_rate, 00, 00] # sample rate byte array
    ser.write(samp_rate_packet) # send sample rate instruction
    time.sleep(0.1) # wait for change to take effect
    return
            
def get_version():
    ##########################
    # get version info
    info_packet = [0x5a,0x04,0x14,0x00]

    ser.write(info_packet)
    time.sleep(0.1)
    bytes_to_read = 30
    t0 = time.time()
    while (time.time()-t0)<5:
        counter = ser.in_waiting
        if counter > bytes_to_read:
            bytes_data = ser.read(bytes_to_read)
            ser.reset_input_buffer()
            if bytes_data[0] == 0x5a:
                version = bytes_data[3:-1].decode('utf-8')
                print('Version -'+version)
                return
            else:
                ser.write(info_packet)
                time.sleep(0.1)

def set_baudrate(baud_indx=4):
    ##########################
    # get version info
    baud_hex = [[0x80,0x25,0x00], # 9600
                [0x00,0x4b,0x00], # 19200
                [0x00,0x96,0x00], # 38400
                [0x00,0xe1,0x00], # 57600
                [0x00,0xc2,0x01], # 115200
                [0x00,0x84,0x03], # 230400
                [0x00,0x08,0x07], # 460800
                [0x00,0x10,0x0e]]  # 921600
    info_packet = [0x5a,0x08,0x06,baud_hex[baud_indx][0],baud_hex[baud_indx][1],
                   baud_hex[baud_indx][2],0x00,0x00] # instruction packet 

    prev_ser.write(info_packet) # change the baud rate
    time.sleep(0.1) # wait to settle
    prev_ser.close() # close old serial port
    time.sleep(0.1) # wait to settle
    ser_new =serial.Serial("/dev/serial0", baudrates[baud_indx],timeout=0) # new serial device
    if ser_new.isOpen() == False:
        ser_new.open() # open serial port if not open
    bytes_to_read = 8
    t0 = time.time()
    while (time.time()-t0)<5:
        counter = ser_new.in_waiting
        if counter > bytes_to_read:
            bytes_data = ser_new.read(bytes_to_read)
            ser_new.reset_input_buffer()
            if bytes_data[0] == 0x5a:
                indx = [ii for ii in range(0,len(baud_hex)) if \
                        baud_hex[ii][0]==bytes_data[3] and
                        baud_hex[ii][1]==bytes_data[4] and
                        baud_hex[ii][2]==bytes_data[5]]
                print('Baud Rate = {0:1d}'.format(baudrates[indx[0]]))
                time.sleep(0.1) 
                return ser_new
            else:
                ser_new.write(info_packet) # try again if wrong data received
                time.sleep(0.1) # wait 100ms
                continue

#
############################
# Configurations
############################
#
baudrates = [9600,19200,38400,57600,115200,230400,460800,921600] # baud rates
prev_indx = 4 # previous baud rate index (current TF-Luna baudrate)
prev_ser = serial.Serial("/dev/serial0", baudrates[prev_indx],timeout=0) # mini UART serial device
if prev_ser.isOpen() == False:
    prev_ser.open() # open serial port if not open
baud_indx = 4 # baud rate to be changed to (new baudrate for TF-Luna)
ser = set_baudrate(baud_indx) # set baudrate, get new serial at new baudrate
set_samp_rate(100) # set sample rate 1-250
get_version() # print version info for TF-Luna
time.sleep(0.1) # wait 100ms to settle

In [None]:
#
############################
# Testing the TF-Luna Output
############################
#

def right_loop_tf_luna(steps = 5):
    
    for step in range(steps):
        
        '''
        tot_pts = 10 # points for sample rate test
        t0 = time.time() # for timing
        dist_array = [] # for storing values
        while len(dist_array)<tot_pts:
            try:
                distance,strength,temperature = read_tfluna_data() # read values
                dist_array.append(distance) # append to array

            except:
                continue
        '''
        '''
        dist_array = []
        samples = 3
        for i in range(samples):
            distance,strength,temperature = read_tfluna_data() # read values
            dist_array.append(distance)
            
        distance = np.sum(dist_array)/len(dist_array)
        '''
        center_dist, avg_dist, min_dist = panorama(sample_size = 3)

        # In 1s bot travels 11cm, 0.40 tf luna distance
        tf_luna_distance_in_1_s = 0.45
        duration = center_dist/tf_luna_distance_in_1_s

        print(f'Straight {round(duration, 2)}')
        if duration > 0.75:
            straight(round(duration, 2))
            time.sleep(1)
            right()
        time.sleep(4)
        
def panorama(sample_size = 5):
    '''
    Gets a panoromic ditance reading at fixed angles from left and right
    Returns average and miniumum values
    '''
    dist_array = []
    angle = 25
    
    steps = sample_size//2
    directions = [0] + [1]*int(steps) + [-1]*int(steps)*2 + [0]
    #print(directions)
    
    counter = 0
    for i in directions:
        
        if i>0:
            right(angle)
            counter -= 1
        elif i<0:
            left(angle)
            counter +=1 
        
        distance,strength,temperature = read_tfluna_data() # read values
        dist_array.append(distance)
    
    
    for i in range(counter):
        right(angle)
        
    return (dist_array[0] + dist_array[-1])/2, np.sum(dist_array)/len(dist_array), min(dist_array)