In [1]:
import cv2
from cv2 import aruco
from matplotlib import pyplot as plt
import math
from math import atan2, cos, sin, sqrt, pi
import numpy as np

In [2]:
def convertWheelSpeeds(speed_l,speed_r,speed_factor=3,axle_track=94):
    '''
    Function that takes the raw values of each motor angular speed and converts it into
    translation speed (in mm/s) and rotation speed (rad/sec) with a pixel scaling factor.

    Inputs: - speed_l: speed of left motor (no units, from Thymio measurements)
            - speed_r: speed of right motor (no units, from Thymio measurements)
            - speed_factor: 3 (motor_target of 200 sent to Thymio corresponds to a wheel speed of 66.8 mm/s)
            - axle_track: 94mm (distance between both wheels)

    Output: - v  : The current forward velocity of the Thymio (in mm/s)
            - omega : The current angular velocity of the Thymio (in rad/s)(positive direction corresponding to that of the camera frame)
    '''
    # wheel_radius : 22mm
    # axle track : 94mm (distance between the 2 wheels)
    v = 0.5*(speed_l + speed_r) / speed_factor
    omega = (speed_l - speed_r) / (0.5*axle_track*speed_factor)
    
    return v, omega
    

In [3]:
def px2real(x_px, scalingFactor):
    '''
    Function that converts measurement in pixels to measurement in mm.

    Inputs: - x_px : value in pixels
            - scalingFactor : for image size (1920x1080) corresponding to real length and width (800mm x 450mm), the scalingFactor is 2.4 

    Output: - x_real : value in mm
            
    '''
    x_real = x_px / 2.4
    return x_real
    
    

In [4]:
def real2px(x_real, scalingFactor):
    '''
    Function that converts measurement in mm to measurement in pixels.

    Inputs: - x_real : value in mm
            - scalingFactor : for image size (1920x1080) corresponding to real length and width (800mm x 450mm), the scalingFactor is 2.4 

    Output: - x_px : value in pixels
            
    '''
    x_px = x_real * 2.4
    return x_px
    

In [5]:
'''
Extended Kalman Filter class

Code modified from https://github.com/L42Project/Tutoriels/blob/master/Divers/tutoriel36/KalmanFilter.py
Comments describing Kalman Filter taken from https://stackoverflow.com/questions/74318200/how-to-tune-extended-kalman-filter-on-pykalman

Example of instance: KF=KalmanFilter(0.03333, [0, 0, 0, 0, 0]) (dt = 0.033 is the time between each measurement, x_ini = [0, 0, 0, 0, 0] is the initial state)

For update() function : takes as input the measurement vector z
z[0:2] are the measurements from the camera (px, py, orientation)
z[3:4] are the measurements from the Thymio (forward speed v and angular speed omega)

When Thymio position not found by camera -> z becomes 2x1 vector, with z[0:1] being the forward speed v and angular speed omega
'''


class KalmanFilter():
    def __init__(self, dt, x_ini):
        self.dt = dt

        # Initial State Vector
        self.x=np.matrix([[x_ini[0]], [x_ini[1]], [x_ini[2]], [x_ini[3]], [x_ini[4]]])
    

        # Initialize State Transition Matrix (Warning : changes with time -> updateFk())
        self.Fk=np.eye(5)

        # Initialize Measurement matrix H
        # Used to convert the predicted state estimate into predicted sensor measurements at time k.
        # In this case, H will be the identity matrix since the estimated state maps directly to state measurements data
        # H has the same number of rows as sensor measurements and same number of columns as states.
        self.H=np.eye(5)

        
        # State model noise covariance matrix Q
        # When Q is large, the Kalman Filter tracks large changes in
        # the sensor measurements more closely than for smaller Q.
        # Q is a square matrix that has the same number of rows as states.
        self.Q=np.diag([1, 0.1, 0.01, 1.0, 1.0]) 
        

        # Sensor measurement noise covariance matrix R
        # Has the same number of rows and columns as sensor measurements.
        # If we are sure about the measurements, R will be near zero.
        self.coord_var = 0.01
        self.angle_var = 0.0001
        self.speed_var = 6.0
        self.omega_var = 0
        
        self.R=np.diag([self.coord_var, self.coord_var, self.angle_var, self.speed_var, self.omega_var])

        self.P=np.eye(5)
        
    def updateFk(self):
        '''
        Function that updates the state transition matrix Fk
        '''
        
        self.Fk=np.matrix([[1.0, 0, 0, self.dt*math.cos(self.x[2]), 0],
                           [0, 1.0,  0, self.dt*math.sin(self.x[2]), 0],
                           [0, 0, 1.0, 0, self.dt],
                           [0, 0, 0, 1.0, 0],
                           [0, 0, 0, 0, 1.0]])
        self.Fk = self.Fk.astype(float)

    def predict(self):
        # Update Fk
        self.updateFk()
        # Predict the state estimate (A Priori) at time k based on the state estimate at time k-1
        self.x=np.dot(self.Fk, self.x)
        # Predict the state covariance estimate based on the previous covariance and some noise
        self.P=np.dot(np.dot(self.Fk, self.P), self.Fk.T)+self.Q
        return self.x

    def update(self, z, CameraAccessible = True): # z[0:2] corrpesonds to measurement of camera, z[3:4] corresponds to measurements of wheels
        
        if CameraAccessible:
            self.H=np.eye(5)
            
            self.R=np.diag([self.coord_var, self.coord_var, self.angle_var, self.speed_var, self.omega_var])
        else: # CAUTION : IF CAMERAACCESSIBLE = FALSE, Z IS A 2x1 VECTOR (z[0:1] are now measrurements from wheels)
            # Measurement matrix H (now only v and omega can be observed)
            self.H=np.matrix([[0, 0, 0, 1, 0],
                              [0, 0, 0, 0, 1]])
            
            self.R=np.diag([self.speed_var, self.omega_var])
            
            
        # Compute Kalman gain
        S=np.dot(self.H, np.dot(self.P, self.H.T))+self.R
        inv_S = np.linalg.pinv(S.astype(float))
        K=np.dot(np.dot(self.P, self.H.T),inv_S)

        # Correction / innovation
        # Calculate an updated state estimate (A Posteriori) for time k
        self.x=self.x+np.dot(K, (z-np.dot(self.H, self.x)))
        # Update the state covariance estimate for time k
        I=np.eye(self.H.shape[1])
        self.P=(I-(K*self.H))*self.P


        return self.x

In [6]:
def dist(pt1, pt2):
    distance = ( (pt1[0]-pt2[0])**2 + (pt1[1]-pt2[1])**2 )**0.5
    return distance

In [7]:
def get_thymio_position(img, img_output):
    
    '''
    Function that outputs Thymio position, radius and orientation from camera frames using Aruco marker detection
    Parts of code taken from https://mecaruco2.readthedocs.io/en/latest/notebooks_rst/Aruco/aruco_basics.html

    Inputs: - img : image from camera in BGR format
            - img_output : copy of image img on which will be drawn Thymio's contours

    Output: - center : tuple (x,y) representing the center of the Thymio (in pixels)
            - radius : radius of the Thymio (in pixels)
            - angle : orientation of the Thymio (in radians)
            - thymio_detected : boolean set to True if Thymio was detected, False if not
    '''
    
    
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    aruco_dict = aruco.Dictionary_get(cv2.aruco.DICT_4X4_50)
    parameters =  aruco.DetectorParameters_create()
    corners, ids, rejectedImgPoints = aruco.detectMarkers(gray, aruco_dict, parameters=parameters)
    frame_markers = aruco.drawDetectedMarkers(img_output, corners, ids)
    
    
    thymio_detected = False
    center = (0,0)
    radius = 0
    angle = 0
    
    if np.all(ids != None):
        for i in range(len(ids)):
            if ids[i] == 0:
                thymio_detected = True
                c = corners[i][0]

                top_center = (c[0:2, 0].mean(),c[0:2, 1].mean())
                bottom_center = (c[2:4, 0].mean(), c[2:4, 1].mean())
                left_center = (c[0:4:3, 0].mean(), c[0:4:3, 1].mean())
                right_center = (c[1:3, 0].mean(), c[1:3, 1].mean())
                points = np.array([ [c[2,0],c[2,1]], [c[3,0],c[3,1]], [left_center[0],left_center[1]], [right_center[0],right_center[1]] ])

                square_center = (int(c[:, 0].mean()), int(c[:, 1].mean()))
                radius = int(max( dist(square_center,c[0]), dist(square_center,c[1]), dist(square_center,c[2]), dist(square_center,c[3])))
                angle = math.atan2(top_center[1]-bottom_center[1],top_center[0]-bottom_center[0])
                
                
                center = (int(points[:, 0].mean()), int(points[:, 1].mean()))
                cv2.arrowedLine(img_output, (center[0],center[1]), (int(top_center[0]),int(top_center[1])), (255,255,0), 10)
                cv2.circle(img_output,square_center,radius,(255,0,0),10)

                break

        
    return center, radius, angle, thymio_detected

In [8]:
'''
Testing Kalman Filter
'''

global center, radius, angle, thymio_detected

forward_speed = 0
angular_speed = 0

KF=KalmanFilter(0.033, [500, 300, math.pi/4, forward_speed, angular_speed])



### INITIALIZE CAMERA
cap = cv2.VideoCapture(1)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
fps = int(cap.get(5))
print("fps:", fps)

success, img = cap.read()
imgResult = img.copy()
    
thymioLoc, thymioRadius, thymioAngle, thymioDetected = get_thymio_position(img, imgResult)


path = [[thymioLoc[0],thymioLoc[1]],[thymioLoc[0]+300,thymioLoc[1]+300],[thymioLoc[0]+500,thymioLoc[1]-300]]

### MAIN LOOP
count = 0

# Check if the webcam is opened correctly
if not cap.isOpened():
    raise IOError("Cannot open webcam")

    
# running infinite while loop so that
# program keep running until we close it
while True:
    success, img = cap.read()
    imgResult = img.copy()
    
    thymioLoc, thymioRadius, thymioAngle, thymioDetected = get_thymio_position(img, imgResult)
    
    # KALMAN -------------------------------------------------------------------
    KF.predict()
    if thymioDetected:
        z = np.array([thymioLoc[0],thymioLoc[1],thymioAngle, forward_speed, angular_speed])
        z = np.expand_dims(z, axis=-1)
    else:
        z = np.array([forward_speed, angular_speed]) 
        z = np.expand_dims(z, axis=-1)
    KF.update(z,thymioDetected)
    cv2.circle(imgResult, (int(KF.x[0]), int(KF.x[1])), 10, (255,0,255), 10)
    # KALMAN ----------------------------------------------------------------------
    count = count + 1
    #print(count)
    #print('Thymio center:',thymioLoc, '  Angle:',thymioAngle, '  Thymio Detected:' ,thymioDetected)
    #print('Kalman center:',KF.x[0:2],'  Kalman angle:',KF.x[2], 'Kalman speed and omega', KF.x[3:5])
    
    #Path tracking ----------------------------------------------------------------
    x = KF.x[0]
    y = KF.x[1]
    angle = KF.x[2]

    # displaying output on Screen
    imgResult = cv2.line(imgResult,tuple(path[0]),tuple(path[1]),(255,255,255),1)
    imgResult = cv2.line(imgResult,tuple(path[1]),tuple(path[2]),(255,255,255),1)
    cv2.imshow("Result", imgResult)
    
    c = cv2.waitKey(1)
    if c == 27:
        break

fps: 30


KeyboardInterrupt: 