Skip to content

Commit

Permalink
additions for ECSS and CCSDS modules
Browse files Browse the repository at this point in the history
  • Loading branch information
robamu committed Apr 22, 2024
1 parent 1023c3a commit b1a4664
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 108 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

# [unreleased]

# [v0.23.1] 2024-04-22

## Added

- PUS TC app data setter method.
- New `PusTelecommand` alias/shorthand: `PusTc`.
- New `SpacePacketHeader` alias/shorthand: `SpHeader`.
- New `PusTelemetry` alias/shorthand: `PusTm`.

# [v0.23.0] 2024-01-24

## Changed
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "spacepackets"
description = "Various CCSDS and ECSS packet implementations"
readme = "README.md"
version = "0.23.0"
version = "0.23.1"
requires-python = ">=3.8"
license = {text = "Apache-2.0"}
authors = [
Expand Down
1 change: 1 addition & 0 deletions spacepackets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from spacepackets.ccsds import (
SpacePacketHeader,
SpHeader,
SpacePacket,
PacketType,
SequenceFlags,
Expand Down
1 change: 1 addition & 0 deletions spacepackets/ccsds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This package contains all CCSDS related components"""
from .spacepacket import (
SpHeader,
SpacePacketHeader,
SpacePacket,
PacketType,
Expand Down
3 changes: 3 additions & 0 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ def __eq__(self, other: object):
return False


SpHeader = SpacePacketHeader


class SpacePacket:
"""Generic CCSDS space packet which consists of the primary header and can optionally include
a secondary header and a user data field.
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from crcmod.predefined import mkPredefinedCrcFun

from .tc import PusVersion, PusTelecommand, PusTcDataFieldHeader
from .tm import PusTelemetry, PusTmSecondaryHeader
from .tc import PusVersion, PusTelecommand, PusTc, PusTcDataFieldHeader
from .tm import PusTm, PusTelemetry, PusTmSecondaryHeader
from .fields import (
PacketFieldEnum,
PacketFieldBase,
Expand Down
6 changes: 3 additions & 3 deletions spacepackets/ecss/pus_17_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.tm import PusTelemetry, AbstractPusTm
from spacepackets.ecss.tm import PusTm, AbstractPusTm


class Subservice(enum.IntEnum):
Expand All @@ -27,7 +27,7 @@ def __init__(
space_time_ref: int = 0b0000,
destination_id: int = 0,
):
self.pus_tm = PusTelemetry(
self.pus_tm = PusTm(
service=PusService.S17_TEST,
subservice=subservice,
time_provider=time_provider,
Expand Down Expand Up @@ -89,5 +89,5 @@ def unpack(
:raises InvalidTmCrc16: Invalid CRC16.
"""
service_17_tm = cls.__empty(time_provider=time_reader)
service_17_tm.pus_tm = PusTelemetry.unpack(data=data, time_reader=time_reader)
service_17_tm.pus_tm = PusTm.unpack(data=data, time_reader=time_reader)
return service_17_tm
28 changes: 13 additions & 15 deletions spacepackets/ecss/pus_1_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from spacepackets.ccsds import SpacePacketHeader
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss import PusTelecommand
from spacepackets.ecss import PusTc
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.fields import PacketFieldEnum
from spacepackets.ecss.tm import PusTelemetry, AbstractPusTm
from spacepackets.ecss.tm import PusTm, AbstractPusTm
from .exceptions import TmSrcDataTooShortError

from .req_id import RequestId
Expand Down Expand Up @@ -136,7 +136,7 @@ def __init__(
self._verif_params = VerificationParams(RequestId.empty())
else:
self._verif_params = verif_params
self.pus_tm = PusTelemetry(
self.pus_tm = PusTm(
service=PusService.S1_VERIFICATION,
subservice=subservice,
time_provider=time_provider,
Expand All @@ -158,7 +158,7 @@ def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service1Tm:
return cls(subservice=Subservice.INVALID, time_provider=time_provider)

@classmethod
def from_tm(cls, tm: PusTelemetry, params: UnpackParams) -> Service1Tm:
def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm:
service_1_tm = cls.__empty(params.time_reader)
service_1_tm.pus_tm = tm
cls._unpack_raw_tm(service_1_tm, params)
Expand All @@ -176,9 +176,7 @@ def unpack(cls, data: bytes, params: UnpackParams) -> Service1Tm:
:return:
"""
service_1_tm = cls.__empty(params.time_reader)
service_1_tm.pus_tm = PusTelemetry.unpack(
data=data, time_reader=params.time_reader
)
service_1_tm.pus_tm = PusTm.unpack(data=data, time_reader=params.time_reader)
cls._unpack_raw_tm(service_1_tm, params)
return service_1_tm

Expand Down Expand Up @@ -302,7 +300,7 @@ def __eq__(self, other: Service1Tm):


def create_acceptance_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_ACCEPTANCE_SUCCESS,
Expand All @@ -312,7 +310,7 @@ def create_acceptance_success_tm(


def create_acceptance_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -327,7 +325,7 @@ def create_acceptance_failure_tm(


def create_start_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_START_SUCCESS,
Expand All @@ -337,7 +335,7 @@ def create_start_success_tm(


def create_start_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -352,7 +350,7 @@ def create_start_failure_tm(


def create_step_success_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
step_id: PacketFieldEnum,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -366,7 +364,7 @@ def create_step_success_tm(


def create_step_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
step_id: PacketFieldEnum,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
Expand All @@ -383,7 +381,7 @@ def create_step_failure_tm(


def create_completion_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_COMPLETION_SUCCESS,
Expand All @@ -393,7 +391,7 @@ def create_completion_success_tm(


def create_completion_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/pus_verificator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, List

from spacepackets.ecss import PusTelecommand
from spacepackets.ecss import PusTc
from spacepackets.ecss.pus_1_verification import RequestId, Service1Tm, Subservice


Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self):
self._verif_dict: VerifDictT = dict()
pass

def add_tc(self, tc: PusTelecommand) -> bool:
def add_tc(self, tc: PusTc) -> bool:
req_id = RequestId.from_sp_header(tc.sp_header)
if req_id in self._verif_dict:
return False
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/req_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from spacepackets import BytesTooShortError
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl, SpacePacketHeader
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.tc import PusTc


class RequestId:
Expand Down Expand Up @@ -47,7 +47,7 @@ def unpack(cls, tm_data: bytes) -> RequestId:
)

@classmethod
def from_pus_tc(cls, pus_tc: PusTelecommand):
def from_pus_tc(cls, pus_tc: PusTc):
return cls.from_sp_header(pus_tc.sp_header)

@classmethod
Expand Down
25 changes: 16 additions & 9 deletions spacepackets/ecss/tc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ def get_header_size(cls):


class InvalidTcCrc16(Exception):
def __init__(self, tc: PusTelecommand):
def __init__(self, tc: PusTc):
self.tc = tc


class PusTelecommand(AbstractSpacePacket):
class PusTc(AbstractSpacePacket):
"""Class representation of a PUS telecommand. Can be converted to the raw byte representation
but also unpacked from a raw byte stream. Only PUS C telecommands are supported.
Expand Down Expand Up @@ -181,7 +181,7 @@ def from_sp_header(
pus_tc = cls.empty()
sp_header.packet_type = PacketType.TC
sp_header.sec_header_flag = True
sp_header.data_len = PusTelecommand.get_data_length(
sp_header.data_len = PusTc.get_data_length(
secondary_header_len=PusTcDataFieldHeader.get_header_size(),
app_data_len=len(app_data),
)
Expand All @@ -201,7 +201,7 @@ def from_composite_fields(
sp_header: SpacePacketHeader,
sec_header: PusTcDataFieldHeader,
app_data: bytes = bytes([]),
) -> PusTelecommand:
) -> PusTc:
pus_tc = cls.empty()
if sp_header.packet_type == PacketType.TM:
raise ValueError(
Expand All @@ -213,8 +213,8 @@ def from_composite_fields(
return pus_tc

@classmethod
def empty(cls) -> PusTelecommand:
return PusTelecommand(service=0, subservice=0)
def empty(cls) -> PusTc:
return PusTc(service=0, subservice=0)

def __repr__(self):
"""Returns the representation of a class instance."""
Expand All @@ -235,7 +235,7 @@ def __str__(self):
)

def __eq__(self, other: object):
if isinstance(other, PusTelecommand):
if isinstance(other, PusTc):
return (
self.sp_header == other.sp_header
and self.pus_tc_sec_header == other.pus_tc_sec_header
Expand Down Expand Up @@ -276,7 +276,7 @@ def pack(self, recalc_crc: bool = True) -> bytearray:
return packed_data

@classmethod
def unpack(cls, data: bytes) -> PusTelecommand:
def unpack(cls, data: bytes) -> PusTc:
"""Create an instance from a raw bytestream.
:raises BytesTooShortError: Passed bytestream too short.
Expand Down Expand Up @@ -324,7 +324,7 @@ def get_data_length(app_data_len: int, secondary_header_len: int) -> int:
current_version=get_version(),
details="use pack and the class itself to build this instead",
)
def pack_command_tuple(self) -> Tuple[bytearray, PusTelecommand]:
def pack_command_tuple(self) -> Tuple[bytearray, PusTc]:
"""Pack a tuple consisting of the raw packet as the first entry and the class representation
as the second entry
"""
Expand Down Expand Up @@ -371,6 +371,10 @@ def packet_seq_control(self) -> PacketSeqCtrl:
def app_data(self) -> bytes:
return self._app_data

@app_data.setter
def app_data(self, app_data: bytes):
self._app_data = app_data

@property
def crc16(self) -> Optional[bytes]:
"""Will be the raw CRC16 if the telecommand was created using :py:meth:`unpack`,
Expand All @@ -387,6 +391,9 @@ def apid(self, apid):
self.sp_header.apid = apid


PusTelecommand = PusTc


def generate_packet_crc(tc_packet: bytearray) -> bytes:
"""Removes current Packet Error Control, calculates new
CRC16 checksum and adds it as correct Packet Error Control Code.
Expand Down
21 changes: 10 additions & 11 deletions spacepackets/ecss/tm.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ def header_size(self) -> int:


class InvalidTmCrc16(Exception):
def __init__(self, tm: PusTelemetry):
def __init__(self, tm: PusTm):
self.tm = tm


class PusTelemetry(AbstractPusTm):
class PusTm(AbstractPusTm):
"""Generic PUS telemetry class representation.
Can be used to generate TM packets using a high level interface with the default constructor,
Expand Down Expand Up @@ -275,10 +275,8 @@ def __init__(
self._crc16: Optional[bytes] = None

@classmethod
def empty(cls) -> PusTelemetry:
return PusTelemetry(
service=0, subservice=0, time_provider=CdsShortTimestamp.empty()
)
def empty(cls) -> PusTm:
return PusTm(service=0, subservice=0, time_provider=CdsShortTimestamp.empty())

def pack(self, recalc_crc: bool = True) -> bytearray:
"""Serializes the packet into a raw bytearray.
Expand All @@ -305,9 +303,7 @@ def calc_crc(self):
self._crc16 = struct.pack("!H", crc.crcValue)

@classmethod
def unpack(
cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]
) -> PusTelemetry:
def unpack(cls, data: bytes, time_reader: Optional[CcsdsTimeProvider]) -> PusTm:
"""Attempts to construct a generic PusTelemetry class given a raw bytearray.
:param data: Raw bytes containing the PUS telemetry packet.
Expand Down Expand Up @@ -364,7 +360,7 @@ def from_composite_fields(
sp_header: SpacePacketHeader,
sec_header: PusTmSecondaryHeader,
tm_data: bytes,
) -> PusTelemetry:
) -> PusTm:
pus_tm = cls.empty()
if sp_header.packet_type == PacketType.TC:
raise ValueError(
Expand Down Expand Up @@ -400,7 +396,7 @@ def __repr__(self):
)

def __eq__(self, other: object):
if isinstance(other, PusTelemetry):
if isinstance(other, PusTm):
return (
self.space_packet_header == other.space_packet_header
and self.pus_tm_sec_header == other.pus_tm_sec_header
Expand Down Expand Up @@ -566,3 +562,6 @@ def get_source_data_string(
return get_printable_data_string(
print_format=print_format, data=self._source_data
)


PusTelemetry = PusTm

0 comments on commit b1a4664

Please sign in to comment.