In [11]:
import os
import sys
import time
import serial
import math
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors
import cv2
import A_star

# Adding the src folder in the current directory as it contains the script
# with the Thymio class
sys.path.insert(0, os.path.join(os.getcwd(), 'utils'))
from Thymio import Thymio

In [None]:
from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

In [12]:
class robot():
    def __init__(self, th):
        self.curr_pos = [0,0,0]
        self.speed = 100
        self.th = th
        self.rt = RepeatedTimer(Ts, self.test_saw_wall) # it auto-starts, no need of rt.start()

    def get_position(self):
        return self.curr_pos
        
    def move_to_target(self, target_pos):
        if target_pos == self.curr_pos[0:2]: return False #if the robot is already at the position or doesn't move
       
        #distance from current position to target
        distance = math.sqrt(((target_pos[0]-self.curr_pos[0])**2)+((target_pos[1]-self.curr_pos[1])**2))
        
        #absolute angle from current position to target (this angle will always be returned between ]-pi;pi])
        path_angle = math.atan2(target_pos[1]-self.curr_pos[1],target_pos[0]-self.curr_pos[0])
        
        #turn angle to get to target relative to Thymio frame
        turn_angle = path_angle - self.curr_pos[2]
        
        print("distance:{}".format(distance))
        print("turn_angle:{}".format(math.degrees(turn_angle)),"\n")
        
        #give commands
        self.turn(turn_angle)
        self.forward(distance)

        #update position and angle of the robot
        self.curr_pos = [target_pos[0],target_pos[1],path_angle]
        
    def turn(self, turn_angle):
        if turn_angle > 0: #turn_angle to the left (180 case will also turn left)
            target_time = (abs(math.degrees(turn_angle)))/38.558 #linear fit model from rad to s for v=100 (change to Kalman)
            left_speed = -self.speed 
            right_speed = self.speed
            
        elif turn_angle < 0: #turn_angle to the right
            target_time = (abs(math.degrees(turn_angle)))/38.558 
            left_speed = self.speed
            right_speed = -self.speed
            
        else: #if turn_angle = 0, do not waste time
            return False
            
        print("target_time_turn:{}".format(target_time))
        t_0 = time.time()
        
        self.move(l_speed=left_speed, r_speed=right_speed)
        
        time.sleep(target_time)
        t_end = time.time()
        print("actual_time_turn:{}".format(t_end-t_0))
        
        #time.sleep(0.1)

    def forward(self, distance): 
        target_time = distance/31.573 #linear fit model from mm to s for v=100 (change to Kalman)
        
        print("target_time_forward:{}".format(target_time))
        t_0 = time.time()
        
        self.move(l_speed=self.speed, r_speed=self.speed)
        
        time.sleep(target_time)
        t_end = time.time()
        print("actual_time_forward:{}".format(t_end-t_0), "\n")
        
        #time.sleep(0.1)
        
    def stop(self):
        move(l_speed=0, r_speed=0)
        
        #time.sleep(0.1)

    def test_saw_wall(self, thread=True, wall_threshold=500, verbose=False):
        
            if any([x>wall_threshold for x in serial['prox.horizontal'][:-2]]):
                if verbose: print("\t\t Saw a wall")
                    if thread: 
                        self.rt.stop() #we stop the thread to not execute test_saw_wall another time
                        # Start following wall of obstacle
                        wall_following(verbose=verbose)
                        self.rt.start(self)
                    else: #we also use test_saw_wall to check if there is STILL a wall (in the wall_folowing function), so we put thread false
                        return True
            return False #to test, not sure we can return smg with the timer, if not, just change the function to return only when thread is false

    def wall_following(self, motor_speed=100, wall_threshold=500, verbose=False):
    """
    Wall following behaviour of the FSM
    param motor_speed: the Thymio's motor speed
    param wall_threshold: threshold starting which it is considered that the sensor saw a wall
    param white_threshold: threshold starting which it is considered that the ground sensor saw white
    param verbose: whether to print status messages or not
    """
    
        if verbose: print("Starting wall following behaviour")
        found_path = False
        
        if verbose: print("\t Moving forward")
        move(l_speed=motor_speed, r_speed=motor_speed)
            
        prev_state="forward"
        
        while not found_path:
            
            if test_saw_wall(thread= False, wall_threshold, verbose=False):
                if prev_state=="forward": 
                    if verbose: print("\tSaw wall, turning clockwise")
                    self.move(l_speed=motor_speed, r_speed=-motor_speed)
                    prev_state="turning"
            
            else:
                if prev_state=="turning": 
                    if verbose: print("\t Moving forward")
                    self.move(l_speed=motor_speed, r_speed=motor_speed)
                    prev_state="forward"

            if test_found_path(verbose): 
                found_path = True

    def move(self, l_speed=100, r_speed=100, verbose=False):
    """
    Sets the motor speeds of the Thymio 
    param l_speed: left motor speed
    param r_speed: right motor speed
    param verbose: whether to print status messages or not
    param th: thymio serial connection handle
    """
        # Printing the speeds if requested
        if verbose: print("\t\t Setting speed : ", l_speed, r_speed)
        
        # Changing negative values to the expected ones with the bitwise complement
        l_speed = l_speed if l_speed>=0 else 2**16+l_speed
        r_speed = r_speed if r_speed>=0 else 2**16+r_speed

        # Setting the motor speeds
        self.th.set_var("motor.left.target", l_speed)
        self.th.set_var("motor.right.target", r_speed)

    def test_found_path(self, verbose=False):
    """
    Test if the robot has returned to its original planned path
    Parameters
    ----------
    verbose : TYPE, optional
        DESCRIPTION. The default is False.

    Returns
    -------
    None.

    """
    
    return False

    def path(self, goal):
        # Define the start and end goal
        start = (0,0)

        resize_factor = 6 # Resize occupancy grid
        occupancy_grid = A_star.get_map('map.jpg',resize_factor)
        max_x, max_y = occupancy_grid.shape # Size of the map
        max_val = [max_x,max_y]


        # List of all coordinates in the grid
        x,y = np.mgrid[0:max_x:1, 0:max_y:1]
        pos = np.empty(x.shape + (2,))
        pos[:, :, 0] = x; pos[:, :, 1] = y
        pos = np.reshape(pos, (x.shape[0]*x.shape[1], 2))
        coords = list([(int(x[0]), int(x[1])) for x in pos])

        # Define the heuristic, here = distance to goal ignoring obstacles
        h = np.linalg.norm(pos - goal, axis=-1)
        h = dict(zip(coords, h))

        # Run the A* algorithm
        path, visitedNodes = A_star.A_Star(start, goal, h, coords, occupancy_grid, max_val, movement_type="8N")
        path = np.array(path).reshape(-1, 2).transpose()
        visitedNodes = np.array(visitedNodes).reshape(-1, 2).transpose()

        return path #joachim: should return a list of coordinates?

    def display_map(self):
        # Displaying the map
        cmap = colors.ListedColormap(['white', 'red']) # Select the colors with which to display obstacles and free cells
        fig_astar, ax_astar = A_star.create_empty_plot(max_val)
        ax_astar.imshow(occupancy_grid, cmap=cmap)
        plt.title("Map : free cells in white, occupied cells in red") #joachim: I erased the ; 

        # Plot the best path found and the list of visited nodes
        ax_astar.scatter(visitedNodes[0], visitedNodes[1], marker="o", color = 'orange')
        ax_astar.plot(path[0], path[1], marker="o", color = 'blue')
        ax_astar.scatter(start[0], start[1], marker="o", color = 'green', s=200)
        ax_astar.scatter(goal[0], goal[1], marker="o", color = 'purple', s=200)

In [14]:
th = Thymio.serial(port="/dev/ttyACM3", refreshing_rate=0.1) #/dev/ttyACM0 for linux #/dev/cu.usbmodem141401 for mac
th_m = robot(th)

time.sleep(3) # To make sure the Thymio has had time to connect

variables = th.variable_description()
for var in variables : #If this does not print, reconnect USB!
    print(var)

{'name': '_id', 'offset': 0, 'size': 1}
{'name': 'event.source', 'offset': 1, 'size': 1}
{'name': 'event.args', 'offset': 2, 'size': 32}
{'name': '_fwversion', 'offset': 34, 'size': 2}
{'name': '_productId', 'offset': 36, 'size': 1}
{'name': 'buttons._raw', 'offset': 37, 'size': 5}
{'name': 'button.backward', 'offset': 42, 'size': 1}
{'name': 'button.left', 'offset': 43, 'size': 1}
{'name': 'button.center', 'offset': 44, 'size': 1}
{'name': 'button.forward', 'offset': 45, 'size': 1}
{'name': 'button.right', 'offset': 46, 'size': 1}
{'name': 'buttons._mean', 'offset': 47, 'size': 5}
{'name': 'buttons._noise', 'offset': 52, 'size': 5}
{'name': 'prox.horizontal', 'offset': 57, 'size': 7}
{'name': 'prox.comm.rx._payloads', 'offset': 64, 'size': 7}
{'name': 'prox.comm.rx._intensities', 'offset': 71, 'size': 7}
{'name': 'prox.comm.rx', 'offset': 78, 'size': 1}
{'name': 'prox.comm.tx', 'offset': 79, 'size': 1}
{'name': 'prox.ground.ambiant', 'offset': 80, 'size': 2}
{'name': 'prox.ground.refl

In [16]:
my_th = robot(th)

path = th_m.path(goal)

pos_th = my_th.get_position()
print("Thymio position: {}".format(pos_th))
my_th.move_to_target([0,100])

pos_th = my_th.get_position()
print("Thymio position: {}".format(pos_th))
my_th.move_to_target([50,150])

pos_th = my_th.get_position()
print("Thymio position: {}".format(pos_th))
my_th.move_to_target([50,300])

pos_th = my_th.get_position()
print("Thymio position: {}".format(pos_th))
my_th.move_to_target([70,400])

pos_th = my_th.get_position()
print("Thymio position: {}".format(pos_th))
    
my_th.stop()
        

Thymio position: [0, 0, 0]
distance:100.0
turn_angle:90.0 

target_time_turn:2.334145961927486
actual_time_turn:2.335474729537964
target_time_forward:3.1672631678966203
actual_time_forward:3.1702733039855957 

Thymio position: [0, 100, 1.5707963267948966]
distance:70.71067811865476
turn_angle:-45.0 

target_time_turn:1.167072980963743
actual_time_turn:1.1687095165252686
target_time_forward:2.239593263822087
actual_time_forward:2.2427167892456055 

Thymio position: [50, 150, 0.7853981633974483]
distance:150.0
turn_angle:45.0 

target_time_turn:1.167072980963743
actual_time_turn:1.1695530414581299
target_time_forward:4.750894751844931
actual_time_forward:4.75424337387085 

Thymio position: [50, 300, 1.5707963267948966]
distance:101.9803902718557
turn_angle:-11.309932474020208 

target_time_turn:0.2933225912656312
actual_time_turn:0.2952730655670166
target_time_forward:3.229987339557714
actual_time_forward:3.233283042907715 

Thymio position: [70, 400, 1.373400766945016]


In [None]:
#     def measurement_model(self):
#         #distance
#         x_d = np.linspace(0.5, 5, 10)
#         y_d = [16,32.5,46.5,61,79.5,95,109,127.5,143,159.5]
        
#         diff_y_d = np.diff(y_d) # Calculating difference list 
#         mean_d = np.mean(diff_y_d) #15.94444
#         sigma_d = np.std(diff_y_d) #6.877318
#         distance_p0 = [1, mean_d, sigma_d]

#         #angle
#         x_a = np.linspace(0, 9, 10)
#         y_a = [0,42,78,114,150,194,231,270,310,347]

#         diff_y_a = np.diff(y_a) # Calculating difference list 
#         mean_a = np.mean(diff_y_a) 
#         sigma_a = np.std(diff_y_a) 
#         angle_p0 = [1, mean_a, sigma_a]
#         return distance_p0, angle_p0
    
    
    
    
# distance_p0, angle_p0 = th_m.measurement_model()
# print("distance measurement model: {}\n".format(distance_p0))
# print("angle measurement model: {}".format(angle_p0))

