In [12]:
!python -m pip install -q pyserial

from MathTools import get_rotate_offset_position
from MathTools import RotateAxis
from MathTools import normalize_coordinates

from DDPGAgent import DDPGAgent

import numpy as np
import warnings
import serial, struct, time
warnings.filterwarnings('ignore')

You should consider upgrading via the 'C:\Users\vika9\AppData\Local\Programs\Python\Python38\python.exe -m pip install --upgrade pip' command.


## Classes

In [13]:
class Servo_lx16a:
    
    def __init__(self, ID, Port, Interlock = False):
        self.ID = ID # unique servo ID
        self.Port = Port # COM port features for later use 
        self.Interlock = Interlock # While Interlock = True, Servo will not move
        
    @staticmethod   
    def checksum(msg):
        #Checksum=~(ID+Length+Cmd+Prm1+...+PrmN)
        #If the numbers in the brackets exceeds 255,
        #Take the lowest one byte, "~" means Negation.
        s = sum(msg)
        s = ~s&255
        chksum = s.to_bytes(1, byteorder ='little')
        return chksum
    
    @staticmethod  
    def display_msg(msg):
        #dispalys bytes in python console not as ASCII symbols but as bytes
        print(''.join(r'\x'+hex(letter)[2:] for letter in msg))
      
    @staticmethod     
    def time(current_position, target_position):
        #currently not used
        #range of time for Servo movement is 0~30000ms
        #3000ms for 1000 steps
        #y ms for N steps
        #y = N*3000/1000
        return abs(target_position-current_position)*3000/1000
    
    @staticmethod
    def to_angle(position):
        #returns Servo angle from Servo position
        #Range of position is 0..1000 which is equal to 0..240 deg
        #240 degree equals to 1000 
        #x degree equals to N
        #x= (N*240/1000)
        return position*240/1000
    
    @staticmethod
    def to_position(angle):
        #return position from angle
        return int(angle*1000/240)
    
    Header = b'\x55\x55' #every message transfered and recieved starts with this Header
    
    def ReadPosition(self):
        #Reads current position of Servo 
        temp = self.ID+b'\x03'+b'\x1c' 
        msg = self.Header + temp + self.checksum(temp) #message to read Servo position
        print(f'Reading position of servo {int.from_bytes(self.ID, "little")}, sent:')
        self.display_msg(msg)

        wait = True
        data_in = b'' # receive answer in data_in while wait True
        ser.write(msg) #send message to COM
        while wait:
            while ser.in_waiting:
                data_in += ser.readline()
                print('Received: ')
                self.display_msg(data_in)
                if len(data_in)>=8: wait = False
        received_chksum = self.checksum(data_in[-6:-1]) 
        if data_in.startswith(self.Header): #check that received data_in startswith Header and that checksum is correct
            print('Message Header correct')
            if  (received_chksum == data_in[-1].to_bytes(1, 'big')):
                print('Checksum correct')
                position = struct.unpack('<h', data_in[-3:-1])[0]
                angle = self.to_angle(position)
                print(f'Current position {position}, {angle}')
                print('---------------------------------------')
                return angle
            else:
                print('Bad checksum')
        else:
            print('Bad message header')



    def MoveServo(self, target_angle):
        #Moves servo to traget angle
        if self.Interlock:
            print('Servo is Interlocked') #if Servo is interlocked, it will not move
        else:
            target_position = self.to_position(target_angle) 
            temp = self.ID+b'\x07'+b'\x01'+struct.pack('<h', target_position)+struct.pack('<h', 1500) #set time temporary to 1500 ms for any movement
            msg = self.Header+temp+self.checksum(temp) # message to move servo to desired position
            print(f'Move servo {int.from_bytes(self.ID, "little")} to {target_position}, sent:')
            self.display_msg(msg)
            self.Interlock = True #Interlock servo
            ser.write(msg) #sending message to COM
            #This while cycle switches Interlock to False, when Servo reaches target position
            while self.Interlock:
                current_angle = self.ReadPosition()
                angle_diff = abs(current_angle-target_angle)
                print('Angle_difference:', angle_diff)
                print('---------------------------------------')
                if angle_diff<0.8:
                    self.Interlock=False
                    print('Reached target angle +- 1 deg')
                    print('---------------------------------------')

In [14]:
test_result = get_rotate_offset_position(RotateAxis.OY, 0, RotateAxis.OX, 5, RotateAxis.OZ, 2,
                                             RotateAxis.OY, 0.4048796, [0, 0.8378, 0])
print(test_result)

[-0.0151087040722454, 0.8343491981978997, 0.07299609626426051]


## Servo settings

In [65]:
Servo1_ID = b'\x01'
Servo2_ID = b'\x02'
Servo3_ID = b'\x03'

com = 'COM9' #COM port id
baudrate = 115200 #COM speed

ser = serial.Serial(com, baudrate, timeout=0.2)
Servo1 = Servo_lx16a(Servo1_ID, ser)
Servo2 = Servo_lx16a(Servo2_ID, ser)
Servo3 = Servo_lx16a(Servo3_ID, ser)

## Testing

In [47]:
state_size = 12
action_size = 3
agent = DDPGAgent(state_size = state_size - 3, action_size = action_size, goal_size = 3, 
                  action_high = 5, action_low = -5, 
                  actor_learning_rate = 1e-3, critic_learning_rate = 1e-3,
                  tau = 0.1)

In [48]:
class Servo():
    def __init__(self):
        self.angle = 0

    def rotate(self, delta_angle):
        self.angle += delta_angle


In [49]:
def read_state_test(servo1, servo2, servo3):
    # текущие углы роборуки
    firstAngle = servo1.angle;
    secondAngle = servo2.angle;
    thirdAngle = servo3.angle;
    
    # длины двух сочленений роборуки
    firstHandLength = 0.4048796
    secondHandLength = 0.4048796
    
    # общая длина руки в вытянутом состоянии
    allHandLength = firstHandLength + secondHandLength
    
    # позиция конца роборуки в вытянутом состоянии
    defaultEndPosition = [0, allHandLength, 0]
    
    # поиск позиции конца роборуки по её углам и осям вращения
    endPosition = get_rotate_offset_position(RotateAxis.OY, firstAngle, 
                                             RotateAxis.OX, secondAngle, 
                                             RotateAxis.OX, thirdAngle,
                                             RotateAxis.OY, firstHandLength, 
                                             defaultEndPosition)
    
    cubePosition = [0, 0, 0.5]
    
    # векторное расстояние между концом руки и целью
    vectorDistance = [cubePosition[0] - endPosition[0], 
                      cubePosition[1] - endPosition[1], 
                      cubePosition[2] - endPosition[2]]
    
    state = [vectorDistance[0], vectorDistance[1], vectorDistance[2], 
              endPosition[0], endPosition[1], endPosition[2],
              firstAngle, secondAngle, thirdAngle]
    
    goal = [cubePosition[0], cubePosition[1], cubePosition[2]]
    return state, goal

In [63]:
def read_state():
    # текущие углы роборуки
    # вычитаем 90, так как настоящая рука в вытянутом состоянии находится на 90 градусах у двух серв
    # вытянутое состояние руки по углам = (90, 90, 0)
    firstAngle = Servo1.ReadPosition() - 90;
    secondAngle = Servo2.ReadPosition() - 90;
    thirdAngle = Servo3.ReadPosition();
    
    # длины двух сочленений роборуки
    firstHandLength = 12
    secondHandLength = 9
    
    # общая длина руки в вытянутом состоянии
    allHandLength = firstHandLength + secondHandLength
    
    # позиция конца роборуки в вытянутом состоянии
    defaultEndPosition = [0, allHandLength, 0]
    
    # поиск позиции конца роборуки по её углам и осям вращения
    endPosition = get_rotate_offset_position(RotateAxis.OY, firstAngle, 
                                             RotateAxis.OX, secondAngle, 
                                             RotateAxis.OX, thirdAngle,
                                             RotateAxis.OY, firstHandLength, 
                                             defaultEndPosition)
    
    endPosition = normalize_coordinates(allHandLength, 1, endPosition)
    
    cubePosition = [0, 0, 0.5]
    
    # векторное расстояние между концом руки и целью
    vectorDistance = [cubePosition[0] - endPosition[0], 
                      cubePosition[1] - endPosition[1], 
                      cubePosition[2] - endPosition[2]]
    
    firstAngle = firstAngle/360
    secondAngle = secondAngle/360
    thirdAngle = thirdAngle/360
    
    state = [vectorDistance[0], vectorDistance[1], vectorDistance[2], 
              endPosition[0], endPosition[1], endPosition[2],
              firstAngle, secondAngle, thirdAngle]
    
    goal = [cubePosition[0], cubePosition[1], cubePosition[2]]
    return state, goal

In [64]:
def set_action(action):
    # вычитаем 90, так как настоящая рука в вытянутом состоянии находится на 90 градусах у двух серв
    # вытянутое состояние руки п углам = (90, 90, 0)
    angle1 = Servo1.ReadPosition()
    angle2 = Servo2.ReadPosition()
    angle3 = Servo3.ReadPosition()
    Servo1.MoveServo(angle1 + action[0])
    Servo2.MoveServo(angle2 + action[1])
    Servo3.MoveServo(angle3 + action[2])
    
def set_action_test(action, servo1, servo2, servo3):
    servo1.rotate(action[0])
    servo2.rotate(action[1])    
    servo3.rotate(action[2])
    return servo1, servo2, servo3

def check_if_done(hand, goal):
    hand = np.array(hand)
    goal = np.array(goal)
    dist = np.linalg.norm(hand - goal)
    if dist <= 0.05:
        return True, dist
    return False, dist

In [66]:
n_episodes = 1

servo1 = Servo()
servo2 = Servo()
servo3 = Servo()

test_rewards = []
agent.saver.restore(agent.sess, "model/without_her_rotate_scaled.ckpt")
for i_episode in range(n_episodes):
    
    # вытянутое состояние руки п углам = (90, 90, 0)
    Servo1.MoveServo(90)
    Servo2.MoveServo(90)
    Servo3.MoveServo(0)
    
    state, goal = read_state()
    
    r = 0
    while True:
        action = agent.choose_action([state], [goal], 0)
        print("\n\nACTION = ", action, "\n\n")
        print("\n\nSTATE = ", state, "\n\n")
        # применяем углы к сервам
        #servo1, servo2, servo3 = set_action_test(action, servo1, servo2, servo3)
        
        set_action(action)
        
        # как-то ждем, пока сервы повернутся
        #time.sleep(5)
        
        # получаем обновленный state
        next_state, next_goal = read_state()
        
        done, distance = check_if_done(next_state[3:6], goal)
        
        reward = -1
        
        if done:
            reward += 10
        
        r += reward
        
        print("\n old state = ", state, "\n action = ", action, "\n new state = ", next_state, "\n distance = ", distance,
              "\n current reward = ", reward, "\n cumulative reward = ", r, "\n")
        
        state = next_state
        if done:
            print("episode:", i_episode+1, "rewards: %.2f" % r, end="\r")
            test_rewards += [r]
            break

INFO:tensorflow:Restoring parameters from model/without_her_rotate_scaled.ckpt


INFO:tensorflow:Restoring parameters from model/without_her_rotate_scaled.ckpt


Move servo 1 to 375, sent:
\x55\x55\x1\x7\x1\x77\x1\xdc\x5\x9d
Reading position of servo 1, sent:
\x55\x55\x1\x3\x1c\xdf
Received: 
\x55\x55\x1\x5\x1c\xe6\x3\xf4
Message Header correct
Checksum correct
Current position 998, 239.52
---------------------------------------
Angle_difference: 149.52
---------------------------------------
Reading position of servo 1, sent:
\x55\x55\x1\x3\x1c\xdf
Received: 
\x55\x55\x1\x5\x1c\x9c\x3\x3e
Message Header correct
Checksum correct
Current position 924, 221.76
---------------------------------------
Angle_difference: 131.76
---------------------------------------
Reading position of servo 1, sent:
\x55\x55\x1\x3\x1c\xdf
Received: 
\x55\x55\x1\x5\x1c\x41\x3\x99
Message Header correct
Checksum correct
Current position 833, 199.92
---------------------------------------
Angle_difference: 109.91999999999999
---------------------------------------
Reading position of servo 1, sent:
\x55\x55\x1\x3\x1c\xdf
Received: 
\x55\x55\x1\x5\x1c\xe8\x2\xf3
Message

KeyboardInterrupt: 

In [46]:
#Servo1.MoveServo(0)
#Servo2.MoveServo(0)
angle = Servo3.ReadPosition()  + 10
print("ReadPosition 1 ", Servo1.ReadPosition(), "\n")
print("ReadPosition 2 ", Servo2.ReadPosition(), "\n")
print("ReadPosition 3 ", Servo3.ReadPosition(), "\n")
Servo3.MoveServo(0)

Reading position of servo 3, sent:
\x55\x55\x3\x3\x1c\xdd
Received: 
\x55\x55\x3\x5\x1c\x76\x1\x64
Message Header correct
Checksum correct
Current position 374, 89.76
---------------------------------------
Reading position of servo 1, sent:
\x55\x55\x1\x3\x1c\xdf
Received: 
\x55\x55\x1\x5\x1c\x77\x1\x65
Message Header correct
Checksum correct
Current position 375, 90.0
---------------------------------------
ReadPosition 1  90.0 

Reading position of servo 2, sent:
\x55\x55\x2\x3\x1c\xde
Received: 
\x55\x55\x2\x5\x1c\x76\x1\x65
Message Header correct
Checksum correct
Current position 374, 89.76
---------------------------------------
ReadPosition 2  89.76 

Reading position of servo 3, sent:
\x55\x55\x3\x3\x1c\xdd
Received: 
\x55\x55\x3\x5\x1c\x77\x1\x63
Message Header correct
Checksum correct
Current position 375, 90.0
---------------------------------------
ReadPosition 3  90.0 

Move servo 3 to 0, sent:
\x55\x55\x3\x7\x1\x0\x0\xdc\x5\x13
Reading position of servo 3, sent:
\x55\x55\

In [67]:
ser.close()