Skip to content

Commit

Permalink
Merge pull request #156 from robamu-org/improve-hook-module
Browse files Browse the repository at this point in the history
Improve hook module and naming
  • Loading branch information
robamu committed May 14, 2024
2 parents cd10f7b + a15dc52 commit 1c2965b
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 96 deletions.
20 changes: 7 additions & 13 deletions examples/app/tmtcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
TcProcedureType,
TcQueueEntryType,
)
from tmtccmd.util import ObjectIdDictT
from spacepackets.seqcount import FileSeqCountProvider, PusFileSeqCountProvider

_LOGGER = logging.getLogger()
Expand All @@ -58,7 +57,7 @@

class ExampleHookClass(HookBase):
def __init__(self, json_cfg_path: str):
super().__init__(json_cfg_path=json_cfg_path)
super().__init__(cfg_file_path=json_cfg_path)

def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
assert self.cfg_path is not None
Expand Down Expand Up @@ -124,11 +123,6 @@ def perform_mode_operation(self, _tmtc_backend: CcsdsTmtcBackend, _mode: int):
_LOGGER.info("Mode operation hook was called")
pass

def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_core_object_ids

return get_core_object_ids()


class PusTmHandler(SpecificApidHandlerBase):
def __init__(
Expand Down Expand Up @@ -238,15 +232,15 @@ def send_cb(self, send_params: SendCbParams):
log_entry = entry_helper.to_log_entry()
_LOGGER.info(log_entry.log_str)

def queue_finished_cb(self, helper: ProcedureWrapper):
if helper.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = helper.to_tree_commanding_procedure()
def queue_finished_cb(self, info: ProcedureWrapper):
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
_LOGGER.info(f"Queue handling finished for command {def_proc.cmd_path}")

def feed_cb(self, helper: ProcedureWrapper, wrapper: FeedWrapper):
def feed_cb(self, info: ProcedureWrapper, wrapper: FeedWrapper):
self.queue_helper.queue_wrapper = wrapper.queue_wrapper
if helper.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = helper.to_tree_commanding_procedure()
if info.proc_type == TcProcedureType.TREE_COMMANDING:
def_proc = info.to_tree_commanding_procedure()
cmd_path = def_proc.cmd_path
assert cmd_path is not None
# Path starts with / so the first entry of the list will be an empty string. We cut
Expand Down
31 changes: 17 additions & 14 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,51 @@
from unittest import TestCase

from tmtccmd.util import ObjectIdU32
from tmtccmd.util.obj_id import ObjectIdU8, ObjectIdU16
from tmtccmd.util.obj_id import (
ComponentIdU16,
ComponentIdU32,
ComponentIdU8,
)


class TestObjectId(TestCase):
def test_basic(self):
obj_id0 = ObjectIdU32(1, "Some Name")
obj_id0 = ComponentIdU32(1, "Some Name")
self.assertEqual(str(obj_id0), "Object ID Some Name with ID 0x00000001")
self.assertEqual(
obj_id0.__repr__(), "ObjectIdU32(object_id=1, name='Some Name')"
obj_id0.__repr__(), "ComponentIdU32(object_id=1, name='Some Name')"
)
self.assertEqual(obj_id0.as_bytes, bytes([0x00, 0x00, 0x00, 0x01]))
self.assertEqual(obj_id0.as_hex_string, "0x00000001")
self.assertEqual(int(obj_id0), 1)
obj_id1 = ObjectIdU32(1, "Other Name")
obj_id1 = ComponentIdU32(1, "Other Name")
self.assertEqual(obj_id0, obj_id1)
obj_from_raw = ObjectIdU32.from_bytes(obj_id0.as_bytes)
obj_from_raw = ComponentIdU32.from_bytes(obj_id0.as_bytes)
self.assertEqual(obj_from_raw, obj_id0)
with self.assertRaises(ValueError):
ObjectIdU32.from_bytes(bytes())
ComponentIdU32.from_bytes(bytes())
with self.assertRaises(ValueError):
ObjectIdU32.from_bytes(bytes([0, 1, 2]))
ComponentIdU32.from_bytes(bytes([0, 1, 2]))
with self.assertRaises(ValueError):
obj_id1.obj_id = -1

def test_diff_types(self):
obj_id_u8 = ObjectIdU8(1, "U8 ID 0")
obj_id_u8 = ComponentIdU8(1, "U8 ID 0")
self.assertEqual(obj_id_u8.as_bytes, bytes([1]))
self.assertEqual(obj_id_u8.as_hex_string, "0x01")
self.assertEqual(obj_id_u8.byte_len, 1)
obj_id_u16 = ObjectIdU16(2, "U16 ID 2")
obj_id_u16 = ComponentIdU16(2, "U16 ID 2")
self.assertEqual(obj_id_u16.as_bytes, bytes([0, 2]))
self.assertEqual(obj_id_u16.as_hex_string, "0x0002")
self.assertEqual(obj_id_u16.byte_len, 2)
obj_id_u32 = ObjectIdU32(1, "U32 ID 1")
obj_id_u32 = ComponentIdU32(1, "U32 ID 1")
test_dict = dict()
test_dict.update({obj_id_u8: obj_id_u8.name})
test_dict.update({obj_id_u16: obj_id_u16.name})
test_dict.update({obj_id_u32: obj_id_u32.name})
self.assertEqual(len(test_dict), 3)
obj_id_u8_from_raw = ObjectIdU8.from_bytes(obj_id_u8.as_bytes)
obj_id_u8_from_raw = ComponentIdU8.from_bytes(obj_id_u8.as_bytes)
self.assertEqual(obj_id_u8_from_raw, obj_id_u8)
obj_id_u16_from_raw = ObjectIdU16.from_bytes(obj_id_u16.as_bytes)
obj_id_u16_from_raw = ComponentIdU16.from_bytes(obj_id_u16.as_bytes)
self.assertEqual(obj_id_u16_from_raw, obj_id_u16)
obj_id_u32_from_raw = ObjectIdU32.from_bytes(obj_id_u32.as_bytes)
obj_id_u32_from_raw = ComponentIdU32.from_bytes(obj_id_u32.as_bytes)
self.assertEqual(obj_id_u32_from_raw, obj_id_u32)
13 changes: 9 additions & 4 deletions tmtccmd/config/defs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import enum
from dataclasses import dataclass
from typing import Any, Tuple, Dict, Union, Optional
from typing import Any, Tuple, Mapping, Union, Optional

from spacepackets.cfdp import TransmissionMode

# Com Interface Types
ComIfValueT = Tuple[str, Any]
ComIfDictT = Dict[str, ComIfValueT]
ComIfValue = Tuple[str, Any]
ComIfMapping = Mapping[str, ComIfValue]
ComIfDictT = ComIfMapping


@dataclass
Expand All @@ -30,6 +31,10 @@ def default_json_path() -> str:
return "tmtc_conf.json"


def default_toml_path() -> str:
return "tmtc_conf.toml"


class CoreComInterfaces(str, enum.Enum):
DUMMY = "dummy"
UDP = "udp"
Expand All @@ -40,7 +45,7 @@ class CoreComInterfaces(str, enum.Enum):
UNSPECIFIED = "unspec"


CORE_COM_IF_DICT: ComIfDictT = {
CORE_COM_IF_DICT: ComIfMapping = {
CoreComInterfaces.DUMMY: ("Dummy Interface", None),
CoreComInterfaces.UDP: ("TCP/IP with UDP datagrams", None),
CoreComInterfaces.TCP: ("TCP/IP with TCP", None),
Expand Down
84 changes: 42 additions & 42 deletions tmtccmd/config/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from .com import ComCfgBase, ComInterface
from .tmtc import TmtcDefinitionWrapper
from .defs import default_json_path, CORE_COM_IF_DICT, ComIfDictT
from .defs import default_json_path, CORE_COM_IF_DICT, ComIfMapping


_LOGGER = logging.getLogger(__name__)
Expand All @@ -24,33 +24,11 @@ class HookBase(ABC):
TMTC commander core.
"""

def __init__(self, json_cfg_path: Optional[str] = None):
self.cfg_path = json_cfg_path
def __init__(self, cfg_file_path: Optional[str] = None):
self.cfg_path = cfg_file_path
if self.cfg_path is None:
self.cfg_path = default_json_path()

@abstractmethod
def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_core_object_ids

"""The user can specify an object ID dictionary here mapping object ID bytearrays to a
list. This list could contain containing the string representation or additional
information about that object ID.
"""
return get_core_object_ids()

@deprecated(
version="8.0.0rc0",
reason="implement and use get_communication_interface instead",
)
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
"""Assign the communication interface used by the TMTC commander to send and receive
TMTC with.
:param com_if_key: String key of the communication interface to be created.
"""
return self.get_communication_interface(com_if_key)

@abstractmethod
def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
from tmtccmd.config.com import create_com_interface_default
Expand All @@ -59,28 +37,14 @@ def get_communication_interface(self, com_if_key: str) -> Optional[ComInterface]
cfg_base = ComCfgBase(com_if_key=com_if_key, json_cfg_path=self.cfg_path)
return create_com_interface_default(cfg_base)

def get_com_if_dict(self) -> ComIfDictT:
return CORE_COM_IF_DICT

@deprecated(
version="8.0.0rc0",
reason="implement and use get_command_definitions instead",
)
def get_tmtc_definitions(self) -> TmtcDefinitionWrapper:
"""This is a dicitonary mapping services represented by strings to an operation code
dictionary.
:return:
"""
from tmtccmd.config.globals import get_default_tmtc_defs

return get_default_tmtc_defs()

@abstractmethod
def get_command_definitions(self) -> CmdTreeNode:
"""This function should return the root node of the command definition tree."""
pass

def get_com_if_dict(self) -> ComIfMapping:
return CORE_COM_IF_DICT

def get_cmd_history(self) -> Optional[History]:
"""Optionlly return a history class for the past command paths which will be used
when prompting a command path from the user in CLI mode."""
Expand All @@ -94,3 +58,39 @@ def perform_mode_operation(self, tmtc_backend: BackendBase, mode: int):
:return:
"""
_LOGGER.warning("No custom mode operation implemented")

@deprecated(version="8.0.0", reason="application specific code")
def get_object_ids(self) -> ObjectIdDictT:
from tmtccmd.config.objects import get_base_component_id_mapping

"""The user can specify an object ID dictionary here mapping object ID bytearrays to a
list. This list could contain containing the string representation or additional
information about that object ID.
"""
return get_base_component_id_mapping()

@deprecated(
version="8.0.0rc0",
reason="implement and use get_command_definitions instead",
)
def get_tmtc_definitions(self) -> TmtcDefinitionWrapper:
"""This is a dicitonary mapping services represented by strings to an operation code
dictionary.
:return:
"""
from tmtccmd.config.globals import get_default_tmtc_defs

return get_default_tmtc_defs()

@deprecated(
version="8.0.0rc0",
reason="implement and use get_communication_interface instead",
)
def assign_communication_interface(self, com_if_key: str) -> Optional[ComInterface]:
"""Assign the communication interface used by the TMTC commander to send and receive
TMTC with.
:param com_if_key: String key of the communication interface to be created.
"""
return self.get_communication_interface(com_if_key)
10 changes: 7 additions & 3 deletions tmtccmd/config/objects.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from tmtccmd.util.obj_id import ObjectIdDictT, ObjectIdU32
from tmtccmd.util.obj_id import ComponentIdMapping, ObjectIdU32


INVALID_ID = bytes([0xFF, 0xFF, 0xFF, 0xFF])


def get_core_object_ids() -> ObjectIdDictT:
def get_base_component_id_mapping() -> ComponentIdMapping:
"""These are the object IDs for the tmtccmd core. The core will usually take care of
inserting these into the object manager during the program initialization.
:return Dictionary of the core object IDs
"""
invalid_id = ObjectIdU32.from_bytes(obj_id_as_bytes=INVALID_ID)
invalid_id = ObjectIdU32.from_bytes(raw=INVALID_ID)
invalid_id.name = "Invalid ID"
object_id_dict = {INVALID_ID: invalid_id}
return object_id_dict


def get_core_object_ids() -> ComponentIdMapping:
return get_base_component_id_mapping()
14 changes: 13 additions & 1 deletion tmtccmd/util/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,14 @@
from .obj_id import ObjectIdU32, ObjectIdU16, ObjectIdU8, ObjectIdBase, ObjectIdDictT
from .obj_id import (
ComponentIdU32,
ComponentIdU16,
ComponentIdU8,
ComponentIdBase,
ComponentIdMapping,
ObjectIdU32,
ObjectIdU16,
ObjectIdU8,
ObjectIdBase,
ObjectIdMapping,
ObjectIdDictT,
)
from .retval import RetvalDictT
8 changes: 4 additions & 4 deletions tmtccmd/util/conf_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections.abc
import logging
from typing import Tuple, Union
from typing import Any, Tuple, Union
from contextlib import contextmanager


Expand Down Expand Up @@ -62,11 +62,11 @@ def check_args_in_dict(


def __handle_iterable_non_dict(
param: any,
param: Any,
iterable: collections.abc.Iterable,
might_be_integer: bool,
init_res_tuple: Tuple[bool, any],
) -> (bool, any):
init_res_tuple: Tuple[bool, Any],
) -> Tuple[bool, Any]:
param_list = list()
for idx, enum_value in enumerate(iterable):
if isinstance(enum_value.value, str):
Expand Down
3 changes: 2 additions & 1 deletion tmtccmd/util/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import enum
from typing import Any

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -57,7 +58,7 @@ def check_json_file(json_cfg_path: str) -> bool:


def save_to_json_with_prompt(
key: str, value: any, name: str, json_cfg_path: str, json_obj: any
key: str, value: Any, name: str, json_cfg_path: str, json_obj: Any
) -> bool:
save_to_json = input(
f"Do you want to store the {name} to the configuration file? (y/n): "
Expand Down

0 comments on commit 1c2965b

Please sign in to comment.