Skip to content

Commit

Permalink
new crc calculator module
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Jul 14, 2022
1 parent 2ec00a5 commit 7ba0dd3
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 76 deletions.
62 changes: 62 additions & 0 deletions tmtccmd/cfdp/handler/crc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from pathlib import Path
from typing import Optional

from crcmod.predefined import PredefinedCrc

from spacepackets.cfdp import ChecksumTypes, NULL_CHECKSUM_U32
from tmtccmd.cfdp.filestore import VirtualFilestore
from tmtccmd.cfdp.handler.defs import ChecksumNotImplemented, SourceFileDoesNotExist


class Crc32Helper:
def __init__(self, init_type: ChecksumTypes, vfs: VirtualFilestore):
self.checksum_type = init_type
self.vfs = vfs

def _verify_checksum(self):
if self.checksum_type not in [
ChecksumTypes.NULL_CHECKSUM,
ChecksumTypes.CRC_32,
ChecksumTypes.CRC_32C,
]:
raise ChecksumNotImplemented(self.checksum_type)

def checksum_type_to_crcmod_str(self) -> Optional[str]:
if self.checksum_type == ChecksumTypes.NULL_CHECKSUM:
return None
if self.checksum_type == ChecksumTypes.CRC_32:
return "crc32"
elif self.checksum_type == ChecksumTypes.CRC_32C:
return "crc32c"

def generate_crc_calculator(self) -> PredefinedCrc:
self._verify_checksum()
return PredefinedCrc(self.checksum_type_to_crcmod_str())

def calc_for_file(self, file: Path, file_sz: int, segment_len: int) -> bytes:
if self.checksum_type == ChecksumTypes.NULL_CHECKSUM:
return NULL_CHECKSUM_U32
crc_obj = self.generate_crc_calculator()
if not file.exists():
# TODO: Handle this exception in the handler, reset CFDP state machine
raise SourceFileDoesNotExist(file)
current_offset = 0
# Calculate the file CRC
with open(file, "rb") as of:
while True:
if current_offset == file_sz:
break
if file_sz < segment_len:
read_len = file_sz
else:
next_offset = current_offset + segment_len
if next_offset > file_sz:
read_len = next_offset % file_sz
else:
read_len = segment_len
if read_len > 0:
crc_obj.update(
self.vfs.read_from_opened_file(of, current_offset, read_len)
)
current_offset += read_len
return crc_obj.digest()
63 changes: 40 additions & 23 deletions tmtccmd/cfdp/handler/dest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import annotations

import dataclasses
import enum
from collections import deque
from dataclasses import dataclass
Expand All @@ -20,8 +22,9 @@
EofPdu,
)
from spacepackets.cfdp.pdu.helper import GenericPduPacket, PduHolder
from tmtccmd.cfdp import RemoteEntityCfg, CfdpUserBase, LocalEntityCfg
from tmtccmd.cfdp import CfdpUserBase, LocalEntityCfg
from tmtccmd.cfdp.defs import CfdpStates, TransactionId
from tmtccmd.cfdp.handler.crc import Crc32Helper
from tmtccmd.cfdp.handler.defs import FileParamsBase
from tmtccmd.cfdp.user import MetadataRecvParams, FileSegmentRecvParams

Expand Down Expand Up @@ -57,6 +60,17 @@ class DestStateWrapper:
packet_ready: bool = False


@dataclass
class DestFieldWrapper:
transaction_id: Optional[TransactionId] = None
closure_requested = False
fp = DestFileParams.empty()
file_directives_dict: Dict[
DirectiveType, List[AbstractFileDirectiveBase]
] = dataclasses.field(default_factory=lambda: dict())
file_data_deque: Deque[FileDataPdu] = deque()


class FsmResult:
def __init__(self, states: DestStateWrapper, pdu_holder: PduHolder):
self.states = states
Expand All @@ -69,14 +83,10 @@ def __init__(self, cfg: LocalEntityCfg, user: CfdpUserBase):
self.states = DestStateWrapper()
self.user = user
self._pdu_holder = PduHolder(None)
self._transaction_id: Optional[TransactionId] = None
self._checksum_type = ChecksumTypes.NULL_CHECKSUM
self._closure_requested = False
self._fp = DestFileParams.empty()
self._file_directives_dict: Dict[
DirectiveType, List[AbstractFileDirectiveBase]
] = dict()
self._file_data_deque: Deque[FileDataPdu] = deque()
self._params = DestFieldWrapper()
self._crc_helper: Crc32Helper = Crc32Helper(
ChecksumTypes.NULL_CHECKSUM, user.vfs
)

def _start_transaction(self, metadata_pdu: MetadataPdu) -> bool:
if self.states.state != CfdpStates.IDLE:
Expand All @@ -86,10 +96,10 @@ def _start_transaction(self, metadata_pdu: MetadataPdu) -> bool:
self.states.state = CfdpStates.BUSY_CLASS_1_NACKED
elif metadata_pdu.pdu_header.trans_mode == TransmissionModes.ACKNOWLEDGED:
self.states.state = CfdpStates.BUSY_CLASS_2_ACKED
self._checksum_type = metadata_pdu.checksum_type
self._crc_helper.checksum_type = metadata_pdu.checksum_type
self._closure_requested = metadata_pdu.closure_requested
self._fp.file_name = Path(metadata_pdu.dest_file_name)
self._fp.size = metadata_pdu.file_size
self._params.fp.file_name = Path(metadata_pdu.dest_file_name)
self._params.fp.size = metadata_pdu.file_size
self._transaction_id = TransactionId(
source_entity_id=metadata_pdu.source_entity_id,
transaction_seq_num=metadata_pdu.transaction_seq_num,
Expand All @@ -115,8 +125,10 @@ def _start_transaction(self, metadata_pdu: MetadataPdu) -> bool:
def state_machine(self) -> FsmResult:
if self.states.state == CfdpStates.IDLE:
transaction_was_started = False
if DirectiveType.METADATA_PDU in self._file_directives_dict:
for pdu in self._file_directives_dict.get(DirectiveType.METADATA_PDU):
if DirectiveType.METADATA_PDU in self._params.file_directives_dict:
for pdu in self._params.file_directives_dict.get(
DirectiveType.METADATA_PDU
):
metadata_pdu = PduHolder(pdu).to_metadata_pdu()
transaction_was_started = self._start_transaction(metadata_pdu)
if transaction_was_started:
Expand All @@ -126,7 +138,7 @@ def state_machine(self) -> FsmResult:
elif self.states.state == CfdpStates.BUSY_CLASS_1_NACKED:
if self.states.transaction == TransactionStep.RECEIVING_FILE_DATA:
# TODO: Sequence count check
for file_data_pdu in self._file_data_deque:
for file_data_pdu in self._params.file_data_deque:
data = file_data_pdu.file_data
offset = file_data_pdu.offset
if self.cfg.indication_cfg.file_segment_recvd_indication_required:
Expand All @@ -140,8 +152,8 @@ def state_machine(self) -> FsmResult:
self.user.file_segment_recv_indication(
params=file_segment_indic_params
)
self.user.vfs.write_data(self._fp.file_name, data, offset)
eof_pdus = self._file_directives_dict.get(DirectiveType.EOF_PDU)
self.user.vfs.write_data(self._params.fp.file_name, data, offset)
eof_pdus = self._params.file_directives_dict.get(DirectiveType.EOF_PDU)
if eof_pdus is not None:
for pdu in eof_pdus:
eof_pdu = PduHolder(pdu).to_eof_pdu()
Expand All @@ -157,12 +169,16 @@ def state_machine(self) -> FsmResult:
def pass_packet(self, packet: GenericPduPacket):
# TODO: Sanity checks
if packet.pdu_type == PduType.FILE_DATA:
self._file_data_deque.append(cast(FileDataPdu, packet))
self._params.file_data_deque.append(cast(FileDataPdu, packet))
else:
if packet.directive_type in self._file_directives_dict:
self._file_directives_dict.get(packet.directive_type).append(packet)
if packet.directive_type in self._params.file_directives_dict:
self._params.file_directives_dict.get(packet.directive_type).append(
packet
)
else:
self._file_directives_dict.update({packet.directive_type: [packet]})
self._params.file_directives_dict.update(
{packet.directive_type: [packet]}
)

def confirm_packet_sent_advance_fsm(self):
"""Helper method which performs both :py:meth:`confirm_packet_sent` and
Expand All @@ -181,8 +197,8 @@ def advance_fsm(self):
def _handle_eof_pdu(self, eof_pdu: EofPdu):
# TODO: Error handling
if eof_pdu.condition_code == ConditionCode.NO_ERROR:
self._fp.crc32 = eof_pdu.file_checksum
self._fp.size = eof_pdu.file_size
self._params.fp.crc32 = eof_pdu.file_checksum
self._params.fp.size = eof_pdu.file_size
if self.cfg.indication_cfg.eof_recv_indication_required:
self.user.eof_recv_indication(self._transaction_id)
if self.states.transaction == TransactionStep.RECEIVING_FILE_DATA:
Expand All @@ -192,6 +208,7 @@ def _handle_eof_pdu(self, eof_pdu: EofPdu):
self.states.transaction = TransactionStep.SENDING_ACK_PDU

def _checksum_verify(self):
# self._crc_helper.calc_for_file(self._params.fp.f)
pass

def _notice_of_completion(self):
Expand Down
62 changes: 9 additions & 53 deletions tmtccmd/cfdp/handler/source.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import enum
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Dict, List
from crcmod.predefined import PredefinedCrc

from spacepackets.cfdp import (
TransmissionModes,
NULL_CHECKSUM_U32,
ConditionCode,
ChecksumTypes,
Direction,
PduConfig,
ChecksumTypes,
)
from spacepackets.cfdp.pdu import (
PduHolder,
Expand All @@ -32,10 +30,11 @@
RemoteEntityCfg,
)
from tmtccmd.cfdp.defs import CfdpRequestType, CfdpStates
from tmtccmd.cfdp.filestore import VirtualFilestore
from tmtccmd.cfdp.handler.crc import Crc32Helper
from tmtccmd.cfdp.handler.defs import (
FileParamsBase,
PacketSendNotConfirmed,
ChecksumNotImplemented,
SourceFileDoesNotExist,
InvalidPduDirection,
InvalidSourceId,
Expand Down Expand Up @@ -70,7 +69,8 @@ class SourceStateWrapper:


class TransferFieldWrapper:
def __init__(self, local_entity_id: UnsignedByteField):
def __init__(self, local_entity_id: UnsignedByteField, vfs: VirtualFilestore):
self.crc_helper = Crc32Helper(ChecksumTypes.NULL_CHECKSUM, vfs)
self.transaction: Optional[TransactionId] = None
self.fp = FileParamsBase.empty()
self.remote_cfg: Optional[RemoteEntityCfg] = None
Expand Down Expand Up @@ -167,7 +167,7 @@ def __init__(
self.cfg = cfg
self.user = user
self.seq_num_provider = seq_num_provider
self._params = TransferFieldWrapper(cfg.local_entity_id)
self._params = TransferFieldWrapper(cfg.local_entity_id, self.user.vfs)
self._current_req = CfdpRequestWrapper(None)
self._rec_dict: Dict[DirectiveType, List[AbstractFileDirectiveBase]] = dict()

Expand Down Expand Up @@ -273,8 +273,7 @@ def state_machine(self) -> FsmResult:
# Empty file, use null checksum
self._params.fp.crc32 = NULL_CHECKSUM_U32
else:
self._params.fp.crc32 = self.calc_cfdp_file_crc(
crc_type=self._params.remote_cfg.crc_type,
self._params.fp.crc32 = self._params.crc_helper.calc_for_file(
file=put_req.cfg.source_file,
file_sz=self._params.fp.size,
segment_len=self._params.fp.segment_len,
Expand Down Expand Up @@ -356,49 +355,6 @@ def reset(self):
self.states.state = CfdpStates.IDLE
self._params.reset()

def calc_cfdp_file_crc(
self, crc_type: ChecksumTypes, file: Path, file_sz: int, segment_len: int
) -> bytes:
if crc_type == ChecksumTypes.CRC_32:
return self.calc_crc_for_file_crcmod(
PredefinedCrc("crc32"), file, file_sz, segment_len
)
elif crc_type == ChecksumTypes.CRC_32C:
return self.calc_crc_for_file_crcmod(
PredefinedCrc("crc32c"), file, file_sz, segment_len
)
else:
raise ChecksumNotImplemented(crc_type)

def calc_crc_for_file_crcmod(
self, crc_obj: PredefinedCrc, file: Path, file_sz: int, segment_len: int
):
if not file.exists():
# TODO: Handle this exception in the handler, reset CFDP state machine
raise SourceFileDoesNotExist(file)
current_offset = 0
# Calculate the file CRC
with open(file, "rb") as of:
while True:
if current_offset == file_sz:
break
if file_sz < segment_len:
read_len = file_sz
else:
next_offset = current_offset + segment_len
if next_offset > file_sz:
read_len = next_offset % file_sz
else:
read_len = segment_len
if read_len > 0:
crc_obj.update(
self.user.vfs.read_from_opened_file(
of, current_offset, read_len
)
)
current_offset += read_len
return crc_obj.digest()

def _handle_wait_for_ack(self):
if self.states.state != CfdpStates.BUSY_CLASS_2_ACKED:
LOGGER.error(
Expand Down Expand Up @@ -456,6 +412,7 @@ def _setup_transmission_mode(self):
closure_req_to_set = put_req.cfg.closure_requested
else:
closure_req_to_set = self._params.remote_cfg.closure_requested
self._params.crc_helper.checksum_type = self._params.remote_cfg.crc_type
self._params.closure_requested = closure_req_to_set

def _transaction_start(self, put_req: PutRequest):
Expand Down Expand Up @@ -484,8 +441,7 @@ def _prepare_metadata_pdu(self, put_req: PutRequest):
params = MetadataParams(
dest_file_name=put_req.cfg.dest_file,
source_file_name=put_req.cfg.source_file.as_posix(),
# TODO: Checksum type can be overriden by Put Request
checksum_type=self._params.remote_cfg.crc_type,
checksum_type=self._params.crc_helper.checksum_type,
closure_requested=self._params.closure_requested,
file_size=self._params.fp.size,
)
Expand Down

0 comments on commit 7ba0dd3

Please sign in to comment.