In [1]:
!pip3 install pyserial
!pip3 install numpy



In [37]:
#!/usr/bin/env python3
import serial
import serial.rs485
import struct
import time
import numpy as np
import queue

import serial.tools.list_ports
ports = serial.tools.list_ports.comports()
for port, desc, hwid in sorted(ports):
        print("{}: {} [{}]".format(port, desc, hwid))

COM1: Communications Port (COM1) [ACPI\PNP0501\0]
COM3: Silicon Labs CP210x USB to UART Bridge (COM3) [USB VID:PID=10C4:EA60 SER=0001 LOCATION=1-2.4]


In [38]:
ser = serial.Serial(
               port="COM3",
               baudrate = 115200,
               parity=serial.PARITY_NONE,
               stopbits=serial.STOPBITS_ONE,
               bytesize=serial.EIGHTBITS,
               timeout=1)
ser.rs485_mode = serial.rs485.RS485Settings()
print(ser.isOpen())

True


In [77]:
class RS485command:
    
    header = '3E'
    id = '00'
    
    def __init__(self, ser, id):
        self.id = id
        self.ser = ser
        
    def parseHeader(self, ser_bytes):
        msg = {}
        msg['header'] = hex(ser_bytes[0])
        msg['command'] = hex(ser_bytes[1])
        msg['id'] = ser_bytes[2]
        msg['data_length'] = ser_bytes[3]
        msg['head_check_byte'] = hex(ser_bytes[4]) 
        chksum = sum(ser_bytes[0:4])%256
        return msg        
        
    def msgBuilder(self, header, data):
        hexAsBAhead = bytearray.fromhex(header)
        chksumhead = sum(hexAsBAhead)%256
        if(data == ''):
            return hexAsBAhead + chksumhead.to_bytes(1,'little')
        chksumbody = sum(data)%256
        return hexAsBAhead + chksumhead.to_bytes(1,'little') + data + chksumbody.to_bytes(1,'little')  
    
    def speed(self, speed):
        # 0.01 DPS increments
        header = self.header + 'A2' + self.id + '04'
        data = speed.to_bytes(4, 'little', signed=True)
        return self.msgBuilder(header, data)
    
    def angle(self, angle):
        # -36000 - 36000 in 0.01 deg increments
        header = self.header + 'A3' + self.id + '08'
        data = angle.to_bytes(8, 'little', signed=True)
        return self.msgBuilder(header, data)
    
    def angleInc(self, angleInc):
        # -36000 - 36000 in 0.01 deg increments
        header = self.header + 'A7' + self.id + '04'
        data = angleInc.to_bytes(4, 'little', signed=True)
        return self.msgBuilder(header, data)
    

class RMDS28MotorDriver(RS485command):
    
    
    def torque(self, power):        # -1000 - 1000
        header = self.header + 'A0' + self.id + '02'
        data = power.to_bytes(2, 'little', signed=True)
        self.ser.write(self.msgBuilder(header, data))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5]
        msg['power'] = int.from_bytes(ser_bytes[5:7], 'little')
        msg['speed'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['position'] = int.from_bytes(ser_bytes[9:11], 'little')
        msg['data_check_byte'] = hex(ser_bytes[11])
        return (msg)   
    
    def speed(self, speed):         # 0.01 DPS
        header = self.header + 'A2' + self.id + '04'
        data = speed.to_bytes(4, 'little', signed=True)
        self.ser.write(self.msgBuilder(header, data))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5]
        msg['power'] = int.from_bytes(ser_bytes[5:7], 'little')
        msg['speed'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['position'] = int.from_bytes(ser_bytes[9:11], 'little')
        msg['data_check_byte'] = hex(ser_bytes[11])
        return (msg) 
    
    def angle(self, direction, angle, speed):
        # 36000 DPS
        header = self.header + 'A6' + self.id + '08'
        data = direction.to_bytes(1, 'little', signed=False)
        data = data + angle.to_bytes(2, 'little', signed=False)
        data = data + b'\x00'
        data = data + speed.to_bytes(4, 'little', signed=False)
        self.ser.write(self.msgBuilder(header, data))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5]
        msg['power'] = int.from_bytes(ser_bytes[5:7], 'little')
        msg['speed'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['position'] = int.from_bytes(ser_bytes[9:11], 'little')
        msg['data_check_byte'] = hex(ser_bytes[11])
        return (msg)      
    
    def incAngle(self, angle):
        # 36000 DPS
        header = self.header + 'A7' + self.id + '04'
        data = angle.to_bytes(4, 'little', signed=True)
        self.ser.write(self.msgBuilder(header, data))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5]
        msg['power'] = int.from_bytes(ser_bytes[5:7], 'little')
        msg['speed'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['position'] = int.from_bytes(ser_bytes[9:11], 'little')
        msg['data_check_byte'] = hex(ser_bytes[11])
        return (msg)     
    
    
    def readPID(self):
        header = self.header + '30' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(12)
        msg = self.parseHeader(ser_bytes)
        msg['anglePidKp'] = ser_bytes[5]
        msg['anglePidKi'] = ser_bytes[6]
        msg['speedPidKp'] = ser_bytes[7]
        msg['speedPidKi'] = ser_bytes[8]
        msg['iqPidKp'] = ser_bytes[9]
        msg['iqPidKi'] = ser_bytes[10]
        msg['data_check_byte'] = hex(ser_bytes[11])
        return (msg)
    
    def readEncoder(self):
        header = self.header + '90' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(12)
        msg = self.parseHeader(ser_bytes)
        msg['data'] = int.from_bytes(ser_bytes[5:7], 'little')  # 0~16383
        msg['original'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['zero'] = int.from_bytes(ser_bytes[7:9], 'little')
        msg['data_check_byte'] = ser_bytes[9]
        return (msg)   
    
    def readAngle(self):
        header = self.header + '94' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(8)
        msg = self.parseHeader(ser_bytes)
        msg['angle'] = int.from_bytes(ser_bytes[5:7], 'little')*0.01  # 0-35999
        msg['data_check_byte'] = ser_bytes[7]
        return (msg)      
    
    def readState(self):
        header = self.header + '9A' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5] 
        msg['voltage'] = int.from_bytes(ser_bytes[7:9], 'little')*0.1
        msg['error_status'] = ser_bytes[11]
        msg['data_check_byte'] = hex(ser_bytes[12])
        return (msg)     
    
    def clearError(self):
        header = self.header + '9B' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(13)
        msg = self.parseHeader(ser_bytes)
        msg['temperature'] = ser_bytes[5] 
        msg['voltage'] = int.from_bytes(ser_bytes[7:9], 'little')*0.1
        msg['error_status'] = ser_bytes[11]
        msg['data_check_byte'] = hex(ser_bytes[12])
        return (msg)      
    
    def version(self):
        header = self.header + '12' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(48)
        msg = self.parseHeader(ser_bytes)
        msg['driver_name'] = ser_bytes[5:24].decode("ascii").strip('\x00')
        msg['motor_name'] = ser_bytes[24:43].decode("ascii").strip('\x00')
        msg['hardware_version'] = ser_bytes[45]*0.1
        msg['firmware_version'] = ser_bytes[46]*0.1
        msg['data_check_byte'] = hex(ser_bytes[47])
        return (msg)   
    
    def stop(self):
        header = self.header + '81' + self.id + '00'
        self.ser.write(self.msgBuilder(header, ''))
        ser_bytes = self.ser.read(5)
        
        msg = self.parseHeader(ser_bytes)
        return (msg)           

In [78]:

version = RMDS28MotorDriver(ser, '01').version()
print(version)

{'header': '0x3e', 'command': '0x12', 'id': 1, 'data_length': 42, 'head_check_byte': '0x7b', 'driver_name': 'DRD01-S1', 'motor_name': '28XX', 'hardware_version': 2.0, 'firmware_version': 1.7000000000000002, 'data_check_byte': '0x2b'}


In [79]:
msg = RMDS28MotorDriver(ser, '01').readPID()
print(msg)

{'header': '0x3e', 'command': '0x30', 'id': 1, 'data_length': 6, 'head_check_byte': '0x75', 'anglePidKp': 100, 'anglePidKi': 100, 'speedPidKp': 30, 'speedPidKi': 8, 'iqPidKp': 0, 'iqPidKi': 0, 'data_check_byte': '0xee'}


In [80]:
msg = RMDS28MotorDriver(ser, '01').readState()
print(msg)

{'header': '0x3e', 'command': '0x9a', 'id': 1, 'data_length': 7, 'head_check_byte': '0xe0', 'temperature': 34, 'voltage': 12.0, 'error_status': 0, 'data_check_byte': '0x9a'}


In [81]:
msg = RMDS28MotorDriver(ser, '01').clearError()
print(msg)

{'header': '0x3e', 'command': '0x9b', 'id': 1, 'data_length': 7, 'head_check_byte': '0xe1', 'temperature': 34, 'voltage': 12.100000000000001, 'error_status': 0, 'data_check_byte': '0x9b'}


In [82]:
msg = RMDS28MotorDriver(ser, '01').stop()
print(msg)

{'header': '0x3e', 'command': '0x81', 'id': 1, 'data_length': 0, 'head_check_byte': '0xc0'}


In [83]:
msg = RMDS28MotorDriver(ser, '01').readEncoder()
print(msg)

{'header': '0x3e', 'command': '0x90', 'id': 1, 'data_length': 6, 'head_check_byte': '0xd5', 'data': 3782, 'original': 3550, 'zero': 3550, 'data_check_byte': 24}


In [84]:
msg = RMDS28MotorDriver(ser, '01').readAngle()
print(msg)

{'header': '0x3e', 'command': '0x94', 'id': 1, 'data_length': 2, 'head_check_byte': '0xd5', 'angle': 332.40000000000003, 'data_check_byte': 89}


In [85]:
msg = RMDS28MotorDriver(ser, '01').torque(100)
print(msg)

{'header': '0x3e', 'command': '0xa0', 'id': 1, 'data_length': 7, 'head_check_byte': '0xe6', 'temperature': 34, 'power': 34, 'speed': 0, 'position': 50688, 'data_check_byte': '0xe'}


In [86]:
msg = RMDS28MotorDriver(ser, '01').speed(-1000)
print(msg)

{'header': '0x3e', 'command': '0xa2', 'id': 1, 'data_length': 7, 'head_check_byte': '0xe8', 'temperature': 34, 'power': 25634, 'speed': 16640, 'position': 7172, 'data_check_byte': '0xe'}


In [87]:
msg = RMDS28MotorDriver(ser, '01').angle(0, 100, 10000)
print(msg)

{'header': '0x3e', 'command': '0xa6', 'id': 1, 'data_length': 7, 'head_check_byte': '0xec', 'temperature': 34, 'power': 59170, 'speed': 12799, 'position': 51968, 'data_check_byte': '0xd'}


In [88]:
msg = RMDS28MotorDriver(ser, '01').incAngle(-3000)
print(msg)

{'header': '0x3e', 'command': '0xa7', 'id': 1, 'data_length': 7, 'head_check_byte': '0xed', 'temperature': 34, 'power': 1314, 'speed': 52736, 'position': 3327, 'data_check_byte': '0x0'}


In [34]:
ser.close()