From 55b99902d5929d8fc0eb363c1a8b8831fd67a7e3 Mon Sep 17 00:00:00 2001 From: Connor Kneebone Date: Mon, 21 Jun 2021 20:17:20 +1000 Subject: [PATCH] Added an experimental roboclaw driver --- src/motors/RoboClaw/pyroboclaw/__init__.py | 1 + src/motors/RoboClaw/pyroboclaw/roboclaw.py | 271 ++++++++++++++++++ .../RoboClaw/pyroboclaw/roboclaw_cmd.py | 91 ++++++ src/motors/RoboClaw/roboclaw.py | 35 +++ 4 files changed, 398 insertions(+) create mode 100644 src/motors/RoboClaw/pyroboclaw/__init__.py create mode 100644 src/motors/RoboClaw/pyroboclaw/roboclaw.py create mode 100644 src/motors/RoboClaw/pyroboclaw/roboclaw_cmd.py create mode 100644 src/motors/RoboClaw/roboclaw.py diff --git a/src/motors/RoboClaw/pyroboclaw/__init__.py b/src/motors/RoboClaw/pyroboclaw/__init__.py new file mode 100644 index 0000000..e7c6a98 --- /dev/null +++ b/src/motors/RoboClaw/pyroboclaw/__init__.py @@ -0,0 +1 @@ +from .roboclaw import RoboClaw \ No newline at end of file diff --git a/src/motors/RoboClaw/pyroboclaw/roboclaw.py b/src/motors/RoboClaw/pyroboclaw/roboclaw.py new file mode 100644 index 0000000..eac2bc9 --- /dev/null +++ b/src/motors/RoboClaw/pyroboclaw/roboclaw.py @@ -0,0 +1,271 @@ +import logging +import time +from threading import Lock + +import serial +import struct +#from pycrc.CRCCCITT import CRCCCITT + +from roboclaw.roboclaw_cmd import Cmd + +logger = logging.getLogger('pyroboclaw') +logger.setLevel(logging.WARNING) + +def CRCCITT(crc, data): + msb = crc >> 8 + lsb = crc & 255 + for c in data: + x = ord(c) ^ msb + x ^= (x >> 4) + msb = (lsb ^ (x >> 3) ^ (x << 4)) & 255 + lsb = (x ^ (x << 5)) & 255 + return (msb << 8) + lsb + +class RoboClaw: + def __init__(self, port, address, auto_recover=False, **kwargs): + self.port = serial.Serial(baudrate=115200, timeout=0.1, interCharTimeout=0.01) + self.port.port = port + self.address = address + self.serial_lock = Lock() + self.auto_recover = auto_recover + try: + self.port.close() + self.port.open() + except serial.serialutil.SerialException: + logger.exception('roboclaw serial') + if auto_recover: + self.recover_serial() + else: + raise + + def _read(self, cmd, fmt): + cmd_bytes = struct.pack('>BB', self.address, cmd) + try: + # self.port.reset_input_buffer() # TODO: potential bug? + with self.serial_lock: + self.port.write(cmd_bytes) + return_bytes = self.port.read(struct.calcsize(fmt) + 2) + crc_actual = CRCCCITT(cmd_bytes + return_bytes[:-2]) + crc_expect = struct.unpack('>H', return_bytes[-2:])[0] + if crc_actual != crc_expect: + logger.error('read crc failed') + raise CRCException('CRC failed') + return struct.unpack(fmt, return_bytes[:-2]) + except serial.serialutil.SerialException: + if self.auto_recover: + self.recover_serial() + else: + logger.exception('roboclaw serial') + raise + + def _write(self, cmd, fmt, *data): + cmd_bytes = struct.pack('>BB', self.address, cmd) + data_bytes = struct.pack(fmt, *data) + write_crc = CRCCCITT(cmd_bytes + data_bytes) + crc_bytes = struct.pack('>H', write_crc) + try: + with self.serial_lock: + self.port.write(cmd_bytes + data_bytes + crc_bytes) + self.port.flush() + verification = self.port.read(1) + if 0xff != struct.unpack('>B', verification)[0]: + logger.error('write crc failed') + raise CRCException('CRC failed') + except serial.serialutil.SerialException: + if self.auto_recover: + self.recover_serial() + else: + logger.exception('roboclaw serial') + raise + + def set_speed(self, motor, speed): + if motor == 1: + cmd = Cmd.M1SPEED + else: + cmd = Cmd.M2SPEED + self._write(cmd, '>i', speed) + + def recover_serial(self): + self.port.close() + while not self.port.isOpen(): + try: + self.port.close() + self.port.open() + except serial.serialutil.SerialException as e: + time.sleep(0.2) + logger.warning('failed to recover serial. retrying.') + + def drive_to_position_raw(self, motor, accel, speed, deccel, position, buffer): + # drive to a position expressed as a percentage of the full range of the motor + if motor == 1: + cmd = Cmd.M1SPEEDACCELDECCELPOS + else: + cmd = Cmd.M2SPEEDACCELDECCELPOS + self._write(cmd, '>IiIiB', accel, speed, deccel, position, buffer) + + def drive_to_position(self, motor, accel, speed, deccel, position, buffer): + range = self.read_range(motor) + max_speed = self.read_max_speed(motor) + set_speed = (speed / 100.) * max_speed + set_position = (position / 100.) * (range[1] - range[0]) + range[0] + self.drive_to_position_raw(motor, + accel, + round(set_speed), + deccel, + round(set_position), + buffer) + + def drive_motor(self, motor, speed): + # assert -64 <= speed <= 63 + write_speed = speed + 64 + if motor == 1: + cmd = Cmd.M17BIT + else: + cmd = Cmd.M27BIT + self._write(cmd, '>B', write_speed) + + def stop_motor(self, motor): + if motor == 1: + self._write(Cmd.M1FORWARD, '>B', 0) + else: + self._write(Cmd.M2FORWARD, '>B', 0) + + def stop_all(self): + self._write(Cmd.M1FORWARD, '>B', 0) + self._write(Cmd.M2FORWARD, '>B', 0) + + def read_encoder(self, motor): + # Currently, this function doesn't check over/underflow, which is fine since we're using pots. + if motor == 1: + cmd = Cmd.GETM1ENC + else: + cmd = Cmd.GETM2ENC + return self._read(cmd, '>IB')[0] + + def reset_quad_encoders(self): + self._write(Cmd.SETM1ENCCOUNT, '>I', 0) + self._write(Cmd.SETM2ENCCOUNT, '>I', 0) + + def read_range(self, motor): + if motor == 1: + cmd = Cmd.READM1POSPID + else: + cmd = Cmd.READM2POSPID + pid_vals = self._read(cmd, '>IIIIIii') + return pid_vals[5], pid_vals[6] + + def read_position(self, motor): + # returns position as a percentage across the full set range of the motor + encoder = self.read_encoder(motor) + range = self.read_range(motor) + return ((encoder - range[0]) / float(range[1] - range[0])) * 100. + + def read_status(self): + cmd = Cmd.GETERROR + status = self._read(cmd, '>B')[0] + return { + 0x0000: 'Normal', + 0x0001: 'Warning: High Current - Motor 1', + 0x0002: 'Warning: High Current - Motor 2', + 0x0004: 'Emergency Stop Triggered', + 0x0008: 'Error: High Temperature - Sensor 1', + 0x0010: 'Error: High Temperature - Sensor 2', + 0x0020: 'Error: High Voltage - Main Battery', + 0x0040: 'Error: High Voltage - Logic Battery', + 0x0080: 'Error: Low Voltage - Logic Battery', + 0x0100: 'Driver Fault - Motor 1 Driver', + 0x0200: 'Driver Fault - Motor 2 Driver', + 0x0400: 'Warning: High Voltage - Main Battery', + 0x0800: 'Warning: Low Voltage - Main Battery', + 0x1000: 'Warning: High Temperature - Sensor 1', + 0x2000: 'Warning: High Temperature - Sensor 2', + 0x4000: 'Home - Motor 1', + 0x8000: 'Home - Motor 2' + }.get(status, 'Unknown Error') + + def read_temp_sensor(self, sensor): + if sensor == 1: + cmd = Cmd.GETTEMP + else: + cmd = Cmd.GETTEMP2 + return self._read(cmd, '>H')[0] / 10 + + def read_batt_voltage(self, battery): + if battery in ['logic', 'Logic', 'L', 'l']: + cmd = Cmd.GETLBATT + else: + cmd = Cmd.GETMBATT + return self._read(cmd, '>H')[0] / 10 + + def read_voltages(self): + mainbatt = self._read(Cmd.GETMBATT, '>H')[0] / 10. + logicbatt = self._read(Cmd.GETLBATT, '>H')[0] / 10. + return mainbatt, logicbatt + + def read_currents(self): + currents = self._read(Cmd.GETCURRENTS, '>hh') + return tuple([c / 100. for c in currents]) + + def read_motor_current(self, motor): + if motor == 1: + return self.read_currents()[0] + else: + return self.read_currents()[1] + + def read_motor_pwms(self): + pwms = self._read(Cmd.GETPWMS, '>hh') + return tuple([c / 327.67 for c in pwms]) + + def read_motor_pwm(self, motor): + if motor == 1: + return self.read_motor_pwms()[0] + else: + return self.read_motor_pwms()[1] + + def read_input_pin_modes(self): + modes = self._read(Cmd.GETPINFUNCTIONS, '>BBB') + s3_mode = { + 0: 'Default', + 1: 'Emergency Stop (require restart)', + 2: 'Emergency Stop', + 3: 'Voltage Clamp' + }.get(modes[0], 'Unknown Mode') + s4_mode = { + 0: 'Disabled', + 1: 'Emergency Stop (require restart)', + 2: 'Emergency Stop', + 3: 'Voltage Clamp', + 4: 'Home Motor 1' + }.get(modes[0], 'Unknown Mode') + s5_mode = { + 0: 'Disabled', + 1: 'Emergency Stop (require restart)', + 2: 'Emergency Stop', + 3: 'Voltage Clamp', + 4: 'Home Motor 2' + }.get(modes[0], 'Unknown Mode') + return s3_mode, s4_mode, s5_mode + + def read_max_speed(self, motor): + if motor == 1: + cmd = Cmd.READM1PID + else: + cmd = Cmd.READM2PID + return self._read(cmd, '>IIII')[3] + + def read_speed(self, motor): + # returns velocity as a percentage of max speed + if motor == 1: + cmd = Cmd.GETM1SPEED + else: + cmd = Cmd.GETM2SPEED + max_speed = self.read_max_speed(motor) + speed_vals = self._read(cmd, '>IB') + speed = (speed_vals[0] / max_speed) * 100. + if speed_vals[1]: + speed *= -1 + return speed + + +class CRCException(Exception): + pass diff --git a/src/motors/RoboClaw/pyroboclaw/roboclaw_cmd.py b/src/motors/RoboClaw/pyroboclaw/roboclaw_cmd.py new file mode 100644 index 0000000..973860a --- /dev/null +++ b/src/motors/RoboClaw/pyroboclaw/roboclaw_cmd.py @@ -0,0 +1,91 @@ +class Cmd(): + M1FORWARD = 0 + M1BACKWARD = 1 + SETMINMB = 2 + SETMAXMB = 3 + M2FORWARD = 4 + M2BACKWARD = 5 + M17BIT = 6 + M27BIT = 7 + MIXEDFORWARD = 8 + MIXEDBACKWARD = 9 + MIXEDRIGHT = 10 + MIXEDLEFT = 11 + MIXEDFB = 12 + MIXEDLR = 13 + GETM1ENC = 16 + GETM2ENC = 17 + GETM1SPEED = 18 + GETM2SPEED = 19 + RESETENC = 20 + GETVERSION = 21 + SETM1ENCCOUNT = 22 + SETM2ENCCOUNT = 23 + GETMBATT = 24 + GETLBATT = 25 + SETMINLB = 26 + SETMAXLB = 27 + SETM1PID = 28 + SETM2PID = 29 + GETM1ISPEED = 30 + GETM2ISPEED = 31 + M1DUTY = 32 + M2DUTY = 33 + MIXEDDUTY = 34 + M1SPEED = 35 + M2SPEED = 36 + MIXEDSPEED = 37 + M1SPEEDACCEL = 38 + M2SPEEDACCEL = 39 + MIXEDSPEEDACCEL = 40 + M1SPEEDDIST = 41 + M2SPEEDDIST = 42 + MIXEDSPEEDDIST = 43 + M1SPEEDACCELDIST = 44 + M2SPEEDACCELDIST = 45 + MIXEDSPEEDACCELDIST = 46 + GETBUFFERS = 47 + GETPWMS = 48 + GETCURRENTS = 49 + MIXEDSPEED2ACCEL = 50 + MIXEDSPEED2ACCELDIST = 51 + M1DUTYACCEL = 52 + M2DUTYACCEL = 53 + MIXEDDUTYACCEL = 54 + READM1PID = 55 + READM2PID = 56 + SETMAINVOLTAGES = 57 + SETLOGICVOLTAGES = 58 + GETMINMAXMAINVOLTAGES = 59 + GETMINMAXLOGICVOLTAGES = 60 + SETM1POSPID = 61 + SETM2POSPID = 62 + READM1POSPID = 63 + READM2POSPID = 64 + M1SPEEDACCELDECCELPOS = 65 + M2SPEEDACCELDECCELPOS = 66 + MIXEDSPEEDACCELDECCELPOS = 67 + SETM1DEFAULTACCEL = 68 + SETM2DEFAULTACCEL = 69 + SETPINFUNCTIONS = 74 + GETPINFUNCTIONS = 75 + SETDEADBAND = 76 + GETDEADBAND = 77 + RESTOREDEFAULTS = 80 + GETTEMP = 82 + GETTEMP2 = 83 + GETERROR = 90 + GETENCODERMODE = 91 + SETM1ENCODERMODE = 92 + SETM2ENCODERMODE = 93 + WRITENVM = 94 + READNVM = 95 + SETCONFIG = 98 + GETCONFIG = 99 + SETM1MAXCURRENT = 133 + SETM2MAXCURRENT = 134 + GETM1MAXCURRENT = 135 + GETM2MAXCURRENT = 136 + SETPWMMODE = 148 + GETPWMMODE = 149 + FLAGBOOTLOADER = 255 \ No newline at end of file diff --git a/src/motors/RoboClaw/roboclaw.py b/src/motors/RoboClaw/roboclaw.py new file mode 100644 index 0000000..edefbe7 --- /dev/null +++ b/src/motors/RoboClaw/roboclaw.py @@ -0,0 +1,35 @@ +from motor_wrapper import MotorWrapper +import serial +import logging +from pyroboclaw import RoboClaw + +class RoboClawConnection(MotorWrapper): + # What type of motor this wrapper handles + type_ = 'roboclaw' + + def __init__(self, config): + MotorWrapper.__init__(self, config) + self.port = config.get('port') + self.address = 0x80 + self.channels = config.get('channels') + self.connection = RoboClaw(port=self.port, address=self.address) + try: + self.channels.get('left') + self.channels.get('right') + except AttributeError: + self.channels['left'] = 1 + self.channels['right'] = 0 + + def move_raw(self, left=None, right=None): + # Left side + if left is not None: + self.connection.drive_motor(self.channels.get('left'), round(62 / 1000 * left)) + # Right side + if right is not None: + self.connection.drive_motor(self.channels.get('right'), round(62 / 1000 * right)) + + def stop(self): + self.connection.stop_all() + + def close(self): + self.connection = None