diff --git a/.gitignore b/.gitignore index a19142be..b1f2e29b 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,7 @@ build /example/log /example/tmtc_config.json -/log/ +/src/log /src/tests/log .idea diff --git a/example/config/hook_implementation.py b/example/config/hook_implementation.py index e9d2b598..33b9be52 100644 --- a/example/config/hook_implementation.py +++ b/example/config/hook_implementation.py @@ -18,7 +18,7 @@ class ExampleHookClass(TmTcHookBase): def add_globals_pre_args_parsing(self, gui: bool = False): from tmtccmd.config.globals import set_default_globals_pre_args_parsing - set_default_globals_pre_args_parsing(gui=gui, apid=APID) + set_default_globals_pre_args_parsing(gui=gui, tc_apid=APID, tm_apid=APID) def add_globals_post_args_parsing(self, args: argparse.Namespace): from tmtccmd.config.globals import set_default_globals_post_args_parsing diff --git a/setup.cfg b/setup.cfg index db2a6278..b697bc6f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,7 +32,7 @@ install_requires = colorama>=0.4.4 colorlog>=5.0.0 dle-encoder>=0.2 - spacepackets>=0.2 + spacepackets>=0.4 package_dir = = src packages = find: diff --git a/src/tests/test_global_manager.py b/src/tests/test_global_manager.py index c251af0a..4406866e 100644 --- a/src/tests/test_global_manager.py +++ b/src/tests/test_global_manager.py @@ -1,5 +1,5 @@ from unittest import TestCase -from tmtccmd.config.globals import update_global, get_global, get_global_apid, set_default_apid, \ +from tmtccmd.config.globals import update_global, get_global, \ set_json_cfg_path, get_json_cfg_path, set_glob_com_if_dict, get_glob_com_if_dict, \ set_default_globals_pre_args_parsing, check_and_set_core_mode_arg, CoreModeList, \ CoreGlobalIds @@ -11,11 +11,6 @@ def test_global_module(self): update_global(global_param_id=30, parameter='hello') self.assertTrue(get_global(global_param_id=30) == 'hello') - current_apid = get_global_apid() - set_default_apid(default_apid=0x01) - self.assertTrue(get_global_apid() == 0x01) - set_default_apid(current_apid) - current_path = get_json_cfg_path() set_json_cfg_path('.') self.assertTrue(get_json_cfg_path() == '.') @@ -29,10 +24,8 @@ def test_global_module(self): self.assertTrue(com_if_dict["test"][0] == "Test Interface") set_default_globals_pre_args_parsing( - gui=False, apid=0x02 + gui=False, tc_apid=0x02, tm_apid=0x03 ) - self.assertTrue(get_global_apid() == 0x02) - set_default_apid(current_apid) result = check_and_set_core_mode_arg(mode_arg='udp') self.assertTrue(result == CoreModeList.SEQUENTIAL_CMD_MODE) diff --git a/src/tests/test_printer.py b/src/tests/test_printer.py index 86e034e9..bdf8fd6b 100644 --- a/src/tests/test_printer.py +++ b/src/tests/test_printer.py @@ -1,10 +1,11 @@ from unittest import TestCase from tmtccmd.runner import initialize_tmtc_commander -from tmtccmd.tm import Service1TM, Service5TM +from tmtccmd.tm import Service5TM +from tmtccmd.pus.service_1_verification import Service1TMExtended from tmtccmd.tm.service_5_event import Srv5Subservices from spacepackets.ccsds.time import CdsShortTimestamp -from tmtccmd.tc.service_17_test import pack_service17_ping_command +from tmtccmd.pus.service_17_test import pack_service_17_ping_command from tmtccmd.utility.tmtc_printer import TmTcPrinter, DisplayMode from tmtccmd.utility.logger import get_console_logger, set_tmtc_console_logger from tmtccmd.config.globals import update_global, CoreGlobalIds @@ -25,7 +26,9 @@ def test_print_functions(self): self.assertTrue(self.tmtc_printer.get_display_mode() == DisplayMode.SHORT) self.tmtc_printer.set_display_mode(DisplayMode.LONG) - service_1_tm = Service1TM(subservice_id=1, time=CdsShortTimestamp.init_from_current_time()) + service_1_tm = Service1TMExtended( + subservice=1, time=CdsShortTimestamp.init_from_current_time() + ) service_1_packed = service_1_tm.pack() self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm) # Should not crash and emit warning @@ -33,7 +36,9 @@ def test_print_functions(self): self.tmtc_printer.set_display_mode(DisplayMode.SHORT) self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm) - service_1_tm = Service1TM(subservice_id=2, time=CdsShortTimestamp.init_from_current_time()) + service_1_tm = Service1TMExtended( + subservice=2, time=CdsShortTimestamp.init_from_current_time() + ) service_1_packed = service_1_tm.pack() self.tmtc_printer.print_telemetry( packet_if=service_1_tm, info_if=service_1_tm, print_raw_tm=True @@ -41,7 +46,7 @@ def test_print_functions(self): self.tmtc_printer.set_display_mode(DisplayMode.LONG) service_5_tm = Service5TM( - subservice_id=Srv5Subservices.INFO_EVENT, object_id=bytearray([0x01, 0x02, 0x03, 0x04]), + subservice=Srv5Subservices.INFO_EVENT, object_id=bytearray([0x01, 0x02, 0x03, 0x04]), event_id=22, param_1=32, param_2=82452, time=CdsShortTimestamp.init_from_current_time() ) hook_base = create_hook_mock_with_srv_handlers() @@ -54,7 +59,7 @@ def test_print_functions(self): object_id=bytes([0x01, 0x02, 0x03, 0x04]), event_id=22, param_1=32, param_2=82452 ) - service_17_command = pack_service17_ping_command(ssc=0, apid=42) + service_17_command = pack_service_17_ping_command(ssc=0, apid=42) self.tmtc_printer.print_telecommand( tc_packet_obj=service_17_command, tc_packet_raw=service_17_command.pack() ) diff --git a/src/tests/test_pus.py b/src/tests/test_pus.py index 31a48886..8796ba65 100755 --- a/src/tests/test_pus.py +++ b/src/tests/test_pus.py @@ -3,56 +3,54 @@ from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control from spacepackets.ccsds.time import CdsShortTimestamp -from spacepackets.ecss.conf import get_pus_tm_version, PusVersion +from spacepackets.ecss.conf import get_pus_tm_version, PusVersion, set_default_tm_apid +from spacepackets.util import PrintFormats -from tmtccmd.tm.service_17_test import Service17TM +from tmtccmd.pus.service_17_test import Service17TMExtended class TestTelemetry(TestCase): - def test_space_packet_functions(self): - psc = get_space_packet_sequence_control(sequence_flags=0b111, source_sequence_count=42) - self.assertTrue(psc & 0xc000 == 0xc000) def test_generic_pus_c(self): - pus_17_telemetry = Service17TM( - subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time() + pus_17_telemetry = Service17TMExtended( + subservice=1, ssc=36, time=CdsShortTimestamp.init_from_current_time(), + apid=0xef ) pus_17_raw = pus_17_telemetry.pack() pus_17_telemetry = None def tm_func(raw_telemetry: bytearray): - return Service17TM.unpack(raw_telemetry=raw_telemetry) + return Service17TMExtended.unpack(raw_telemetry=raw_telemetry) self.assertRaises(ValueError, tm_func, bytearray()) self.assertRaises(ValueError, tm_func, None) - pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw) - + pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw) self.assertTrue(get_pus_tm_version() == PusVersion.PUS_C) - self.assertTrue(pus_17_telemetry.get_service() == 17) - self.assertTrue(pus_17_telemetry.get_apid() == 0xef) - self.assertTrue(pus_17_telemetry.get_subservice() == 1) - self.assertTrue(pus_17_telemetry.get_ssc() == 36) - self.assertTrue(pus_17_telemetry.get_tm_data() == bytearray()) - self.assertTrue(pus_17_telemetry.pus_tm.is_valid()) + self.assertTrue(pus_17_telemetry.service == 17) + self.assertTrue(pus_17_telemetry.apid == 0xef) + self.assertTrue(pus_17_telemetry.subservice == 1) + self.assertTrue(pus_17_telemetry.ssc == 36) + self.assertTrue(pus_17_telemetry.tm_data == bytearray()) + self.assertTrue(pus_17_telemetry.pus_tm.valid) self.assertTrue(pus_17_telemetry.get_custom_printout() == "") - self.assertTrue(pus_17_telemetry.return_source_data_string() == "[]") - pus_17_telemetry.pus_tm.print_source_data() - pus_17_telemetry.pus_tm.print_full_packet_string() + self.assertTrue(pus_17_telemetry.get_source_data_string() == "hex []") + pus_17_telemetry.pus_tm.print_source_data(print_format=PrintFormats.HEX) + pus_17_telemetry.pus_tm.print_full_packet_string(print_format=PrintFormats.HEX) # This string changes depending on system time, so its complicated to test its validity - full_string = pus_17_telemetry.pus_tm.return_full_packet_string() + full_string = pus_17_telemetry.pus_tm.get_full_packet_string(print_format=PrintFormats.HEX) print(full_string) print(pus_17_telemetry) print(repr(pus_17_telemetry)) - self.assertTrue(pus_17_telemetry.pus_tm.get_packet_id() == 0x8 << 8 | 0xef) + self.assertTrue(pus_17_telemetry.pus_tm.packet_id == 0x8 << 8 | 0xef) def test_list_functionality(self): - pus_17_telecommand = Service17TM( - subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time() + pus_17_telecommand = Service17TMExtended( + subservice=1, ssc=36, time=CdsShortTimestamp.init_from_current_time() ) pus_17_raw = pus_17_telecommand.pack() - pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw) + pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw) header_list = [] content_list = [] diff --git a/src/tmtccmd/__init__.py b/src/tmtccmd/__init__.py index ffd2be5e..d43b5fbf 100644 --- a/src/tmtccmd/__init__.py +++ b/src/tmtccmd/__init__.py @@ -1,8 +1,8 @@ VERSION_NAME = "tmtccmd" VERSION_MAJOR = 1 -VERSION_MINOR = 9 -VERSION_REVISION = 1 +VERSION_MINOR = 10 +VERSION_REVISION = 0 # I think this needs to be in string representation to be parsed so we can't # use a formatted string here. -__version__ = "1.9.1" +__version__ = "1.10.0" diff --git a/src/tmtccmd/com_if/dummy_com_if.py b/src/tmtccmd/com_if/dummy_com_if.py index 9446fb9e..55e156d0 100644 --- a/src/tmtccmd/com_if/dummy_com_if.py +++ b/src/tmtccmd/com_if/dummy_com_if.py @@ -1,9 +1,12 @@ """Dummy Communication Interface. Currently serves to provide an example without external hardware """ -from tmtccmd.com_if.com_interface_base import CommunicationInterface from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tm import TelemetryListT, Service1TM, Service17TM -from tmtccmd.pus.service_17_test import Srv17Subservices +from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control, SequenceFlags + +from tmtccmd.com_if.com_interface_base import CommunicationInterface +from tmtccmd.tm import TelemetryListT +from tmtccmd.pus.service_1_verification import Service1TMExtended +from tmtccmd.pus.service_17_test import Srv17Subservices, Service17TMExtended from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.tmtc_printer import TmTcPrinter @@ -70,23 +73,27 @@ def generate_reply_package(self): """ if self.last_service == 17: if self.last_subservice == 1: - tm_packer = Service1TM( + tc_psc = get_space_packet_sequence_control( + sequence_flags=SequenceFlags.UNSEGMENTED, + source_sequence_count=self.last_tc_ssc + ) + tm_packer = Service1TMExtended( subservice=1, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id, - tc_ssc=self.last_tc_ssc + tc_psc=tc_psc ) self.current_ssc += 1 tm_packet_raw = tm_packer.pack() self.next_telemetry_package.append(tm_packet_raw) - tm_packer = Service1TM( + tm_packer = Service1TMExtended( subservice=7, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id, - tc_ssc=self.last_tc_ssc + tc_psc=tc_psc ) tm_packet_raw = tm_packer.pack() self.next_telemetry_package.append(tm_packet_raw) self.current_ssc += 1 - tm_packer = Service17TM(subservice=Srv17Subservices.PING_REPLY) + tm_packer = Service17TMExtended(subservice=Srv17Subservices.PING_REPLY) tm_packet_raw = tm_packer.pack() self.next_telemetry_package.append(tm_packet_raw) self.current_ssc += 1 diff --git a/src/tmtccmd/com_if/tcpip_tcp_com_if.py b/src/tmtccmd/com_if/tcpip_tcp_com_if.py index 71d7b744..19aa0737 100644 --- a/src/tmtccmd/com_if/tcpip_tcp_com_if.py +++ b/src/tmtccmd/com_if/tcpip_tcp_com_if.py @@ -10,7 +10,7 @@ import threading import select from collections import deque -from typing import Union, Optional +from typing import Union, Optional, Tuple from spacepackets.ccsds.spacepacket import parse_space_packets @@ -42,7 +42,7 @@ class TcpIpTcpComIF(CommunicationInterface): TM_LOOP_DELAY = 0.2 def __init__( - self, com_if_key: str, com_type: TcpCommunicationType, space_packet_id: int, + self, com_if_key: str, com_type: TcpCommunicationType, space_packet_ids: Tuple[int], tm_polling_freqency: float, tm_timeout: float, tc_timeout_factor: float, send_address: EthernetAddressT, max_recv_size: int, max_packets_stored: int = 50, @@ -52,8 +52,8 @@ def __init__( :param com_if_key: :param com_type: Communication Type. By default, it is assumed that space packets are sent via TCP - :param space_packet_id: 16 bit packet header for space packet headers. It is used - to detect the start of a header. + :param space_packet_ids: 16 bit packet header for space packet headers. Used to + detect the start of PUS packets :param tm_polling_freqency: Polling frequency in seconds :param tm_timeout: Timeout in seconds :param tmtc_printer: Printer instance, can be passed optionally to allow packet debugging @@ -61,7 +61,7 @@ def __init__( super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer) self.tm_timeout = tm_timeout self.com_type = com_type - self.space_packet_id = space_packet_id + self.space_packet_ids = space_packet_ids self.tc_timeout_factor = tc_timeout_factor self.tm_polling_frequency = tm_polling_freqency self.send_address = send_address @@ -122,8 +122,7 @@ def receive(self, poll_timeout: float = 0) -> TelemetryListT: # call. We parse the space packets contained in the stream here if self.com_type == TcpCommunicationType.SPACE_PACKETS: tm_packet_list = parse_space_packets( - analysis_queue=self.__analysis_queue, packet_id=self.space_packet_id, - max_len=self.max_recv_size + analysis_queue=self.__analysis_queue, packet_ids=self.space_packet_ids ) else: while self.__analysis_queue: diff --git a/src/tmtccmd/com_if/tcpip_udp_com_if.py b/src/tmtccmd/com_if/tcpip_udp_com_if.py index a67bc93a..1bc9b82e 100644 --- a/src/tmtccmd/com_if/tcpip_udp_com_if.py +++ b/src/tmtccmd/com_if/tcpip_udp_com_if.py @@ -69,7 +69,7 @@ def open(self, args: any = None): # Set non-blocking because we use select. self.udp_socket.setblocking(False) if self.init_mode == CoreModeList.LISTENER_MODE: - from tmtccmd.tc.service_17_test import pack_service17_ping_command + from tmtccmd.pus.service_17_test import pack_service17_ping_command # Send ping command immediately so the reception address is known for UDP ping_cmd = pack_service17_ping_command(ssc=0) self.send(ping_cmd.pack()) diff --git a/src/tmtccmd/config/args.py b/src/tmtccmd/config/args.py index bb5ef3e7..e2696bf8 100644 --- a/src/tmtccmd/config/args.py +++ b/src/tmtccmd/config/args.py @@ -242,10 +242,10 @@ def prompt_op_code(service_op_code_dict: ServiceOpCodeDictT, service: str) -> st info_adjustment = 34 horz_line_num = op_code_adjustment + info_adjustment + 3 horiz_line = horz_line_num * "-" - op_code_string = "Operation Code".ljust(op_code_adjustment) + op_code_info_str = "Operation Code".ljust(op_code_adjustment) info_string = "Information".ljust(info_adjustment) while True: - LOGGER.info(f"{op_code_string} | {info_string}") + LOGGER.info(f"{op_code_info_str} | {info_string}") LOGGER.info(horiz_line) if service in service_op_code_dict: op_code_dict = service_op_code_dict[service][1] diff --git a/src/tmtccmd/config/com_if.py b/src/tmtccmd/config/com_if.py index ff69db19..44efc3f3 100644 --- a/src/tmtccmd/config/com_if.py +++ b/src/tmtccmd/config/com_if.py @@ -1,5 +1,5 @@ import sys -from typing import Optional +from typing import Optional, Tuple from tmtccmd.config.definitions import CoreGlobalIds, CoreComInterfaces from tmtccmd.core.globals_manager import get_global, update_global @@ -17,7 +17,7 @@ def create_communication_interface_default( com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str, - space_packet_id: int = 0 + space_packet_ids: Tuple[int] = (0,) ) -> Optional[CommunicationInterface]: """Return the desired communication interface object @@ -36,7 +36,7 @@ def create_communication_interface_default( com_if_key == CoreComInterfaces.TCPIP_TCP.value: communication_interface = create_default_tcpip_interface( com_if_key=com_if_key, json_cfg_path=json_cfg_path, tmtc_printer=tmtc_printer, - space_packet_id=space_packet_id + space_packet_ids=space_packet_ids ) elif com_if_key == CoreComInterfaces.SERIAL_DLE.value or \ com_if_key == CoreComInterfaces.SERIAL_FIXED_FRAME.value: @@ -63,13 +63,21 @@ def create_communication_interface_default( return None communication_interface.initialize() return communication_interface - except (IOError, OSError) as e: - LOGGER.error("Error setting up communication interface") - print(e) + except ConnectionRefusedError: + LOGGER.exception(f'TCP/IP connection refused') + if com_if_key == CoreComInterfaces.TCPIP_UDP.value: + LOGGER.warning('Make sure that a UDP server is running') + if com_if_key == CoreComInterfaces.TCPIP_TCP.value: + LOGGER.warning('Make sure that a TCP server is running') + sys.exit(1) + except (IOError, OSError): + LOGGER.exception(f'Error setting up communication interface') sys.exit(1) -def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_packet_id: int = 0): +def default_tcpip_cfg_setup( + tcpip_type: TcpIpType, json_cfg_path: str, space_packet_ids: Tuple[int] = (0,) +): """Default setup for TCP/IP communication interfaces. This intantiates all required data in the globals manager so a TCP/IP communication interface can be built with :func:`create_default_tcpip_interface` @@ -93,11 +101,11 @@ def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_pac ethernet_cfg_dict.update({TcpIpConfigIds.SEND_ADDRESS: send_tuple}) ethernet_cfg_dict.update({TcpIpConfigIds.RECV_ADDRESS: recv_tuple}) ethernet_cfg_dict.update({TcpIpConfigIds.RECV_MAX_SIZE: max_recv_buf_size}) - if space_packet_id == 0 and tcpip_type == TcpIpType.TCP: + if space_packet_ids == (0,) and tcpip_type == TcpIpType.TCP: LOGGER.warning( 'TCP communication interface without any specified space packet ID might not work!' ) - ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_id}) + ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_ids}) update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict) @@ -117,7 +125,8 @@ def default_serial_cfg_setup(com_if_key: str, json_cfg_path: str): def create_default_tcpip_interface( - com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str, space_packet_id: int = 0 + com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str, + space_packet_ids: Tuple[int] = (0,) ) -> Optional[CommunicationInterface]: """Create a default serial interface. Requires a certain set of global variables set up. See :func:`default_tcpip_cfg_setup` for more details. @@ -125,7 +134,7 @@ def create_default_tcpip_interface( :param com_if_key: :param tmtc_printer: :param json_cfg_path: - :param space_packet_id: Two byte packet ID which is used by TCP to parse space packets + :param space_packet_ids: Two byte packet IDs which is used by TCP to parse space packets :return: """ communication_interface = None @@ -136,7 +145,7 @@ def create_default_tcpip_interface( elif com_if_key == CoreComInterfaces.TCPIP_TCP.value: default_tcpip_cfg_setup( tcpip_type=TcpIpType.TCP, json_cfg_path=json_cfg_path, - space_packet_id=space_packet_id + space_packet_ids=space_packet_ids ) ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG) send_addr = ethernet_cfg_dict[TcpIpConfigIds.SEND_ADDRESS] @@ -154,7 +163,7 @@ def create_default_tcpip_interface( elif com_if_key == CoreComInterfaces.TCPIP_TCP.value: communication_interface = TcpIpTcpComIF( com_if_key=com_if_key, com_type=TcpCommunicationType.SPACE_PACKETS, - space_packet_id=space_packet_id, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT), + space_packet_ids=space_packet_ids, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT), tc_timeout_factor=get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR), tm_polling_freqency=0.5, send_address=send_addr, max_recv_size=max_recv_size, tmtc_printer=tmtc_printer, init_mode=init_mode diff --git a/src/tmtccmd/config/definitions.py b/src/tmtccmd/config/definitions.py index b1a58963..2acacd76 100644 --- a/src/tmtccmd/config/definitions.py +++ b/src/tmtccmd/config/definitions.py @@ -18,7 +18,6 @@ class CoreGlobalIds(enum.IntEnum): # Parameters JSON_CFG_PATH = 139 - APID = 140 MODE = 141 CURRENT_SERVICE = 142 COM_IF = 144 diff --git a/src/tmtccmd/config/globals.py b/src/tmtccmd/config/globals.py index 6fbe7ca6..c8768b3a 100644 --- a/src/tmtccmd/config/globals.py +++ b/src/tmtccmd/config/globals.py @@ -5,7 +5,7 @@ from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.conf_util import check_args_in_dict, print_core_globals -from spacepackets.ecss.conf import PusVersion, set_default_apid, get_default_apid, \ +from spacepackets.ecss.conf import PusVersion, set_default_tc_apid, set_default_tm_apid, \ set_pus_tc_version, set_pus_tm_version from tmtccmd.core.globals_manager import update_global, get_global from tmtccmd.config.definitions import CoreGlobalIds, CoreModeList, CoreServiceList, \ @@ -17,10 +17,6 @@ SERVICE_OP_CODE_DICT = dict() -def get_global_apid() -> int: - return get_default_apid() - - def set_json_cfg_path(json_cfg_path: str): update_global(CoreGlobalIds.JSON_CFG_PATH, json_cfg_path) @@ -39,7 +35,7 @@ def get_glob_com_if_dict() -> ComIFDictT: def set_default_globals_pre_args_parsing( - gui: bool, apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C, + gui: bool, tc_apid: int, tm_apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C, pus_tm_version: PusVersion = PusVersion.PUS_C, com_if_id: str = CoreComInterfaces.DUMMY.value, custom_com_if_dict=None, display_mode="long", tm_timeout: float = 4.0, print_to_file: bool = True, @@ -47,8 +43,8 @@ def set_default_globals_pre_args_parsing( ): if custom_com_if_dict is None: custom_com_if_dict = dict() - update_global(CoreGlobalIds.APID, apid) - set_default_apid(default_apid=apid) + set_default_tc_apid(tc_apid=tc_apid) + set_default_tm_apid(tm_apid=tm_apid) set_pus_tc_version(pus_tc_version) set_pus_tm_version(pus_tm_version) update_global(CoreGlobalIds.COM_IF, com_if_id) diff --git a/src/tmtccmd/pus/definitions.py b/src/tmtccmd/pus/definitions.py new file mode 100644 index 00000000..71983444 --- /dev/null +++ b/src/tmtccmd/pus/definitions.py @@ -0,0 +1,5 @@ +from enum import IntEnum + + +class CustomPusServices(IntEnum): + SERVICE_200_MODE = 200 diff --git a/src/tmtccmd/pus/obj_id.py b/src/tmtccmd/pus/obj_id.py index aa4b0a36..b232e6bc 100644 --- a/src/tmtccmd/pus/obj_id.py +++ b/src/tmtccmd/pus/obj_id.py @@ -1,4 +1,5 @@ from __future__ import annotations +from typing import Union import struct from tmtccmd.utility.logger import get_console_logger @@ -7,32 +8,40 @@ class ObjectId: def __init__(self, object_id: int): - self.object_id = object_id - self.id_as_bytes = bytearray() - self.set(object_id=object_id) - - def set(self, object_id: int): - self.object_id = object_id - self.id_as_bytes = struct.pack('!I', self.object_id) - - def set_from_bytes(self, obj_id_as_bytes: bytearray): - if len(obj_id_as_bytes) != 4: - LOGGER.warning(f'Invalid object ID length {len(obj_id_as_bytes)}') - raise ValueError - self.id_as_bytes = obj_id_as_bytes - self.object_id = struct.unpack('!I', self.id_as_bytes[:])[0] + self.id = object_id @classmethod def from_bytes(cls, obj_id_as_bytes: bytearray) -> ObjectId: obj_id = ObjectId(object_id=0) - obj_id.set_from_bytes(obj_id_as_bytes=obj_id_as_bytes) + obj_id.id=obj_id_as_bytes return obj_id - def get_id(self) -> int: - return self.object_id + @property + def id(self) -> int: + return self._object_id + + @id.setter + def id(self, new_id: Union[int, bytearray]): + if isinstance(new_id, int): + self._object_id = new_id + self._id_as_bytes = struct.pack('!I', self._object_id) + elif isinstance(new_id, bytearray): + if len(new_id) != 4: + LOGGER.warning(f'Invalid object ID length {len(new_id)}') + raise ValueError + self._id_as_bytes = new_id + self._object_id = struct.unpack('!I', self._id_as_bytes[:])[0] + else: + raise ValueError + + @property + def as_int(self) -> int: + return self._object_id + @property def as_bytes(self) -> bytes: - return self.id_as_bytes + return self._id_as_bytes + @property def as_string(self) -> str: - return f'0x{self.object_id:08x}' + return f'0x{self._object_id:08x}' diff --git a/src/tmtccmd/pus/service_17_test.py b/src/tmtccmd/pus/service_17_test.py index b9e94251..f0462c2a 100644 --- a/src/tmtccmd/pus/service_17_test.py +++ b/src/tmtccmd/pus/service_17_test.py @@ -1,7 +1,96 @@ +from __future__ import annotations import enum +import sys + +from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry +from spacepackets.ecss.service_17_test import Service17TM +from spacepackets.ecss.definitions import PusServices +from spacepackets.ecss.conf import get_default_tc_apid + +from tmtccmd.config.definitions import QueueCommands +from tmtccmd.tc.definitions import PusTelecommand, TcQueueT +from tmtccmd.tm.base import PusTmInfoBase, PusTmBase class Srv17Subservices(enum.IntEnum): PING_CMD = 1, PING_REPLY = 2, GEN_EVENT = 128 + + +class Service17TMExtended(PusTmBase, PusTmInfoBase, Service17TM): + def __init__( + self, subservice: int, time: CdsShortTimestamp = None, ssc: int = 0, + source_data: bytearray = bytearray([]), apid: int = -1, packet_version: int = 0b000, + pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, secondary_header_flag: bool = True, + space_time_ref: int = 0b0000, destination_id: int = 0 + ): + Service17TM.__init__( + self, + subservice=subservice, + time=time, + ssc=ssc, + source_data=source_data, + apid=apid, + packet_version=packet_version, + pus_version=pus_version, + secondary_header_flag=secondary_header_flag, + space_time_ref=space_time_ref, + destination_id=destination_id + ) + PusTmBase.__init__(self, pus_tm=self.pus_tm) + PusTmInfoBase.__init__(self, pus_tm=self.pus_tm) + if self.subservice == Srv17Subservices.PING_REPLY: + self.set_packet_info("Ping Reply") + + @classmethod + def __empty(cls) -> Service17TMExtended: + return cls( + subservice=0 + ) + + @classmethod + def unpack( + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG + ) -> Service17TMExtended: + service_17_tm = cls.__empty() + service_17_tm.pus_tm = PusTelemetry.unpack( + raw_telemetry=raw_telemetry, pus_version=pus_version + ) + return service_17_tm + + +def pack_service_17_ping_command(ssc: int, apid: int = -1) -> PusTelecommand: + """Generate a simple ping PUS telecommand packet""" + if apid == -1: + apid = get_default_tc_apid() + return PusTelecommand(service=17, subservice=Srv17Subservices.PING_CMD, ssc=ssc, apid=apid) + + +def pack_generic_service17_test(init_ssc: int, tc_queue: TcQueueT, apid: int = -1) -> int: + if apid == -1: + apid = get_default_tc_apid() + new_ssc = init_ssc + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17")) + # ping test + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Ping Test")) + tc_queue.appendleft(pack_service_17_ping_command(ssc=new_ssc).pack_command_tuple()) + new_ssc += 1 + # enable event + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Enable Event")) + command = PusTelecommand(service=5, subservice=5, ssc=new_ssc, apid=apid) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # test event + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Trigger event")) + command = PusTelecommand( + service=17, subservice=Srv17Subservices.GEN_EVENT, ssc=new_ssc, apid=apid + ) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + # invalid subservice + tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Invalid subservice")) + command = PusTelecommand(service=17, subservice=243, ssc=new_ssc, apid=apid) + tc_queue.appendleft(command.pack_command_tuple()) + new_ssc += 1 + return new_ssc diff --git a/src/tmtccmd/tm/service_1_verification.py b/src/tmtccmd/pus/service_1_verification.py similarity index 50% rename from src/tmtccmd/tm/service_1_verification.py rename to src/tmtccmd/pus/service_1_verification.py index 0bace147..afcfa637 100644 --- a/src/tmtccmd/tm/service_1_verification.py +++ b/src/tmtccmd/pus/service_1_verification.py @@ -7,7 +7,8 @@ from typing import Deque from spacepackets.ccsds.time import CdsShortTimestamp -from spacepackets.ecss.tm import PusTelemetry, PusVersion +from spacepackets.ecss.tm import PusVersion, PusTelemetry +from spacepackets.ecss.service_1_verification import Service1TM from tmtccmd.tm.base import PusTmInfoBase, PusTmBase from tmtccmd.utility.logger import get_console_logger @@ -15,55 +16,43 @@ LOGGER = get_console_logger() -class Service1TM(PusTmBase, PusTmInfoBase): +class Service1TMExtended(PusTmBase, PusTmInfoBase, Service1TM): """Service 1 TM class representation. Can be used to deserialize raw service 1 packets. """ def __init__( - self, subservice_id: int, time: CdsShortTimestamp = None, + self, subservice: int, time: CdsShortTimestamp = None, tc_packet_id: int = 0, tc_psc: int = 0, ssc: int = 0, source_data: bytearray = bytearray([]), apid: int = -1, packet_version: int = 0b000, - pus_version: PusVersion = PusVersion.UNKNOWN, pus_tm_version: int = 0b0001, - ack: int = 0b1111, secondary_header_flag: bool = True, space_time_ref: int = 0b0000, + pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, + secondary_header_flag: bool = True, space_time_ref: int = 0b0000, destination_id: int = 0 ): - pus_tm = PusTelemetry( - service_id=1, - subservice_id=subservice_id, + Service1TM.__init__( + self, + subservice=subservice, time=time, ssc=ssc, source_data=source_data, apid=apid, packet_version=packet_version, pus_version=pus_version, - pus_tm_version=pus_tm_version, - ack=ack, secondary_header_flag=secondary_header_flag, space_time_ref=space_time_ref, destination_id=destination_id ) - PusTmBase.__init__(self, pus_tm=pus_tm) - PusTmInfoBase.__init__(self, pus_tm=pus_tm) - self.has_tc_error_code = False - self.is_step_reply = False - # Failure Reports with error code - self.err_code = 0 - self.step_number = 0 - self.error_param1 = 0 - self.error_param2 = 0 - self.tc_packet_id = tc_packet_id - self.tc_psc = tc_psc - self.tc_ssc = tc_psc & 0x3fff + PusTmBase.__init__(self, pus_tm=self.pus_tm) + PusTmInfoBase.__init__(self, pus_tm=self.pus_tm) @classmethod - def __empty(cls): + def __empty(cls) -> Service1TMExtended: return cls( - subservice_id=0 + subservice=0 ) @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN - ) -> Service1TM: + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG + ) -> Service1TMExtended: """Parse a service 1 telemetry packet :param raw_telemetry: @@ -75,17 +64,17 @@ def unpack( service_1_tm.pus_tm = PusTelemetry.unpack( raw_telemetry=raw_telemetry, pus_version=pus_version ) - tm_data = service_1_tm.get_tm_data() + tm_data = service_1_tm.tm_data if len(tm_data) < 4: LOGGER.warning("TM data less than 4 bytes!") raise ValueError service_1_tm.tc_packet_id = tm_data[0] << 8 | tm_data[1] service_1_tm.tc_psc = tm_data[2] << 8 | tm_data[3] service_1_tm.tc_ssc = service_1_tm.tc_psc & 0x3fff - if service_1_tm.get_subservice() % 2 == 0: - service_1_tm.__handle_failure_verification() + if service_1_tm.subservice % 2 == 0: + service_1_tm._handle_failure_verification() else: - service_1_tm.__handle_success_verification() + service_1_tm._handle_success_verification() return service_1_tm @abstractmethod @@ -116,70 +105,32 @@ def append_telemetry_column_headers(self, header_list: list): elif self.is_step_reply: header_list.append("Step Number") - def __handle_failure_verification(self): + def _handle_failure_verification(self): """Handle parsing a verification failure packet, subservice ID 2, 4, 6 or 8 """ - self.specify_packet_info("Failure Verficiation") - self.has_tc_error_code = True - tm_data = self.get_tm_data() - subservice = self.get_subservice() - expected_len = 14 + super()._handle_failure_verification() + self.set_packet_info("Failure Verficiation") + subservice = self.pus_tm.subservice if subservice == 2: self.append_packet_info(" : Acceptance failure") elif subservice == 4: self.append_packet_info(" : Start failure") elif subservice == 6: - self.is_step_reply = True - expected_len = 15 self.append_packet_info(" : Step Failure") elif subservice == 8: self.append_packet_info(" : Completion Failure") - else: - LOGGER.error("Service1TM: Invalid subservice") - if len(tm_data) < expected_len: - LOGGER.warning(f'PUS TM[1,{subservice}] source data smaller than expected 15 bytes') - raise ValueError - current_idx = 4 - if self.is_step_reply: - self.step_number = struct.unpack('>B', tm_data[current_idx: current_idx + 1])[0] - current_idx += 1 - self.err_code = struct.unpack('>H', tm_data[current_idx: current_idx + 2])[0] - current_idx += 2 - self.error_param1 = struct.unpack('>I', tm_data[current_idx: current_idx + 4])[0] - current_idx += 2 - self.error_param2 = struct.unpack('>I', tm_data[current_idx: current_idx + 4])[0] - - def __handle_success_verification(self): - self.pus_tm.specify_packet_info("Success Verification") - if self.get_subservice() == 1: - self.pus_tm.append_packet_info(" : Acceptance success") - elif self.get_subservice() == 3: - self.pus_tm.append_packet_info(" : Start success") - elif self.get_subservice() == 5: - self.is_step_reply = True - self.pus_tm.append_packet_info(" : Step Success") - self.step_number = struct.unpack('>B', self.get_tm_data()[4:5])[0] - elif self.get_subservice() == 7: - self.pus_tm.append_packet_info(" : Completion success") - else: - LOGGER.error("Service1TM: Invalid subservice") - - def get_tc_ssc(self): - return self.tc_ssc - def get_error_code(self): - if self.has_tc_error_code: - return self.err_code - else: - LOGGER.warning("Service1Tm: get_error_code: This is not a failure packet, returning 0") - return 0 - - def get_step_number(self): - if self.is_step_reply: - return self.step_number - else: - LOGGER.warning("Service1Tm: get_step_number: This is not a step reply, returning 0") - return 0 + def _handle_success_verification(self): + super()._handle_success_verification() + self.set_packet_info('Success Verification') + if self.subservice == 1: + self.append_packet_info(" : Acceptance success") + elif self.subservice == 3: + self.append_packet_info(" : Start success") + elif self.subservice == 5: + self.append_packet_info(" : Step Success") + elif self.subservice == 7: + self.append_packet_info(" : Completion success") PusVerifQueue = Deque[Service1TM] diff --git a/src/tmtccmd/pus/service_list.py b/src/tmtccmd/pus/service_list.py index 5f8158e3..e69de29b 100644 --- a/src/tmtccmd/pus/service_list.py +++ b/src/tmtccmd/pus/service_list.py @@ -1,15 +0,0 @@ -from enum import IntEnum - - -class PusServices(IntEnum): - SERVICE_1_VERIFICATION = 1 - SERVICE_2_RAW_CMD = 2 - SERVICE_3_HOUSEKEEPING = 3 - SERVICE_5_EVENT = 5 - SERVICE_6_MEMORY_MGMT = 6 - SERVICE_8_FUNC_CMD = 8 - SERVICE_9_TIME_MGMT = 9 - SERVICE_17_TEST = 17 - SERVICE_20_PARAMETER = 20 - SERVICE_23_FILE_MGMT = 23 - SERVICE_200_MODE = 200 diff --git a/src/tmtccmd/runner.py b/src/tmtccmd/runner.py index a937097e..a413d02c 100644 --- a/src/tmtccmd/runner.py +++ b/src/tmtccmd/runner.py @@ -7,6 +7,8 @@ import os from typing import Union +from spacepackets.ecss.conf import get_default_tc_apid + from tmtccmd import __version__ from tmtccmd.config.hook import TmTcHookBase from tmtccmd.core.backend import BackendBase @@ -247,7 +249,7 @@ def get_default_tmtc_backend(hook_obj: TmTcHookBase, tm_handler: TmHandler, json if tm_handler.get_type() == TmTypes.CCSDS_SPACE_PACKETS: tm_handler = cast(CcsdsTmHandler, tm_handler) tm_handler.initialize(tmtc_printer=tmtc_printer) - apid = get_global(CoreGlobalIds.APID) + apid = get_default_tc_apid() if json_cfg_path: pass diff --git a/src/tmtccmd/tc/packer.py b/src/tmtccmd/tc/packer.py index 27062a34..03f7995b 100644 --- a/src/tmtccmd/tc/packer.py +++ b/src/tmtccmd/tc/packer.py @@ -10,7 +10,7 @@ from tmtccmd.tc.definitions import TcQueueT from spacepackets.ecss.tc import PusTelecommand from tmtccmd.utility.logger import get_console_logger -from tmtccmd.tc.service_17_test import pack_service17_ping_command +from tmtccmd.pus.service_17_test import pack_service_17_ping_command from tmtccmd.tc.service_5_event import pack_generic_service5_test_into LOGGER = get_console_logger() @@ -39,15 +39,15 @@ def pack_service_queue_core(service: int, op_code: str, service_queue: TcQueueT) def default_single_packet_preparation() -> PusTelecommand: - return pack_service17_ping_command(ssc=1700) + return pack_service_17_ping_command(ssc=1700) def default_service_queue_preparation(service: Union[str, int], op_code: str, service_queue: TcQueueT): from tmtccmd.config.definitions import CoreServiceList, QueueCommands if service == CoreServiceList.SERVICE_5.value: pack_generic_service5_test_into(service_queue) - if service == CoreServiceList.SERVICE_17.value: + elif service == CoreServiceList.SERVICE_17.value: service_queue.appendleft((QueueCommands.PRINT, "Sending ping command PUS TC[17,1]")) - service_queue.appendleft(pack_service17_ping_command(ssc=1700).pack_command_tuple()) + service_queue.appendleft(pack_service_17_ping_command(ssc=1700).pack_command_tuple()) else: - LOGGER.warning("Invalid Service !") + LOGGER.warning("Invalid Service!") diff --git a/src/tmtccmd/tc/service_17_test.py b/src/tmtccmd/tc/service_17_test.py deleted file mode 100644 index 08307da2..00000000 --- a/src/tmtccmd/tc/service_17_test.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -Contains definitions and functions related to PUS Service 17 Telecommands -""" -from tmtccmd.config.definitions import QueueCommands -from tmtccmd.tc.definitions import PusTelecommand, TcQueueT -from tmtccmd.config.globals import get_global_apid -from tmtccmd.pus.service_17_test import Srv17Subservices - - -def pack_service17_ping_command(ssc: int, apid: int = -1) -> PusTelecommand: - """Generate a simple ping PUS telecommand packet""" - if apid == -1: - apid = get_global_apid() - return PusTelecommand(service=17, subservice=Srv17Subservices.PING_CMD, ssc=ssc, apid=apid) - - -def pack_generic_service17_test(init_ssc: int, tc_queue: TcQueueT, apid: int = -1) -> int: - if apid == -1: - apid = get_global_apid() - new_ssc = init_ssc - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17")) - # ping test - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Ping Test")) - tc_queue.appendleft(pack_service17_ping_command(ssc=new_ssc).pack_command_tuple()) - new_ssc += 1 - # enable event - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Enable Event")) - command = PusTelecommand(service=5, subservice=5, ssc=new_ssc, apid=apid) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # test event - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Trigger event")) - command = PusTelecommand( - service=17, subservice=Srv17Subservices.GEN_EVENT, ssc=new_ssc, apid=apid - ) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - # invalid subservice - tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 17: Invalid subservice")) - command = PusTelecommand(service=17, subservice=243, ssc=new_ssc, apid=apid) - tc_queue.appendleft(command.pack_command_tuple()) - new_ssc += 1 - return new_ssc diff --git a/src/tmtccmd/tc/service_20_parameter.py b/src/tmtccmd/tc/service_20_parameter.py index a6ab75a0..235fd86f 100644 --- a/src/tmtccmd/tc/service_20_parameter.py +++ b/src/tmtccmd/tc/service_20_parameter.py @@ -2,10 +2,11 @@ """ from typing import Union +from spacepackets.ecss.conf import get_default_tc_apid + from spacepackets.ecss.tc import PusTelecommand from tmtccmd.pus.service_20_parameter import EcssPtc, EcssPfcUnsigned from tmtccmd.utility.logger import get_console_logger -from tmtccmd.config.globals import get_global_apid logger = get_console_logger() @@ -24,7 +25,7 @@ def pack_boolean_parameter_command( @return: """ if apid == -1: - apid = get_global_apid() + apid = get_default_tc_apid() parameter_id = bytearray(4) parameter_id[0] = domain_id diff --git a/src/tmtccmd/tc/service_5_event.py b/src/tmtccmd/tc/service_5_event.py index 1b30d9b4..e5d74e2b 100644 --- a/src/tmtccmd/tc/service_5_event.py +++ b/src/tmtccmd/tc/service_5_event.py @@ -1,6 +1,7 @@ """Contains definitions and functions related to PUS Service 5 Telecommands. """ -from tmtccmd.config.globals import get_global_apid +from spacepackets.ecss.conf import get_default_tc_apid + from tmtccmd.config.definitions import QueueCommands from tmtccmd.tc.definitions import PusTelecommand, TcQueueT from tmtccmd.pus.service_5_event import Srv5Subservices @@ -8,7 +9,7 @@ def pack_enable_event_reporting_command(ssc: int, apid: int = -1): if apid == -1: - apid = get_global_apid() + apid = get_default_tc_apid() return PusTelecommand( service=5, subservice=Srv5Subservices.ENABLE_EVENT_REPORTING, ssc=ssc, apid=apid ) @@ -16,7 +17,7 @@ def pack_enable_event_reporting_command(ssc: int, apid: int = -1): def pack_disable_event_reporting_command(ssc: int, apid: int = -1): if apid == -1: - apid = get_global_apid() + apid = get_default_tc_apid() return PusTelecommand( service=5, subservice=Srv5Subservices.DISABLE_EVENT_REPORTING, ssc=ssc, apid=apid ) @@ -24,7 +25,7 @@ def pack_disable_event_reporting_command(ssc: int, apid: int = -1): def pack_generic_service5_test_into(tc_queue: TcQueueT, apid: int = -1): if apid == -1: - apid = get_global_apid() + apid = get_default_tc_apid() tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5")) # invalid subservice tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 5: Invalid subservice")) diff --git a/src/tmtccmd/tc/service_8_functional_cmd.py b/src/tmtccmd/tc/service_8_functional_cmd.py index fc9fb323..a6a6bae1 100644 --- a/src/tmtccmd/tc/service_8_functional_cmd.py +++ b/src/tmtccmd/tc/service_8_functional_cmd.py @@ -2,7 +2,7 @@ import struct from tmtccmd.tc.definitions import PusTelecommand -from tmtccmd.config.globals import get_global_apid +from spacepackets.ecss.conf import get_default_tc_apid class Srv8Subservices(enum.IntEnum): @@ -15,7 +15,7 @@ def generate_action_command( ssc: int = 0, apid: int = -1 ) -> PusTelecommand: if apid == -1: - apid = get_global_apid() + apid = get_default_tc_apid() data_to_pack = bytearray(object_id) data_to_pack += make_action_id(action_id) + app_data return PusTelecommand( diff --git a/src/tmtccmd/tm/__init__.py b/src/tmtccmd/tm/__init__.py index 494799c5..a023724d 100644 --- a/src/tmtccmd/tm/__init__.py +++ b/src/tmtccmd/tm/__init__.py @@ -1,9 +1,5 @@ -from tmtccmd.tm.definitions import TelemetryListT, TelemetryQueueT +from tmtccmd.tm.service_3_housekeeping import Service3TM -from tmtccmd.tm.service_1_verification import Service1TM +from tmtccmd.tm.definitions import TelemetryListT, TelemetryQueueT from tmtccmd.tm.service_5_event import Service5TM -from tmtccmd.tm.service_3_base import Service3Base -from tmtccmd.tm.service_3_housekeeping import Service3TM -from tmtccmd.tm.service_17_test import Service17TM from tmtccmd.tm.service_8_functional_cmd import Service8TM -from tmtccmd.tm.service_23_file_mgmt import Service23TM diff --git a/src/tmtccmd/tm/base.py b/src/tmtccmd/tm/base.py index 310bd874..1de22cac 100644 --- a/src/tmtccmd/tm/base.py +++ b/src/tmtccmd/tm/base.py @@ -1,38 +1,46 @@ -from abc import abstractmethod +from abc import abstractmethod, ABC from typing import Optional -from spacepackets.ecss.tm import PusTelemetry +from spacepackets.ecss.tm import PusTelemetry, PusVersion +from spacepackets.ccsds.time import CdsShortTimestamp +from spacepackets.util import PrintFormats class PusTmInterface: @abstractmethod def pack(self) -> bytearray: - return bytearray() + raise NotImplementedError + @property @abstractmethod - def get_tm_data(self) -> bytearray: - return bytearray() + def tm_data(self) -> bytearray: + raise NotImplementedError + @property @abstractmethod - def is_valid(self) -> bool: - return False + def valid(self) -> bool: + raise NotImplementedError + @property @abstractmethod - def get_ssc(self) -> int: - return 0 + def ssc(self) -> int: + raise NotImplementedError + @property @abstractmethod - def get_apid(self) -> int: - return 0 + def apid(self) -> int: + raise NotImplementedError + @property @abstractmethod - def get_service(self) -> int: - return -1 + def service(self) -> int: + raise NotImplementedError + @property @abstractmethod - def get_subservice(self) -> int: - return -1 + def subservice(self) -> int: + raise NotImplementedError class PusTmInfoInterface: @@ -45,11 +53,11 @@ def get_custom_printout(self) -> str: return '' @abstractmethod - def return_source_data_string(self) -> str: + def get_source_data_string(self) -> str: return '' @abstractmethod - def specify_packet_info(self, print_info: str): + def set_packet_info(self, print_info: str): pass @abstractmethod @@ -72,46 +80,89 @@ def __init__(self, pus_tm: PusTelemetry): def pack(self) -> bytearray: return self.pus_tm.pack() - def get_tm_data(self) -> bytearray: - return self.pus_tm.get_tm_data() + @property + def tm_data(self) -> bytearray: + return self.pus_tm.tm_data - def get_ssc(self) -> int: - return self.pus_tm.get_ssc() + @property + def ssc(self) -> int: + return self.pus_tm.ssc - def is_valid(self): - return self.pus_tm.is_valid() + @property + def valid(self): + return self.pus_tm.valid - def get_apid(self) -> int: - return self.pus_tm.get_apid() + @property + def apid(self) -> int: + return self.pus_tm.apid - def get_service(self) -> int: - return self.pus_tm.get_service() + @property + def service(self) -> int: + return self.pus_tm.service - def get_subservice(self) -> int: - return self.pus_tm.get_subservice() + @property + def subservice(self) -> int: + return self.pus_tm.subservice class PusTmInfoBase(PusTmInfoInterface): def __init__(self, pus_tm: PusTelemetry): self.pus_tm = pus_tm + self._custom_printout = '' + self._print_info = '' def get_print_info(self) -> str: - return self.pus_tm.print_info + return self._print_info def get_custom_printout(self) -> str: - return self.pus_tm.get_custom_printout() + return self._custom_printout - def return_source_data_string(self) -> str: - return self.pus_tm.return_source_data_string() + def set_custom_printout(self, custom_string: str): + self._custom_printout = custom_string - def specify_packet_info(self, print_info: str): - self.pus_tm.print_info = print_info + def get_source_data_string(self, print_format: PrintFormats = PrintFormats.HEX) -> str: + return self.pus_tm.get_source_data_string(print_format=print_format) - def append_packet_info(self, info: str): - self.pus_tm.print_info += info + def set_packet_info(self, print_info: str): + self._print_info = print_info - def append_telemetry_column_headers(self, header_list: list): - self.pus_tm.append_telemetry_column_headers(header_list=header_list) + def append_packet_info(self, info: str): + self._print_info += info def append_telemetry_content(self, content_list: list): - self.pus_tm.append_telemetry_content(content_list=content_list) + """Default implementation adds the PUS header content to the list which can then be + printed with a simple print() command. To add additional content, override this method. + Any child class should call this function as well if header information is required. + + :param content_list: Header content will be appended to this list + :return: + """ + content_list.append(f'{self.pus_tm.service}') + content_list.append(f'{self.pus_tm.subservice}') + content_list.append(f'{self.pus_tm.secondary_packet_header.message_counter}') + content_list.append(f'{self.pus_tm.secondary_packet_header.time.return_unix_seconds()}') + content_list.append(f'{self.pus_tm.secondary_packet_header.time.return_time_string()}') + content_list.append(f'0x{self.pus_tm.space_packet_header.apid:02x}') + content_list.append(f'{self.pus_tm.space_packet_header.ssc}') + if self.pus_tm.valid: + content_list.append("Yes") + else: + content_list.append("No") + + def append_telemetry_column_headers(self, header_list: list): + """Default implementation adds the PUS header content header (confusing, I know) + to the list which can then be printed with a simple print() command. + To add additional headers, override this method. Any child class should + call this function as well if header information is required. + + :param header_list: Header content will be appended to this list + :return: + """ + header_list.append("Service") + header_list.append("Subservice") + header_list.append("MSG Counter") + header_list.append("Time (Unix Seconds)") + header_list.append("Time") + header_list.append("APID") + header_list.append("SSC") + header_list.append("Packet valid") diff --git a/src/tmtccmd/tm/handler.py b/src/tmtccmd/tm/handler.py index b4a058ad..443a50e8 100644 --- a/src/tmtccmd/tm/handler.py +++ b/src/tmtccmd/tm/handler.py @@ -1,8 +1,8 @@ from tmtccmd.tm.definitions import TmTypes from spacepackets.ecss.tm import PusTelemetry from tmtccmd.tm.service_5_event import Service5TM -from tmtccmd.tm.service_1_verification import Service1TM -from tmtccmd.tm.service_17_test import Service17TM +from tmtccmd.pus.service_1_verification import Service1TM +from tmtccmd.pus.service_17_test import Service17TMExtended from tmtccmd.utility.logger import get_console_logger from tmtccmd.utility.tmtc_printer import TmTcPrinter @@ -26,11 +26,11 @@ def default_factory_hook(raw_tm_packet: bytearray, tmtc_printer: TmTcPrinter): service_type = raw_tm_packet[7] tm_packet = None if service_type == 1: - tm_packet = Service1TM.unpack(raw_telemetry=raw_tm_packet) + tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet) if service_type == 5: tm_packet = Service5TM.unpack(raw_telemetry=raw_tm_packet) if service_type == 17: - tm_packet = Service17TM.unpack(raw_telemetry=raw_tm_packet) + tm_packet = Service17TMExtended.unpack(raw_telemetry=raw_tm_packet) if tm_packet is None: LOGGER.info(f'The service {service_type} is not implemented in Telemetry Factory') tm_packet = PusTelemetry.unpack(raw_telemetry=raw_tm_packet) diff --git a/src/tmtccmd/tm/service_17_test.py b/src/tmtccmd/tm/service_17_test.py deleted file mode 100644 index 24aacd13..00000000 --- a/src/tmtccmd/tm/service_17_test.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations -from tmtccmd.pus.service_list import PusServices -from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry -from tmtccmd.tm.base import PusTmInfoBase, PusTmBase - - -class Service17TM(PusTmBase, PusTmInfoBase): - def __init__( - self, subservice_id: int, time: CdsShortTimestamp = None, ssc: int = 0, - source_data: bytearray = bytearray([]), apid: int = -1, packet_version: int = 0b000, - pus_version: PusVersion = PusVersion.UNKNOWN, pus_tm_version: int = 0b0001, - ack: int = 0b1111, secondary_header_flag: bool = True, space_time_ref: int = 0b0000, - destination_id: int = 0 - ): - pus_tm = PusTelemetry( - service_id=PusServices.SERVICE_17_TEST, - subservice_id=subservice_id, - time=time, - ssc=ssc, - source_data=source_data, - apid=apid, - packet_version=packet_version, - pus_version=pus_version, - pus_tm_version=pus_tm_version, - ack=ack, - secondary_header_flag=secondary_header_flag, - space_time_ref=space_time_ref, - destination_id=destination_id - ) - PusTmBase.__init__(self, pus_tm=pus_tm) - PusTmInfoBase.__init__(self, pus_tm=pus_tm) - self.pus_tm.specify_packet_info("Ping Reply") - - @classmethod - def __empty(cls) -> Service17TM: - return cls( - subservice_id=0 - ) - - @classmethod - def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN - ) -> Service17TM: - service_17_tm = cls.__empty() - service_17_tm.pus_tm = PusTelemetry.unpack( - raw_telemetry=raw_telemetry, pus_version=pus_version - ) - return service_17_tm diff --git a/src/tmtccmd/tm/service_200_mode.py b/src/tmtccmd/tm/service_200_mode.py index 86b5e075..6e63a3b0 100644 --- a/src/tmtccmd/tm/service_200_mode.py +++ b/src/tmtccmd/tm/service_200_mode.py @@ -2,9 +2,9 @@ """ from __future__ import annotations import struct - -from tmtccmd.pus.service_list import PusServices from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry + +from tmtccmd.pus.definitions import CustomPusServices from tmtccmd.tm.base import PusTmInfoBase, PusTmBase @@ -26,8 +26,8 @@ def __init__( source_data.extend(struct.pack('!I', mode)) source_data.append(submode) pus_tm = PusTelemetry( - service_id=PusServices.SERVICE_200_MODE, - subservice_id=subservice_id, + service_=CustomPusServices.SERVICE_200_MODE, + subservice=subservice_id, time=time, ssc=ssc, source_data=source_data, @@ -77,7 +77,7 @@ def __empty(cls) -> Service200TM: @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG ) -> Service200TM: service_200_tm = cls.__empty() service_200_tm.pus_tm = PusTelemetry.unpack( diff --git a/src/tmtccmd/tm/service_20_parameters.py b/src/tmtccmd/tm/service_20_parameters.py index b26b6d06..30eb376b 100644 --- a/src/tmtccmd/tm/service_20_parameters.py +++ b/src/tmtccmd/tm/service_20_parameters.py @@ -2,9 +2,10 @@ import os import struct -from tmtccmd.pus.service_list import PusServices -from tmtccmd.pus.obj_id import ObjectId from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry +from spacepackets.ecss.definitions import PusServices + +from tmtccmd.pus.obj_id import ObjectId from tmtccmd.tm.base import PusTmInfoBase, PusTmBase from tmtccmd.utility.logger import get_console_logger @@ -118,7 +119,7 @@ def __empty(cls) -> Service20TM: @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG ) -> Service20TM: service_20_tm = cls.__empty() service_20_tm.pus_tm = PusTelemetry.unpack( diff --git a/src/tmtccmd/tm/service_23_file_mgmt.py b/src/tmtccmd/tm/service_23_file_mgmt.py index e055fb61..51d34b4f 100644 --- a/src/tmtccmd/tm/service_23_file_mgmt.py +++ b/src/tmtccmd/tm/service_23_file_mgmt.py @@ -1,8 +1,9 @@ from __future__ import annotations import struct -from tmtccmd.pus.service_list import PusServices +from spacepackets.ecss.definitions import PusServices from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry + from tmtccmd.tm.base import PusTmInfoBase, PusTmBase from tmtccmd.utility.logger import get_console_logger @@ -28,7 +29,7 @@ def __init__( object_id: bytearray, repo_path: str, file_name: str, time: CdsShortTimestamp = None, ssc: int = 0, source_data: bytearray = bytearray([]), apid: int = -1, packet_version: int = 0b000, - pus_version: PusVersion = PusVersion.UNKNOWN, pus_tm_version: int = 0b0001, + pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, pus_tm_version: int = 0b0001, ack: int = 0b1111, secondary_header_flag: bool = True, space_time_ref: int = 0b0000, destination_id: int = 0 ): @@ -79,7 +80,7 @@ def __empty(cls) -> Service23TM: @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG ) -> Service23TM: service_23_tm = cls.__empty() service_23_tm.pus_tm = PusTelemetry.unpack( diff --git a/src/tmtccmd/tm/service_2_raw_cmd.py b/src/tmtccmd/tm/service_2_raw_cmd.py index b2a6595e..1f1d1090 100644 --- a/src/tmtccmd/tm/service_2_raw_cmd.py +++ b/src/tmtccmd/tm/service_2_raw_cmd.py @@ -2,6 +2,7 @@ """ from __future__ import annotations from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry + from tmtccmd.tm.base import PusTmInfoBase, PusTmBase @@ -40,7 +41,7 @@ def __empty(cls) -> Service2TM: @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG ) -> Service2TM: service_2_tm = cls.__empty() service_2_tm.pus_tm = PusTelemetry.unpack( diff --git a/src/tmtccmd/tm/service_3_base.py b/src/tmtccmd/tm/service_3_base.py index 361590f2..434fc167 100644 --- a/src/tmtccmd/tm/service_3_base.py +++ b/src/tmtccmd/tm/service_3_base.py @@ -23,19 +23,24 @@ def __init__(self, object_id: int, custom_hk_handling: bool = False): self._param_length = 0 self._custom_hk_handling = custom_hk_handling - def get_object_id(self) -> ObjectId: + @property + def object_id(self) -> ObjectId: return self._object_id - def get_set_id(self) -> int: + @property + def set_id(self) -> int: return self._set_id + @property def has_custom_hk_handling(self) -> bool: return self._custom_hk_handling - def set_custom_hk_handling(self, custom_hk_handling: bool): - self._custom_hk_handling = custom_hk_handling + @has_custom_hk_handling.setter + def has_custom_hk_handling(self, has_custom_hk_handling: bool): + self._custom_hk_handling = has_custom_hk_handling - def get_hk_definitions_list(self) -> Tuple[List, List]: + @property + def hk_definitions_list(self) -> Tuple[List, List]: """Can be implemented by a child class to print definitions lists. The first list should contain a header with parameter names, and the second list shall contain the corresponding set IDs""" diff --git a/src/tmtccmd/tm/service_3_housekeeping.py b/src/tmtccmd/tm/service_3_housekeeping.py index 87189455..ad626c54 100644 --- a/src/tmtccmd/tm/service_3_housekeeping.py +++ b/src/tmtccmd/tm/service_3_housekeeping.py @@ -7,7 +7,7 @@ from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry from tmtccmd.tm.base import PusTmInfoBase, PusTmBase -from tmtccmd.tm import Service3Base +from tmtccmd.tm.service_3_base import Service3Base from tmtccmd.utility.logger import get_console_logger from typing import Tuple, List @@ -32,8 +32,8 @@ def __init__( custom_hk_handling: bool = False, ssc: int = 0, apid: int = -1, minimum_reply_size: int = DEFAULT_MINIMAL_PACKET_SIZE, minimum_structure_report_header_size: int = STRUCTURE_REPORT_FIXED_HEADER_SIZE, - packet_version: int = 0b000, pus_version: PusVersion = PusVersion.UNKNOWN, - pus_tm_version: int = 0b0001, ack: int = 0b1111, secondary_header_flag: bool = True, + packet_version: int = 0b000, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, + pus_tm_version: int = 0b0001, secondary_header_flag: bool = True, space_time_ref: int = 0b0000, destination_id: int = 0 ): """Service 3 packet class representation which can be built from a raw bytearray @@ -47,21 +47,19 @@ def __init__( """ Service3Base.__init__(self, object_id=0, custom_hk_handling=custom_hk_handling) source_data = bytearray() - source_data.extend(struct.pack('!I', self.get_object_id().get_id())) - source_data.extend(struct.pack('!I', self._set_id)) + source_data.extend(struct.pack('!I', self.object_id.id)) + source_data.extend(struct.pack('!I', self.set_id)) if subservice_id == 25 or subservice_id == 26: source_data.extend(hk_data) pus_tm = PusTelemetry( - service_id=3, - subservice_id=subservice_id, + service=3, + subservice=subservice_id, time=time, ssc=ssc, source_data=source_data, apid=apid, packet_version=packet_version, pus_version=pus_version, - pus_tm_version=pus_tm_version, - ack=ack, secondary_header_flag=secondary_header_flag, space_time_ref=space_time_ref, destination_id=destination_id @@ -82,10 +80,10 @@ def __init_without_base( minimum_reply_size: int = DEFAULT_MINIMAL_PACKET_SIZE, minimum_structure_report_header_size: int = STRUCTURE_REPORT_FIXED_HEADER_SIZE ): - instance.set_custom_hk_handling(custom_hk_handling=custom_hk_handling) - if instance.has_custom_hk_handling(): + instance.custom_hk_handling=custom_hk_handling + if instance.custom_hk_handling: return - tm_data = instance.get_tm_data() + tm_data = instance.tm_data if len(tm_data) < 8: LOGGER.warning( f'Invalid Service 3 packet, is too short. Detected TM data length: {len(tm_data)}' @@ -93,12 +91,12 @@ def __init_without_base( raise ValueError instance.min_hk_reply_size = minimum_reply_size instance.hk_structure_report_header_size = minimum_structure_report_header_size - instance.get_object_id().set_from_bytes(obj_id_as_bytes=tm_data[0:4]) - instance._set_id = struct.unpack('!I', tm_data[4:8])[0] - if instance.get_subservice() == 25 or instance.get_subservice() == 26: + instance.object_id.id = tm_data[0:4] + instanceset_id = struct.unpack('!I', tm_data[4:8])[0] + if instance.subservice == 25 or instance.subservice == 26: if len(tm_data) > 8: instance._param_length = len(tm_data[8:]) - instance.specify_packet_info("Housekeeping Packet") + instance.set_packet_info("Housekeeping Packet") @classmethod def __empty(cls) -> Service3TM: @@ -111,7 +109,7 @@ def __empty(cls) -> Service3TM: @classmethod def unpack( cls, raw_telemetry: bytearray, custom_hk_handling: bool, - pus_version: PusVersion = PusVersion.UNKNOWN, + pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, ) -> Service3TM: service_3_tm = cls.__empty() service_3_tm.pus_tm = PusTelemetry.unpack( @@ -126,8 +124,8 @@ def unpack( @abstractmethod def append_telemetry_content(self, content_list: list): super().append_telemetry_content(content_list=content_list) - content_list.append(self.get_object_id().as_string()) - content_list.append(hex(self._set_id)) + content_list.append(self.object_id.as_string) + content_list.append(hex(self.set_id)) content_list.append(int(self._param_length)) @abstractmethod @@ -138,11 +136,11 @@ def append_telemetry_column_headers(self, header_list: list): header_list.append("HK data size") def get_hk_definitions_list(self) -> Tuple[List, List]: - tm_data = self.get_tm_data() + tm_data = self.tm_data if len(tm_data) < self.hk_structure_report_header_size: LOGGER.warning( f'Service3TM: handle_filling_definition_arrays: Invalid structure report ' - f'from {self.get_object_id().as_string()}, is shorter ' + f'from {self.object_id.as_string}, is shorter ' f'than {self.hk_structure_report_header_size}' ) return [], [] @@ -157,7 +155,7 @@ def get_hk_definitions_list(self) -> Tuple[List, List]: if len(tm_data) < self.hk_structure_report_header_size + num_params * 4: LOGGER.warning( f'Service3TM: handle_filling_definition_arrays: Invalid structure report ' - f'from {self.get_object_id().as_string()}, is shorter than ' + f'from {self.object_id.as_string}, is shorter than ' f'{self.hk_structure_report_header_size + num_params * 4}' ) return [], [] @@ -181,7 +179,7 @@ def get_hk_definitions_list(self) -> Tuple[List, List]: else: valid_string = "No" definitions_content = [ - self.get_object_id().as_string(), self._set_id, status_string, valid_string, + self.object_id.as_string, self._set_id, status_string, valid_string, collection_interval_seconds, num_params ] definitions_content.extend(parameters) diff --git a/src/tmtccmd/tm/service_5_event.py b/src/tmtccmd/tm/service_5_event.py index d30287f6..49d841cd 100644 --- a/src/tmtccmd/tm/service_5_event.py +++ b/src/tmtccmd/tm/service_5_event.py @@ -5,9 +5,9 @@ from abc import abstractmethod import struct -from tmtccmd.pus.service_list import PusServices -from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion, PusTelemetry -from tmtccmd.tm.base import PusTmInfoBase, PusTmBase +from spacepackets.ecss.definitions import PusServices +from spacepackets.ecss.tm import CdsShortTimestamp, PusVersion +from tmtccmd.tm.base import PusTmInfoBase, PusTmBase, PusTelemetry from tmtccmd.pus.service_5_event import Srv5Subservices, Srv5Severity from tmtccmd.pus.obj_id import ObjectId from tmtccmd.utility.logger import get_console_logger @@ -18,12 +18,12 @@ class Service5TM(PusTmBase, PusTmInfoBase): def __init__( - self, subservice_id: Srv5Subservices, event_id: int, object_id: bytearray, + self, subservice: Srv5Subservices, event_id: int, object_id: bytearray, param_1: int, param_2: int, time: CdsShortTimestamp = None, ssc: int = 0, apid: int = -1, packet_version: int = 0b000, - pus_version: PusVersion = PusVersion.UNKNOWN, - pus_tm_version: int = 0b0001, ack: int = 0b1111, secondary_header_flag: bool = True, - space_time_ref: int = 0b0000, destination_id: int = 0 + pus_version: PusVersion = PusVersion.GLOBAL_CONFIG, + secondary_header_flag: bool = True, space_time_ref: int = 0b0000, + destination_id: int = 0 ): """Create a Service 5 telemetry instance. Use the unpack function to create an instance from a raw bytestream instead. @@ -45,16 +45,14 @@ def __init__( source_data.extend(struct.pack('!I', self._param_1)) source_data.extend(struct.pack('!I', self._param_2)) pus_tm = PusTelemetry( - service_id=PusServices.SERVICE_5_EVENT, - subservice_id=subservice_id, + service=PusServices.SERVICE_5_EVENT, + subservice=subservice, time=time, ssc=ssc, source_data=source_data, apid=apid, packet_version=packet_version, pus_version=pus_version, - pus_tm_version=pus_tm_version, - ack=ack, secondary_header_flag=secondary_header_flag, space_time_ref=space_time_ref, destination_id=destination_id @@ -66,7 +64,7 @@ def __init__( @classmethod def __empty(cls) -> Service5TM: return cls( - subservice_id=Srv5Subservices.INFO_EVENT, + subservice=Srv5Subservices.INFO_EVENT, event_id=0, object_id=bytearray(4), param_1=0, @@ -75,7 +73,7 @@ def __empty(cls) -> Service5TM: @classmethod def unpack( - cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN + cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG ) -> Service5TM: service_5_tm = cls.__empty() service_5_tm.pus_tm = PusTelemetry.unpack( @@ -88,7 +86,7 @@ def unpack( def append_telemetry_content(self, content_list: list): super().append_telemetry_content(content_list=content_list) content_list.append(str(self._event_id)) - content_list.append(self._object_id.as_string()) + content_list.append(self._object_id.as_string) content_list.append(str(hex(self._param_1)) + ", " + str(self._param_1)) content_list.append(str(hex(self._param_2)) + ", " + str(self._param_2)) @@ -100,35 +98,40 @@ def append_telemetry_column_headers(self, header_list: list): header_list.append("Parameter 1") header_list.append("Parameter 2") - def get_reporter_id_as_bytes(self) -> bytes: - return self._object_id.as_bytes() + @property + def reporter_id(self) -> int: + return self._object_id.id - def get_reporter_id(self) -> int: - return self._object_id.get_id() + @property + def reporter_id_as_bytes(self) -> bytes: + return self._object_id.as_bytes - def get_event_id(self): + @property + def event_id(self): return self._event_id - def get_param_1(self): + @property + def param_1(self): return self._param_1 - def get_param_2(self): + @property + def param_2(self): return self._param_2 @staticmethod def __init_without_base(instance: Service5TM, set_attrs_from_tm_data: bool = False): - if instance.get_service() != 5: + if instance.service != 5: LOGGER.warning("This packet is not an event service packet!") - instance.specify_packet_info("Event") - if instance.get_subservice() == Srv5Subservices.INFO_EVENT: + instance.set_packet_info("Event") + if instance.subservice == Srv5Subservices.INFO_EVENT: instance.append_packet_info(" Info") - elif instance.get_subservice() == Srv5Subservices.LOW_SEVERITY_EVENT: + elif instance.subservice == Srv5Subservices.LOW_SEVERITY_EVENT: instance.append_packet_info(" Error Low Severity") - elif instance.get_subservice() == Srv5Subservices.MEDIUM_SEVERITY_EVENT: + elif instance.subservice == Srv5Subservices.MEDIUM_SEVERITY_EVENT: instance.append_packet_info(" Error Med Severity") - elif instance.get_subservice() == Srv5Subservices.HIGH_SEVERITY_EVENT: + elif instance.subservice == Srv5Subservices.HIGH_SEVERITY_EVENT: instance.append_packet_info(" Error High Severity") - tm_data = instance.get_tm_data() + tm_data = instance.tm_data if len(tm_data) < 14: LOGGER.warning(f'Length of TM data field {len(tm_data)} shorter than expected 14 bytes') raise ValueError diff --git a/src/tmtccmd/tm/service_8_functional_cmd.py b/src/tmtccmd/tm/service_8_functional_cmd.py index ee2f54e1..7e21e3f8 100644 --- a/src/tmtccmd/tm/service_8_functional_cmd.py +++ b/src/tmtccmd/tm/service_8_functional_cmd.py @@ -72,7 +72,7 @@ def __empty(cls) -> Service8TM: ) @classmethod - def unpack(cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.UNKNOWN): + def unpack(cls, raw_telemetry: bytearray, pus_version: PusVersion = PusVersion.GLOBAL_CONFIG): service_8_tm = cls.__empty() service_8_tm.pus_tm = PusTelemetry.unpack( raw_telemetry=raw_telemetry, pus_version=pus_version @@ -90,14 +90,18 @@ def append_telemetry_column_headers(self, header_list: list): header_list.append("Object ID") header_list.append("Action ID") - def get_source_object_id_as_bytes(self) -> bytes: - return self._object_id.as_bytes() + @property + def source_object_id_as_bytes(self) -> bytes: + return self._object_id.as_bytes - def get_source_object_id(self) -> int: - return self._object_id.get_id() + @property + def source_object_id(self) -> int: + return self._object_id.as_int - def get_action_id(self) -> int: + @property + def action_id(self) -> int: return self._action_id - def get_custom_data(self) -> bytearray: + @property + def custom_data(self) -> bytearray: return self._custom_data diff --git a/src/tmtccmd/utility/tmtc_printer.py b/src/tmtccmd/utility/tmtc_printer.py index 62a24cb0..bea71d10 100644 --- a/src/tmtccmd/utility/tmtc_printer.py +++ b/src/tmtccmd/utility/tmtc_printer.py @@ -5,8 +5,11 @@ from typing import cast from spacepackets.ecss.tc import PusTelecommand -from tmtccmd.tm import Service8TM, Service5TM -from tmtccmd.pus.service_list import PusServices +from spacepackets.util import get_printable_data_string, PrintFormats + +from tmtccmd.tm.service_8_functional_cmd import Service8TM +from tmtccmd.tm.service_5_event import Service5TM +from spacepackets.ecss.definitions import PusServices from tmtccmd.tm.base import PusTmInfoInterface, PusTmInterface from tmtccmd.pus.service_8_func_cmd import Srv8Subservices from tmtccmd.tm.definitions import PusIFQueueT @@ -23,7 +26,8 @@ class DisplayMode(enum.Enum): class TmTcPrinter: - """This class handles printing to the command line and to files.""" + """This class handles printing to the command line and to files. + TODO: Introduce file logger""" def __init__( self, display_mode: DisplayMode = DisplayMode.LONG, do_print_to_file: bool = True, print_tc: bool = True @@ -69,7 +73,7 @@ def print_telemetry( LOGGER.warning("Passed packet does not implement necessary interfaces!") return - if packet_if.get_service() == PusServices.SERVICE_5_EVENT: + if packet_if.service == PusServices.SERVICE_5_EVENT: self.__handle_event_packet(cast(Service5TM, packet_if)) if self._display_mode == DisplayMode.SHORT: @@ -79,22 +83,25 @@ def print_telemetry( self.__handle_wiretapping_packet(packet_if=packet_if, info_if=info_if) # Handle special packet types - if packet_if.get_service() == PusServices.SERVICE_3_HOUSEKEEPING: + if packet_if.service == PusServices.SERVICE_3_HOUSEKEEPING: self.handle_service_3_packet(packet_if=packet_if) - if packet_if.get_service() == PusServices.SERVICE_5_EVENT: + if packet_if.service == PusServices.SERVICE_5_EVENT: self.handle_service_5_packet(packet_if=packet_if) - if packet_if.get_service() == PusServices.SERVICE_8_FUNC_CMD and \ - packet_if.get_subservice() == Srv8Subservices.DATA_REPLY: + if packet_if.service == PusServices.SERVICE_8_FUNC_CMD and \ + packet_if.subservice == Srv8Subservices.DATA_REPLY: self.handle_service_8_packet(packet_if=packet_if) if print_raw_tm: - self.__print_buffer = f"TM Data:\n{self.return_data_string(packet_if.pack())}" + tm_data_string = get_printable_data_string( + print_format= PrintFormats.HEX, data=packet_if.pack() + ) + self.__print_buffer = f"TM Data:\n{tm_data_string}" LOGGER.info(self.__print_buffer) self.add_print_buffer_to_file_buffer() def handle_service_3_packet(self, packet_if: PusTmInterface): from tmtccmd.config.hook import get_global_hook_obj - if packet_if.get_service() != 3: + if packet_if.service != 3: LOGGER.warning('This packet is not a service 3 packet!') return hook_obj = get_global_hook_obj() @@ -102,33 +109,33 @@ def handle_service_3_packet(self, packet_if: PusTmInterface): LOGGER.warning('Hook object not set') return srv3_packet = cast(Service3Base, packet_if) - if srv3_packet.has_custom_hk_handling(): + if srv3_packet.custom_hk_handling: (hk_header, hk_content, validity_buffer, num_vars) = \ hook_obj.handle_service_3_housekeeping( - object_id=bytes(), set_id=srv3_packet.get_set_id(), - hk_data=packet_if.get_tm_data(), service3_packet=srv3_packet + object_id=bytes(), set_id=srv3_packet.set_id, + hk_data=packet_if.tm_data, service3_packet=srv3_packet ) else: (hk_header, hk_content, validity_buffer, num_vars) = \ hook_obj.handle_service_3_housekeeping( - object_id=srv3_packet.get_object_id().as_bytes(), set_id=srv3_packet.get_set_id(), - hk_data=packet_if.get_tm_data()[8:], service3_packet=srv3_packet + object_id=srv3_packet.object_id.as_bytes, set_id=srv3_packet.set_id, + hk_data=packet_if.tm_data[8:], service3_packet=srv3_packet ) - if packet_if.get_subservice() == 25 or packet_if.get_subservice() == 26: + if packet_if.subservice == 25 or packet_if.subservice == 26: self.handle_hk_print( - object_id=srv3_packet.get_object_id().get_id(), set_id=srv3_packet.get_set_id(), + object_id=srv3_packet.object_id.as_int, set_id=srv3_packet.set_id, hk_header=hk_header, hk_content=hk_content, validity_buffer=validity_buffer, num_vars=num_vars ) - if packet_if.get_subservice() == 10 or packet_if.get_subservice() == 12: + if packet_if.subservice == 10 or packet_if.subservice == 12: self.handle_hk_definition_print( - object_id=srv3_packet.get_object_id().get_id(), set_id=srv3_packet.get_set_id(), + object_id=srv3_packet.object_id.id, set_id=srv3_packet.set_id, srv3_packet=srv3_packet ) def handle_service_5_packet(self, packet_if: PusTmInterface): from tmtccmd.config.hook import get_global_hook_obj - if packet_if.get_service() != 5: + if packet_if.service != 5: LOGGER.warning('This packet is not a service 5 packet!') return hook_obj = get_global_hook_obj() @@ -137,8 +144,8 @@ def handle_service_5_packet(self, packet_if: PusTmInterface): return srv5_packet = cast(Service5TM, packet_if) custom_string = hook_obj.handle_service_5_event( - object_id=srv5_packet.get_reporter_id_as_bytes(), event_id=srv5_packet.get_event_id(), - param_1=srv5_packet.get_param_1(), param_2=srv5_packet.get_param_2() + object_id=srv5_packet.reporter_id_as_bytes, event_id=srv5_packet.event_id, + param_1=srv5_packet.param_1, param_2=srv5_packet.param_2 ) self.__print_buffer = custom_string LOGGER.info(self.__print_buffer) @@ -146,10 +153,10 @@ def handle_service_5_packet(self, packet_if: PusTmInterface): def handle_service_8_packet(self, packet_if: PusTmInterface): from tmtccmd.config.hook import get_global_hook_obj - if packet_if.get_service() != PusServices.SERVICE_8_FUNC_CMD: + if packet_if.service != PusServices.SERVICE_8_FUNC_CMD: LOGGER.warning('This packet is not a service 8 packet!') return - if packet_if.get_subservice() != Srv8Subservices.DATA_REPLY: + if packet_if.subservice != Srv8Subservices.DATA_REPLY: LOGGER.warning( f'This packet is not data reply packet with ' f'subservice {Srv8Subservices.DATA_REPLY}!' @@ -163,11 +170,11 @@ def handle_service_8_packet(self, packet_if: PusTmInterface): if srv8_packet is None: LOGGER.warning('Service 8 object is not instance of Service8TM') return - obj_id = srv8_packet.get_source_object_id_as_bytes() - action_id = srv8_packet.get_action_id() + obj_id = srv8_packet.source_object_id_as_bytes + action_id = srv8_packet.action_id header_list, content_list = hook_obj.handle_service_8_telemetry( object_id=obj_id, action_id=action_id, - custom_data=srv8_packet.get_custom_data() + custom_data=srv8_packet.custom_data ) obj_id_dict = hook_obj.get_object_ids() rep_str = obj_id_dict.get(bytes(obj_id)) @@ -220,8 +227,8 @@ def handle_hk_definition_print(self, object_id: int, set_id: int, srv3_packet: S ) def __handle_short_print(self, packet_if: PusTmInterface): - self.__print_buffer = "Received TM[" + str(packet_if.get_service()) + "," + str( - packet_if.get_subservice()) + "]" + self.__print_buffer = "Received TM[" + str(packet_if.service) + "," + str( + packet_if.subservice) + "]" LOGGER.info(self.__print_buffer) self.add_print_buffer_to_file_buffer() @@ -239,10 +246,10 @@ def __handle_long_tm_print(self, packet_if: PusTmInterface, info_if: PusTmInfoIn self.__handle_tm_content_print(info_if=info_if) self.__handle_additional_printout(info_if=info_if) except TypeError as error: - LOGGER.warning( + LOGGER.exception( f"Type Error when trying to print TM Packet " - f"[{packet_if.get_service()} , {packet_if.get_subservice()}]") - LOGGER.warning(error) + f"[{packet_if.service} , {packet_if.subservice}]" + ) def __handle_column_header_print(self, info_if: PusTmInfoInterface): header_list = [] @@ -340,14 +347,14 @@ def __handle_wiretapping_packet(self, packet_if: PusTmInterface, info_if: PusTmI :param info_if: Information interface :return: """ - if packet_if.get_service() == 2 and \ - (packet_if.get_subservice() == 131 - or packet_if.get_subservice() == 130): + if packet_if.service == 2 and \ + (packet_if.subservice == 131 + or packet_if.subservice == 130): self.__print_buffer = \ - f"Wiretapping Packet or Raw Reply from TM [{packet_if.get_service()}," \ - f"{packet_if.get_subservice()}]: " + f"Wiretapping Packet or Raw Reply from TM [{packet_if.service}," \ + f"{packet_if.subservice}]: " self.__print_buffer = \ - self.__print_buffer + info_if.return_source_data_string() + self.__print_buffer + info_if.get_source_data_string() LOGGER.info(self.__print_buffer) self.add_print_buffer_to_file_buffer() @@ -446,9 +453,12 @@ def bit_extractor(byte: int, position: int): shift_number = position + (6 - 2 * (position - 1)) return (byte >> shift_number) & 1 - def print_telecommand(self, tc_packet_obj: PusTelecommand, tc_packet_raw: bytearray = bytearray()): + def print_telecommand( + self, tc_packet_obj: PusTelecommand, tc_packet_raw: bytearray = bytes() + ): """ - This function handles the printing of Telecommands. It assumed the packets are sent shortly before or after. + This function handles the printing of Telecommands. It assumed the packets are sent + shortly before or after. :param tc_packet_obj: :param tc_packet_raw: :return: @@ -469,8 +479,8 @@ def _handle_short_tc_print(self, tc_packet_obj: PusTelecommand): :return: """ self.__print_buffer = \ - f"Sent TC[{tc_packet_obj.get_service()}, {tc_packet_obj.get_subservice()}] with SSC " \ - f"{tc_packet_obj.get_ssc()}" + f"Sent TC[{tc_packet_obj.service}, {tc_packet_obj.subservice}] with SSC " \ + f"{tc_packet_obj.ssc}" LOGGER.info(self.__print_buffer) self.add_print_buffer_to_file_buffer() @@ -480,35 +490,24 @@ def _handle_long_tc_print(self, tc_packet_obj: PusTelecommand): :param tc_packet_obj: :return: """ + data_print = get_printable_data_string( + print_format=PrintFormats.HEX, data=tc_packet_obj.app_data + ) try: self.__print_buffer = \ - f"Telecommand TC[{tc_packet_obj.get_service()}, {tc_packet_obj.get_subservice()}] " \ - f"with SSC {tc_packet_obj.get_ssc()} sent with data " \ - f"{self.return_data_string(tc_packet_obj.get_app_data())}" + f"Telecommand TC[{tc_packet_obj.service}, {tc_packet_obj.subservice}] " \ + f"with SSC {tc_packet_obj.ssc} sent with data " \ + f"{data_print}" LOGGER.info(self.__print_buffer) self.add_print_buffer_to_file_buffer() except TypeError as error: LOGGER.error("TMTC Printer: Type Error! Traceback: %s", error) - def print_data(self, byte_array: bytearray): + @staticmethod + def print_data(data: bytes): """ - :param byte_array: + :param data: Data to print :return: None """ - string = self.return_data_string(byte_array) + string = get_printable_data_string(print_format=PrintFormats.HEX, data=data) LOGGER.info(string) - - @staticmethod - def return_data_string(byte_array: bytearray) -> str: - """ - Converts a bytearray to string format for printing - :param byte_array: - :return: - """ - str_to_print = "[" - for byte in byte_array: - str_to_print += str(hex(byte)) + " , " - str_to_print = str_to_print.rstrip(' ') - str_to_print = str_to_print.rstrip(',') - str_to_print += ']' - return str_to_print