In [117]:
import serial
import struct
from dataclasses import dataclass, asdict

In [2]:
PORT = '/dev/ttyACM0'

In [3]:
serial_params = dict(
    port = PORT,
    baudrate = 9600,
    bytesize = serial.EIGHTBITS,
    stopbits = serial.STOPBITS_ONE,
    timeout = 0.5,
)

In [83]:
def checksum_bytes(ba):
    ushorts = [i[0] for i in struct.iter_unpack('<H', ba)]
    cs = (0xffff - sum(ushorts)) % 0xffff
    return struct.pack('<H', cs)

In [292]:
class ChecksumError(Exception):
    pass

In [313]:
@dataclass
class RequestPing:
    packet_number: int
    prefix: int = 0xff7b
    command: int = 0x0020
    extension_length: int = 0x0000
    
    def __iter__(self):
        for i in asdict(self).items():
            yield i
        
    def __bytes__(self):
        base_command = struct.pack('<HHHBxxx', self.prefix,self.command,self.extension_length,self.packet_number)
        base_checksum = checksum_bytes(base_command)
        return base_command + base_checksum

In [317]:
@dataclass
class ResponsePing:
    # Base
    packet_number: int = None # whatever
    prefix: int = None # 0x7b
    command: int = None # 0x0020
    extension_length: int = None # 0x01
    
    cs1: int = None
    
    def parse_bytes(self, res):
        base_command = res[:12]
        if not checksum_bytes(base_command[:-2]) == base_command[-2:]:
            raise ChecksumError("Basecommand checksum failed", res)
            
        (
            self.prefix,
            self.command,
            self.extension_length,
            self.packet_number,
            self.cs1,
        ) = struct.unpack("<HHHBxxxH", res)
        return self
        
    def __iter__(self):
        for i in asdict(self).items():
            yield i
        

In [273]:
@dataclass
class RequestSerial:
    packet_number: int
    prefix: int = 0xff7b
    command: int = 0x0020
    extension_length: int = 0x0006
    subcommand: int = 0x0001
    reserved1: int = 0x000c
    
    def __iter__(self):
        for i in asdict(self).items():
            yield i
        
    def __bytes__(self):
        base_command = struct.pack('<HHHBxxx', self.prefix,self.command,self.extension_length,self.packet_number)
        base_checksum = checksum_bytes(base_command)
        sub_command = struct.pack('<HH', self.subcommand, self.reserved1)
        sub_checksum = checksum_bytes(sub_command)
        
        return base_command + base_checksum + sub_command + sub_checksum

In [302]:
# e.g.
# 7AFF20801E00230000002380010000001400000016A40000CE130000E407090F000F480328000000A91E

@dataclass
class ResponseSerial:
    # Base
    packet_number: int = None # whatever
    prefix: int = None # 0x7b
    command: int = None # 0x0020
    extension_length: int = None # 0x01
    
    # Sub
    subcommand: int = None # 0x0001
    SN: int = None
    YEAR: int = None
    MONTH: int = None
    DAY: int = None
    MAJ: int = None
    MIN: int = None
        
    cs1: int = None
    cs2: int = None
        
    @property
    def VERSION(self):
        return f"{self.MAJ}.{self.MIN}"

    @property
    def SERIAL(self):
        return f"{self.SN:05}-{self.YEAR:02d}{self.MONTH}{self.DAY}"
    
    def parse_bytes(self, res):
        base_command = res[:12]
        if not checksum_bytes(base_command[:-2]) == base_command[-2:]:
            raise ChecksumError("Basecommand checksum failed", res)

        sub_command = res[12:]
        if not checksum_bytes(sub_command[:-2]) == sub_command[-2:]:
            raise ChecksumError("Subcommand checksum failed", res)
            
        (
            self.prefix,
            self.command,
            self.extension_length,
            self.packet_number,
            self.cs1,
            self.subcommand,
            self.SN,
            self.YEAR,
            self.MONTH,
            self.DAY,
            self.MAJ,
            self.MIN,
            self.cs2
        ) = struct.unpack("<HHHBxxxH    H 10x H 2x HBB BB 6x H", res)
        return self
        
    def __iter__(self):
        for i in asdict(self).items():
            yield i
        

In [307]:
with serial.Serial(**serial_params) as port:
    port.write(bytes(RequestSerial(0x69)))
    res = port.read(128)

In [321]:
# Possibly we want to parse the BaseCommand portion, get the extension length,
# Then read the subcommand based on that length.
with serial.Serial(**serial_params) as port:
    port.write(bytes(RequestPing(1)))
    ping_response = ResponsePing().parse_bytes(port.read(64))
    print(ping_response)
    
    port.write(bytes(RequestSerial(2)))
    serial_response = ResponseSerial().parse_bytes(port.read(64))
    print(serial_response)

ResponsePing(packet_number=1, prefix=65402, command=32800, extension_length=0, cs1=32867)
ResponseSerial(packet_number=2, prefix=65402, command=32800, extension_length=30, subcommand=1, SN=5070, YEAR=2020, MONTH=9, DAY=15, MAJ=0, MIN=15, cs1=32836, cs2=7849)


In [309]:
r.VERSION

'0.15'