Skip to content

Commit

Permalink
remove some other TM base classes
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed May 5, 2024
1 parent 703f243 commit 7d726c4
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 317 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Starting from v4.0.0, this project adheres to [Semantic Versioning](http://semve

## Removed

- Broken FSFW PUS3 TM unpacket.
- Broken FSFW PUS 3, PUS 20, PUS 8 TM unpackers.

# [v8.0.0rc2] 2024-04-23

Expand Down
138 changes: 0 additions & 138 deletions tmtccmd/pus/tm/s23_filemgmt.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
from __future__ import annotations

import logging
import struct
from typing import Optional

from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.tm import CdsShortTimestamp, PusTelemetry

from tmtccmd.tmtc.base import PusTmInfoBase, PusTmBase


class FileInfo:
def __init__(
Expand All @@ -23,131 +13,3 @@ def __init__(
self.repo_path = repo_path
self.file_size = file_size
self.lock_status = lock_status


class Service23Tm(PusTmInfoBase, PusTmBase):
MAX_REPOSITORY_LENGTH = 64
MAX_FILENAME_LENGTH = 12

def __init__(
self,
subservice_id: int,
object_id: bytearray,
repo_path: str,
file_name: str,
time: Optional[CdsShortTimestamp],
ssc: int = 0,
source_data: bytearray = bytearray([]),
apid: int = -1,
packet_version: int = 0b000,
space_time_ref: int = 0b0000,
destination_id: int = 0,
):
self.object_id = object_id
self.data_start_idx = 0
self.file_info = FileInfo(file_name=file_name, repo_path=repo_path)
pus_tm = PusTelemetry(
service=PusService.S23_FILE_MGMT,
subservice=subservice_id,
time_provider=time,
seq_count=ssc,
source_data=source_data,
apid=apid,
packet_version=packet_version,
space_time_ref=space_time_ref,
destination_id=destination_id,
)
PusTmBase.__init__(self, pus_tm=pus_tm)
PusTmInfoBase.__init__(self, pus_tm=pus_tm)

self.set_packet_info("File Service Reply")

@staticmethod
def __init_without_base(instance: Service23Tm):
tm_data = instance.tm_data
if len(tm_data) < 4:
logging.getLogger(__name__).error("Service23TM: Invalid packet format!")
return
instance.object_id = struct.unpack("!I", tm_data[0:4])[0]
instance.file_info.file_size = 0
instance.file_info.lock_status = False
instance.data_start_idx = 0
if instance.subservice == 4:
instance.unpack_repo_and_filename()
instance.unpack_file_attributes()
elif instance.subservice == 132:
instance.unpack_repo_and_filename()

@classmethod
def __empty(cls) -> Service23Tm:
return cls(
subservice_id=0,
object_id=bytearray(4),
repo_path="",
file_name="",
time=CdsShortTimestamp.empty(),
)

@classmethod
def unpack(
cls, raw_telemetry: bytes, time_reader: Optional[CcsdsTimeProvider]
) -> Service23Tm:
service_23_tm = cls.__empty()
service_23_tm.pus_tm = PusTelemetry.unpack(
data=raw_telemetry, time_reader=time_reader
)
service_23_tm.__init_without_base(instance=service_23_tm)
return service_23_tm

def unpack_repo_and_filename(self):
tm_data = self.tm_data
repo_path_found = False
path_idx_start = 0
max_len_to_scan = len(self.tm_data) - 4
for idx in range(4, max_len_to_scan):
if not repo_path_found and tm_data[idx] == 0:
repo_bytes = tm_data[4:idx]
self.file_info.repo_path = repo_bytes.decode("utf-8")
path_idx_start = idx + 1
idx += 1
repo_path_found = True
if repo_path_found:
if tm_data[idx] == 0:
filename_bytes = tm_data[path_idx_start:idx]
self.file_info.file_name = filename_bytes.decode("utf-8")
self.data_start_idx = idx + 1
break

def unpack_file_attributes(self):
# Size of file length (4) + lock status (1), adapt if more field are added!
print(len(self.tm_data) - self.data_start_idx)
if len(self.tm_data) - self.data_start_idx != 5:
logging.getLogger(__name__).error(
"Service23TM: Invalid lenght of file attributes data"
)
return
self.file_info.file_size = struct.unpack(
"!I", self.tm_data[self.data_start_idx : self.data_start_idx + 4]
)[0]
self.file_info.lock_status = self.tm_data[self.data_start_idx + 4]

def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list)
content_list.append(self.object_id)
content_list.append(self.file_info.repo_path)
content_list.append(self.file_info.file_name)
if self.subservice == 4:
content_list.append(self.file_info.file_size)
if self.file_info.lock_status == 0:
content_list.append("No")
else:
content_list.append("Yes")

def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list)
header_list.append("Object ID")
header_list.append("Repo Path")
header_list.append("File Name")
if self.subservice == 4:
header_list.append("File Size")
header_list.append("Locked")
63 changes: 0 additions & 63 deletions tmtccmd/pus/tm/s2_rawcmd.py
Original file line number Diff line number Diff line change
@@ -1,64 +1 @@
"""Base class for implementation of PUS Service 2 handling.
"""

from __future__ import annotations

from typing import Optional

from deprecated.sphinx import deprecated
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss.tm import CdsShortTimestamp, PusTelemetry

from tmtccmd.tmtc.tm_base import PusTmBase, PusTmInfoBase


class Service2Tm(PusTmInfoBase, PusTmBase):
@deprecated(
version="v4.0.0a1",
reason="use a custom wrapper type instead",
)
def __init__(
self,
subservice: int,
time: Optional[CdsShortTimestamp] = None,
ssc: int = 0,
source_data: bytearray = bytearray([]),
apid: int = -1,
packet_version: int = 0b000,
space_time_ref: int = 0b0000,
destination_id: int = 0,
):
pus_tm = PusTelemetry(
service=2,
subservice=subservice,
time_provider=time,
seq_count=ssc,
source_data=source_data,
apid=apid,
packet_version=packet_version,
space_time_ref=space_time_ref,
destination_id=destination_id,
)
PusTmInfoBase.__init__(self, pus_tm=pus_tm)
PusTmBase.__init__(self, pus_tm=pus_tm)
self.set_packet_info("Raw Commanding Reply")

@classmethod
def __empty(cls) -> Service2Tm:
return cls(subservice=0)

@classmethod
def unpack(
cls, raw_telemetry: bytes, time_reader: Optional[CcsdsTimeProvider]
) -> Service2Tm:
service_2_tm = cls.__empty()
service_2_tm.pus_tm = PusTelemetry.unpack(
data=raw_telemetry, time_reader=time_reader
)
return service_2_tm

def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list=content_list)

def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list=header_list)
115 changes: 0 additions & 115 deletions tmtccmd/pus/tm/s8_fsfw_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,118 +2,3 @@
"""

from __future__ import annotations
import struct
from typing import Optional
from deprecated.sphinx import deprecated

from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss import PusService
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.util import UnsignedByteField
from tmtccmd.tmtc.tm_base import PusTmInfoBase, PusTmBase
from tmtccmd.util.obj_id import ObjectIdU32


# TODO: Implement custom reader class, unittest it.
class Service8FsfwTm(PusTmBase, PusTmInfoBase):
"""Custom Action Service Telemetry handler tailored towards Flight Software Framework (FSFW)
TM service 8 packets
"""

@deprecated(version="8.0.0", reason="deprecated TM API")
def __init__(
self,
subservice: int,
object_id: bytearray,
action_id: int,
custom_data: bytearray,
time: Optional[CcsdsTimeProvider] = None,
ssc: int = 0,
apid: int = -1,
packet_version: int = 0b000,
space_time_ref: int = 0b0000,
destination_id: int = 0,
):
"""This class can be used to deserialize service 8 packets.
:raises ValueError: If the length of the passed bytearray is too short.
"""
self._object_id = ObjectIdU32.from_bytes(obj_id_as_bytes=object_id)
self._action_id = action_id
self._custom_data = custom_data
source_data = bytearray()
source_data.extend(object_id)
source_data.extend(struct.pack("!I", self._action_id))
source_data.extend(self._custom_data)
pus_tm = PusTelemetry(
service=PusService.S8_FUNC_CMD,
subservice=subservice,
time_provider=time,
seq_count=ssc,
source_data=source_data,
apid=apid,
packet_version=packet_version,
space_time_ref=space_time_ref,
destination_id=destination_id,
)
PusTmBase.__init__(self, pus_tm=pus_tm)
PusTmInfoBase.__init__(self, pus_tm=pus_tm)
self.__init_without_base(instance=self)

@staticmethod
def __init_without_base(instance: Service8FsfwTm):
if instance.subservice == 130:
tm_data = instance.tm_data
if len(tm_data) < 8:
raise ValueError(
f"Length of Service 8 TM data field {len(tm_data)} short than 8"
)
instance.set_packet_info("Functional Data Reply")
instance._object_id = ObjectIdU32.from_bytes(obj_id_as_bytes=tm_data[0:4])
instance._action_id = struct.unpack("!I", tm_data[4:8])[0]
instance._custom_data = tm_data[8:]
else:
instance.set_packet_info("Unknown functional commanding reply")

@classmethod
def __empty(cls) -> Service8FsfwTm:
return cls(
subservice=0,
object_id=bytearray(4),
action_id=0,
custom_data=bytearray(),
)

@classmethod
def unpack(cls, raw_telemetry: bytes, time_reader: Optional[CcsdsTimeProvider]):
service_8_tm = cls.__empty()
service_8_tm.pus_tm = PusTelemetry.unpack(
data=raw_telemetry, time_reader=time_reader
)
service_8_tm.__init_without_base(instance=service_8_tm)
return service_8_tm

def append_telemetry_content(self, content_list: list):
super().append_telemetry_content(content_list=content_list)
content_list.append(self._object_id.as_hex_string)
content_list.append(self._action_id)

def append_telemetry_column_headers(self, header_list: list):
super().append_telemetry_column_headers(header_list=header_list)
header_list.append("Object ID")
header_list.append("Action ID")

@property
def source_object_id_as_bytes(self) -> bytes:
return bytes(self._object_id.as_bytes)

@property
def source_object_id(self) -> UnsignedByteField:
return self._object_id

@property
def action_id(self) -> int:
return self._action_id

@property
def custom_data(self) -> bytearray:
return self._custom_data

0 comments on commit 7d726c4

Please sign in to comment.