Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PDU Updates #30

Merged
merged 7 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/tmtccmd/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import enum
from collections import deque
from typing import Tuple, Deque, List


Expand Down
26 changes: 15 additions & 11 deletions src/tmtccmd/cfdp/lv.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
from __future__ import annotations
from tmtccmd.ccsds.log import LOGGER


class CfdpLv:
def __init__(self,serialize: bool, value: bytearray = None):
def __init__(self, value: bytearray):
"""This class encapsulates CFDP LV fields
:raise ValueError: If value is invalid and serilization is enabled or if length of bytearray
is too large
:param serialize:
:param value:
"""
if serialize and value is None:
LOGGER.warning('All parameter have to be valid for serialization')
raise ValueError
if len > 255:
if len(value) > 255:
LOGGER.warning('Length too large for LV field')
raise ValueError
self.len = len(value)
self.value = value

def get_len(self):
"""Returns length of full LV packet"""
return self.len + 1

def pack(self):
packet = bytearray()
packet.append(self.len)
packet.extend(self.value)

def unpack(self, raw_bytes: bytearray):
@classmethod
def unpack(cls, raw_bytes: bytearray) -> CfdpLv:
"""Parses LV field at the start of the given bytearray
:raise ValueError: Invalid lenght found
:raise ValueError: Invalid length found
"""
self.len = raw_bytes[0]
if self.len > 255:
detected_len = raw_bytes[0]
if detected_len > 255:
LOGGER.warning('Length too large for LV field')
raise ValueError
elif self.len > 0:
self.value = raw_bytes[1: 1 + self.len]
return cls(
value=raw_bytes[1:]
)
30 changes: 21 additions & 9 deletions src/tmtccmd/cfdp/pdu/ack.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import enum
import struct

Expand All @@ -20,7 +21,6 @@ class TransactionStatus(enum.IntEnum):
class AckPdu():
def __init__(
self,
serialize: bool,
directive_code_of_acked_pdu: DirectiveCodes,
condition_code_of_acked_pdu: ConditionCode,
transaction_status: TransactionStatus,
Expand All @@ -31,7 +31,6 @@ def __init__(
len_transaction_seq_num=LenInBytes.NONE,
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.ACK_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -48,16 +47,29 @@ def __init__(
self.condition_code_of_acked_pdu = condition_code_of_acked_pdu
self.transaction_status = transaction_status

@classmethod
def __empty(cls) -> AckPdu:
return cls(
directive_code_of_acked_pdu=None,
condition_code_of_acked_pdu=None,
transaction_status=None,
direction=None,
trans_mode=None
)

def pack(self):
packet = self.pdu_file_directive.pack()
packet.append((self.directive_code_of_acked_pdu << 4) | self.directive_subtype_code)
packet.append((self.condition_code_of_acked_pdu << 4) | self.transaction_status)

def unpack(self, raw_packet: bytearray):
self.pdu_file_directive.unpack(raw_packet=raw_packet)
current_idx = self.pdu_file_directive.get_len()
self.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
self.directive_subtype_code = raw_packet[current_idx] & 0x0f
@classmethod
def unpack(cls, raw_packet: bytearray) -> AckPdu:
ack_packet = cls.__empty()
ack_packet.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
current_idx = ack_packet.pdu_file_directive.get_len()
ack_packet.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
ack_packet.directive_subtype_code = raw_packet[current_idx] & 0x0f
current_idx += 1
self.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
self.transaction_status = raw_packet[current_idx] & 0x03
ack_packet.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
ack_packet.transaction_status = raw_packet[current_idx] & 0x03
return ack_packet
48 changes: 25 additions & 23 deletions src/tmtccmd/cfdp/pdu/eof.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import struct

from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, \
Expand All @@ -13,7 +14,6 @@ class EofPdu():

def __init__(
self,
serialize: bool,
file_checksum: int,
file_size: int,
direction: Direction,
Expand All @@ -25,7 +25,6 @@ def __init__(
len_transaction_seq_num=LenInBytes.NONE,
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.EOF_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -38,6 +37,15 @@ def __init__(
self.file_size = file_size
self.fault_location = fault_location

@classmethod
def __empty(cls) -> EofPdu:
cls(
file_checksum=None,
file_size=None,
direction=None,
trans_mode=None
)

def pack(self) -> bytearray:
eof_pdu = bytearray()
eof_pdu.extend(self.pdu_file_directive.pack())
Expand All @@ -51,35 +59,29 @@ def pack(self) -> bytearray:
eof_pdu.extend(self.fault_location.pack())
return eof_pdu

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> EofPdu:
"""Deserialize raw EOF PDU packet
:param raw_packet:
:raise ValueError: If raw packet is too short
:return:
"""
self.pdu_file_directive.unpack(raw_packet=raw_packet)
expected_min_len = self.MINIMAL_LENGTH
eof_pdu = cls.__empty()
eof_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
expected_min_len = cls.MINIMAL_LENGTH
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=expected_min_len):
raise ValueError
current_idx = self.pdu_file_directive.get_len()
self.condition_code = raw_packet[current_idx] & 0xf0
expected_min_len = self.pdu_file_directive.get_len() + 5
current_idx = eof_pdu.pdu_file_directive.get_len()
eof_pdu.condition_code = raw_packet[current_idx] & 0xf0
expected_min_len = eof_pdu.pdu_file_directive.get_len() + 5
current_idx += 1
checksum_raw = raw_packet[current_idx: current_idx + 4]
self.file_checksum = struct.unpack('!I', checksum_raw)[0]
eof_pdu.file_checksum = struct.unpack('!I', checksum_raw)[0]
current_idx += 4
if self.pdu_file_directive.pdu_header.large_file:
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=current_idx + 8):
raise ValueError
filesize_raw = checksum_raw = \
raw_packet[current_idx: current_idx + 8]
current_idx += 8
self.file_checksum = struct.unpack('!Q', filesize_raw)[0]
else:
filesize_raw = checksum_raw = \
raw_packet[current_idx: current_idx + 4]
current_idx += 4
self.file_checksum = struct.unpack('!I', filesize_raw)[0]
current_idx, eof_pdu.file_size = eof_pdu.pdu_file_directive.parse_fss_field(
raw_packet=raw_packet, current_idx=current_idx
)
if len(raw_packet) > current_idx:
self.fault_location = CfdpTlv(serialize=False)
self.fault_location.unpack(raw_bytes=raw_packet[current_idx:])
eof_pdu.fault_location = CfdpTlv(serialize=False)
eof_pdu.fault_location.unpack(raw_bytes=raw_packet[current_idx:])
return eof_pdu
58 changes: 44 additions & 14 deletions src/tmtccmd/cfdp/pdu/file_directive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations
import enum
import struct

from tmtccmd.cfdp.pdu.header import PduHeader, PduType, Direction, CrcFlag, TransmissionModes
from tmtccmd.cfdp.conf import check_packet_length
from tmtccmd.cfdp.definitions import LenInBytes
Expand Down Expand Up @@ -39,26 +42,33 @@ class FileDirectivePduBase:
"""
def __init__(
self,
serialize: bool,
directive_code: DirectiveCodes = None,
directive_code: DirectiveCodes,
# PDU Header parameters
direction: Direction = None,
trans_mode: TransmissionModes = None,
crc_flag: CrcFlag = None,
direction: Direction,
trans_mode: TransmissionModes,
crc_flag: CrcFlag,
len_entity_id: LenInBytes = LenInBytes.NONE,
len_transaction_seq_num: LenInBytes = LenInBytes.NONE,
):
if serialize:
if directive_code is None:
LOGGER.warning('Some mandatory fields were not specified for serialization')
raise ValueError
self.pdu_header = PduHeader(
serialize=serialize, pdu_type=PduType.FILE_DIRECTIVE, direction=direction,
trans_mode=trans_mode, crc_flag=crc_flag, len_entity_id=len_entity_id,
pdu_type=PduType.FILE_DIRECTIVE,
direction=direction,
trans_mode=trans_mode,
crc_flag=crc_flag,
len_entity_id=len_entity_id,
len_transaction_seq_num=len_transaction_seq_num
)
self.directive_code = directive_code

@classmethod
def __empty(cls) -> FileDirectivePduBase:
return cls(
directive_code=None,
direction=None,
trans_mode=None,
crc_flag=None
)

def get_len(self) -> int:
return self.FILE_DIRECTIVE_PDU_LEN

Expand All @@ -68,16 +78,19 @@ def pack(self) -> bytearray:
data.append(self.directive_code)
return data

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> FileDirectivePduBase:
"""Unpack a raw bytearray into the File Directive PDU object representation
:param raw_bytes:
:raise ValueError: Passed bytearray is too short.
:return:
"""
self.pdu_header.unpack(raw_bytes=raw_packet)
file_directive = cls.__empty()
file_directive.pdu_header = PduHeader.unpack(raw_packet=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=5):
raise ValueError
self.directive_code = raw_packet[4]
file_directive.directive_code = raw_packet[4]
return file_directive

def verify_file_len(self, file_size: int) -> bool:
if self.pdu_file_directive.pdu_header.large_file and file_size > pow(2, 64):
Expand All @@ -87,3 +100,20 @@ def verify_file_len(self, file_size: int) -> bool:
LOGGER.warning(f'File size {file_size} larger than 32 bit field')
raise False
return True

def parse_fss_field(self, raw_packet: bytearray, current_idx: int) -> (int, int):
"""Parse the FSS field, which has different size depending on the large file flag being
set or not. Returns the current index incremented and the parsed file size
:raise ValueError: Packet not large enough
"""
if self.pdu_header.large_file:
if not check_packet_length(len(raw_packet), current_idx + 8 + 1):
raise ValueError
file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 8])
current_idx += 8
else:
if not check_packet_length(len(raw_packet), current_idx + 4 + 1):
raise ValueError
file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 4])
current_idx += 4
return current_idx, file_size
36 changes: 24 additions & 12 deletions src/tmtccmd/cfdp/pdu/finished.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import enum

from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, Direction, \
Expand Down Expand Up @@ -25,7 +26,6 @@ class FinishedPdu():

def __init__(
self,
serialize: bool,
direction: Direction,
delivery_code: DeliveryCode,
file_delivery_status: FileDeliveryStatus,
Expand All @@ -38,7 +38,6 @@ def __init__(
len_transaction_seq_num: LenInBytes = LenInBytes.NONE
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.FINISHED_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -53,9 +52,19 @@ def __init__(
self.file_delivery_status = file_delivery_status
self.might_have_fault_location = False
if self.condition_code != ConditionCode.NO_ERROR and \
self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE:
self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE:
self.might_have_fault_location = True

@classmethod
def __empty(cls) -> FinishedPdu:
cls(
direction=None,
delivery_code=None,
file_delivery_status=None,
trans_mode=None,
condition_code=None
)

def pack(self) -> bytearray:
packet = self.pdu_file_directive.pack()
packet[len(packet)] |= \
Expand All @@ -66,23 +75,26 @@ def pack(self) -> bytearray:
packet.extend(self.fault_location.pack())
return packet

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> FinishedPdu:
"""Unpack a raw packet into a PDU object
:param raw_packet:
:raise ValueError: If packet is too short
:return:
"""
self.pdu_file_directive.unpack(raw_bytes=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=self.MINIMAL_LEN):
finished_pdu = cls.__empty()
finished_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=cls.MINIMAL_LEN):
raise ValueError
current_idx = self.pdu_file_directive.get_len()
current_idx = finished_pdu.pdu_file_directive.get_len()
first_param_byte = raw_packet[current_idx]
self.condition_code = first_param_byte & 0xf0
self.delivery_code = first_param_byte & 0x04
self.file_delivery_status = first_param_byte & 0b11
finished_pdu.condition_code = first_param_byte & 0xf0
finished_pdu.delivery_code = first_param_byte & 0x04
finished_pdu.file_delivery_status = first_param_byte & 0b11
current_idx += 1
if len(raw_packet) > current_idx:
self.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx)
finished_pdu.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx)
return finished_pdu

def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int:
current_idx = start_idx
Expand All @@ -91,7 +103,7 @@ def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int:
current_tlv.unpack(raw_bytes=raw_packet[current_idx])
# This will always increment at least two, so we can't get stuck in the loop
current_idx += current_tlv.get_total_length()
if start_idx > len(raw_packet) or current_idx == len(raw_packet):
if current_idx > len(raw_packet) or current_idx == len(raw_packet):
if current_idx > len(raw_packet):
LOGGER.warning(
'Parser Error when parsing TLVs in Finished PDU. Possibly invalid packet'
Expand Down
Loading