diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dca2a8..4e1d40c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - ECSS PUS telemetry time handling is now more generic and low level: Constructors expect a simple `bytes` type while unpackers/readers expect the length of the timestamp. Determining - or knowing the size of the timestamp is now the task of the user. + or knowing the size of the timestamp is now the task of the user, but a helper constant + is expose which can help with this. - `CdsShortTimestamp.from_now` renamed to `now`. +- The ECSS TMTC APID field must not be set explicitely in the class constructors. ## Added @@ -24,6 +26,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). determine the timestamp length from a raw PUS TM packet. - `spacepackets.ccsds.CCSDS_HEADER_LEN` constant. +## Removed + +- Global configuration module for TC and TM APID was removed. + # [v0.23.1] 2024-04-22 ## Added diff --git a/docs/api/ecss.rst b/docs/api/ecss.rst index cc1662c..5b1dbf0 100644 --- a/docs/api/ecss.rst +++ b/docs/api/ecss.rst @@ -6,8 +6,13 @@ ECSS Package :undoc-members: :show-inheritance: +.. automodule:: spacepackets.ecss.defs + :members: + :undoc-members: + :show-inheritance: + ECSS PUS Telecommand Module --------------------------------- +---------------------------- :ref:`api/ecss_tc:ECSS Telecommand Module` @@ -16,17 +21,6 @@ ECSS PUS Telemetry Module :ref:`api/ecss_tm:ECSS Telemetry Module` -ECSS Configuration Submodule ----------------------------------- - -This module can be used to configure common default values so these don't have to be specified -each time anymore when creating ECSS packets - -.. automodule:: spacepackets.ecss.conf - :members: - :undoc-members: - :show-inheritance: - ECSS Fields Submodule ---------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 7f8c9ce..fbbc5ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "spacepackets" description = "Various CCSDS and ECSS packet implementations" readme = "README.md" -version = "0.23.1" +version = "0.24.0" requires-python = ">=3.8" license = {text = "Apache-2.0"} authors = [ diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index be299b8..979c851 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -10,11 +10,13 @@ from spacepackets.exceptions import BytesTooShortError -SPACE_PACKET_HEADER_SIZE: Final = 6 -CCSDS_HEADER_LEN: Final = SPACE_PACKET_HEADER_SIZE -SEQ_FLAG_MASK = 0xC000 -APID_MASK = 0x7FF -PACKET_ID_MASK = 0x1FFF +CCSDS_HEADER_LEN: Final[int] = 6 +SPACE_PACKET_HEADER_SIZE: Final[int] = CCSDS_HEADER_LEN +SEQ_FLAG_MASK: Final[int] = 0xC000 +APID_MASK: Final[int] = 0x7FF +PACKET_ID_MASK: Final[int] = 0x1FFF +MAX_SEQ_COUNT: Final[int] = pow(2, 14) - 1 +MAX_APID: Final[int] = pow(2, 11) - 1 class PacketType(enum.IntEnum): @@ -35,7 +37,7 @@ class PacketSeqCtrl: """ def __init__(self, seq_flags: SequenceFlags, seq_count: int): - if seq_count > pow(2, 14) - 1 or seq_count < 0: + if seq_count > MAX_SEQ_COUNT or seq_count < 0: raise ValueError( f"Sequence count larger than allowed {pow(2, 14) - 1} or negative" ) @@ -206,7 +208,7 @@ def __init__( :param data_len: Contains a length count C that equals one fewer than the length of the packet data field. Should not be larger than 65535 bytes :param ccsds_version: - :param sec_header_flag: Secondary header flag, 1 or True by default + :param sec_header_flag: Secondary header flag, or False by default. :param seq_flags: :raises ValueError: On invalid parameters """ @@ -300,7 +302,7 @@ def seq_flags(self, value): @property def header_len(self) -> int: - return SPACE_PACKET_HEADER_SIZE + return CCSDS_HEADER_LEN @apid.setter def apid(self, apid): @@ -312,7 +314,7 @@ def packet_len(self) -> int: :return: Size of the TM packet based on the space packet header data length field. """ - return SPACE_PACKET_HEADER_SIZE + self.data_len + 1 + return CCSDS_HEADER_LEN + self.data_len + 1 @classmethod def unpack(cls, data: bytes) -> SpacePacketHeader: @@ -501,7 +503,7 @@ def parse_space_packets( # Packet ID detected while True: # Can't even parse CCSDS header. Wait for more data to arrive. - if current_idx + SPACE_PACKET_HEADER_SIZE >= len(concatenated_packets): + if current_idx + CCSDS_HEADER_LEN >= len(concatenated_packets): break current_packet_id = ( struct.unpack("!H", concatenated_packets[current_idx : current_idx + 2])[0] diff --git a/spacepackets/ecss/__init__.py b/spacepackets/ecss/__init__.py index 643cadb..aa34f70 100644 --- a/spacepackets/ecss/__init__.py +++ b/spacepackets/ecss/__init__.py @@ -1,6 +1,6 @@ from crcmod.predefined import mkPredefinedCrcFun -from .tc import PusVersion, PusTelecommand, PusTc, PusTcDataFieldHeader +from .tc import PusTc, PusTelecommand, PusTcDataFieldHeader from .tm import PusTm, PusTelemetry, PusTmSecondaryHeader from .fields import ( PacketFieldEnum, @@ -13,7 +13,7 @@ PfcSigned, PfcUnsigned, ) -from .defs import PusService +from .defs import PusService, PusVersion from .req_id import RequestId from .pus_verificator import PusVerificator diff --git a/spacepackets/ecss/conf.py b/spacepackets/ecss/conf.py deleted file mode 100644 index 0a0c15c..0000000 --- a/spacepackets/ecss/conf.py +++ /dev/null @@ -1,46 +0,0 @@ -import enum - - -#: Can be passed as an APID value to automatically fetch an APID value configured globally for -#: the library -FETCH_GLOBAL_APID = -1 - - -class PusVersion(enum.IntEnum): - # ESA PSS-07-101. Not supported by this package! - ESA_PUS = 0 - # ECSS-E-70-41A - PUS_A = 1 - # ECSS-E-ST-70-41C - PUS_C = 2 - GLOBAL_CONFIG = 98 - UNKNOWN = 99 - - -class EcssConfKeys(enum.IntEnum): - ECSS_TC_APID = 0 - ECSS_TM_APID = 1 - PUS_TM_TYPE = 2 - PUS_TC_TYPE = 3 - - -__ECSS_DICT = { - EcssConfKeys.ECSS_TM_APID: 0x00, - EcssConfKeys.ECSS_TC_APID: 0x00, -} - - -def set_default_tm_apid(tm_apid: int): - __ECSS_DICT[EcssConfKeys.ECSS_TM_APID] = tm_apid - - -def get_default_tm_apid() -> int: - return __ECSS_DICT[EcssConfKeys.ECSS_TM_APID] - - -def set_default_tc_apid(tc_apid: int): - __ECSS_DICT[EcssConfKeys.ECSS_TC_APID] = tc_apid - - -def get_default_tc_apid() -> int: - return __ECSS_DICT[EcssConfKeys.ECSS_TC_APID] diff --git a/spacepackets/ecss/defs.py b/spacepackets/ecss/defs.py index 19cc1c7..958c863 100644 --- a/spacepackets/ecss/defs.py +++ b/spacepackets/ecss/defs.py @@ -1,6 +1,15 @@ import enum +class PusVersion(enum.IntEnum): + # ESA PSS-07-101. Not supported by this package! + ESA_PUS = 0 + # ECSS-E-70-41A + PUS_A = 1 + # ECSS-E-ST-70-41C + PUS_C = 2 + + class PusService(enum.IntEnum): S1_VERIFICATION = 1 S2_RAW_CMD = 2 diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index febc762..252039f 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -3,7 +3,6 @@ from spacepackets import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl -from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -16,11 +15,11 @@ class Subservice(enum.IntEnum): class Service17Tm(AbstractPusTm): def __init__( self, + apid: int, subservice: int, timestamp: bytes, ssc: int = 0, source_data: bytes = bytes(), - apid: int = FETCH_GLOBAL_APID, packet_version: int = 0b000, space_time_ref: int = 0b0000, destination_id: int = 0, @@ -74,7 +73,7 @@ def pack(self) -> bytearray: @classmethod def __empty(cls) -> Service17Tm: - return cls(subservice=0, timestamp=bytes()) + return cls(apid=0, subservice=0, timestamp=bytes()) @classmethod def unpack(cls, data: bytes, timestamp_len: int) -> Service17Tm: diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index 01d0759..73250be 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -9,7 +9,6 @@ from spacepackets.ccsds import SpacePacketHeader from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl from spacepackets.ecss import PusTc -from spacepackets.ecss.conf import FETCH_GLOBAL_APID from spacepackets.ecss.defs import PusService from spacepackets.ecss.fields import PacketFieldEnum from spacepackets.ecss.tm import PusTm, AbstractPusTm @@ -122,11 +121,11 @@ class Service1Tm(AbstractPusTm): def __init__( self, + apid: int, subservice: Subservice, timestamp: bytes, verif_params: Optional[VerificationParams] = None, seq_count: int = 0, - apid: int = FETCH_GLOBAL_APID, packet_version: int = 0b000, space_time_ref: int = 0b0000, destination_id: int = 0, @@ -154,7 +153,7 @@ def pack(self) -> bytearray: @classmethod def __empty(cls) -> Service1Tm: - return cls(subservice=Subservice.INVALID, timestamp=bytes()) + return cls(apid=0, subservice=Subservice.INVALID, timestamp=bytes()) @classmethod def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm: @@ -303,8 +302,11 @@ def __eq__(self, other: object): return False -def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_acceptance_success_tm( + apid: int, pus_tc: PusTc, timestamp: bytes +) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_ACCEPTANCE_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -312,11 +314,13 @@ def create_acceptance_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_acceptance_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_ACCEPTANCE_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -326,8 +330,9 @@ def create_acceptance_failure_tm( ) -def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_start_success_tm(apid: int, pus_tc: PusTc, timestamp: bytes) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_START_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -335,11 +340,13 @@ def create_start_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_start_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_START_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -350,11 +357,13 @@ def create_start_failure_tm( def create_step_success_tm( + apid: int, pus_tc: PusTc, step_id: PacketFieldEnum, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_STEP_SUCCESS, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), step_id=step_id @@ -364,12 +373,14 @@ def create_step_success_tm( def create_step_failure_tm( + apid: int, pus_tc: PusTc, step_id: PacketFieldEnum, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_STEP_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), @@ -380,8 +391,11 @@ def create_step_failure_tm( ) -def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: +def create_completion_success_tm( + apid: int, pus_tc: PusTc, timestamp: bytes +) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_COMPLETION_SUCCESS, verif_params=VerificationParams(RequestId.from_sp_header(pus_tc.sp_header)), timestamp=timestamp, @@ -389,11 +403,13 @@ def create_completion_success_tm(pus_tc: PusTc, timestamp: bytes) -> Service1Tm: def create_completion_failure_tm( + apid: int, pus_tc: PusTc, failure_notice: FailureNotice, timestamp: bytes, ) -> Service1Tm: return Service1Tm( + apid=apid, subservice=Subservice.TM_COMPLETION_FAILURE, verif_params=VerificationParams( req_id=RequestId.from_sp_header(pus_tc.sp_header), diff --git a/spacepackets/ecss/tc.py b/spacepackets/ecss/tc.py index fab2d66..9c9d322 100644 --- a/spacepackets/ecss/tc.py +++ b/spacepackets/ecss/tc.py @@ -15,18 +15,14 @@ from spacepackets.ccsds.spacepacket import ( SpacePacketHeader, PacketType, - SPACE_PACKET_HEADER_SIZE, + CCSDS_HEADER_LEN, SpacePacket, AbstractSpacePacket, PacketId, PacketSeqCtrl, SequenceFlags, ) -from spacepackets.ecss.conf import ( - get_default_tc_apid, - PusVersion, - FETCH_GLOBAL_APID, -) +from spacepackets.ecss.defs import PusVersion class PusTcDataFieldHeader: @@ -123,10 +119,10 @@ class PusTc(AbstractSpacePacket): def __init__( self, + apid: int, service: int, subservice: int, - app_data: bytes = bytes([]), - apid: int = FETCH_GLOBAL_APID, + app_data: bytes = bytes(), seq_count: int = 0, source_id: int = 0, ack_flags: int = 0b1111, @@ -144,8 +140,6 @@ def __init__( different packet sources (e.g. different ground stations) :raises ValueError: Invalid input parameters """ - if apid == FETCH_GLOBAL_APID: - apid = get_default_tc_apid() self.pus_tc_sec_header = PusTcDataFieldHeader( service=service, subservice=subservice, @@ -214,7 +208,7 @@ def from_composite_fields( @classmethod def empty(cls) -> PusTc: - return PusTc(service=0, subservice=0) + return PusTc(apid=0, service=0, subservice=0) def __repr__(self): """Returns the representation of a class instance.""" @@ -286,11 +280,9 @@ def unpack(cls, data: bytes) -> PusTc: tc_unpacked = cls.empty() tc_unpacked.sp_header = SpacePacketHeader.unpack(data=data) tc_unpacked.pus_tc_sec_header = PusTcDataFieldHeader.unpack( - data=data[SPACE_PACKET_HEADER_SIZE:] - ) - header_len = ( - SPACE_PACKET_HEADER_SIZE + tc_unpacked.pus_tc_sec_header.get_header_size() + data=data[CCSDS_HEADER_LEN:] ) + header_len = CCSDS_HEADER_LEN + tc_unpacked.pus_tc_sec_header.get_header_size() expected_packet_len = tc_unpacked.packet_len if len(data) < expected_packet_len: raise BytesTooShortError(expected_packet_len, len(data)) diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index a96ea8c..10b5186 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -25,12 +25,8 @@ AbstractSpacePacket, SequenceFlags, ) +from spacepackets.ecss.defs import PusVersion from spacepackets.ccsds.time import CdsShortTimestamp -from spacepackets.ecss.conf import ( - PusVersion, - get_default_tm_apid, - FETCH_GLOBAL_APID, -) from spacepackets.crc import CRC16_CCITT_FUNC @@ -226,22 +222,19 @@ class PusTm(AbstractPusTm): CDS_SHORT_SIZE = 7 PUS_TIMESTAMP_SIZE = CDS_SHORT_SIZE - # TODO: Supply this constructor as a classmethod in reduced form def __init__( self, + apid: int, service: int, subservice: int, timestamp: bytes, source_data: bytes = bytes(), seq_count: int = 0, - apid: int = FETCH_GLOBAL_APID, message_counter: int = 0, space_time_ref: int = 0b0000, destination_id: int = 0, packet_version: int = 0b000, ): - if apid == FETCH_GLOBAL_APID: - apid = get_default_tm_apid() self._source_data = source_data len_stamp = len(timestamp) data_length = self.data_len_from_src_len_timestamp_len( @@ -270,7 +263,7 @@ def __init__( @classmethod def empty(cls) -> PusTm: return PusTm( - service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() + apid=0, service=0, subservice=0, timestamp=CdsShortTimestamp.empty().pack() ) def pack(self, recalc_crc: bool = True) -> bytearray: @@ -443,9 +436,7 @@ def tm_data(self) -> bytes: @tm_data.setter def tm_data(self, data: bytes): self._source_data = data - stamp_len = 0 - if self.pus_tm_sec_header.timestamp: - stamp_len += len(self.pus_tm_sec_header.timestamp) + stamp_len = len(self.pus_tm_sec_header.timestamp) self.space_packet_header.data_len = self.data_len_from_src_len_timestamp_len( stamp_len, len(data) ) diff --git a/tests/ccsds/test_sp_parser.py b/tests/ccsds/test_sp_parser.py index 19831b5..2e0dfe1 100644 --- a/tests/ccsds/test_sp_parser.py +++ b/tests/ccsds/test_sp_parser.py @@ -8,8 +8,12 @@ class TestSpParser(TestCase): def setUp(self) -> None: + self.def_apid = 0x03 self.tm_packet = PusTm( - service=17, subservice=2, timestamp=CdsShortTimestamp.empty().pack() + apid=self.def_apid, + service=17, + subservice=2, + timestamp=CdsShortTimestamp.empty().pack(), ) self.packet_ids = (self.tm_packet.packet_id,) self.tm_packet_raw = self.tm_packet.pack() @@ -27,6 +31,7 @@ def test_sp_parser(self): def test_sp_parser_crap_data_is_skipped(self): other_larger_packet = PusTm( + apid=self.def_apid, service=8, subservice=128, source_data=bytearray(64), diff --git a/tests/ecss/test_pus_tc.py b/tests/ecss/test_pus_tc.py index 9068a2b..944d243 100644 --- a/tests/ecss/test_pus_tc.py +++ b/tests/ecss/test_pus_tc.py @@ -4,8 +4,7 @@ from spacepackets import SpacePacketHeader, PacketType from spacepackets.ccsds.spacepacket import SequenceFlags -from spacepackets.ecss import PusTc, PusTcDataFieldHeader, check_pus_crc -from spacepackets.ecss.conf import get_default_tc_apid, set_default_tc_apid, PusVersion +from spacepackets.ecss import PusTc, PusTcDataFieldHeader, check_pus_crc, PusVersion from spacepackets.ecss.tc import generate_crc, generate_packet_crc, InvalidTcCrc16 @@ -92,13 +91,10 @@ def test_print(self): print(repr(self.ping_tc)) print(self.ping_tc) - set_default_tc_apid(42) - self.assertTrue(get_default_tc_apid() == 42) - def test_with_app_data(self): test_app_data = bytearray([1, 2, 3]) ping_with_app_data = PusTc( - service=17, subservice=32, seq_count=52, app_data=test_app_data + apid=0, service=17, subservice=32, seq_count=52, app_data=test_app_data ) # 6 bytes CCSDS header, 5 bytes secondary header, 2 bytes CRC, 3 bytes app data self.assertEqual(ping_with_app_data.packet_len, 16) @@ -114,7 +110,7 @@ def test_with_app_data(self): def test_invalid_seq_count(self): with self.assertRaises(ValueError): - PusTc(service=493, subservice=5252, seq_count=99432942) + PusTc(apid=55, service=493, subservice=5252, seq_count=99432942) def test_unpack(self): pus_17_unpacked = PusTc.unpack(data=self.ping_tc_raw) @@ -147,15 +143,15 @@ def test_sec_header(self): self.assertEqual(tc_header_pus_c_raw[2], 2) def test_calc_crc(self): - new_ping_tc = PusTc(service=17, subservice=1) + new_ping_tc = PusTc(apid=27, service=17, subservice=1) self.assertIsNone(new_ping_tc.crc16) new_ping_tc.calc_crc() - self.assertIsNotNone(new_ping_tc.crc16) + assert new_ping_tc.crc16 is not None self.assertTrue(isinstance(new_ping_tc.crc16, bytes)) self.assertEqual(len(new_ping_tc.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tc = PusTc(service=17, subservice=1) + new_ping_tc = PusTc(apid=28, service=17, subservice=1) self.assertIsNone(new_ping_tc.crc16) # Should still calculate CRC tc_raw = new_ping_tc.pack(recalc_crc=False) @@ -172,7 +168,7 @@ def test_from_composite_fields(self): self.assertEqual(pus_17_from_composite_fields.pack(), self.ping_tc.pack()) def test_crc_16(self): - pus_17_telecommand = PusTc(service=17, subservice=1, seq_count=25) + pus_17_telecommand = PusTc(apid=25, service=17, subservice=1, seq_count=25) crc_func = crcmod.mkCrcFun(0x11021, rev=False, initCrc=0xFFFF, xorOut=0x0000) crc = crc_func(pus_17_telecommand.pack()) self.assertTrue(crc == 0) @@ -189,7 +185,7 @@ def test_crc_16(self): self.assertTrue(crc_func(packet_raw) == 0) def test_getter_functions(self): - pus_17_telecommand = PusTc(service=17, subservice=1, seq_count=25) + pus_17_telecommand = PusTc(apid=26, service=17, subservice=1, seq_count=25) self.assertTrue(pus_17_telecommand.seq_count == 25) self.assertTrue(pus_17_telecommand.service == 17) self.assertEqual(pus_17_telecommand.subservice, 1) diff --git a/tests/ecss/test_pus_tm.py b/tests/ecss/test_pus_tm.py index 3b3f84d..7fa6a13 100755 --- a/tests/ecss/test_pus_tm.py +++ b/tests/ecss/test_pus_tm.py @@ -11,13 +11,11 @@ SequenceFlags, SpacePacketHeader, ) -from spacepackets.ecss import check_pus_crc -from spacepackets.ecss.conf import set_default_tm_apid +from spacepackets.ecss import check_pus_crc, PusVersion from spacepackets.util import PrintFormats, get_printable_data_string from spacepackets.ecss.tm import ( PusTm, CdsShortTimestamp, - PusVersion, PusTmSecondaryHeader, InvalidTmCrc16, ) @@ -129,7 +127,6 @@ def test_service_from_raw_invalid(self): self.assertRaises(ValueError, PusTm.service_from_bytes, bytearray()) def test_source_data_string_getters(self): - set_default_tm_apid(0x22) source_data = bytearray([0x42, 0x38]) self.ping_reply.tm_data = source_data self.assertEqual( @@ -226,15 +223,15 @@ def test_unpack(self): PusTm.unpack(data=self.ping_reply_raw, timestamp_len=len(TEST_STAMP)) def test_calc_crc(self): - new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) + new_ping_tm = PusTm(apid=0, service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) new_ping_tm.calc_crc() - self.assertIsNotNone(new_ping_tm.crc16) + assert new_ping_tm.crc16 is not None self.assertTrue(isinstance(new_ping_tm.crc16, bytes)) self.assertEqual(len(new_ping_tm.crc16), 2) def test_crc_always_calced_if_none(self): - new_ping_tm = PusTm(service=17, subservice=2, timestamp=TEST_STAMP) + new_ping_tm = PusTm(apid=0, service=17, subservice=2, timestamp=TEST_STAMP) self.assertIsNone(new_ping_tm.crc16) # Should still calculate CRC tc_raw = new_ping_tm.pack(recalc_crc=False) diff --git a/tests/ecss/test_srv1.py b/tests/ecss/test_srv1.py index cd0b6f7..9e0e60f 100644 --- a/tests/ecss/test_srv1.py +++ b/tests/ecss/test_srv1.py @@ -26,8 +26,11 @@ class Service1TmTest(TestCase): def setUp(self) -> None: - ping_tc = PusTc(service=17, subservice=1) - self.srv1_tm = create_start_success_tm(ping_tc, TEST_STAMP) + self.def_apid = 0x02 + ping_tc = PusTc(apid=self.def_apid, service=17, subservice=1) + self.srv1_tm = create_start_success_tm( + apid=self.def_apid, pus_tc=ping_tc, timestamp=TEST_STAMP + ) def test_failure_notice_invalid_creation(self): with self.assertRaises(ValueError): @@ -92,21 +95,28 @@ def test_service_1_tm_completion_success(self): self._generic_test_srv_1_success(Subservice.TM_COMPLETION_SUCCESS) def _generic_test_srv_1_success(self, subservice: Subservice): - pus_tc = PusTc(service=17, subservice=1) + pus_tc = PusTc(apid=self.def_apid, service=17, subservice=1) helper_created = None step_id = None if subservice == Subservice.TM_ACCEPTANCE_SUCCESS: - helper_created = create_acceptance_success_tm(pus_tc, TEST_STAMP) + helper_created = create_acceptance_success_tm( + apid=self.def_apid, pus_tc=pus_tc, timestamp=TEST_STAMP + ) elif subservice == Subservice.TM_START_SUCCESS: - helper_created = create_start_success_tm(pus_tc, TEST_STAMP) + helper_created = create_start_success_tm(self.def_apid, pus_tc, TEST_STAMP) elif subservice == Subservice.TM_STEP_SUCCESS: step_id = PacketFieldEnum.with_byte_size(1, 4) - helper_created = create_step_success_tm(pus_tc, step_id, TEST_STAMP) + helper_created = create_step_success_tm( + self.def_apid, pus_tc, step_id, TEST_STAMP + ) elif subservice == Subservice.TM_COMPLETION_SUCCESS: - helper_created = create_completion_success_tm(pus_tc, timestamp=TEST_STAMP) + helper_created = create_completion_success_tm( + self.def_apid, pus_tc, timestamp=TEST_STAMP + ) self._test_srv_1_success_tm( pus_tc, Service1Tm( + apid=self.def_apid, subservice=subservice, verif_params=VerificationParams( req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), @@ -155,7 +165,7 @@ def _test_srv_1_success_tm( self.assertEqual(srv_1_tm_unpacked.step_id, step_id) def _generic_test_srv_1_failure(self, subservice: Subservice): - pus_tc = PusTc(service=17, subservice=1) + pus_tc = PusTc(apid=self.def_apid, service=17, subservice=1) failure_notice = FailureNotice( code=PacketFieldEnum.with_byte_size(1, 8), data=bytes([2, 4]) ) @@ -163,13 +173,16 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): step_id = None if subservice == Subservice.TM_ACCEPTANCE_FAILURE: helper_created = create_acceptance_failure_tm( - pus_tc, failure_notice, TEST_STAMP + self.def_apid, pus_tc, failure_notice, TEST_STAMP ) elif subservice == Subservice.TM_START_FAILURE: - helper_created = create_start_failure_tm(pus_tc, failure_notice, TEST_STAMP) + helper_created = create_start_failure_tm( + self.def_apid, pus_tc, failure_notice, TEST_STAMP + ) elif subservice == Subservice.TM_STEP_FAILURE: step_id = PacketFieldEnum.with_byte_size(2, 12) helper_created = create_step_failure_tm( + self.def_apid, pus_tc, failure_notice=failure_notice, step_id=step_id, @@ -177,11 +190,12 @@ def _generic_test_srv_1_failure(self, subservice: Subservice): ) elif subservice == Subservice.TM_COMPLETION_FAILURE: helper_created = create_completion_failure_tm( - pus_tc, failure_notice, TEST_STAMP + self.def_apid, pus_tc, failure_notice, TEST_STAMP ) self._test_srv_1_failure_comparison_helper( pus_tc, Service1Tm( + apid=self.def_apid, subservice=subservice, verif_params=VerificationParams( req_id=RequestId(pus_tc.packet_id, pus_tc.packet_seq_control), diff --git a/tests/ecss/test_srv17.py b/tests/ecss/test_srv17.py index 34414bb..a6ca240 100644 --- a/tests/ecss/test_srv17.py +++ b/tests/ecss/test_srv17.py @@ -9,7 +9,8 @@ class TestSrv17Tm(TestCase): def setUp(self) -> None: - self.srv17_tm = Service17Tm(subservice=1, timestamp=bytes()) + self.def_apid = 0x05 + self.srv17_tm = Service17Tm(apid=self.def_apid, subservice=1, timestamp=bytes()) self.srv17_tm.pus_tm.apid = 0x72 self.time_stamp_provider = generic_time_provider_mock(TEST_STAMP) @@ -29,6 +30,7 @@ def test_state(self): def test_other_state(self): srv17_with_data = Service17Tm( + apid=self.def_apid, subservice=128, timestamp=CdsShortTimestamp(0, 0).pack(), source_data=bytes([0, 1, 2]), @@ -40,7 +42,7 @@ def test_other_state(self): ) def test_service_17_tm(self): - srv_17_tm = Service17Tm(subservice=2, timestamp=TEST_STAMP) + srv_17_tm = Service17Tm(apid=self.def_apid, subservice=2, timestamp=TEST_STAMP) self.assertEqual(srv_17_tm.pus_tm.subservice, 2) srv_17_tm_raw = srv_17_tm.pack() srv_17_tm_unpacked = Service17Tm.unpack( diff --git a/tests/test_pus_verificator.py b/tests/test_pus_verificator.py index 08a6c24..a868885 100644 --- a/tests/test_pus_verificator.py +++ b/tests/test_pus_verificator.py @@ -25,37 +25,44 @@ class SuccessSet: - def __init__(self, pus_tc: PusTc): + def __init__(self, apid: int, pus_tc: PusTc): self.pus_tc = pus_tc self.req_id = RequestId.from_pus_tc(pus_tc) self.empty_stamp = CdsShortTimestamp.empty() - self.acc_suc_tm = create_acceptance_success_tm(pus_tc, self.empty_stamp.pack()) - self.sta_suc_tm = create_start_success_tm(pus_tc, self.empty_stamp.pack()) + self.acc_suc_tm = create_acceptance_success_tm( + apid, pus_tc, self.empty_stamp.pack() + ) + self.sta_suc_tm = create_start_success_tm(apid, pus_tc, self.empty_stamp.pack()) self.ste_suc_tm = create_step_success_tm( + apid=apid, pus_tc=pus_tc, step_id=StepId.with_byte_size(1, 1), timestamp=self.empty_stamp.pack(), ) - self.fin_suc_tm = create_completion_success_tm(pus_tc, self.empty_stamp.pack()) + self.fin_suc_tm = create_completion_success_tm( + apid, pus_tc, self.empty_stamp.pack() + ) class FailureSet: - def __init__(self, pus_tc: PusTc, failure_notice: FailureNotice): - self.suc_set = SuccessSet(pus_tc) + def __init__(self, apid: int, pus_tc: PusTc, failure_notice: FailureNotice): + self.suc_set = SuccessSet(apid, pus_tc) self.failure_notice = failure_notice self.acc_fail_tm = create_acceptance_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() + apid, pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.sta_fail_tm = create_start_failure_tm( - pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() + apid, pus_tc, self.failure_notice, CdsShortTimestamp.empty().pack() ) self.ste_fail_tm = create_step_failure_tm( + apid, pus_tc, failure_notice=self.failure_notice, step_id=StepId.with_byte_size(1, 1), timestamp=CdsShortTimestamp.empty().pack(), ) self.fin_fail_tm = create_completion_failure_tm( + apid, failure_notice=self.failure_notice, pus_tc=self.suc_set.pus_tc, timestamp=CdsShortTimestamp.empty().pack(), @@ -72,10 +79,13 @@ def req_id(self): class TestPusVerificator(TestCase): def setUp(self) -> None: + self.def_apid = 0x06 self.pus_verificator = PusVerificator() def test_basic(self): - suc_set = SuccessSet(PusTc(service=17, subservice=1)) + suc_set = SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ) self.pus_verificator.add_tc(suc_set.pus_tc) check_res = self.pus_verificator.add_tm(suc_set.acc_suc_tm) self.assertEqual(check_res.completed, False) @@ -104,30 +114,48 @@ def test_basic(self): ) def test_complete_verification_clear_completed(self): - self._regular_success_seq(SuccessSet(PusTc(service=17, subservice=1))) + self._regular_success_seq( + SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ), + ) self.pus_verificator.remove_completed_entries() self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_clear_completed_multi(self): self._regular_success_seq( - SuccessSet(PusTc(service=17, subservice=1, seq_count=0)) + SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + ) ) self._regular_success_seq( - SuccessSet(PusTc(service=5, subservice=4, seq_count=1)) + SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=5, subservice=4, seq_count=1), + ) ) self.pus_verificator.remove_completed_entries() self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_remove_manually(self): - suc_set = SuccessSet(PusTc(service=17, subservice=1)) + suc_set = SuccessSet( + self.def_apid, PusTc(apid=self.def_apid, service=17, subservice=1) + ) self._regular_success_seq(suc_set) self.assertTrue(self.pus_verificator.remove_entry(suc_set.req_id)) self.assertEqual(len(self.pus_verificator.verif_dict), 0) def test_complete_verification_multi_remove_manually(self): - set_0 = SuccessSet(PusTc(service=17, subservice=1, seq_count=0)) + set_0 = SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + ) self._regular_success_seq(set_0) - set_1 = SuccessSet(PusTc(service=5, subservice=4, seq_count=1)) + set_1 = SuccessSet( + self.def_apid, + PusTc(apid=self.def_apid, service=5, subservice=4, seq_count=1), + ) self._regular_success_seq(set_1) self.assertTrue(self.pus_verificator.remove_entry(set_0.req_id)) self.assertEqual(len(self.pus_verificator.verif_dict), 1) @@ -136,7 +164,11 @@ def test_complete_verification_multi_remove_manually(self): def test_acceptance_failure(self): notice = FailureNotice(ErrorCode.with_byte_size(1, 8), data=bytes([0, 1])) - fail_set = FailureSet(PusTc(service=17, subservice=1, seq_count=0), notice) + fail_set = FailureSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + notice, + ) self.assertTrue(self.pus_verificator.add_tc(fail_set.pus_tc)) status = self.pus_verificator.add_tm(fail_set.acc_fail_tm) self.assertIsNotNone(status) @@ -153,7 +185,11 @@ def test_acceptance_failure(self): def test_step_failure(self): notice = FailureNotice(ErrorCode.with_byte_size(1, 8), data=bytes([0, 1])) - fail_set = FailureSet(PusTc(service=17, subservice=1, seq_count=0), notice) + fail_set = FailureSet( + self.def_apid, + PusTc(apid=self.def_apid, service=17, subservice=1, seq_count=0), + notice, + ) self.assertTrue(self.pus_verificator.add_tc(fail_set.pus_tc)) status = self.pus_verificator.add_tm(fail_set.suc_set.acc_suc_tm) self.assertIsNotNone(status)