diff --git a/src/tmtccmd/ccsds/spacepacket.py b/src/tmtccmd/ccsds/spacepacket.py index 1a8753c9..336f0551 100644 --- a/src/tmtccmd/ccsds/spacepacket.py +++ b/src/tmtccmd/ccsds/spacepacket.py @@ -1,5 +1,4 @@ import enum -from collections import deque from typing import Tuple, Deque, List from tmtccmd.ccsds.log import LOGGER diff --git a/src/tmtccmd/cfdp/lv.py b/src/tmtccmd/cfdp/lv.py index ff6d48a3..5e37da91 100644 --- a/src/tmtccmd/cfdp/lv.py +++ b/src/tmtccmd/cfdp/lv.py @@ -1,35 +1,39 @@ +from __future__ import annotations from tmtccmd.ccsds.log import LOGGER class CfdpLv: - def __init__(self,serialize: bool, value: bytearray = None): + def __init__(self, value: bytearray): """This class encapsulates CFDP LV fields :raise ValueError: If value is invalid and serilization is enabled or if length of bytearray is too large :param serialize: :param value: """ - if serialize and value is None: - LOGGER.warning('All parameter have to be valid for serialization') - raise ValueError - if len > 255: + if len(value) > 255: LOGGER.warning('Length too large for LV field') raise ValueError self.len = len(value) self.value = value + def get_len(self): + """Returns length of full LV packet""" + return self.len + 1 + def pack(self): packet = bytearray() packet.append(self.len) packet.extend(self.value) - def unpack(self, raw_bytes: bytearray): + @classmethod + def unpack(cls, raw_bytes: bytearray) -> CfdpLv: """Parses LV field at the start of the given bytearray - :raise ValueError: Invalid lenght found + :raise ValueError: Invalid length found """ - self.len = raw_bytes[0] - if self.len > 255: + detected_len = raw_bytes[0] + if detected_len > 255: LOGGER.warning('Length too large for LV field') raise ValueError - elif self.len > 0: - self.value = raw_bytes[1: 1 + self.len] + return cls( + value=raw_bytes[1:] + ) diff --git a/src/tmtccmd/cfdp/pdu/ack.py b/src/tmtccmd/cfdp/pdu/ack.py index b6bef03a..cd9d8e30 100644 --- a/src/tmtccmd/cfdp/pdu/ack.py +++ b/src/tmtccmd/cfdp/pdu/ack.py @@ -1,3 +1,4 @@ +from __future__ import annotations import enum import struct @@ -20,7 +21,6 @@ class TransactionStatus(enum.IntEnum): class AckPdu(): def __init__( self, - serialize: bool, directive_code_of_acked_pdu: DirectiveCodes, condition_code_of_acked_pdu: ConditionCode, transaction_status: TransactionStatus, @@ -31,7 +31,6 @@ def __init__( len_transaction_seq_num=LenInBytes.NONE, ): self.pdu_file_directive = FileDirectivePduBase( - serialize=serialize, directive_code=DirectiveCodes.ACK_PDU, direction=direction, trans_mode=trans_mode, @@ -48,16 +47,29 @@ def __init__( self.condition_code_of_acked_pdu = condition_code_of_acked_pdu self.transaction_status = transaction_status + @classmethod + def __empty(cls) -> AckPdu: + return cls( + directive_code_of_acked_pdu=None, + condition_code_of_acked_pdu=None, + transaction_status=None, + direction=None, + trans_mode=None + ) + def pack(self): packet = self.pdu_file_directive.pack() packet.append((self.directive_code_of_acked_pdu << 4) | self.directive_subtype_code) packet.append((self.condition_code_of_acked_pdu << 4) | self.transaction_status) - def unpack(self, raw_packet: bytearray): - self.pdu_file_directive.unpack(raw_packet=raw_packet) - current_idx = self.pdu_file_directive.get_len() - self.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0 - self.directive_subtype_code = raw_packet[current_idx] & 0x0f + @classmethod + def unpack(cls, raw_packet: bytearray) -> AckPdu: + ack_packet = cls.__empty() + ack_packet.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet) + current_idx = ack_packet.pdu_file_directive.get_len() + ack_packet.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0 + ack_packet.directive_subtype_code = raw_packet[current_idx] & 0x0f current_idx += 1 - self.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0 - self.transaction_status = raw_packet[current_idx] & 0x03 + ack_packet.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0 + ack_packet.transaction_status = raw_packet[current_idx] & 0x03 + return ack_packet diff --git a/src/tmtccmd/cfdp/pdu/eof.py b/src/tmtccmd/cfdp/pdu/eof.py index 1a59403a..036fcd7e 100644 --- a/src/tmtccmd/cfdp/pdu/eof.py +++ b/src/tmtccmd/cfdp/pdu/eof.py @@ -1,3 +1,4 @@ +from __future__ import annotations import struct from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, \ @@ -13,7 +14,6 @@ class EofPdu(): def __init__( self, - serialize: bool, file_checksum: int, file_size: int, direction: Direction, @@ -25,7 +25,6 @@ def __init__( len_transaction_seq_num=LenInBytes.NONE, ): self.pdu_file_directive = FileDirectivePduBase( - serialize=serialize, directive_code=DirectiveCodes.EOF_PDU, direction=direction, trans_mode=trans_mode, @@ -38,6 +37,15 @@ def __init__( self.file_size = file_size self.fault_location = fault_location + @classmethod + def __empty(cls) -> EofPdu: + cls( + file_checksum=None, + file_size=None, + direction=None, + trans_mode=None + ) + def pack(self) -> bytearray: eof_pdu = bytearray() eof_pdu.extend(self.pdu_file_directive.pack()) @@ -51,35 +59,29 @@ def pack(self) -> bytearray: eof_pdu.extend(self.fault_location.pack()) return eof_pdu - def unpack(self, raw_packet: bytearray): + @classmethod + def unpack(cls, raw_packet: bytearray) -> EofPdu: """Deserialize raw EOF PDU packet :param raw_packet: :raise ValueError: If raw packet is too short :return: """ - self.pdu_file_directive.unpack(raw_packet=raw_packet) - expected_min_len = self.MINIMAL_LENGTH + eof_pdu = cls.__empty() + eof_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet) + expected_min_len = cls.MINIMAL_LENGTH if not check_packet_length(raw_packet_len=len(raw_packet), min_len=expected_min_len): raise ValueError - current_idx = self.pdu_file_directive.get_len() - self.condition_code = raw_packet[current_idx] & 0xf0 - expected_min_len = self.pdu_file_directive.get_len() + 5 + current_idx = eof_pdu.pdu_file_directive.get_len() + eof_pdu.condition_code = raw_packet[current_idx] & 0xf0 + expected_min_len = eof_pdu.pdu_file_directive.get_len() + 5 current_idx += 1 checksum_raw = raw_packet[current_idx: current_idx + 4] - self.file_checksum = struct.unpack('!I', checksum_raw)[0] + eof_pdu.file_checksum = struct.unpack('!I', checksum_raw)[0] current_idx += 4 - if self.pdu_file_directive.pdu_header.large_file: - if not check_packet_length(raw_packet_len=len(raw_packet), min_len=current_idx + 8): - raise ValueError - filesize_raw = checksum_raw = \ - raw_packet[current_idx: current_idx + 8] - current_idx += 8 - self.file_checksum = struct.unpack('!Q', filesize_raw)[0] - else: - filesize_raw = checksum_raw = \ - raw_packet[current_idx: current_idx + 4] - current_idx += 4 - self.file_checksum = struct.unpack('!I', filesize_raw)[0] + current_idx, eof_pdu.file_size = eof_pdu.pdu_file_directive.parse_fss_field( + raw_packet=raw_packet, current_idx=current_idx + ) if len(raw_packet) > current_idx: - self.fault_location = CfdpTlv(serialize=False) - self.fault_location.unpack(raw_bytes=raw_packet[current_idx:]) + eof_pdu.fault_location = CfdpTlv(serialize=False) + eof_pdu.fault_location.unpack(raw_bytes=raw_packet[current_idx:]) + return eof_pdu diff --git a/src/tmtccmd/cfdp/pdu/file_directive.py b/src/tmtccmd/cfdp/pdu/file_directive.py index 8033ee12..4cb3ec9b 100644 --- a/src/tmtccmd/cfdp/pdu/file_directive.py +++ b/src/tmtccmd/cfdp/pdu/file_directive.py @@ -1,4 +1,7 @@ +from __future__ import annotations import enum +import struct + from tmtccmd.cfdp.pdu.header import PduHeader, PduType, Direction, CrcFlag, TransmissionModes from tmtccmd.cfdp.conf import check_packet_length from tmtccmd.cfdp.definitions import LenInBytes @@ -39,26 +42,33 @@ class FileDirectivePduBase: """ def __init__( self, - serialize: bool, - directive_code: DirectiveCodes = None, + directive_code: DirectiveCodes, # PDU Header parameters - direction: Direction = None, - trans_mode: TransmissionModes = None, - crc_flag: CrcFlag = None, + direction: Direction, + trans_mode: TransmissionModes, + crc_flag: CrcFlag, len_entity_id: LenInBytes = LenInBytes.NONE, len_transaction_seq_num: LenInBytes = LenInBytes.NONE, ): - if serialize: - if directive_code is None: - LOGGER.warning('Some mandatory fields were not specified for serialization') - raise ValueError self.pdu_header = PduHeader( - serialize=serialize, pdu_type=PduType.FILE_DIRECTIVE, direction=direction, - trans_mode=trans_mode, crc_flag=crc_flag, len_entity_id=len_entity_id, + pdu_type=PduType.FILE_DIRECTIVE, + direction=direction, + trans_mode=trans_mode, + crc_flag=crc_flag, + len_entity_id=len_entity_id, len_transaction_seq_num=len_transaction_seq_num ) self.directive_code = directive_code + @classmethod + def __empty(cls) -> FileDirectivePduBase: + return cls( + directive_code=None, + direction=None, + trans_mode=None, + crc_flag=None + ) + def get_len(self) -> int: return self.FILE_DIRECTIVE_PDU_LEN @@ -68,16 +78,19 @@ def pack(self) -> bytearray: data.append(self.directive_code) return data - def unpack(self, raw_packet: bytearray): + @classmethod + def unpack(cls, raw_packet: bytearray) -> FileDirectivePduBase: """Unpack a raw bytearray into the File Directive PDU object representation :param raw_bytes: :raise ValueError: Passed bytearray is too short. :return: """ - self.pdu_header.unpack(raw_bytes=raw_packet) + file_directive = cls.__empty() + file_directive.pdu_header = PduHeader.unpack(raw_packet=raw_packet) if not check_packet_length(raw_packet_len=len(raw_packet), min_len=5): raise ValueError - self.directive_code = raw_packet[4] + file_directive.directive_code = raw_packet[4] + return file_directive def verify_file_len(self, file_size: int) -> bool: if self.pdu_file_directive.pdu_header.large_file and file_size > pow(2, 64): @@ -87,3 +100,20 @@ def verify_file_len(self, file_size: int) -> bool: LOGGER.warning(f'File size {file_size} larger than 32 bit field') raise False return True + + def parse_fss_field(self, raw_packet: bytearray, current_idx: int) -> (int, int): + """Parse the FSS field, which has different size depending on the large file flag being + set or not. Returns the current index incremented and the parsed file size + :raise ValueError: Packet not large enough + """ + if self.pdu_header.large_file: + if not check_packet_length(len(raw_packet), current_idx + 8 + 1): + raise ValueError + file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 8]) + current_idx += 8 + else: + if not check_packet_length(len(raw_packet), current_idx + 4 + 1): + raise ValueError + file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 4]) + current_idx += 4 + return current_idx, file_size diff --git a/src/tmtccmd/cfdp/pdu/finished.py b/src/tmtccmd/cfdp/pdu/finished.py index d934bc9a..36831bc1 100644 --- a/src/tmtccmd/cfdp/pdu/finished.py +++ b/src/tmtccmd/cfdp/pdu/finished.py @@ -1,3 +1,4 @@ +from __future__ import annotations import enum from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, Direction, \ @@ -25,7 +26,6 @@ class FinishedPdu(): def __init__( self, - serialize: bool, direction: Direction, delivery_code: DeliveryCode, file_delivery_status: FileDeliveryStatus, @@ -38,7 +38,6 @@ def __init__( len_transaction_seq_num: LenInBytes = LenInBytes.NONE ): self.pdu_file_directive = FileDirectivePduBase( - serialize=serialize, directive_code=DirectiveCodes.FINISHED_PDU, direction=direction, trans_mode=trans_mode, @@ -53,9 +52,19 @@ def __init__( self.file_delivery_status = file_delivery_status self.might_have_fault_location = False if self.condition_code != ConditionCode.NO_ERROR and \ - self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE: + self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE: self.might_have_fault_location = True + @classmethod + def __empty(cls) -> FinishedPdu: + cls( + direction=None, + delivery_code=None, + file_delivery_status=None, + trans_mode=None, + condition_code=None + ) + def pack(self) -> bytearray: packet = self.pdu_file_directive.pack() packet[len(packet)] |= \ @@ -66,23 +75,26 @@ def pack(self) -> bytearray: packet.extend(self.fault_location.pack()) return packet - def unpack(self, raw_packet: bytearray): + @classmethod + def unpack(cls, raw_packet: bytearray) -> FinishedPdu: """Unpack a raw packet into a PDU object :param raw_packet: :raise ValueError: If packet is too short :return: """ - self.pdu_file_directive.unpack(raw_bytes=raw_packet) - if not check_packet_length(raw_packet_len=len(raw_packet), min_len=self.MINIMAL_LEN): + finished_pdu = cls.__empty() + finished_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet) + if not check_packet_length(raw_packet_len=len(raw_packet), min_len=cls.MINIMAL_LEN): raise ValueError - current_idx = self.pdu_file_directive.get_len() + current_idx = finished_pdu.pdu_file_directive.get_len() first_param_byte = raw_packet[current_idx] - self.condition_code = first_param_byte & 0xf0 - self.delivery_code = first_param_byte & 0x04 - self.file_delivery_status = first_param_byte & 0b11 + finished_pdu.condition_code = first_param_byte & 0xf0 + finished_pdu.delivery_code = first_param_byte & 0x04 + finished_pdu.file_delivery_status = first_param_byte & 0b11 current_idx += 1 if len(raw_packet) > current_idx: - self.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx) + finished_pdu.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx) + return finished_pdu def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int: current_idx = start_idx @@ -91,7 +103,7 @@ def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int: current_tlv.unpack(raw_bytes=raw_packet[current_idx]) # This will always increment at least two, so we can't get stuck in the loop current_idx += current_tlv.get_total_length() - if start_idx > len(raw_packet) or current_idx == len(raw_packet): + if current_idx > len(raw_packet) or current_idx == len(raw_packet): if current_idx > len(raw_packet): LOGGER.warning( 'Parser Error when parsing TLVs in Finished PDU. Possibly invalid packet' diff --git a/src/tmtccmd/cfdp/pdu/header.py b/src/tmtccmd/cfdp/pdu/header.py index 7a6adff7..920b1e0e 100644 --- a/src/tmtccmd/cfdp/pdu/header.py +++ b/src/tmtccmd/cfdp/pdu/header.py @@ -1,4 +1,6 @@ +from __future__ import annotations import enum + from tmtccmd.ccsds.handler import CcsdsTmHandler from tmtccmd.utility.logger import get_console_logger from tmtccmd.cfdp.definitions import LenInBytes @@ -49,10 +51,9 @@ class PduHeader: For more, information, refer to CCSDS 727.0-B-5 p.75""" def __init__( self, - serialize: bool, - pdu_type: PduType = None, - direction: Direction = None, - trans_mode: TransmissionModes = None, + pdu_type: PduType, + direction: Direction, + trans_mode: TransmissionModes, crc_flag: CrcFlag = CrcFlag.GLOBAL_CONFIG, len_entity_id: LenInBytes = LenInBytes.NONE, len_transaction_seq_num: LenInBytes = LenInBytes.NONE, @@ -71,10 +72,6 @@ def __init__( :param segment_metadata_flag: :raise ValueError: If some field were not specified with serialize == True """ - if serialize: - if pdu_type is None or direction is None or trans_mode is None or crc_flag is None: - LOGGER.warning('Some mandatory fields were not specified for serialization') - raise ValueError self.pdu_type = pdu_type self.direction = direction self.trans_mode = trans_mode @@ -115,7 +112,8 @@ def pack(self) -> bytearray: ) return header - def unpack(self, raw_packet: bytearray): + @classmethod + def unpack(cls, raw_packet: bytearray) -> PduHeader: """Unpack a raw bytearray into the PDU header oject representation :param raw_packet: :raise ValueError: Passed bytearray is too short. @@ -124,13 +122,26 @@ def unpack(self, raw_packet: bytearray): if len(raw_packet) < 4: LOGGER.warning('Can not unpack less than four bytes into PDU header') raise ValueError - self.pdu_type = raw_packet[0] & 0x10 - self.direction = raw_packet[0] & 0x08 - self.trans_mode = raw_packet[0] & 0x04 - self.crc_flag = raw_packet[0] & 0x02 - self.large_file = raw_packet[0] & 0x01 - self.pdu_data_field_length = raw_packet[1] << 8 | raw_packet[2] - self.segmentation_control = raw_packet[3] & 0x80 - self.len_entity_id = raw_packet[3] & 0x70 - self.segment_metadata_flag = raw_packet[3] & 0x08 - self.len_transaction_seq_num = raw_packet[3] & 0x01 + pdu_type = raw_packet[0] & 0x10 + direction = raw_packet[0] & 0x08 + trans_mode = raw_packet[0] & 0x04 + crc_flag = raw_packet[0] & 0x02 + large_file = raw_packet[0] & 0x01 + pdu_data_field_length = raw_packet[1] << 8 | raw_packet[2] + segmentation_control = raw_packet[3] & 0x80 + len_entity_id = raw_packet[3] & 0x70 + segment_metadata_flag = raw_packet[3] & 0x08 + len_transaction_seq_num = raw_packet[3] & 0x01 + + return cls( + pdu_type=pdu_type, + direction=direction, + trans_mode=trans_mode, + crc_flag=crc_flag, + large_file=large_file, + pdu_data_field_length=pdu_data_field_length, + segmentation_control=segmentation_control, + len_entity_id=len_entity_id, + segment_metadata_flag=segment_metadata_flag, + len_transaction_seq_num=len_transaction_seq_num + ) diff --git a/src/tmtccmd/cfdp/pdu/metadata.py b/src/tmtccmd/cfdp/pdu/metadata.py index bc56dfe5..ec733859 100644 --- a/src/tmtccmd/cfdp/pdu/metadata.py +++ b/src/tmtccmd/cfdp/pdu/metadata.py @@ -1,3 +1,4 @@ +from __future__ import annotations import struct from typing import List @@ -14,7 +15,6 @@ class MetadataPdu(): def __init__( self, - serialize: bool, closure_requested: bool, checksum_type: ChecksumTypes, file_size: int, @@ -28,7 +28,6 @@ def __init__( len_transaction_seq_num=LenInBytes.NONE, ): self.pdu_file_directive = FileDirectivePduBase( - serialize=serialize, directive_code=DirectiveCodes.METADATA_PDU, direction=direction, trans_mode=trans_mode, @@ -39,20 +38,28 @@ def __init__( self.closure_requested = closure_requested self.checksum_type = checksum_type self.file_size = file_size - source_file_name_as_bytes = source_file_name - if serialize: - source_file_name_as_bytes = source_file_name.encode('utf-8') + source_file_name_as_bytes = source_file_name.encode('utf-8') self.source_file_name_lv = CfdpLv( - serialize=serialize, value=source_file_name_as_bytes + value=source_file_name_as_bytes ) - dest_file_name_as_bytes = dest_file_name - if serialize: - dest_file_name_as_bytes = dest_file_name.encode('utf-8') + dest_file_name_as_bytes = dest_file_name.encode('utf-8') self.dest_file_name_lv = CfdpLv( - serialize=serialize, value=dest_file_name_as_bytes + value=dest_file_name_as_bytes ) self.options = options + @classmethod + def __empty(cls) -> MetadataPdu: + cls( + closure_requested=None, + checksum_type=None, + file_size=None, + source_file_name="", + dest_file_name="", + direction=None, + trans_mode=None + ) + def pack(self): if not self.pdu_file_directive.verify_file_len(self.file_size): raise ValueError @@ -68,5 +75,42 @@ def pack(self): for option in self.options: packet.extend(option.pack()) - def unpack(self, raw_packet: bytearray): - pass + @classmethod + def unpack(cls, raw_packet: bytearray) -> MetadataPdu: + metadata_pdu = cls.__empty() + metadata_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet) + current_idx = metadata_pdu.pdu_file_directive.get_len() + # Minimal length: 1 byte + FSS (4 byte) + 2 empty LV (1 byte) + if not check_packet_length(len(raw_packet), metadata_pdu.pdu_file_directive.get_len() + 7): + raise ValueError + metadata_pdu.closure_requested = raw_packet[current_idx] & 0x40 + metadata_pdu.checksum_type = raw_packet[current_idx] & 0x0f + current_idx += 1 + current_idx, metadata_pdu.file_size = metadata_pdu.pdu_file_directive.parse_fss_field( + raw_packet=raw_packet, current_idx=current_idx + ) + metadata_pdu.source_file_name_lv.unpack(raw_bytes=raw_packet[current_idx:]) + current_idx += metadata_pdu.source_file_name_lv.get_len() + metadata_pdu.dest_file_name_lv.unpack(raw_bytes=raw_packet[current_idx:]) + current_idx += metadata_pdu.dest_file_name_lv.get_len() + if current_idx < len(raw_packet): + metadata_pdu.parse_options(raw_packet=raw_packet, start_idx=current_idx) + return metadata_pdu + + def parse_options(self, raw_packet: bytearray, start_idx: int): + self.options = [] + current_idx = start_idx + while True: + current_tlv = CfdpTlv(serialize=False) + current_tlv.unpack(raw_bytes=raw_packet[current_idx]) + self.options.append(current_tlv) + # This will always increment at least two, so we can't get stuck in the loop + current_idx += current_tlv.get_total_length() + if current_idx > len(raw_packet): + LOGGER.warning( + 'Parser Error when parsing TLVs in Finished PDU. ' + 'Possibly invalid packet' + ) + break + elif current_idx == len(raw_packet): + break diff --git a/src/tmtccmd/cfdp/pdu/nak.py b/src/tmtccmd/cfdp/pdu/nak.py new file mode 100644 index 00000000..27640e29 --- /dev/null +++ b/src/tmtccmd/cfdp/pdu/nak.py @@ -0,0 +1,66 @@ +from __future__ import annotations +import enum +import struct +from typing import List, Tuple + +from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, \ + ConditionCode +from tmtccmd.cfdp.pdu.header import Direction, TransmissionModes, CrcFlag +from tmtccmd.cfdp.conf import LenInBytes + + +class NakPdu(): + def __init__( + self, + direction: Direction, + trans_mode: TransmissionModes, + start_of_scope: int, + end_of_scope: int, + segment_requests: List[Tuple[int,int]], + crc_flag: CrcFlag = CrcFlag.GLOBAL_CONFIG, + len_entity_id: LenInBytes = LenInBytes.NONE, + len_transaction_seq_num=LenInBytes.NONE, + ): + self.pdu_file_directive = FileDirectivePduBase( + directive_code=DirectiveCodes.ACK_PDU, + direction=direction, + trans_mode=trans_mode, + crc_flag=crc_flag, + len_entity_id=len_entity_id, + len_transaction_seq_num=len_transaction_seq_num + ) + self.start_of_scope = start_of_scope + self.end_of_scope = end_of_scope + self.segment_requests = segment_requests + + @classmethod + def __empty(cls) -> NakPdu: + return cls( + direction=None, + trans_mode=None, + start_of_scope=None, + end_of_scope=None, + segment_requests=None + ) + + def pack(self) -> bytearray: + nak_pdu = self.pdu_file_directive.pack() + if not self.pdu_file_directive.pdu_header.large_file: + nak_pdu.extend(struct.pack('!I', self.start_of_scope)) + nak_pdu.extend(struct.pack('!I', self.end_of_scope)) + else: + nak_pdu.extend(struct.pack('!Q', self.start_of_scope)) + nak_pdu.extend(struct.pack('!Q', self.end_of_scope)) + for segment_request in self.segment_requests: + if not self.pdu_file_directive.pdu_header.large_file: + nak_pdu.extend(struct.pack('!I', segment_request[0])) + nak_pdu.extend(struct.pack('!I', segment_request[1])) + else: + nak_pdu.extend(struct.pack('!Q', segment_request[0])) + nak_pdu.extend(struct.pack('!Q', segment_request[1])) + + @classmethod + def unpack(cls, raw_packet: bytearray) -> NakPdu: + nak_pdu = cls.__empty() + nak_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet) + current_idx = nak_pdu.pdu_file_directive.get_len() diff --git a/src/tmtccmd/cfdp/tlv.py b/src/tmtccmd/cfdp/tlv.py index 9de4d3c3..501978c8 100644 --- a/src/tmtccmd/cfdp/tlv.py +++ b/src/tmtccmd/cfdp/tlv.py @@ -1,3 +1,4 @@ +from __future__ import annotations import enum from tmtccmd.ccsds.log import LOGGER @@ -18,8 +19,10 @@ class CfdpTlv: For more information, refer to CCSDS 727.0-B-5 p.77 """ def __init__( - self, serialize: bool, type: TlvTypes = None, length: int = None, - value: bytearray = None + self, + type: TlvTypes, + length: int, + value: bytearray ): """Constructor for TLV field. :param serialize: Specfiy whether a packet is serialized or deserialized. For serialize, @@ -30,14 +33,10 @@ def __init__( :param value: :raise ValueError: Length invalid or value length not equal to specified length """ - if serialize: - if type is None or length is None or value is None: - LOGGER.warning('All parameter have to be valid for serialization') - raise ValueError - if length > 255 or length < 0: - raise ValueError - if len(value) != length: - raise ValueError + if length > 255 or length < 0: + raise ValueError + if len(value) != length: + raise ValueError self.type = type self.length = length self.value = value @@ -49,7 +48,8 @@ def pack(self) -> bytearray: tlv_data.extend(self.value) return tlv_data - def unpack(self, raw_bytes: bytearray): + @classmethod + def unpack(cls, raw_bytes: bytearray) -> CfdpTlv: """Parses LV field at the start of the given bytearray :param raw_bytes: :raise ValueError: Invalid format of the raw bytearray or type field invalid @@ -59,15 +59,20 @@ def unpack(self, raw_bytes: bytearray): LOGGER.warning('Invalid length for TLV field, less than 2') raise ValueError try: - self.type = TlvTypes(raw_bytes[0]) + type = TlvTypes(raw_bytes[0]) except ValueError: LOGGER.warning( - f'TLV field invalid, found value {self.type} is not a possible TLV parameter' + f'TLV field invalid, found value {type} is not a possible TLV parameter' ) raise ValueError - self.length = raw_bytes[1] + value = bytearray() if len(raw_bytes) > 2: - self.value = raw_bytes[2:] + value.extend(raw_bytes[2:]) + return cls( + type=raw_bytes[0], + length=raw_bytes[1], + value=value + ) def get_total_length(self) -> int: return self.MINIMAL_LEN + len(self.value) diff --git a/src/tmtccmd/com_if/dummy_com_if.py b/src/tmtccmd/com_if/dummy_com_if.py index 5411f59e..5be73ddb 100644 --- a/src/tmtccmd/com_if/dummy_com_if.py +++ b/src/tmtccmd/com_if/dummy_com_if.py @@ -1,10 +1,5 @@ +"""Dummy Communication Interface. Currently serves to provide an example without external hardware """ -:file: dummy_com_if.py -:date: 09.03.2020 -:brief: Dummy Communication Interface. Currently serves to provide an example without external hardware -""" -from typing import cast - from tmtccmd.com_if.com_interface_base import CommunicationInterface from tmtccmd.ecss.tc import PusTelecommand from tmtccmd.tm.definitions import TelemetryListT diff --git a/src/tmtccmd/com_if/qemu_com_if.py b/src/tmtccmd/com_if/qemu_com_if.py index 126399d4..a50d1821 100644 --- a/src/tmtccmd/com_if/qemu_com_if.py +++ b/src/tmtccmd/com_if/qemu_com_if.py @@ -170,35 +170,42 @@ def receive(self, parameters=0) -> TelemetryListT: return packet_list - def data_available(self, timeout: any = 0, parameters: any = 0) -> int: - elapsed_time = 0 - start_time = time.time() - sleep_time = timeout / 3.0 - + def data_available(self, timeout: float = 0, parameters: any = 0) -> int: if self.ser_com_type == SerialCommunicationType.FIXED_FRAME_BASED: - if timeout > 0: - while elapsed_time < timeout: - if self.usart.new_data_available(): - return self.usart.get_data_in_waiting() - - time.sleep(sleep_time) - elapsed_time = time.time() - start_time - - if self.usart.new_data_available(): - return self.usart.get_data_in_waiting() - + return self.data_available_fixed_frame(timeout=timeout) elif self.ser_com_type == SerialCommunicationType.DLE_ENCODING: - if timeout > 0: - while elapsed_time < timeout: - if self.reception_buffer: - return len(self.reception_buffer) - - time.sleep(sleep_time) - elapsed_time = time.time() - start_time + return self.data_available_dle(timeout=timeout) + return 0 - if self.reception_buffer: - return len(self.reception_buffer) + def data_available_fixed_frame(self, timeout: float = 0) -> int: + elapsed_time = 0 + start_time = time.time() + sleep_time = timeout / 3.0 + if timeout > 0: + while elapsed_time < timeout: + if self.usart.new_data_available(): + return self.usart.get_data_in_waiting() + + time.sleep(sleep_time) + elapsed_time = time.time() - start_time + return 0 + if self.usart.new_data_available(): + return self.usart.get_data_in_waiting() + return 0 + def data_availanle_dle(self, timeout: float = 0) -> int: + elapsed_time = 0 + start_time = time.time() + sleep_time = timeout / 3.0 + if timeout > 0: + while elapsed_time < timeout: + if self.reception_buffer: + return len(self.reception_buffer) + time.sleep(sleep_time) + elapsed_time = time.time() - start_time + return 0 + if self.reception_buffer: + return len(self.reception_buffer) return 0 async def start_dle_polling(self): diff --git a/src/tmtccmd/config/globals.py b/src/tmtccmd/config/globals.py index a4ba6946..7fdec3e5 100644 --- a/src/tmtccmd/config/globals.py +++ b/src/tmtccmd/config/globals.py @@ -87,31 +87,8 @@ def set_default_globals_post_args_parsing( :return: """ - # Determine communication interface from arguments. Must be contained in core modes list - try: - mode_param = args.mode - except AttributeError: - LOGGER.warning("Passed namespace does not contain the mode (-m) argument") - mode_param = CoreModeList.LISTENER_MODE - mode_param = check_and_set_core_mode_arg( - mode_arg=mode_param, custom_modes_list=custom_modes_list - ) - all_com_ifs = CoreComInterfacesDict - if custom_com_if_dict is not None: - all_com_ifs = CoreComInterfacesDict.update(custom_com_if_dict) - try: - com_if_key = str(args.com_if) - except AttributeError: - LOGGER.warning("No communication interface specified") - LOGGER.warning("Trying to set from existing configuration..") - com_if_key = determine_com_if(com_if_dict=all_com_ifs, json_cfg_path=json_cfg_path) - if com_if_key == CoreComInterfaces.UNSPECIFIED.value: - com_if_key = determine_com_if(com_if_dict=all_com_ifs, json_cfg_path=json_cfg_path) - update_global(CoreGlobalIds.COM_IF, com_if_key) - try: - LOGGER.info(f"Communication interface: {all_com_ifs[com_if_key]}") - except KeyError as e: - LOGGER.error(f'Invalid communication interface key {com_if_key}') + handle_mode_arg(args=args, custom_modes_list=custom_modes_list) + handle_com_if_arg(args=args, json_cfg_path=json_cfg_path, custom_com_if_dict=custom_com_if_dict) display_mode_param = "long" if args.short_display_mode is not None: @@ -149,6 +126,42 @@ def set_default_globals_post_args_parsing( print_core_globals() +def handle_mode_arg( + args, custom_modes_list: Union[None, List[Union[collections.abc.Iterable, dict]]] = None +) -> int: + # Determine communication interface from arguments. Must be contained in core modes list + try: + mode_param = args.mode + except AttributeError: + LOGGER.warning("Passed namespace does not contain the mode (-m) argument") + mode_param = CoreModeList.LISTENER_MODE + mode_param = check_and_set_core_mode_arg( + mode_arg=mode_param, custom_modes_list=custom_modes_list + ) + update_global(CoreGlobalIds.MODE, mode_param) + + +def handle_com_if_arg( + args, json_cfg_path: str, custom_com_if_dict: Dict[str, any] = None +): + all_com_ifs = CoreComInterfacesDict + if custom_com_if_dict is not None: + all_com_ifs = CoreComInterfacesDict.update(custom_com_if_dict) + try: + com_if_key = str(args.com_if) + except AttributeError: + LOGGER.warning("No communication interface specified") + LOGGER.warning("Trying to set from existing configuration..") + com_if_key = determine_com_if(com_if_dict=all_com_ifs, json_cfg_path=json_cfg_path) + if com_if_key == CoreComInterfaces.UNSPECIFIED.value: + com_if_key = determine_com_if(com_if_dict=all_com_ifs, json_cfg_path=json_cfg_path) + update_global(CoreGlobalIds.COM_IF, com_if_key) + try: + LOGGER.info(f"Communication interface: {all_com_ifs[com_if_key]}") + except KeyError as e: + LOGGER.error(f'Invalid communication interface key {com_if_key}, error {e}') + + def check_and_set_other_args(args): if args.listener is not None: update_global(CoreGlobalIds.USE_LISTENER_AFTER_OP, args.listener)