Skip to content

Commit

Permalink
Merge pull request #75 from us-irs/ccsds-ecss-additions
Browse files Browse the repository at this point in the history
additions for ECSS and CCSDS modules
  • Loading branch information
robamu committed Apr 22, 2024
2 parents 1023c3a + f6a2642 commit b122ab6
Show file tree
Hide file tree
Showing 18 changed files with 124 additions and 116 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

# [unreleased]

# [v0.23.1] 2024-04-22

## Added

- PUS TC app data setter method.
- New `PusTelecommand` alias/shorthand: `PusTc`.
- New `SpacePacketHeader` alias/shorthand: `SpHeader`.
- New `PusTelemetry` alias/shorthand: `PusTm`.

# [v0.23.0] 2024-01-24

## Changed
Expand Down
12 changes: 6 additions & 6 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ PUS ping telemetry reply without a timestamp.

.. testcode:: pus

from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.tm import PusTelemetry
from spacepackets.ecss.tc import PusTc
from spacepackets.ecss.tm import PusTm

ping_cmd = PusTelecommand(service=17, subservice=1, apid=0x01)
ping_cmd = PusTc(service=17, subservice=1, apid=0x01)
cmd_as_bytes = ping_cmd.pack()
print(f"Ping telecommand [17,1] (hex): [{cmd_as_bytes.hex(sep=',')}]")

ping_reply = PusTelemetry(service=17, subservice=2, apid=0x01, time_provider=None)
ping_reply = PusTm(service=17, subservice=2, apid=0x01, time_provider=None)
tm_as_bytes = ping_reply.pack()
print(f"Ping reply [17,2] (hex): [{tm_as_bytes.hex(sep=',')}]")

Expand All @@ -34,9 +34,9 @@ The following example shows how to generate a space packet header:

.. testcode:: ccsds

from spacepackets.ccsds.spacepacket import SpacePacketHeader, PacketType
from spacepackets.ccsds.spacepacket import SpHeader, PacketType

spacepacket_header = SpacePacketHeader(
spacepacket_header = SpHeader(
packet_type=PacketType.TC, apid=0x01, seq_count=0, data_len=0
)
header_as_bytes = spacepacket_header.pack()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
name = "spacepackets"
description = "Various CCSDS and ECSS packet implementations"
readme = "README.md"
version = "0.23.0"
version = "0.23.1"
requires-python = ">=3.8"
license = {text = "Apache-2.0"}
authors = [
Expand Down
1 change: 1 addition & 0 deletions spacepackets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from spacepackets.ccsds import (
SpacePacketHeader,
SpHeader,
SpacePacket,
PacketType,
SequenceFlags,
Expand Down
1 change: 1 addition & 0 deletions spacepackets/ccsds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This package contains all CCSDS related components"""
from .spacepacket import (
SpHeader,
SpacePacketHeader,
SpacePacket,
PacketType,
Expand Down
3 changes: 3 additions & 0 deletions spacepackets/ccsds/spacepacket.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ def __eq__(self, other: object):
return False


SpHeader = SpacePacketHeader


class SpacePacket:
"""Generic CCSDS space packet which consists of the primary header and can optionally include
a secondary header and a user data field.
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from crcmod.predefined import mkPredefinedCrcFun

from .tc import PusVersion, PusTelecommand, PusTcDataFieldHeader
from .tm import PusTelemetry, PusTmSecondaryHeader
from .tc import PusVersion, PusTelecommand, PusTc, PusTcDataFieldHeader
from .tm import PusTm, PusTelemetry, PusTmSecondaryHeader
from .fields import (
PacketFieldEnum,
PacketFieldBase,
Expand Down
6 changes: 3 additions & 3 deletions spacepackets/ecss/pus_17_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.tm import PusTelemetry, AbstractPusTm
from spacepackets.ecss.tm import PusTm, AbstractPusTm


class Subservice(enum.IntEnum):
Expand All @@ -27,7 +27,7 @@ def __init__(
space_time_ref: int = 0b0000,
destination_id: int = 0,
):
self.pus_tm = PusTelemetry(
self.pus_tm = PusTm(
service=PusService.S17_TEST,
subservice=subservice,
time_provider=time_provider,
Expand Down Expand Up @@ -89,5 +89,5 @@ def unpack(
:raises InvalidTmCrc16: Invalid CRC16.
"""
service_17_tm = cls.__empty(time_provider=time_reader)
service_17_tm.pus_tm = PusTelemetry.unpack(data=data, time_reader=time_reader)
service_17_tm.pus_tm = PusTm.unpack(data=data, time_reader=time_reader)
return service_17_tm
28 changes: 13 additions & 15 deletions spacepackets/ecss/pus_1_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
from spacepackets.ccsds import SpacePacketHeader
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl
from spacepackets.ccsds.time import CcsdsTimeProvider
from spacepackets.ecss import PusTelecommand
from spacepackets.ecss import PusTc
from spacepackets.ecss.conf import FETCH_GLOBAL_APID
from spacepackets.ecss.defs import PusService
from spacepackets.ecss.fields import PacketFieldEnum
from spacepackets.ecss.tm import PusTelemetry, AbstractPusTm
from spacepackets.ecss.tm import PusTm, AbstractPusTm
from .exceptions import TmSrcDataTooShortError

from .req_id import RequestId
Expand Down Expand Up @@ -136,7 +136,7 @@ def __init__(
self._verif_params = VerificationParams(RequestId.empty())
else:
self._verif_params = verif_params
self.pus_tm = PusTelemetry(
self.pus_tm = PusTm(
service=PusService.S1_VERIFICATION,
subservice=subservice,
time_provider=time_provider,
Expand All @@ -158,7 +158,7 @@ def __empty(cls, time_provider: Optional[CcsdsTimeProvider]) -> Service1Tm:
return cls(subservice=Subservice.INVALID, time_provider=time_provider)

@classmethod
def from_tm(cls, tm: PusTelemetry, params: UnpackParams) -> Service1Tm:
def from_tm(cls, tm: PusTm, params: UnpackParams) -> Service1Tm:
service_1_tm = cls.__empty(params.time_reader)
service_1_tm.pus_tm = tm
cls._unpack_raw_tm(service_1_tm, params)
Expand All @@ -176,9 +176,7 @@ def unpack(cls, data: bytes, params: UnpackParams) -> Service1Tm:
:return:
"""
service_1_tm = cls.__empty(params.time_reader)
service_1_tm.pus_tm = PusTelemetry.unpack(
data=data, time_reader=params.time_reader
)
service_1_tm.pus_tm = PusTm.unpack(data=data, time_reader=params.time_reader)
cls._unpack_raw_tm(service_1_tm, params)
return service_1_tm

Expand Down Expand Up @@ -302,7 +300,7 @@ def __eq__(self, other: Service1Tm):


def create_acceptance_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_ACCEPTANCE_SUCCESS,
Expand All @@ -312,7 +310,7 @@ def create_acceptance_success_tm(


def create_acceptance_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -327,7 +325,7 @@ def create_acceptance_failure_tm(


def create_start_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_START_SUCCESS,
Expand All @@ -337,7 +335,7 @@ def create_start_success_tm(


def create_start_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -352,7 +350,7 @@ def create_start_failure_tm(


def create_step_success_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
step_id: PacketFieldEnum,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand All @@ -366,7 +364,7 @@ def create_step_success_tm(


def create_step_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
step_id: PacketFieldEnum,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
Expand All @@ -383,7 +381,7 @@ def create_step_failure_tm(


def create_completion_success_tm(
pus_tc: PusTelecommand, time_provider: Optional[CcsdsTimeProvider]
pus_tc: PusTc, time_provider: Optional[CcsdsTimeProvider]
) -> Service1Tm:
return Service1Tm(
subservice=Subservice.TM_COMPLETION_SUCCESS,
Expand All @@ -393,7 +391,7 @@ def create_completion_success_tm(


def create_completion_failure_tm(
pus_tc: PusTelecommand,
pus_tc: PusTc,
failure_notice: FailureNotice,
time_provider: Optional[CcsdsTimeProvider],
) -> Service1Tm:
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/pus_verificator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass, field
from typing import Dict, Optional, List

from spacepackets.ecss import PusTelecommand
from spacepackets.ecss import PusTc
from spacepackets.ecss.pus_1_verification import RequestId, Service1Tm, Subservice


Expand Down Expand Up @@ -57,7 +57,7 @@ def __init__(self):
self._verif_dict: VerifDictT = dict()
pass

def add_tc(self, tc: PusTelecommand) -> bool:
def add_tc(self, tc: PusTc) -> bool:
req_id = RequestId.from_sp_header(tc.sp_header)
if req_id in self._verif_dict:
return False
Expand Down
4 changes: 2 additions & 2 deletions spacepackets/ecss/req_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from spacepackets import BytesTooShortError
from spacepackets.ccsds.spacepacket import PacketId, PacketSeqCtrl, SpacePacketHeader
from spacepackets.ecss.tc import PusTelecommand
from spacepackets.ecss.tc import PusTc


class RequestId:
Expand Down Expand Up @@ -47,7 +47,7 @@ def unpack(cls, tm_data: bytes) -> RequestId:
)

@classmethod
def from_pus_tc(cls, pus_tc: PusTelecommand):
def from_pus_tc(cls, pus_tc: PusTc):
return cls.from_sp_header(pus_tc.sp_header)

@classmethod
Expand Down
27 changes: 17 additions & 10 deletions spacepackets/ecss/tc.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ def get_header_size(cls):


class InvalidTcCrc16(Exception):
def __init__(self, tc: PusTelecommand):
def __init__(self, tc: PusTc):
self.tc = tc


class PusTelecommand(AbstractSpacePacket):
class PusTc(AbstractSpacePacket):
"""Class representation of a PUS telecommand. Can be converted to the raw byte representation
but also unpacked from a raw byte stream. Only PUS C telecommands are supported.
>>> ping_tc = PusTelecommand(service=17, subservice=1, seq_count=22, apid=0x01)
>>> ping_tc = PusTc(service=17, subservice=1, seq_count=22, apid=0x01)
>>> ping_tc.service
17
>>> ping_tc.subservice
Expand Down Expand Up @@ -181,7 +181,7 @@ def from_sp_header(
pus_tc = cls.empty()
sp_header.packet_type = PacketType.TC
sp_header.sec_header_flag = True
sp_header.data_len = PusTelecommand.get_data_length(
sp_header.data_len = PusTc.get_data_length(
secondary_header_len=PusTcDataFieldHeader.get_header_size(),
app_data_len=len(app_data),
)
Expand All @@ -201,7 +201,7 @@ def from_composite_fields(
sp_header: SpacePacketHeader,
sec_header: PusTcDataFieldHeader,
app_data: bytes = bytes([]),
) -> PusTelecommand:
) -> PusTc:
pus_tc = cls.empty()
if sp_header.packet_type == PacketType.TM:
raise ValueError(
Expand All @@ -213,8 +213,8 @@ def from_composite_fields(
return pus_tc

@classmethod
def empty(cls) -> PusTelecommand:
return PusTelecommand(service=0, subservice=0)
def empty(cls) -> PusTc:
return PusTc(service=0, subservice=0)

def __repr__(self):
"""Returns the representation of a class instance."""
Expand All @@ -235,7 +235,7 @@ def __str__(self):
)

def __eq__(self, other: object):
if isinstance(other, PusTelecommand):
if isinstance(other, PusTc):
return (
self.sp_header == other.sp_header
and self.pus_tc_sec_header == other.pus_tc_sec_header
Expand Down Expand Up @@ -276,7 +276,7 @@ def pack(self, recalc_crc: bool = True) -> bytearray:
return packed_data

@classmethod
def unpack(cls, data: bytes) -> PusTelecommand:
def unpack(cls, data: bytes) -> PusTc:
"""Create an instance from a raw bytestream.
:raises BytesTooShortError: Passed bytestream too short.
Expand Down Expand Up @@ -324,7 +324,7 @@ def get_data_length(app_data_len: int, secondary_header_len: int) -> int:
current_version=get_version(),
details="use pack and the class itself to build this instead",
)
def pack_command_tuple(self) -> Tuple[bytearray, PusTelecommand]:
def pack_command_tuple(self) -> Tuple[bytearray, PusTc]:
"""Pack a tuple consisting of the raw packet as the first entry and the class representation
as the second entry
"""
Expand Down Expand Up @@ -371,6 +371,10 @@ def packet_seq_control(self) -> PacketSeqCtrl:
def app_data(self) -> bytes:
return self._app_data

@app_data.setter
def app_data(self, app_data: bytes):
self._app_data = app_data

@property
def crc16(self) -> Optional[bytes]:
"""Will be the raw CRC16 if the telecommand was created using :py:meth:`unpack`,
Expand All @@ -387,6 +391,9 @@ def apid(self, apid):
self.sp_header.apid = apid


PusTelecommand = PusTc


def generate_packet_crc(tc_packet: bytearray) -> bytes:
"""Removes current Packet Error Control, calculates new
CRC16 checksum and adds it as correct Packet Error Control Code.
Expand Down

0 comments on commit b122ab6

Please sign in to comment.