# Setup

In [1]:
import time
import serial
# ser = serial.Serial(port='/dev/serial0',baudrate=9600,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,timeout=1)
ser = serial.Serial(port='COM9',baudrate=9600,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,bytesize=serial.EIGHTBITS,timeout=1)

In [8]:
from enum import Enum
import time
import serial
import struct


class CvCmdHandler:
    # misc constants
    DATA_PACKAGE_SIZE = 15
    DATA_PAYLOAD_INDEX = 2

    class eMsgType(Enum):
        MSG_MODE_CONTROL = b'\x10'
        MSG_CV_CMD = b'\x20'
        MSG_ACK = b'\x40'

    class eSepChar(Enum):  # start and ending hexes, acknowledgement bit
        CHAR_STX = b'\x02'
        CHAR_ETX = b'\x03'
        ACK_ASCII = b'ACK'
        CHAR_UNUSED = b'\xFF'

    class eCvCmdState(Enum):
        CMD_STATE_INIT = 0
        CMD_STATE_WAIT_FOR_STX = 1
        CMD_STATE_READ_PAYLOAD = 2

    class eModeControlBits(Enum):
        MODE_AUTO_AIM_BIT = 0b00000001
        MODE_AUTO_MOVE_BIT = 0b00000010
        MODE_ENEMY_DETECTED_BIT = 0b00000100

    def __init__(self):
        self.Heartbeat_State = self.eCvCmdState.CMD_STATE_INIT
        self.AutoAimSwitch = False
        self.AutoMoveSwitch = False
        self.EnemySwitch = False
        self.rxSwitchBuffer = 0

        self.ser = serial.Serial(port='/dev/serial0', baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1)
        # self.ser = serial.Serial(port='COM9', baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1)

        self.txCvCmdMsg = bytearray(self.eSepChar.CHAR_STX.value + self.eMsgType.MSG_CV_CMD.value + self.CHAR_UNUSED.value*12 + self.eSepChar.CHAR_ETX.value)
        # txAckMsg is always the same, so use the immutable bytes object
        self.txAckMsg = b''.join([self.eSepChar.CHAR_STX.value, self.eMsgType.ACK_ASCII.value, self.CHAR_UNUSED.value*11, self.eSepChar.CHAR_ETX.value])
        assert (len(self.txCvCmdMsg) == self.DATA_PACKAGE_SIZE)
        assert (len(self.txAckMsg) == self.DATA_PACKAGE_SIZE)

    def get_AutoAimMode(self):
        return self.AutoAimSwitch

    def get_AutoMoveMode(self):
        return self.AutoMoveSwitch

    def get_EnemyMode(self):
        return self.EnemySwitch

    # @param[in] gimbal_coordinate_x and gimbal_coordinate_y: type is int; must be positive; will be converted to uint16_t
    # @param[in] chassis_speed_x and chassis_speed_y: type is float; can be positive/negative; will be converted to float (32 bits)
    def sendCmd(self, gimbal_coordinate_x, gimbal_coordinate_y, chassis_speed_x, chassis_speed_y):
        # print("Printing CvCmd data")
        self.txCvCmdMsg[self.DATA_PAYLOAD_INDEX:self.DATA_PAYLOAD_INDEX+12] = b''.join([gimbal_coordinate_x.to_bytes(2, 'little'), gimbal_coordinate_y.to_bytes(2, 'little'), struct.pack('<f', chassis_speed_x), struct.pack('<f', chassis_speed_y)])
        ser.write(self.txCvCmdMsg)

    def CvCmd_Heartbeat(self, gimbal_coordinate_x, gimbal_coordinate_y, chassis_speed_x, chassis_speed_y):
        if self.AutoAimSwitch or self.AutoMoveSwitch:
            self.sendCmd(gimbal_coordinate_x, gimbal_coordinate_y, chassis_speed_x, chassis_speed_y)
        self.CvCmd_RxHeartbeat()
        return (self.AutoAimSwitch, self.AutoMoveSwitch, self.EnemySwitch)

    def CvCmd_RxHeartbeat(self):
        if self.Heartbeat_State == self.eCvCmdState.CMD_STATE_INIT:
            self.AutoAimSwitch = False
            self.AutoMoveSwitch = False
            self.EnemySwitch = False

            if ~self.ser.is_open:
                self.ser.open()
            ser.reset_input_buffer()
            ser.reset_output_buffer()

            print("Reactor online. Sensors online. Weapons online. All systems nominal.\n")
            self.Heartbeat_State = self.eCvCmdState.CMD_STATE_WAIT_FOR_STX

        elif self.Heartbeat_State == self.eCvCmdState.CMD_STATE_WAIT_FOR_STX:
            # polling for control msg, if any msg received, ACK back
            if ser.in_waiting >= self.DATA_PACKAGE_SIZE:
                # read_until returns b'' or b'\x...\x02' or '\x02'. The point is it contains bytes up to b'\x02'
                bytesUpToStx = ser.read_until(self.CHAR_STX.value, size=1)
                if bytesUpToStx and (bytesUpToStx[-1] == int.from_bytes(self.CHAR_STX.value, 'little')):
                    self.Heartbeat_State = self.eCvCmdState.CMD_STATE_READ_PAYLOAD
                else:
                    ser.reset_input_buffer()
        elif self.Heartbeat_State == self.eCvCmdState.CMD_STATE_READ_PAYLOAD:
            # CHAR_STX may be the last char in the buffer when switching from CMD_STATE_WAIT_FOR_STX, so need to take care of in_waiting size here
            if ser.in_waiting >= self.DATA_PACKAGE_SIZE-1:
                byteRead = ser.read(1)
                fInvalid = True
                if byteRead == self.eMsgType.MODE_CONTROL.value:
                    self.rxSwitchBuffer = int.from_bytes(ser.read(1), 'little')

                    # check remaining payload
                    bytesUpToEtx = ser.read_until(self.CHAR_ETX.value, size=1)
                    if bytesUpToEtx and (bytesUpToEtx == self.CHAR_UNUSED.value*9 + self.CHAR_ETX.value):
                        self.AutoAimSwitch = self.rxSwitchBuffer & self.MODE_AUTO_AIM_BIT.value
                        self.AutoMoveSwitch = self.rxSwitchBuffer & self.MODE_AUTO_MOVE_BIT.value
                        self.EnemySwitch = self.rxSwitchBuffer & self.MODE_ENEMY_DETECTED_BIT.value
                        ser.write(self.txAckMsg)
                        self.Heartbeat_State = self.eCvCmdState.CMD_STATE_WAIT_FOR_STX
                        fInvalid = False

                if fInvalid:
                    # maybe reader cursor derailed; immediately look for STX again to save looping time
                    bytesUpToStx = ser.read_until(self.CHAR_STX.value, size=1)
                    if bytesUpToStx and (bytesUpToStx[-1] == int.from_bytes(self.CHAR_STX.value, 'little')):
                        pass  # stay in this state; read payload of next msg
                    else:
                        self.Heartbeat_State = self.eCvCmdState.CMD_STATE_WAIT_FOR_STX


In [None]:
CvCmder = CvCmdHandler()
while True:
    CvCmder.CvCmd_Heartbeat(1,2,3,4)

## Unit test

In [None]:
#AutoAim_Heartbeat(10,10,1)
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))