diff --git a/pros/cli/v5_utils.py b/pros/cli/v5_utils.py index 6e9565ef..7a23a1f6 100644 --- a/pros/cli/v5_utils.py +++ b/pros/cli/v5_utils.py @@ -290,3 +290,34 @@ def capture(file_name: str, port: str, force: bool = False): w.write(file_, i_data) print(f'Saved screen capture to {file_name}') + +@v5.command(aliases=['sv', 'set'], short_help='Set a kernel variable on a connected V5 device') +@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True) +@click.argument('value', required=True, type=click.STRING, nargs=1) +@default_options +def set_variable(variable, value): + import pros.serial.devices.vex as vex + from pros.serial.ports import DirectPort + + # Get the connected v5 device + port = resolve_v5_port(None, 'system')[0] + if port == None: + return + device = vex.V5Device(DirectPort(port)) + actual_value = device.kv_write(variable, value).decode() + print(f'Value of \'{variable}\' set to : {actual_value}') + +@v5.command(aliases=['rv', 'get'], short_help='Read a kernel variable from a connected V5 device') +@click.argument('variable', type=click.Choice(['teamnumber', 'robotname']), required=True) +@default_options +def read_variable(variable): + import pros.serial.devices.vex as vex + from pros.serial.ports import DirectPort + + # Get the connected v5 device + port = resolve_v5_port(None, 'system')[0] + if port == None: + return + device = vex.V5Device(DirectPort(port)) + value = device.kv_read(variable).decode() + print(f'Value of \'{variable}\' is : {value}') diff --git a/pros/serial/devices/vex/v5_device.py b/pros/serial/devices/vex/v5_device.py index c65861e5..b3139001 100644 --- a/pros/serial/devices/vex/v5_device.py +++ b/pros/serial/devices/vex/v5_device.py @@ -892,6 +892,36 @@ def sc_init(self) -> None: self._txrx_ext_struct(0x28, [], '') logger(__name__).debug('Completed ext 0x28 command') + @retries + def kv_read(self, kv: str) -> bytearray: + logger(__name__).debug('Sending ext 0x2e command') + encoded_kv = f'{kv}\0'.encode(encoding='ascii') + tx_payload = struct.pack(f'<{len(encoded_kv)}s', encoded_kv) + # Because the length of the kernel variables is not known, use None to indicate we are recieving an unknown length. + ret = self._txrx_ext_packet(0x2e, tx_payload, 1, check_length=False, check_ack=True) + logger(__name__).debug('Completed ext 0x2e command') + return ret + + @retries + def kv_write(self, kv: str, payload: Union[Iterable, bytes, bytearray, str]): + logger(__name__).debug('Sending ext 0x2f command') + encoded_kv = f'{kv}\0'.encode(encoding='ascii') + kv_to_max_bytes = { + 'teamnumber': 7, + 'robotname': 16 + } + if len(payload) > kv_to_max_bytes.get(kv, 254): + print(f'Truncating input to meet maximum value length ({kv_to_max_bytes[kv]} characters).') + # Trim down size of payload to fit within the 255 byte limit and add null terminator. + payload = payload[:kv_to_max_bytes.get(kv, 254)] + "\0" + if isinstance(payload, str): + payload = payload.encode(encoding='ascii') + tx_fmt =f'<{len(encoded_kv)}s{len(payload)}s' + tx_payload = struct.pack(tx_fmt, encoded_kv, payload) + ret = self._txrx_ext_packet(0x2f, tx_payload, 1, check_length=False, check_ack=True) + logger(__name__).debug('Completed ext 0x2f command') + return payload + def _txrx_ext_struct(self, command: int, tx_data: Union[Iterable, bytes, bytearray], unpack_fmt: str, check_length: bool = True, check_ack: bool = True, timeout: Optional[float] = None) -> Tuple: