In [2]:
#https://www.youtube.com/watch?v=8ZZDNd4eyVI&t=1s&ab_channel=Mr.PSolver
import numpy as np
import sympy as smp
import pickle
import neat
from math import sqrt
import os
filename = "variables.pickle"

Symbols Init

In [3]:
g = smp.symbols('g')
M1, M2 = smp.symbols('M1, M2')
L1, L2 = smp.symbols('L1, L2')
T1, T2 = smp.symbols('T1, T2')
Fx, Fy = smp.symbols('Fx, Fy')


KP, KD, desiredX, desiredY, desiredXdott, desiredYdott, currentX, currentY = smp.symbols('KP, KD, desiredX, desiredY, desiredXdott, desiredYdott, currentX, currentY')

th1, th2 = smp.symbols('th1, th2')
th1d, th2d = smp.symbols('th1d, th2d')
th1dd, th2dd = smp.symbols('th1dd, th2dd')

Dynamic Model

In [3]:

I1 = 1/12*L1*L1*M1
I2 = 1/12*L2*L2*M2

d1 = L1/2
d2 = L2/2

Fd = smp.Matrix([Fx, Fy]).T;

P1 = smp.Matrix([ - L2*smp.cos(th1 + th2) - L1*smp.cos(th1) ,
                   - L2*smp.sin(th1 + th2) - L1*smp.sin(th1) ])

P2 = smp.Matrix([ - L2*smp.cos(th1 + th2) ,
                   - L2*smp.sin(th1 + th2) ])

M = smp.Matrix([ [I2 + I1 + 2*M2*d2*L1*smp.cos(th2) + M2*L1*L1 + M2*d2*d2 + M1*d1*d1  , I2 + d2*d2 * M2 + M2*d2*L1*smp.cos(th2)] ,
                  [I2 + d2*M2*L1*smp.cos(th2) + M2*d2*d2                             , I2 + d2*d2 * M2                      ]])

B = smp.Matrix([ - d2*M2*L1*th2d*th2d*smp.sin(th2) - 2*M2*d2*L1*th1d*th2d*smp.sin(th2),
                    d2*M2*L1*smp.sin(th2)*th1d*th1d])

G = smp.Matrix([ - g*M2*d2*smp.sin(th1 + th2) - (M2*L1 + d1*M1)*g*smp.sin(th1) ,
                  - d2*g*M2*smp.sin(th1 + th2)                              ])

F = smp.Matrix([ Fd*P1 ,
      Fd*P2 ])

T = smp.Matrix([ T1 ,
                  T2 ])

thedd = M.inv() * ( - B - G + T + F)


Gravity Compensation

In [4]:
Tformulas = smp.solve([thedd[0], thedd[1]], (T1, T2), simplify=True, rational=True)
Tformulas = [Tformulas[T1], Tformulas[T2]]

Impedance Control

In [5]:
J = smp.Matrix([[- L2*smp.cos(th1 + th2) - L1*smp.cos(th1), -L2*smp.cos(th1 + th2)],
                [- L2*smp.sin(th1 + th2) - L1*smp.sin(th1), -L2*smp.sin(th1 + th2)],
                [                                0,                  0]])


desiredPosition = smp.Matrix([desiredX, desiredY, 0 ])
desiredVelocity = smp.Matrix([desiredXdott, desiredYdott, 0])

thed = smp.Matrix([th1d, th2d])

currentPosition = smp.Matrix([currentX,  currentY, 0])

impedanceControl = J.T*(KP*(desiredPosition - currentPosition) + KD*(desiredVelocity - J*thed))

impedanceControl1, impedanceControl2 = impedanceControl 

Save/Pickle variables

In [None]:
stuff = {
    "thdotdot1": thedd[0],
    "thdotdot2": thedd[1],
    "impedanceControl1": impedanceControl1,
    "impedanceControl2": impedanceControl2,
    "gravityCompensation1": Tformulas[0],
    "gravityCompensation2": Tformulas[1]
}

with open(filename, 'wb') as f:
    pickle.dump(stuff, f)

Load/Unpickle variables

In [4]:
with open(filename, 'rb') as f:
    stuff = pickle.load(f)
    
thedd = [ stuff['thdotdot1'], stuff['thdotdot2'] ]
impedanceControl1 = stuff['impedanceControl1']
impedanceControl2 = stuff['impedanceControl2']
Tformulas = [stuff['gravityCompensation1'], stuff['gravityCompensation2']]


Turn formulas into functions

In [5]:
thdotdot1_f = smp.lambdify( (Fx, Fy, T1, T2, g, M1, M2, L1, L2, th1, th2, th1d, th2d) , thedd[0])
thdotdot2_f = smp.lambdify( (Fx, Fy, T1, T2, g, M1, M2, L1, L2, th1, th2, th1d, th2d) , thedd[1])

gravityCompensation1_f = smp.lambdify((Fx, Fy, g, M1, M2, L1, L2, th1, th2, th1d, th2d), Tformulas[0])
gravityCompensation2_f = smp.lambdify((Fx, Fy, g, M1, M2, L1, L2, th1, th2, th1d, th2d), Tformulas[1])

impedanceControl1_f = smp.lambdify((KP, KD, desiredX, desiredY, desiredXdott, desiredYdott, currentX, currentY, L1, L2, th1, th2, th1d, th2d), impedanceControl1)
impedanceControl2_f = smp.lambdify((KP, KD, desiredX, desiredY, desiredXdott, desiredYdott, currentX, currentY, L1, L2, th1, th2, th1d, th2d), impedanceControl2)

Constants

In [6]:
m1 = 1
m2 = 1
l1 = 1
l2 = 1
g_ = 9.8
Kp = 0 #10
Kd = 0 #8
ogKf = 2 # gain of applied disturbance force
Kf = 0

Forward Kinematics

In [7]:
def forwardKin(the1, the2):
    x = - l2*np.sin(the1 + the2) - l1*np.sin(the1)
    y = l2*np.cos(the1 + the2) + l1*np.cos(the1)
    return x, y

def forwardElbowKin(the1):
    x = -l1*np.sin(the1)
    y = l1*np.cos(the1)
    return x, y

def center_me(x, y): # we have casual coordinates but we want zero to be the middle of screen with a range of -4 to 4 just bcz it works with our other values
    x = x*2/100 - 4
    y = - y*2/100 + 4
    return x, y

Physics Init

In [8]:
initial_theta1 = np.pi*1/4
initial_theta1dot = 0
initial_theta2 = np.pi*1/4
initial_theta2dot = 0

initial_desiredX = 0
initial_desiredY = 0
initial_desiredXdott = 0
initial_desiredYdott = 0
posFx_ = 0
posFy_ = 0

dt = 0.04

class Arm:
    def __init__(self, m1, m2, l1, l2, g, theta1, theta2, theta1dot, theta2dot, Kp, Kd, initial_desiredX, initial_desiredY, initial_desiredXdott, initial_desiredYdott):
        self.m1 = m1
        self.m2 = m2
        self.l1 = l1
        self.l2 = l2
        self.g = g
        self.theta1 = theta1
        self.theta2 = theta2
        self.theta1dot = theta1dot
        self.theta2dot = theta2dot
        self.Kp = Kp
        self.Kd = Kd
        self.desiredX = initial_desiredX
        self.desiredY = initial_desiredY
        self.desiredXdott = initial_desiredXdott
        self.desiredYdott = initial_desiredYdott
        
        self.initial_theta1 = theta1
        self.initial_theta2 = theta2
        self.initial_theta1dot = theta1dot
        self.initial_theta2dot = theta2dot
        self.success_printed = False
        self.previous_error = 0
        
        # some forward kinematics that'll be needed in reporting the current error and for drawing later on
        self.forwardKin()
        
    def forwardKin(self):
        # some forward kinematics that'll be needed in reporting the current error and for drawing later on
        xElbow, yElbow = forwardElbowKin(self.theta1)
        xEndEffector, yEndEffector = forwardKin(self.theta1, self.theta2)

        # this is just to center and scale the arm in frame instead of having its fixed point at 0, 0
        elbow = (200 + xElbow*100, 200 - yElbow*100)
        end_effector = (200 + xEndEffector*100, 200 - yEndEffector*100)

        # turn its values into integers because u can't draw with floats in opencv
        self.elbow = (int(elbow[0]), int(elbow[1]))
        self.end_effector = (int(end_effector[0]), int(end_effector[1]))
        
    def reset(self):
        self.theta1 = self.initial_theta1
        self.theta2 = self.initial_theta2
        self.theta1dot = self.initial_theta1dot
        self.theta2dot = self.initial_theta2dot
        
    def getError(self):
        return sqrt( (( (self.desiredX - self.end_effector[0]) / 100 ) **2) + (( (self.desiredY - self.end_effector[1]) / 100 )**2) )
    
    def getSpeeds(self):
        return self.theta1dot, self.theta2dot
        
    def updateGains(self, Kp, Kd):
        self.Kp = Kp
        self.Kd = Kd
        
    def getGains(self):
        return self.Kp, self.Kd
        
    def updateTheta(self):

        currentX, currentY = center_me(self.end_effector[0], self.end_effector[1])
        T1_, T2_ = self.impedenceControl(currentX, currentY)
        
        tempFx_, tempFy_ = center_me(posFx_, posFy_)
        
        Fx_ = Kf * (tempFx_ - currentX)
        Fy_ = Kf * (tempFy_ - currentY)
        
        theta1dotdot = thdotdot1_f(Fx_, Fy_, T1_, T2_, self.g, self.m1, self.m2, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)
        theta2dotdot = thdotdot2_f(Fx_, Fy_, T1_, T2_, self.g, self.m1, self.m2, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)

        self.theta1dot = theta1dotdot*dt + self.theta1dot
        self.theta2dot = theta2dotdot*dt + self.theta2dot

        self.theta1 = self.theta1dot*dt + self.theta1
        self.theta2 = self.theta2dot*dt + self.theta2

        self.forwardKin()
        
    def updateDesired(self, desiredX, desiredY, desiredXdott, desiredYdott):
        self.desiredX = desiredX
        self.desiredY = desiredY
        self.desiredXdott = desiredXdott
        self.desiredYdott = desiredYdott
        
    def impedenceControl(self, currentX, currentY):

        tempDesiredX, tempDesiredY = center_me(self.desiredX, self.desiredY)
        
        impedanceControl1 = impedanceControl1_f(self.Kp, self.Kd, tempDesiredX, tempDesiredY, self.desiredXdott, self.desiredYdott, currentX, currentY, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)
        impedanceControl2 = impedanceControl2_f(self.Kp, self.Kd, tempDesiredX, tempDesiredY, self.desiredXdott, self.desiredYdott, currentX, currentY, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)
        
        # gravity compensation
        gravityCompensation1 = gravityCompensation1_f(0, 0, self.g, self.m1, self.m2, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)
        gravityCompensation2 = gravityCompensation2_f(0, 0, self.g, self.m1, self.m2, self.l1, self.l2, self.theta1, self.theta2, self.theta1dot, self.theta2dot)
        T1_ = impedanceControl1 + gravityCompensation1
        T2_ = impedanceControl2 + gravityCompensation2

        return T1_, T2_
    
    
    def draw(self, window):

        # draw the arm
        window = cv2.line(window, (200, 200), self.elbow, (255, 0, 0), 6)
        window = cv2.line(window, self.elbow, self.end_effector, (255, 0, 0), 6)

        # draw the desired point
        window = cv2.circle(window, (self.desiredX, self.desiredY), 5, (255, 255, 255), 2)   # we have casual coordinates but we want zero to be the middle of screen with a range of -4 to 4 just bcz it works with our other values
        return window

Init OpenCV

In [9]:
import cv2
from IPython.display import clear_output
from time import time

def millis():
    return int(time()*1000)

width = 400
height = 400
last_desiredPoint_update = 0
desiredX = 250
desiredY = 250

desiredUpdating = False
forceBeingIntroduced = False
def forceIntroduced(event, x, y, flags, param):
    global desiredX, desiredY, last_desiredPoint_update, posFx_, posFy_, Kf, desiredUpdating, forceBeingIntroduced

    if event==cv2.EVENT_LBUTTONDOWN:
        desiredUpdating = True
        last_desiredPoint_update = millis()
        desiredX = x
        desiredY = y
        

    if event==cv2.EVENT_LBUTTONUP:
        desiredUpdating = False
            

    if event==cv2.EVENT_RBUTTONDOWN:
        forceBeingIntroduced = True
        posFx_ = x
        posFy_ = y
        Kf = ogKf

    if event==cv2.EVENT_RBUTTONUP:
        forceBeingIntroduced = False
        Kf = 0
        
    if event==cv2.EVENT_MOUSEMOVE:
        if desiredUpdating:
            last_desiredPoint_update = millis()
            desiredX = x
            desiredY = y
            
        if forceBeingIntroduced:
            forceBeingIntroduced = True
            posFx_ = x
            posFy_ = y


Ai (NEAT. Neuro-Evolution of Augmented Topologies)

In [10]:
from threading import Thread
GEN = 0
window = np.zeros((height, width, 3), dtype=np.uint8)
stopped = False

cv2.destroyAllWindows()

def animation_thread():
    global window, stopped
    
    cv2.namedWindow('output', cv2.WINDOW_AUTOSIZE)
    # allow the user to edit the desired point real time
    cv2.setMouseCallback('output', forceIntroduced)
    
    while True:
        # debugging: display all genoms to the screen
        cv2.imshow('output', window)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            stopped = True
            break
    
            
    cv2.destroyAllWindows()
    

def fitness_function(genomes_, config):
    if stopped:
        return
    
    global GEN, window
    GEN += 1
    last_desiredPoint_update = millis()
    #clear_output(wait=True)
    #print("Generation", GEN)
    
    neural_networks = []
    genomes = []
    arms = []

    for _, ge in genomes_:
        net = neat.nn.FeedForwardNetwork.create(ge, config)
        neural_networks.append(net)
        
        genom = Arm(m1, m2, l1, l2, g_, initial_theta1, initial_theta2, initial_theta1dot, initial_theta2dot, Kp, Kd, initial_desiredX, initial_desiredY, initial_desiredXdott, initial_desiredYdott)
        arms.append(genom)
        ge.fitness = 0 # set initial fitness to 0
        genomes.append(ge)
    
    while True:
        if len(arms)==0:
            break
                
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
            
        window = np.zeros((height, width, 3), dtype=np.uint8)
        errors_to_print = ""
        gains_to_print = ""
        for i, arm in enumerate(arms):
            
            try:

                # update desired state
                arm.updateDesired(desiredX, desiredY, initial_desiredXdott, initial_desiredYdott)

                # physics simulation
                arm.updateTheta()


                # obtain the error
                error = arm.getError()

                #print(error - arm.previous_error)
                if error - arm.previous_error > 0.1:
                    genomes[i].fitness += 10
                elif arm.previous_error - error > 0.1:
                    genomes[i].fitness -= 10


                arm.previous_error = error

                # obtain the angular velocities to use in the policy function
                theta1dot, theta2dot = arm.getSpeeds()


                # the Ai will predict a value for Kp and Kd
                outputs = neural_networks[i].activate((error, 0))


                raise_or_lower_Kp = 0
                raise_or_lower_Kd = 0
                if outputs[0] < -0.25:
                    raise_or_lower_Kp = -0.1
                elif outputs[0] > 0.25:
                    raise_or_lower_Kp = 0.1

                if outputs[1] < -0.25:
                    raise_or_lower_Kd = -0.1
                elif outputs[1] > 0.25:
                    raise_or_lower_Kd = 0.1

                previous_Kp, previous_Kd = arm.getGains()
                new_Kp  = previous_Kp + raise_or_lower_Kp
                new_Kd  = previous_Kd + raise_or_lower_Kd

                if new_Kp<0:
                    new_Kp = 0

                if new_Kd<0:
                    new_Kd = 0

                if new_Kp>30:
                    new_Kp = 30

                if new_Kd>30:
                    new_Kd = 30

                arm.updateGains(new_Kp, new_Kd)
                
                # debugging
                gains_to_print += str(arm.getGains()) + " "
                errors_to_print += str(error) + " "

                #clear_output(wait=True)
                #print("GEN", GEN, "Predictions", outputs, "Gains", arm.getGains(), new_Kp, new_Kd)

                # debugging: display this genom to the screen
                window = arm.draw(window)

                #print(error, timed)
                timed = (millis() - last_desiredPoint_update ) / 1000
                if timed>5: # if it's been over 5 seconds and it did not converge, then kill it
                    #print("error", error)
                    if abs(error) >= 0.03:
                        #print("time out")
                        if len(genomes)==1:
                            print("Prediction", arm.getGains())
                        genomes[i].fitness -= 100
                        arms.pop(i) # issue removing birds while looping, will be fixed
                        neural_networks.pop(i)
                        genomes.pop(i)

                        #print("Prediction", arm.getGains())
                    else:
                        if not arm.success_printed:
                            genomes[i].fitness += 100
                            arm.success_printed = True
                            print("Prediction", arm.getGains())
            except ValueError as e:
                #arm.reset()
                # if it went too crazy till NaN, consider it dead
                
                #if len(genomes)==1:
                #    print("Prediction", arm.getGains())
                genomes[i].fitness -= 100
                arms.pop(i) # issue removing birds while looping, will be fixed
                neural_networks.pop(i)
                genomes.pop(i)
            except Exception as e2:
                print(e2)
                print("Error")
        
        clear_output(wait=True)
        print("Error", errors_to_print)
        print("Gains", gains_to_print)
                

def run(config_path):
    thread = Thread(target=animation_thread)
    thread.start()
    
    config = neat.config.Config(neat.DefaultGenome, neat.DefaultReproduction, neat.DefaultSpeciesSet, neat.DefaultStagnation, config_path)

    p = neat.Population(config)

    p.add_reporter(neat.StdOutReporter(True))
    stats = neat.StatisticsReporter()
    p.add_reporter(stats)

    winner = p.run(fitness_function ,200) # how many generations
    print("finished", winner)
     

config_path = os.path.join("impedenceConfig.txt")
run(config_path)

Error 0.0 
Gains (30, 7.899999999999988) 


KeyboardInterrupt: 

Manual

In [None]:
cv2.destroyAllWindows()

m1 = 1
m2 = 1
l1 = 1
l2 = 1
g_ = 9.8
Kp = 30 #10
Kd = 7.98 #8
ogKf = 30 # gain of applied disturbance force
Kf = 0
dt = 0.02

    
cv2.namedWindow('output', cv2.WINDOW_AUTOSIZE)
# allow the user to edit the desired point real time
cv2.setMouseCallback('output', forceIntroduced)

arm = Arm(m1, m2, l1, l2, g_, initial_theta1, initial_theta2, initial_theta1dot, initial_theta2dot, Kp, Kd, initial_desiredX, initial_desiredY, initial_desiredXdott, initial_desiredYdott)

while True:

    window = np.zeros((height, width, 3), dtype=np.uint8)

    try:
        clear_output(wait=True)
        # update desired state
        arm.updateDesired(desiredX, desiredY, initial_desiredXdott, initial_desiredYdott)

        # physics simulation
        arm.updateTheta()

        # obtain the error
        error = arm.getError()

        print("error", error)
        arm.previous_error = error

        # obtain the angular velocities to use in the policy function
        theta1dot, theta2dot = arm.getSpeeds()

        # debugging: display this genom to the screen
        window = arm.draw(window)

        cv2.imshow('output', window)

        #cv2.waitKey(1)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break


    except ValueError as e:
    #    genom.reset()
        print(e)
    except Exception as e2:
        print(e2)
        print("Error")


cv2.destroyAllWindows()