FSM

In [None]:
import GlobalNavigation as globnav
import ExtendedKalmanFilter as EKF
import camera as cam
import cv2
import numpy as np
import math
from tdmclient import ClientAsync, aw
import constants
import time

creating the node

In [2]:
client = ClientAsync()
node = await client.wait_for_node()
await node.lock()

Node fee58e48-a333-47f7-8e98-fbfeb0cf054d

In [3]:
PATH_FOLLOWING = 0
LOCAL_AVOIDANCE = 1

state = PATH_FOLLOWING
prev_state = PATH_FOLLOWING

In [4]:
def distance_between(p1, p2):
    delta_x = p1[0] - p2[0]
    delta_y = p1[1] - p2[1]
    return math.sqrt(delta_x**2 + delta_y**2)

In [None]:
def motors(l_speed=100, r_speed=100, verbose=False):
    """defining the motor speeds and printing them if necessary for debugging puposes

    Args:
        l_speed (int, optional): left speed. Defaults to 100.
        r_speed (int, optional): right. Defaults to 100.
        verbose (bool, optional): debugging. Defaults to False.

    """
    if verbose:
        print("\t\t Setting speed : ", l_speed, r_speed)
    return {
        "motor.left.target": [l_speed],
        "motor.right.target": [r_speed],
    }
    




In [6]:
async def test_obst(obst_threshold, verbose=False):
    """is there any obstacle, independantly on which sensor is sensing it?

    Args:
        obst_threshold (_type_): obstacle threshold
        verbose (bool, optional): debugging. Defaults to False.

    """
    
    await node.wait_for_variables({'prox.horizontal'})
    prox_horizontal = list(node['prox.horizontal'])

    if verbose :
        if prox_horizontal is None:
            print("Failed to fetch sensor data.")
        else:
            print("Sensor data fetched successfully:", prox_horizontal)

    for value in prox_horizontal:
        if value>obst_threshold :
            if verbose: print("\t\t Saw an obstacle")
            return True
    
    return False

In [7]:
async def check_state ():
    """
    returns the state it is in
    
    """
    if await test_obst(0, False) :
        state = LOCAL_AVOIDANCE
    else:
        state = PATH_FOLLOWING
    return state

In [8]:
def PID(Kp =10, Ki=3, Kd=1, maxerr=40, minuserr=0, prev_err=0, err_sum=0, angle_diff=0): 
    """PID control to minimise the angle error

    Args:
        Kp (int, optional): Proportional. Defaults to 10.
        Ki (int, optional): Integrative. Defaults to 3.
        Kd (int, optional): Derivative. Defaults to 1.
        maxerr (int, optional): maximum sum error to xhich we set a max value. Defaults to 40.
        minuserr (int, optional): sub err. Defaults to 0.
        prev_err (int, optional): _description_. Defaults to 0.
        err_sum (int, optional): _description_. Defaults to 0.
        angle_diff (int, optional): _description_. Defaults to 0.

    Returns:
        returns the PID output that we can directly apply to the motors speed
    """
    ## enft on a tester un PD, test PID now
    err_sum += angle_diff
    minuserr = angle_diff-prev_err
    
    dt = 0.02

    #tester sans
    if err_sum> maxerr:
        err_sum = maxerr
    if err_sum < -maxerr:
        err_sum = -maxerr

    PID_eff = Kp*angle_diff + Ki * err_sum * dt + Kd*minuserr / dt
    prev_err=angle_diff

    
    
    return PID_eff, maxerr, minuserr, prev_err, err_sum

In [9]:

async def path_following(motor_speed, path, loc, angle, next_idx, Kp, Ki, Kd, maxerr, minuserr, prev_err, err_sum):
    
    current_x, current_y = loc
    current_angle = angle
    goal_x, goal_y = path[next_idx]

    # Calculate heading to the waypoint
    delta_x = current_x - goal_x
    delta_y = current_y - goal_y
    
    target_angle = math.atan2(delta_y, delta_x) # Angle to current waypoint
    angle_diff = (current_angle - target_angle ) + np.pi # Normalize angle
    
    #to do the smallest turn  
    if angle_diff > math.pi:
        angle_diff = angle_diff - 2*math.pi
    elif angle_diff < -math.pi:
        angle_diff = angle_diff + 2*math.pi
    
    PID_eff, maxerr, minuserr, prev_err, err_sum = PID(Kp, Ki, Kd, maxerr, minuserr, prev_err, err_sum, angle_diff)
    
    prev_err=angle_diff

    await node.set_variables(motors(int(motor_speed - PID_eff), int(motor_speed + PID_eff) ))

    prev_state = PATH_FOLLOWING

    return prev_state

        

In [10]:
async def straight(motor_speed, t):
    await node.set_variables(motors(motor_speed, motor_speed))
    await client.sleep(t)

async def turn_right(motor_speed, t):
    await node.set_variables(motors(motor_speed, 0))
    await client.sleep(t)
    
async def turn_left(motor_speed, t):
    await node.set_variables(motors(0, motor_speed))
    await client.sleep(t)
    
async def piv_right(motor_speed, t):
    await node.set_variables(motors(motor_speed, -motor_speed))
    await client.sleep(t)
    
async def piv_left(motor_speed, t):
    await node.set_variables(motors(-motor_speed, motor_speed))
    await client.sleep(t)
    
async def go_back(motor_speed, t):
    await node.set_variables(motors(-motor_speed, -motor_speed))
    await client.sleep(t)


In [11]:
async def obst_avoidance(motor_speed, obst_threshold, verbose):
    """
    obstacle avoidance behaviour of the FSM
    param motor_speed: the Thymio's motor speed
    param obst_threshold: threshold starting which it is considered that the sensor saw an obstacle
    param verbose: whether to print status messages or not
    """
      
    prox_horizontal = list(node['prox.horizontal'])

    ### here faire pour chaque sensor
    left = prox_horizontal[0]
    centeredleft = prox_horizontal[1]
    center = prox_horizontal[2]
    centeredright = prox_horizontal[3]
    right = prox_horizontal[4]
    
    # detect in the middle
    if center != 0:
        await piv_left(motor_speed, 0.1)
    
    # detect on the LEFT
    if left != 0: 
        await piv_right(motor_speed, 0.1)
        await straight(motor_speed, 1)
        
    # detect on the RIGHT
    if right != 0: 
        await piv_left(motor_speed, 0.1)
        await straight(motor_speed, 1)

    # in LEFTCENTER
    if centeredleft != 0: 
        await piv_right(motor_speed, 0.1)

    # detect on the RIGHT
    if centeredright != 0: 
        await piv_left(motor_speed, 0.1)
        
    if centeredleft > centeredright: 
        await piv_right(motor_speed, 0.1)
        
    if centeredright > centeredleft: 
        await piv_left(motor_speed, 0.1)

    await client.sleep(0.1) #otherwise, variables would not be updated
    
    prev_state = LOCAL_AVOIDANCE
    
    return prev_state

In [None]:
async def main_FSM(speed, obst_threshold, verbose=False):

    #DEFINING GLOBAL VARIABLES
    global_path_found = False  #to read the path only when needed
    next_idx = 1 #path 
    minuserr, prev_err, err_sum =0,0,0 # for PID, to accumulate error
    loc = None 
    goal = None
    angle = None
    dir_vec = None
    
    
    #VISION AND PLOTTING
    vision = cv2.VideoCapture(0)

    cv2.namedWindow("test", cv2.WINDOW_AUTOSIZE)
    cv2.namedWindow("drawing", cv2.WINDOW_AUTOSIZE)
    cv2.startWindowThread()
    
    for i in range(60):
        ret, bin_frame = vision.read()
        
    #first reading global obstacles
    _, obstacles_frame = vision.read()
    obstacles = cam.find_obstacles(obstacles_frame)

    frame_width = int(vision.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(vision.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Define the codec and create VideoWriter object (for video)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output = cv2.VideoWriter('output.mp4', fourcc, 30.0, (frame_width, frame_height))

    # detector for arucode
    aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_100)
    aruco_params = cv2.aruco.DetectorParameters()
    detector = cv2.aruco.ArucoDetector(aruco_dict, aruco_params)

    # bound not to go out of frame
    bounds = cam.get_bounds(obstacles_frame, detector)
    
    #initialisation for Kalman filter
    loc, dir_vec, angle, thymio_corners, goal = cam.live_pos(obstacles_frame, detector, constants.THYMIO_ID, constants.GOAL_ID)  # position given by the localization function
    x_est = [np.array([[loc[0]], [loc[1]], [0], [0], [0]])]   
    P_est = [1000 * np.ones(5)]                                                
    camera_available = False                                  
  
    while True:
        
        start_time = time.time()  #to synchronize the kalman filter and data sampling
        
        _, frame = vision.read()

        blank_frame = np.zeros(frame.shape)

        new_loc, new_dir_vec, new_angle, thymio_corners, new_goal = cam.live_pos(frame, detector, constants.THYMIO_ID, constants.GOAL_ID)  # position given by the localization function
        
        if new_loc is None:
            camera_available = False 
        else:
            camera_available = True 
        
        aw(client.sleep(0.1))
        await node.wait_for_variables() # wait for Thymio variables values
        
        new_x_est, new_P_est = EKF.extended_kalman_filter(node["motor.left.target"], node["motor.right.target"], 
                                                          new_loc, new_angle, x_est[-1], P_est[-1], camera_available) # starting to use the kalman filter
        
        #adding the new estimation to the list of estimations
        x_est.append(new_x_est)
        P_est.append(new_P_est)
        
        new_x_est = tuple(new_x_est.flatten())
        new_P_est = tuple(new_P_est.flatten())

        loc = (int(new_x_est[0]),int(new_x_est[1])) # location estimated by kalman filtering

        angle = (new_x_est[3]) # angle estimated by kalman filtering
        dir_vec = EKF.get_vector_from_angle(angle) #director vector estimated by kalman filtering

        if new_goal is not None:
            goal = new_goal

        #getting global path once
        if global_path_found == False:
            path = cam.live_path(obstacles, loc, goal, constants.THYMIO_SIZE, bounds) ## here using filtering?
            global_path_found= True

        #plotting
        cam.live_drawing(frame, blank_frame, thymio_corners, obstacles, path)

        distance_to_goal = distance_between(loc, path[next_idx])

        if distance_to_goal < 100:  # Threshold for reaching next point
            next_idx += 1  # next point
            if next_idx >= len(path) and distance_to_goal >= constants.THRESHOLD_TO_GOAL:
                    await node.set_variables(motors(speed,speed))
                    next_idx=next_idx-1
            if distance_to_goal < constants.THRESHOLD_TO_GOAL:
                await node.set_variables(motors(0, 0)) # if reached final goal stop the thymio and go out of function
                vision.release()
                cv2.destroyAllWindows()
                output.release()
                return

        # Step 1: path following (for now going straight) global nav
        if await check_state()== PATH_FOLLOWING:
            prev_state = await path_following(speed, path, loc, angle, next_idx, 10, 3, 1, 40, minuserr, prev_err, err_sum) #maxSum et Kp par tatonement 

        # Step 2: obst following local nav
        if await check_state() == LOCAL_AVOIDANCE:
            prev_state = await obst_avoidance(speed, obst_threshold, verbose)

        #recalculating path after local avoidance
        if prev_state == LOCAL_AVOIDANCE and await check_state()==PATH_FOLLOWING:
            path = cam.live_path(obstacles, loc, goal, constants.THYMIO_SIZE, bounds)
            next_idx = 1
        
        #plotting
        goal_dir = (int(loc[0] + 50*dir_vec[0]) , int(loc[1] + 50*dir_vec[1]))

        cv2.circle(frame, loc, 5, [255,0,0], 5)
        cv2.line(frame, loc, goal_dir, [0,255,0], 5)
        cv2.line(frame, loc, path[next_idx], [0,0,255], 5)
        cv2.putText(frame, "{} radians".format(angle), (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, [255,255,255], 3)

        output.write(frame)
        cv2.imshow("test", frame)

        if cv2.waitKey(1) == ord('q'):
            break
        
        elapsed_time = time.time() - start_time  ## duration of the loop for filtering purposes
        time.sleep(max(0, constants.Ts - elapsed_time))  # Allows a constanst sampling frequency (kalman)


In [None]:
await main_FSM(100, 0, False)



Goal seen
Thymio seen
Goal seen
Thymio seen
going straight
(933.5100569087883, 679.6481941751747)
(868.0242632853335, 772.4907538312334)
(890.8626888895599, 879.8313541710976)
(893.6068355654367, 889.2065933239161)
(950.9911015615459, 1041.7279318925223)
(1040.237199627458, 1087.5582189283818)
(1214.9307088943347, 1040.6892286372686)
(1268.9879470388732, 966.7192388317585)
(1264.4673672244567, 869.5267728218028)
(1261.5080065715267, 852.9034124977904)
(1238.7304908435772, 780.7746126926173)
(1202.6418777703295, 739.6747021376884)
(1059.9944743073595, 673.2942272588805)
(1023.2438641600601, 667.6836865416718)
(981.0, 590.6371758929737)
(966.934282463426, 602.6987598322128)
(913.6847700692721, 685.5313346675632)
(954.2394774090801, 804.4134330127677)
(996.5805630079013, 819.4839889038735)
(1000.3749353395614, 850.9230739376284)
(1084.8238341201882, 917.3878759812178)
(1184.2966925128958, 905.6851867585464)
(1230.817081469768, 876.48113531)
(1270.1306538954664, 882.1879442105046)
(1360.0,

2024-12-05 17:36:47.990 python[67054:9773497] +[IMKClient subclass]: chose IMKClient_Legacy
2024-12-05 17:36:47.990 python[67054:9773497] +[IMKInputSession subclass]: chose IMKInputSession_Legacy


Goal seen
Thymio seen
turning left
Goal seen
Thymio seen
turning left
Goal seen
Thymio seen
turning left
Thymio seen
Goal seen
turning left
Goal seen
Thymio seen
turning left
(933.5100569087883, 679.6481941751747)
(868.0242632853335, 772.4907538312334)
(890.8626888895599, 879.8313541710976)
(893.6068355654367, 889.2065933239161)
(950.9911015615459, 1041.7279318925223)
(1040.237199627458, 1087.5582189283818)
(1214.9307088943347, 1040.6892286372686)
(1268.9879470388732, 966.7192388317585)
(1264.4673672244567, 869.5267728218028)
(1261.5080065715267, 852.9034124977904)
(1238.7304908435772, 780.7746126926173)
(1202.6418777703295, 739.6747021376884)
(1059.9944743073595, 673.2942272588805)
(1023.2438641600601, 667.6836865416718)
(981.0, 590.6371758929737)
(966.934282463426, 602.6987598322128)
(913.6847700692721, 685.5313346675632)
(954.2394774090801, 804.4134330127677)
(996.5805630079013, 819.4839889038735)
(1000.3749353395614, 850.9230739376284)
(1084.8238341201882, 917.3878759812178)
(1184.

In [None]:
await node.unlock()

{'error_code': 2}