Skip to content

Commit

Permalink
Merge pull request #30 from robamu-org/eive
Browse files Browse the repository at this point in the history
PDU Updates
  • Loading branch information
robamu committed Aug 18, 2021
2 parents 7e24d33 + 46b4b39 commit e7df305
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 172 deletions.
1 change: 0 additions & 1 deletion src/tmtccmd/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import enum
from collections import deque
from typing import Tuple, Deque, List
from tmtccmd.ccsds.log import LOGGER

Expand Down
26 changes: 15 additions & 11 deletions src/tmtccmd/cfdp/lv.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,39 @@
from __future__ import annotations
from tmtccmd.ccsds.log import LOGGER


class CfdpLv:
def __init__(self,serialize: bool, value: bytearray = None):
def __init__(self, value: bytearray):
"""This class encapsulates CFDP LV fields
:raise ValueError: If value is invalid and serilization is enabled or if length of bytearray
is too large
:param serialize:
:param value:
"""
if serialize and value is None:
LOGGER.warning('All parameter have to be valid for serialization')
raise ValueError
if len > 255:
if len(value) > 255:
LOGGER.warning('Length too large for LV field')
raise ValueError
self.len = len(value)
self.value = value

def get_len(self):
"""Returns length of full LV packet"""
return self.len + 1

def pack(self):
packet = bytearray()
packet.append(self.len)
packet.extend(self.value)

def unpack(self, raw_bytes: bytearray):
@classmethod
def unpack(cls, raw_bytes: bytearray) -> CfdpLv:
"""Parses LV field at the start of the given bytearray
:raise ValueError: Invalid lenght found
:raise ValueError: Invalid length found
"""
self.len = raw_bytes[0]
if self.len > 255:
detected_len = raw_bytes[0]
if detected_len > 255:
LOGGER.warning('Length too large for LV field')
raise ValueError
elif self.len > 0:
self.value = raw_bytes[1: 1 + self.len]
return cls(
value=raw_bytes[1:]
)
30 changes: 21 additions & 9 deletions src/tmtccmd/cfdp/pdu/ack.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import enum
import struct

Expand All @@ -20,7 +21,6 @@ class TransactionStatus(enum.IntEnum):
class AckPdu():
def __init__(
self,
serialize: bool,
directive_code_of_acked_pdu: DirectiveCodes,
condition_code_of_acked_pdu: ConditionCode,
transaction_status: TransactionStatus,
Expand All @@ -31,7 +31,6 @@ def __init__(
len_transaction_seq_num=LenInBytes.NONE,
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.ACK_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -48,16 +47,29 @@ def __init__(
self.condition_code_of_acked_pdu = condition_code_of_acked_pdu
self.transaction_status = transaction_status

@classmethod
def __empty(cls) -> AckPdu:
return cls(
directive_code_of_acked_pdu=None,
condition_code_of_acked_pdu=None,
transaction_status=None,
direction=None,
trans_mode=None
)

def pack(self):
packet = self.pdu_file_directive.pack()
packet.append((self.directive_code_of_acked_pdu << 4) | self.directive_subtype_code)
packet.append((self.condition_code_of_acked_pdu << 4) | self.transaction_status)

def unpack(self, raw_packet: bytearray):
self.pdu_file_directive.unpack(raw_packet=raw_packet)
current_idx = self.pdu_file_directive.get_len()
self.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
self.directive_subtype_code = raw_packet[current_idx] & 0x0f
@classmethod
def unpack(cls, raw_packet: bytearray) -> AckPdu:
ack_packet = cls.__empty()
ack_packet.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
current_idx = ack_packet.pdu_file_directive.get_len()
ack_packet.directive_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
ack_packet.directive_subtype_code = raw_packet[current_idx] & 0x0f
current_idx += 1
self.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
self.transaction_status = raw_packet[current_idx] & 0x03
ack_packet.condition_code_of_acked_pdu = raw_packet[current_idx] & 0xf0
ack_packet.transaction_status = raw_packet[current_idx] & 0x03
return ack_packet
48 changes: 25 additions & 23 deletions src/tmtccmd/cfdp/pdu/eof.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import struct

from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, \
Expand All @@ -13,7 +14,6 @@ class EofPdu():

def __init__(
self,
serialize: bool,
file_checksum: int,
file_size: int,
direction: Direction,
Expand All @@ -25,7 +25,6 @@ def __init__(
len_transaction_seq_num=LenInBytes.NONE,
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.EOF_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -38,6 +37,15 @@ def __init__(
self.file_size = file_size
self.fault_location = fault_location

@classmethod
def __empty(cls) -> EofPdu:
cls(
file_checksum=None,
file_size=None,
direction=None,
trans_mode=None
)

def pack(self) -> bytearray:
eof_pdu = bytearray()
eof_pdu.extend(self.pdu_file_directive.pack())
Expand All @@ -51,35 +59,29 @@ def pack(self) -> bytearray:
eof_pdu.extend(self.fault_location.pack())
return eof_pdu

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> EofPdu:
"""Deserialize raw EOF PDU packet
:param raw_packet:
:raise ValueError: If raw packet is too short
:return:
"""
self.pdu_file_directive.unpack(raw_packet=raw_packet)
expected_min_len = self.MINIMAL_LENGTH
eof_pdu = cls.__empty()
eof_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
expected_min_len = cls.MINIMAL_LENGTH
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=expected_min_len):
raise ValueError
current_idx = self.pdu_file_directive.get_len()
self.condition_code = raw_packet[current_idx] & 0xf0
expected_min_len = self.pdu_file_directive.get_len() + 5
current_idx = eof_pdu.pdu_file_directive.get_len()
eof_pdu.condition_code = raw_packet[current_idx] & 0xf0
expected_min_len = eof_pdu.pdu_file_directive.get_len() + 5
current_idx += 1
checksum_raw = raw_packet[current_idx: current_idx + 4]
self.file_checksum = struct.unpack('!I', checksum_raw)[0]
eof_pdu.file_checksum = struct.unpack('!I', checksum_raw)[0]
current_idx += 4
if self.pdu_file_directive.pdu_header.large_file:
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=current_idx + 8):
raise ValueError
filesize_raw = checksum_raw = \
raw_packet[current_idx: current_idx + 8]
current_idx += 8
self.file_checksum = struct.unpack('!Q', filesize_raw)[0]
else:
filesize_raw = checksum_raw = \
raw_packet[current_idx: current_idx + 4]
current_idx += 4
self.file_checksum = struct.unpack('!I', filesize_raw)[0]
current_idx, eof_pdu.file_size = eof_pdu.pdu_file_directive.parse_fss_field(
raw_packet=raw_packet, current_idx=current_idx
)
if len(raw_packet) > current_idx:
self.fault_location = CfdpTlv(serialize=False)
self.fault_location.unpack(raw_bytes=raw_packet[current_idx:])
eof_pdu.fault_location = CfdpTlv(serialize=False)
eof_pdu.fault_location.unpack(raw_bytes=raw_packet[current_idx:])
return eof_pdu
58 changes: 44 additions & 14 deletions src/tmtccmd/cfdp/pdu/file_directive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations
import enum
import struct

from tmtccmd.cfdp.pdu.header import PduHeader, PduType, Direction, CrcFlag, TransmissionModes
from tmtccmd.cfdp.conf import check_packet_length
from tmtccmd.cfdp.definitions import LenInBytes
Expand Down Expand Up @@ -39,26 +42,33 @@ class FileDirectivePduBase:
"""
def __init__(
self,
serialize: bool,
directive_code: DirectiveCodes = None,
directive_code: DirectiveCodes,
# PDU Header parameters
direction: Direction = None,
trans_mode: TransmissionModes = None,
crc_flag: CrcFlag = None,
direction: Direction,
trans_mode: TransmissionModes,
crc_flag: CrcFlag,
len_entity_id: LenInBytes = LenInBytes.NONE,
len_transaction_seq_num: LenInBytes = LenInBytes.NONE,
):
if serialize:
if directive_code is None:
LOGGER.warning('Some mandatory fields were not specified for serialization')
raise ValueError
self.pdu_header = PduHeader(
serialize=serialize, pdu_type=PduType.FILE_DIRECTIVE, direction=direction,
trans_mode=trans_mode, crc_flag=crc_flag, len_entity_id=len_entity_id,
pdu_type=PduType.FILE_DIRECTIVE,
direction=direction,
trans_mode=trans_mode,
crc_flag=crc_flag,
len_entity_id=len_entity_id,
len_transaction_seq_num=len_transaction_seq_num
)
self.directive_code = directive_code

@classmethod
def __empty(cls) -> FileDirectivePduBase:
return cls(
directive_code=None,
direction=None,
trans_mode=None,
crc_flag=None
)

def get_len(self) -> int:
return self.FILE_DIRECTIVE_PDU_LEN

Expand All @@ -68,16 +78,19 @@ def pack(self) -> bytearray:
data.append(self.directive_code)
return data

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> FileDirectivePduBase:
"""Unpack a raw bytearray into the File Directive PDU object representation
:param raw_bytes:
:raise ValueError: Passed bytearray is too short.
:return:
"""
self.pdu_header.unpack(raw_bytes=raw_packet)
file_directive = cls.__empty()
file_directive.pdu_header = PduHeader.unpack(raw_packet=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=5):
raise ValueError
self.directive_code = raw_packet[4]
file_directive.directive_code = raw_packet[4]
return file_directive

def verify_file_len(self, file_size: int) -> bool:
if self.pdu_file_directive.pdu_header.large_file and file_size > pow(2, 64):
Expand All @@ -87,3 +100,20 @@ def verify_file_len(self, file_size: int) -> bool:
LOGGER.warning(f'File size {file_size} larger than 32 bit field')
raise False
return True

def parse_fss_field(self, raw_packet: bytearray, current_idx: int) -> (int, int):
"""Parse the FSS field, which has different size depending on the large file flag being
set or not. Returns the current index incremented and the parsed file size
:raise ValueError: Packet not large enough
"""
if self.pdu_header.large_file:
if not check_packet_length(len(raw_packet), current_idx + 8 + 1):
raise ValueError
file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 8])
current_idx += 8
else:
if not check_packet_length(len(raw_packet), current_idx + 4 + 1):
raise ValueError
file_size = struct.unpack('!I', raw_packet[current_idx: current_idx + 4])
current_idx += 4
return current_idx, file_size
36 changes: 24 additions & 12 deletions src/tmtccmd/cfdp/pdu/finished.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import annotations
import enum

from tmtccmd.cfdp.pdu.file_directive import FileDirectivePduBase, DirectiveCodes, Direction, \
Expand Down Expand Up @@ -25,7 +26,6 @@ class FinishedPdu():

def __init__(
self,
serialize: bool,
direction: Direction,
delivery_code: DeliveryCode,
file_delivery_status: FileDeliveryStatus,
Expand All @@ -38,7 +38,6 @@ def __init__(
len_transaction_seq_num: LenInBytes = LenInBytes.NONE
):
self.pdu_file_directive = FileDirectivePduBase(
serialize=serialize,
directive_code=DirectiveCodes.FINISHED_PDU,
direction=direction,
trans_mode=trans_mode,
Expand All @@ -53,9 +52,19 @@ def __init__(
self.file_delivery_status = file_delivery_status
self.might_have_fault_location = False
if self.condition_code != ConditionCode.NO_ERROR and \
self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE:
self.condition_code != ConditionCode.UNSUPPORTED_CHECKSUM_TYPE:
self.might_have_fault_location = True

@classmethod
def __empty(cls) -> FinishedPdu:
cls(
direction=None,
delivery_code=None,
file_delivery_status=None,
trans_mode=None,
condition_code=None
)

def pack(self) -> bytearray:
packet = self.pdu_file_directive.pack()
packet[len(packet)] |= \
Expand All @@ -66,23 +75,26 @@ def pack(self) -> bytearray:
packet.extend(self.fault_location.pack())
return packet

def unpack(self, raw_packet: bytearray):
@classmethod
def unpack(cls, raw_packet: bytearray) -> FinishedPdu:
"""Unpack a raw packet into a PDU object
:param raw_packet:
:raise ValueError: If packet is too short
:return:
"""
self.pdu_file_directive.unpack(raw_bytes=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=self.MINIMAL_LEN):
finished_pdu = cls.__empty()
finished_pdu.pdu_file_directive = FileDirectivePduBase.unpack(raw_packet=raw_packet)
if not check_packet_length(raw_packet_len=len(raw_packet), min_len=cls.MINIMAL_LEN):
raise ValueError
current_idx = self.pdu_file_directive.get_len()
current_idx = finished_pdu.pdu_file_directive.get_len()
first_param_byte = raw_packet[current_idx]
self.condition_code = first_param_byte & 0xf0
self.delivery_code = first_param_byte & 0x04
self.file_delivery_status = first_param_byte & 0b11
finished_pdu.condition_code = first_param_byte & 0xf0
finished_pdu.delivery_code = first_param_byte & 0x04
finished_pdu.file_delivery_status = first_param_byte & 0b11
current_idx += 1
if len(raw_packet) > current_idx:
self.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx)
finished_pdu.unpack_tlvs(raw_packet=raw_packet, start_idx=current_idx)
return finished_pdu

def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int:
current_idx = start_idx
Expand All @@ -91,7 +103,7 @@ def unpack_tlvs(self, raw_packet: bytearray, start_idx: int) -> int:
current_tlv.unpack(raw_bytes=raw_packet[current_idx])
# This will always increment at least two, so we can't get stuck in the loop
current_idx += current_tlv.get_total_length()
if start_idx > len(raw_packet) or current_idx == len(raw_packet):
if current_idx > len(raw_packet) or current_idx == len(raw_packet):
if current_idx > len(raw_packet):
LOGGER.warning(
'Parser Error when parsing TLVs in Finished PDU. Possibly invalid packet'
Expand Down
Loading

0 comments on commit e7df305

Please sign in to comment.