Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes for 1.4.4 #14

Merged
merged 10 commits into from May 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion example/config/hook_implementation.py
Expand Up @@ -10,6 +10,9 @@

class ExampleHookClass(TmTcHookBase):

def set_json_config_file_path(self) -> str:
return "tmtc_config.json"

def get_version(self) -> str:
return "My Version String"

Expand All @@ -25,7 +28,9 @@ def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter)
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
LOGGER.info("Communication interface assignment function was called")
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path()
)

def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
LOGGER.info("Mode operation hook was called")
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_obj_id_manager.py
@@ -1,5 +1,5 @@
from unittest import TestCase
from tmtccmd.core.object_id_manager import insert_object_id, get_object_id_info, insert_object_ids
from tmtccmd.core.object_id_manager import insert_object_id, get_object_id_info

TEST_ID_0 = bytearray([0x00, 0x01, 0x02, 0x03])

Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_pus.py
Expand Up @@ -7,7 +7,7 @@
from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.ecss.tc import generate_crc, generate_packet_crc
from tmtccmd.ccsds.spacepacket import get_sp_packet_sequence_control
from tmtccmd.ecss.conf import set_default_apid, get_default_apid, PusVersion, set_pus_tm_version, get_pus_tm_version
from tmtccmd.ecss.conf import set_default_apid, get_default_apid, PusVersion, get_pus_tm_version
from tmtccmd.pus_tm.service_17_test import Service17TM, Service17TmPacked
from tmtccmd.ecss.tm import PusTelemetry

Expand Down
4 changes: 2 additions & 2 deletions src/tmtccmd/__init__.py
@@ -1,9 +1,9 @@
VERSION_NAME = "tmtccmd"
VERSION_MAJOR = 1
VERSION_MINOR = 4
VERSION_REVISION = 3
VERSION_REVISION = 4

# I think this needs to be in string representation to be parsed so we can't
# use a formatted string here.
__version__ = "1.4.3"
__version__ = "1.4.4"

1 change: 0 additions & 1 deletion src/tmtccmd/com_if/com_interface_base.py
Expand Up @@ -9,7 +9,6 @@
:author: R. Mueller
"""
from abc import abstractmethod
from typing import Tuple

from tmtccmd.ecss.tc import PusTelecommand
from tmtccmd.pus_tm.factory import PusTmListT
Expand Down
8 changes: 4 additions & 4 deletions src/tmtccmd/com_if/dummy_com_if.py
Expand Up @@ -3,7 +3,7 @@
:date: 09.03.2020
:brief: Dummy Communication Interface. Currently serves to provide an example without external hardware
"""
from typing import Tuple, cast
from typing import cast

from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.ecss.tc import PusTelecommand
Expand All @@ -24,13 +24,13 @@ def __init__(self, tmtc_printer):
self.tc_ssc = 0
self.tc_packet_id = 0

def initialize(self) -> any:
def initialize(self, args: any = None) -> any:
pass

def open(self):
def open(self, args: any = None) -> None:
pass

def close(self) -> None:
def close(self, args: any = None) -> None:
pass

def data_available(self, parameters):
Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/qemu_com_if.py
Expand Up @@ -98,15 +98,15 @@ def set_dle_settings(self, dle_queue_len: int, dle_max_frame: int, dle_timeout:
self.dle_max_frame = dle_max_frame
self.dle_timeout = dle_timeout

def initialize(self) -> None:
def initialize(self, args: any = None) -> any:
"""
Needs to be called by application code once for DLE mode!
"""
if not self.loop.is_running():
self.background_loop_thread = Thread(
target=start_background_loop, args=(self.loop,), daemon=True)

def open(self) -> None:
def open(self, args: any = None) -> None:
self.background_loop_thread.start()
try:
self.usart = asyncio.run_coroutine_threadsafe(
Expand All @@ -119,7 +119,7 @@ def open(self) -> None:
self.reception_buffer = deque(maxlen=self.dle_queue_len)
asyncio.run_coroutine_threadsafe(self.start_dle_polling(), self.loop)

def close(self):
def close(self, args: any = None) -> None:
if self.loop.is_closed():
return

Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/serial_com_if.py
Expand Up @@ -98,12 +98,12 @@ def set_dle_settings(self, dle_queue_len: int, dle_max_frame: int, dle_timeout:
self.dle_max_frame = dle_max_frame
self.dle_timeout = dle_timeout

def initialize(self):
def initialize(self, args: any = None) -> any:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.reception_buffer = deque(maxlen=self.dle_queue_len)
self.dle_polling_active_event = threading.Event()

def open(self) -> None:
def open(self, args: any = None) -> None:
try:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.dle_polling_active_event.set()
Expand All @@ -119,7 +119,7 @@ def open(self) -> None:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.reception_thread.start()

def close(self):
def close(self, args: any = None) -> None:
try:
if self.ser_com_type == SerialCommunicationType.DLE_ENCODING:
self.dle_polling_active_event.clear()
Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/com_if/tcpip_udp_com_if.py
Expand Up @@ -7,7 +7,7 @@
"""
import select
import socket
from typing import Tuple, Union
from typing import Union

from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.core.definitions import CoreModeList
Expand Down Expand Up @@ -59,7 +59,7 @@ def __del__(self):
except IOError:
LOGGER.warning("Could not close UDP communication interface!")

def initialize(self) -> None:
def initialize(self, args: any = None) -> any:
pass

def open(self, args: any = None):
Expand All @@ -78,7 +78,7 @@ def open(self, args: any = None):
ping_cmd = pack_service17_ping_command(ssc=0)
self.send_telecommand(ping_cmd.pack(), ping_cmd)

def close(self) -> None:
def close(self, args: any = None) -> None:
if self.udp_socket is not None:
self.udp_socket.close()

Expand Down
2 changes: 1 addition & 1 deletion src/tmtccmd/config/globals.py
@@ -1,4 +1,4 @@
import collections
import collections.abc
from typing import List, Union

from tmtccmd.utility.tmtcc_logger import get_logger
Expand Down
8 changes: 5 additions & 3 deletions src/tmtccmd/config/objects.py
@@ -1,14 +1,16 @@
from typing import Dict
from tmtccmd.core.definitions import CoreObjectIds


def get_core_object_ids() -> Dict[int, bytearray]:
INVALID_ID = bytes([0xff, 0xff, 0xff, 0xff])


def get_core_object_ids() -> Dict[bytes, list]:
"""
These are the object IDs for the tmtccmd core. The core will usually take care of
inserting these into the object manager during the program initialization.
:return: Dictionary of the core object IDs
"""
object_id_dict = {
CoreObjectIds.INVALID: bytearray([0xff, 0xff, 0xff, 0xff]),
INVALID_ID: ["Invalid ID"],
}
return object_id_dict
6 changes: 0 additions & 6 deletions src/tmtccmd/core/definitions.py
@@ -1,7 +1,6 @@
"""
@brief Definitions for the TMTC commander core
"""

import enum
from typing import Tuple

Expand Down Expand Up @@ -35,10 +34,6 @@ class QueueCommands(enum.Enum):
SET_TIMEOUT = enum.auto()


class CoreObjectIds(enum.IntEnum):
INVALID = 999


# Mode options, set by args parser
class CoreModeList(enum.IntEnum):
SINGLE_CMD_MODE = 0
Expand Down Expand Up @@ -110,4 +105,3 @@ class CoreGlobalIds(enum.IntEnum):

DEFAULT_APID = 0xef
DEBUG_MODE = False

2 changes: 1 addition & 1 deletion src/tmtccmd/core/frontend_base.py
@@ -1,7 +1,7 @@
from abc import abstractmethod


class FrontendBase():
class FrontendBase:

@abstractmethod
def start(self, args: any):
Expand Down
27 changes: 16 additions & 11 deletions src/tmtccmd/core/hook_base.py
Expand Up @@ -44,13 +44,21 @@ def add_globals_pre_args_parsing(self, gui: bool = False):
set_default_globals_pre_args_parsing(gui=gui, apid=DEFAULT_APID)

@abstractmethod
def add_globals_post_args_parsing(self, args: argparse.Namespace, json_cfg_path: str = ""):
def set_json_config_file_path(self) -> str:
"""
The user can specify a path and filename for the JSON configuration file by overriding this function.
:return:
"""
return "tmtc_config.json"

@abstractmethod
def add_globals_post_args_parsing(self, args: argparse.Namespace):
"""
Add global variables prior after parsing the CLI arguments.
:param gui: Specify whether a GUI is used
:param args: Specify whether a GUI is used
"""
from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(args=args)
set_default_globals_post_args_parsing(args=args, json_cfg_path=self.set_json_config_file_path())

@abstractmethod
def assign_communication_interface(
Expand All @@ -62,7 +70,9 @@ def assign_communication_interface(
:param tmtc_printer: Printer utility instance.
"""
from tmtccmd.defaults.com_setup import create_communication_interface_default
return create_communication_interface_default(com_if=com_if, tmtc_printer=tmtc_printer)
return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path()
)

@abstractmethod
def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
Expand Down Expand Up @@ -120,7 +130,7 @@ def handle_service_3_housekeeping(
) -> Tuple[list, list, bytearray, int]:
"""
This function is called when a Service 3 Housekeeping packet is received.
:param object_id_key: Integer representation of the found object ID. See
:param object_id: Integer representation of the found object ID. See
the :func:`~tmtccmd.core.hook_base.set_object_ids function for more information
:param set_id: Unique set ID of the HK reply
:param hk_data: HK data. For custom HK handling, whole HK data will be passed here.
Expand All @@ -146,16 +156,11 @@ def handle_service_5_event(
"""
This function is called when a Service 5 Event Packet is received. The user can specify a custom
string here which will be printed to display additional information related to an event.
:param object_id_key: Integer representation of the found object ID. See
:param object_id: Integer representation of the found object ID. See
the :func:`~tmtccmd.core.hook_base.set_object_ids function for more information
:param event_id: Two-byte event ID
:param param_1: Four-byte Parameter 1
:param param_2: Four-byte Parameter 2
:return: Custom information string which will be printed with the event
"""
return ""

@abstractmethod
def set_json_config_file_path(self) -> str:
return "tmtc_config.json"

7 changes: 3 additions & 4 deletions src/tmtccmd/core/object_id_manager.py
Expand Up @@ -2,7 +2,6 @@
from typing import Dict

from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.core.definitions import CoreObjectIds

LOGGER = get_logger()

Expand Down Expand Up @@ -40,12 +39,12 @@ def get_object_id_info(self, object_id: bytearray):

def get_object_id_info_from_int_id(self, object_id: int):
object_id_bytearray = struct.pack('!I', object_id)
return get_object_id_info(object_id=object_id_bytearray)
return self.get_object_id_info(object_id=bytearray(object_id_bytearray))

def insert_object_id(self, object_id: bytearray, object_id_info: list):
self.object_id_dict.update({bytes(object_id): object_id_info})

def insert_object_ids(self, object_id_dict: Dict[bytearray, list]):
def insert_object_ids(self, object_id_dict: Dict[bytes, list]):
self.object_id_dict.update(object_id_dict)


Expand All @@ -55,7 +54,7 @@ def insert_object_id(object_id: bytearray, object_id_info: list):
)


def insert_object_ids(object_id_dict: Dict[bytearray, list]):
def insert_object_ids(object_id_dict: Dict[bytes, list]):
if object_id_dict is not None:
return ObjectIdManager.get_manager().insert_object_ids(object_id_dict=object_id_dict)

Expand Down
22 changes: 12 additions & 10 deletions src/tmtccmd/defaults/com_setup.py
Expand Up @@ -6,7 +6,7 @@
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.com_if.serial_com_if import SerialConfigIds, SerialCommunicationType
from tmtccmd.com_if.serial_utilities import determine_com_port, determine_baud_rate
from tmtccmd.com_if.tcpip_utilities import TcpIpConfigIds, ethernet_address_t
from tmtccmd.com_if.tcpip_utilities import TcpIpConfigIds
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

Expand All @@ -15,7 +15,7 @@


def create_communication_interface_default(
com_if: int, tmtc_printer: TmTcPrinter
com_if: int, tmtc_printer: TmTcPrinter, json_cfg_path: str
) -> Union[CommunicationInterface, None]:
from tmtccmd.com_if.serial_com_if import SerialComIF
from tmtccmd.com_if.dummy_com_if import DummyComIF
Expand Down Expand Up @@ -45,7 +45,7 @@ def create_communication_interface_default(
serial_baudrate = serial_cfg[SerialConfigIds.SERIAL_BAUD_RATE]
serial_timeout = serial_cfg[SerialConfigIds.SERIAL_TIMEOUT]
# Determine COM port, either extract from JSON file or ask from user.
com_port = determine_com_port()
com_port = determine_com_port(json_cfg_path=json_cfg_path)
communication_interface = SerialComIF(
tmtc_printer=tmtc_printer, com_port=com_port, baud_rate=serial_baudrate,
serial_timeout=serial_timeout,
Expand All @@ -63,7 +63,8 @@ def create_communication_interface_default(
dle_max_queue_len = serial_cfg[SerialConfigIds.SERIAL_DLE_QUEUE_LEN]
dle_max_frame_size = serial_cfg[SerialConfigIds.SERIAL_DLE_MAX_FRAME_SIZE]
communication_interface.set_dle_settings(
dle_max_queue_len, dle_max_frame_size, serial_timeout)
dle_max_queue_len, dle_max_frame_size, serial_timeout
)
else:
communication_interface = DummyComIF(tmtc_printer=tmtc_printer)
if not communication_interface.valid:
Expand Down Expand Up @@ -92,23 +93,24 @@ def default_tcpip_udp_cfg_setup(json_cfg_path: str):
update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)


def default_serial_cfg_setup(com_if: int):
baud_rate = determine_baud_rate()
def default_serial_cfg_setup(json_cfg_path: str, com_if: int):
baud_rate = determine_baud_rate(json_cfg_path=json_cfg_path)
if com_if == CoreComInterfaces.SERIAL_DLE:
serial_port = determine_com_port()
serial_port = determine_com_port(json_cfg_path=json_cfg_path)
else:
serial_port = ""
set_up_serial_cfg(com_if=com_if, baud_rate=baud_rate, com_port=serial_port)
set_up_serial_cfg(json_cfg_path=json_cfg_path, com_if=com_if, baud_rate=baud_rate, com_port=serial_port)


def set_up_serial_cfg(
com_if: int, baud_rate: int, com_port: str = "", tm_timeout: float = 0.01,
json_cfg_path: str, com_if: int, baud_rate: int, com_port: str = "", tm_timeout: float = 0.01,
ser_com_type: SerialCommunicationType = SerialCommunicationType.DLE_ENCODING,
ser_frame_size: int = 256, dle_queue_len: int = 25, dle_frame_size: int = 1024
):
"""
Default configuration to set up serial communication. The serial port and the baud rate
will be determined from a JSON configuration file and prompted from the user
:param json_cfg_path:
:param com_if:
:param com_port:
:param baud_rate:
Expand All @@ -122,7 +124,7 @@ def set_up_serial_cfg(
update_global(CoreGlobalIds.USE_SERIAL, True)
if com_if == CoreComInterfaces.SERIAL_DLE and com_port == "":
LOGGER.warning("Invalid com port specified!")
com_port = determine_com_port()
com_port = determine_com_port(json_cfg_path=json_cfg_path)
serial_cfg_dict = get_global(CoreGlobalIds.SERIAL_CONFIG)
serial_cfg_dict.update({SerialConfigIds.SERIAL_PORT: com_port})
serial_cfg_dict.update({SerialConfigIds.SERIAL_BAUD_RATE: baud_rate})
Expand Down