Skip to content

Commit

Permalink
Merge pull request #14 from spacefisch/develop
Browse files Browse the repository at this point in the history
Changes for 1.4.4
  • Loading branch information
robamu committed May 9, 2021
2 parents f80d00e + 275b675 commit d0168b4
Show file tree
Hide file tree
Showing 24 changed files with 83 additions and 85 deletions.
7 changes: 6 additions & 1 deletion example/config/hook_implementation.py
Expand Up @@ -10,6 +10,9 @@

class ExampleHookClass(TmTcHookBase):

def set_json_config_file_path(self) -> str:
return "tmtc_config.json"

def get_version(self) -> str:
return "My Version String"

Expand All @@ -25,7 +28,9 @@ def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter)
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
LOGGER.info("Communication interface assignment function was called")
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path()
)

def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
LOGGER.info("Mode operation hook was called")
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_obj_id_manager.py
@@ -1,5 +1,5 @@
from unittest import TestCase
from tmtccmd.core.object_id_manager import insert_object_id, get_object_id_info, insert_object_ids
from tmtccmd.core.object_id_manager import insert_object_id, get_object_id_info

TEST_ID_0 = bytearray([0x00, 0x01, 0x02, 0x03])

Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_pus.py
Expand Up @@ -7,7 +7,7 @@
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.ecss.tc import generate_crc, generate_packet_crc
from tmtccmd.ccsds.spacepacket import get_sp_packet_sequence_control
from tmtccmd.ecss.conf import set_default_apid, get_default_apid, PusVersion, set_pus_tm_version, get_pus_tm_version
from tmtccmd.ecss.conf import set_default_apid, get_default_apid, PusVersion, get_pus_tm_version
from tmtccmd.pus_tm.service_17_test import Service17TM, Service17TmPacked
from tmtccmd.ecss.tm import PusTelemetry

Expand Down
4 changes: 2 additions & 2 deletions src/tmtccmd/__init__.py
@@ -1,9 +1,9 @@
VERSION_NAME = "tmtccmd"
VERSION_MAJOR = 1
VERSION_MINOR = 4
VERSION_REVISION = 3
VERSION_REVISION = 4

# I think this needs to be in string representation to be parsed so we can't
# use a formatted string here.
__version__ = "1.4.3"
__version__ = "1.4.4"

1 change: 0 additions & 1 deletion src/tmtccmd/com_if/com_interface_base.py
Expand Up @@ -9,7 +9,6 @@
:author: R. Mueller
"""
from abc import abstractmethod
from typing import Tuple

from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tm.factory import PusTmListT
Expand Down
8 changes: 4 additions & 4 deletions src/tmtccmd/com_if/dummy_com_if.py
Expand Up @@ -3,7 +3,7 @@
:date: 09.03.2020
:brief: Dummy Communication Interface. Currently serves to provide an example without external hardware
"""
from typing import Tuple, cast
from typing import cast

from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.ecss.tc import PusTelecommand
Expand All @@ -24,13 +24,13 @@ def __init__(self, tmtc_printer):
self.tc_ssc = 0
self.tc_packet_id = 0

def initialize(self) -> any:
def initialize(self, args: any = None) -> any:
pass

def open(self):
def open(self, args: any = None) -> None:
pass

def close(self) -> None:
def close(self, args: any = None) -> None:
pass

def data_available(self, parameters):
Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/qemu_com_if.py
Expand Up @@ -98,15 +98,15 @@ def set_dle_settings(self, dle_queue_len: int, dle_max_frame: int, dle_timeout:
self.dle_max_frame = dle_max_frame
self.dle_timeout = dle_timeout

def initialize(self) -> None:
def initialize(self, args: any = None) -> any:
"""
Needs to be called by application code once for DLE mode!
"""
if not self.loop.is_running():
self.background_loop_thread = Thread(
target=start_background_loop, args=(self.loop,), daemon=True)

def open(self) -> None:
def open(self, args: any = None) -> None:
self.background_loop_thread.start()
try:
self.usart = asyncio.run_coroutine_threadsafe(
Expand All @@ -119,7 +119,7 @@ def open(self) -> None:
self.reception_buffer = deque(maxlen=self.dle_queue_len)
asyncio.run_coroutine_threadsafe(self.start_dle_polling(), self.loop)

def close(self):
def close(self, args: any = None) -> None:
if self.loop.is_closed():
return

Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/serial_com_if.py
Expand Up @@ -98,12 +98,12 @@ def set_dle_settings(self, dle_queue_len: int, dle_max_frame: int, dle_timeout:
self.dle_max_frame = dle_max_frame
self.dle_timeout = dle_timeout

def initialize(self):
def initialize(self, args: any = None) -> any:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.reception_buffer = deque(maxlen=self.dle_queue_len)
self.dle_polling_active_event = threading.Event()

def open(self) -> None:
def open(self, args: any = None) -> None:
try:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.dle_polling_active_event.set()
Expand All @@ -119,7 +119,7 @@ def open(self) -> None:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.reception_thread.start()

def close(self):
def close(self, args: any = None) -> None:
try:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.dle_polling_active_event.clear()
Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/tcpip_udp_com_if.py
Expand Up @@ -7,7 +7,7 @@
"""
import select
import socket
from typing import Tuple, Union
from typing import Union

from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.core.definitions import CoreModeList
Expand Down Expand Up @@ -59,7 +59,7 @@ def __del__(self):
except IOError:
LOGGER.warning("Could not close UDP communication interface!")

def initialize(self) -> None:
def initialize(self, args: any = None) -> any:
pass

def open(self, args: any = None):
Expand All @@ -78,7 +78,7 @@ def open(self, args: any = None):
ping_cmd = pack_service17_ping_command(ssc=0)
self.send_telecommand(ping_cmd.pack(), ping_cmd)

def close(self) -> None:
def close(self, args: any = None) -> None:
if self.udp_socket is not None:
self.udp_socket.close()

Expand Down
2 changes: 1 addition & 1 deletion src/tmtccmd/config/globals.py
@@ -1,4 +1,4 @@
import collections
import collections.abc
from typing import List, Union

from tmtccmd.utility.tmtcc_logger import get_logger
Expand Down
8 changes: 5 additions & 3 deletions src/tmtccmd/config/objects.py
@@ -1,14 +1,16 @@
from typing import Dict
from tmtccmd.core.definitions import CoreObjectIds


def get_core_object_ids() -> Dict[int, bytearray]:
INVALID_ID = bytes([0xff, 0xff, 0xff, 0xff])


def get_core_object_ids() -> Dict[bytes, list]:
"""
These are the object IDs for the tmtccmd core. The core will usually take care of
inserting these into the object manager during the program initialization.
:return: Dictionary of the core object IDs
"""
object_id_dict = {
CoreObjectIds.INVALID: bytearray([0xff, 0xff, 0xff, 0xff]),
INVALID_ID: ["Invalid ID"],
}
return object_id_dict
6 changes: 0 additions & 6 deletions src/tmtccmd/core/definitions.py
@@ -1,7 +1,6 @@
"""
@brief Definitions for the TMTC commander core
"""

import enum
from typing import Tuple

Expand Down Expand Up @@ -35,10 +34,6 @@ class QueueCommands(enum.Enum):
SET_TIMEOUT = enum.auto()


class CoreObjectIds(enum.IntEnum):
INVALID = 999


# Mode options, set by args parser
class CoreModeList(enum.IntEnum):
SINGLE_CMD_MODE = 0
Expand Down Expand Up @@ -110,4 +105,3 @@ class CoreGlobalIds(enum.IntEnum):

DEFAULT_APID = 0xef
DEBUG_MODE = False

2 changes: 1 addition & 1 deletion src/tmtccmd/core/frontend_base.py
@@ -1,7 +1,7 @@
from abc import abstractmethod


class FrontendBase():
class FrontendBase:

@abstractmethod
def start(self, args: any):
Expand Down
27 changes: 16 additions & 11 deletions src/tmtccmd/core/hook_base.py
Expand Up @@ -44,13 +44,21 @@ def add_globals_pre_args_parsing(self, gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=DEFAULT_APID)

@abstractmethod
def add_globals_post_args_parsing(self, args: argparse.Namespace, json_cfg_path: str = ""):
def set_json_config_file_path(self) -> str:
"""
The user can specify a path and filename for the JSON configuration file by overriding this function.
:return:
"""
return "tmtc_config.json"

@abstractmethod
def add_globals_post_args_parsing(self, args: argparse.Namespace):
"""
Add global variables prior after parsing the CLI arguments.
:param gui: Specify whether a GUI is used
:param args: Specify whether a GUI is used
"""
from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(args=args)
set_default_globals_post_args_parsing(args=args, json_cfg_path=self.set_json_config_file_path())

@abstractmethod
def assign_communication_interface(
Expand All @@ -62,7 +70,9 @@ def assign_communication_interface(
:param tmtc_printer: Printer utility instance.
"""
from tmtccmd.defaults.com_setup import create_communication_interface_default
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path()
)

@abstractmethod
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
Expand Down Expand Up @@ -120,7 +130,7 @@ def handle_service_3_housekeeping(
) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
:param object_id_key: Integer representation of the found object ID. See
:param object_id: Integer representation of the found object ID. See
the :func:`~tmtccmd.core.hook_base.set_object_ids function for more information
:param set_id: Unique set ID of the HK reply
:param hk_data: HK data. For custom HK handling, whole HK data will be passed here.
Expand All @@ -146,16 +156,11 @@ def handle_service_5_event(
"""
This function is called when a Service 5 Event Packet is received. The user can specify a custom
string here which will be printed to display additional information related to an event.
:param object_id_key: Integer representation of the found object ID. See
:param object_id: Integer representation of the found object ID. See
the :func:`~tmtccmd.core.hook_base.set_object_ids function for more information
:param event_id: Two-byte event ID
:param param_1: Four-byte Parameter 1
:param param_2: Four-byte Parameter 2
:return: Custom information string which will be printed with the event
"""
return ""

@abstractmethod
def set_json_config_file_path(self) -> str:
return "tmtc_config.json"

7 changes: 3 additions & 4 deletions src/tmtccmd/core/object_id_manager.py
Expand Up @@ -2,7 +2,6 @@
from typing import Dict

from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.core.definitions import CoreObjectIds

LOGGER = get_logger()

Expand Down Expand Up @@ -40,12 +39,12 @@ def get_object_id_info(self, object_id: bytearray):

def get_object_id_info_from_int_id(self, object_id: int):
object_id_bytearray = struct.pack('!I', object_id)
return get_object_id_info(object_id=object_id_bytearray)
return self.get_object_id_info(object_id=bytearray(object_id_bytearray))

def insert_object_id(self, object_id: bytearray, object_id_info: list):
self.object_id_dict.update({bytes(object_id): object_id_info})

def insert_object_ids(self, object_id_dict: Dict[bytearray, list]):
def insert_object_ids(self, object_id_dict: Dict[bytes, list]):
self.object_id_dict.update(object_id_dict)


Expand All @@ -55,7 +54,7 @@ def insert_object_id(object_id: bytearray, object_id_info: list):
)


def insert_object_ids(object_id_dict: Dict[bytearray, list]):
def insert_object_ids(object_id_dict: Dict[bytes, list]):
if object_id_dict is not None:
return ObjectIdManager.get_manager().insert_object_ids(object_id_dict=object_id_dict)

Expand Down
22 changes: 12 additions & 10 deletions src/tmtccmd/defaults/com_setup.py
Expand Up @@ -6,7 +6,7 @@
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.com_if.serial_com_if import SerialConfigIds, SerialCommunicationType
from tmtccmd.com_if.serial_utilities import determine_com_port, determine_baud_rate
from tmtccmd.com_if.tcpip_utilities import TcpIpConfigIds, ethernet_address_t
from tmtccmd.com_if.tcpip_utilities import TcpIpConfigIds
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

Expand All @@ -15,7 +15,7 @@


def create_communication_interface_default(
com_if: int, tmtc_printer: TmTcPrinter
com_if: int, tmtc_printer: TmTcPrinter, json_cfg_path: str
) -> Union[CommunicationInterface, None]:
from tmtccmd.com_if.serial_com_if import SerialComIF
from tmtccmd.com_if.dummy_com_if import DummyComIF
Expand Down Expand Up @@ -45,7 +45,7 @@ def create_communication_interface_default(
serial_baudrate = serial_cfg[SerialConfigIds.SERIAL_BAUD_RATE]
serial_timeout = serial_cfg[SerialConfigIds.SERIAL_TIMEOUT]
# Determine COM port, either extract from JSON file or ask from user.
com_port = determine_com_port()
com_port = determine_com_port(json_cfg_path=json_cfg_path)
communication_interface = SerialComIF(
tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate,
serial_timeout=serial_timeout,
Expand All @@ -63,7 +63,8 @@ def create_communication_interface_default(
dle_max_queue_len = serial_cfg[SerialConfigIds.SERIAL_DLE_QUEUE_LEN]
dle_max_frame_size = serial_cfg[SerialConfigIds.SERIAL_DLE_MAX_FRAME_SIZE]
communication_interface.set_dle_settings(
dle_max_queue_len, dle_max_frame_size, serial_timeout)
dle_max_queue_len, dle_max_frame_size, serial_timeout
)
else:
communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
if not communication_interface.valid:
Expand Down Expand Up @@ -92,23 +93,24 @@ def default_tcpip_udp_cfg_setup(json_cfg_path: str):
update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)


def default_serial_cfg_setup(com_if: int):
baud_rate = determine_baud_rate()
def default_serial_cfg_setup(json_cfg_path: str, com_if: int):
baud_rate = determine_baud_rate(json_cfg_path=json_cfg_path)
if com_if == CoreComInterfaces.SERIAL_DLE:
serial_port = determine_com_port()
serial_port = determine_com_port(json_cfg_path=json_cfg_path)
else:
serial_port = ""
set_up_serial_cfg(com_if=com_if, baud_rate=baud_rate, com_port=serial_port)
set_up_serial_cfg(json_cfg_path=json_cfg_path, com_if=com_if, baud_rate=baud_rate, com_port=serial_port)


def set_up_serial_cfg(
com_if: int, baud_rate: int, com_port: str = "", tm_timeout: float = 0.01,
json_cfg_path: str, com_if: int, baud_rate: int, com_port: str = "", tm_timeout: float = 0.01,
ser_com_type: SerialCommunicationType = SerialCommunicationType.DLE_ENCODING,
ser_frame_size: int = 256, dle_queue_len: int = 25, dle_frame_size: int = 1024
):
"""
Default configuration to set up serial communication. The serial port and the baud rate
will be determined from a JSON configuration file and prompted from the user
:param json_cfg_path:
:param com_if:
:param com_port:
:param baud_rate:
Expand All @@ -122,7 +124,7 @@ def set_up_serial_cfg(
update_global(CoreGlobalIds.USE_SERIAL, True)
if com_if == CoreComInterfaces.SERIAL_DLE and com_port == "":
LOGGER.warning("Invalid com port specified!")
com_port = determine_com_port()
com_port = determine_com_port(json_cfg_path=json_cfg_path)
serial_cfg_dict = get_global(CoreGlobalIds.SERIAL_CONFIG)
serial_cfg_dict.update({SerialConfigIds.SERIAL_PORT: com_port})
serial_cfg_dict.update({SerialConfigIds.SERIAL_BAUD_RATE: baud_rate})
Expand Down

0 comments on commit d0168b4

Please sign in to comment.