Skip to content

Commit

Permalink
Merge pull request #15 from spacefisch/develop
Browse files Browse the repository at this point in the history
v1.5.0
  • Loading branch information
robamu committed May 13, 2021
2 parents d0168b4 + 77c358d commit d5f8c62
Show file tree
Hide file tree
Showing 57 changed files with 1,483 additions and 1,274 deletions.
48 changes: 24 additions & 24 deletions example/config/hook_implementation.py
@@ -1,64 +1,64 @@
import argparse
from typing import Union, Dict, Tuple
from tmtccmd.core.hook_base import \
TmTcHookBase, TmTcPrinter, CommunicationInterface, TmTcHandler, PusTelemetry, TcQueueT, \
PusTelecommand, Service3Base
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.config.definitions import ServiceOpCodeDictT
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.utility.logger import get_logger

LOGGER = get_logger()


class ExampleHookClass(TmTcHookBase):
import argparse
from typing import Union, Dict, Tuple

def set_json_config_file_path(self) -> str:
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.ecss.tm import PusTelemetry
from tmtccmd.pus_tc.definitions import TcQueueT
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.pus_tm.service_3_base import Service3Base

def get_json_config_file_path(self) -> str:
return "tmtc_config.json"

def get_version(self) -> str:
return "My Version String"

def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.defaults.globals_setup import set_default_globals_pre_args_parsing
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(gui=gui, apid=0xef)

def add_globals_post_args_parsing(self, args: argparse.Namespace, json_cfg_path: str = ""):
from tmtccmd.defaults.globals_setup import set_default_globals_post_args_parsing
from tmtccmd.config.globals import set_default_globals_post_args_parsing
set_default_globals_post_args_parsing(args=args, json_cfg_path=json_cfg_path)

def assign_communication_interface(self, com_if: int, tmtc_printer: TmTcPrinter) -> \
def assign_communication_interface(self, com_if_key: str, tmtc_printer: TmTcPrinter) -> \
Union[CommunicationInterface, None]:
from tmtccmd.defaults.com_setup import create_communication_interface_default
from tmtccmd.config.com_if import create_communication_interface_default
LOGGER.info("Communication interface assignment function was called")
return create_communication_interface_default(
com_if=com_if, tmtc_printer=tmtc_printer, json_cfg_path=self.set_json_config_file_path()
com_if_key=com_if_key, tmtc_printer=tmtc_printer, json_cfg_path=self.get_json_config_file_path()
)

def perform_mode_operation(self, tmtc_backend: TmTcHandler, mode: int):
LOGGER.info("Mode operation hook was called")
pass

def pack_service_queue(self, service: int, op_code: str, service_queue: TcQueueT):
from tmtccmd.defaults.tc_packing import default_service_queue_preparation
def pack_service_queue(self, service: Union[str, int], op_code: str, service_queue: TcQueueT):
from tmtccmd.pus_tc.packer import default_service_queue_preparation
LOGGER.info("Service queue packer hook was called")
default_service_queue_preparation(
service=service, op_code=op_code, service_queue=service_queue
)

def tm_user_factory_hook(self, raw_tm_packet: bytearray) -> PusTelemetry:
from tmtccmd.defaults.tm_handling import default_factory_hook
from tmtccmd.pus_tm.factory import default_factory_hook
LOGGER.info("TM user factory hook was called")
return default_factory_hook(raw_tm_packet=raw_tm_packet)

def set_object_ids(self) -> Dict[bytes, list]:
def get_object_ids(self) -> Dict[bytes, list]:
pass

def pack_total_service_queue(self) -> Union[None, TcQueueT]:
from tmtccmd.defaults.tc_packing import default_total_queue_preparation
return default_total_queue_preparation()

def command_preparation_hook(self) -> Union[None, PusTelecommand]:
LOGGER.info("Single command hook was called")
from tmtccmd.defaults.tc_packing import default_single_packet_preparation
return default_single_packet_preparation()
def get_service_op_code_dictionary(self) -> ServiceOpCodeDictT:
pass

@staticmethod
def handle_service_8_telemetry(
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Expand Up @@ -29,6 +29,8 @@ classifiers =
install_requires =
crcmod>=1.7
pyserial>=3.0
colorama>=0.4.4
colorlog>=5.0.0
package_dir =
= src
packages = find:
Expand Down
File renamed without changes.
22 changes: 16 additions & 6 deletions src/tests/backend_mock.py
@@ -1,9 +1,11 @@
from unittest.mock import MagicMock
from abc import abstractmethod
from argparse import Namespace
from tmtccmd.core.backend import TmTcHandler
from tmtccmd.core.hook_base import TmTcHookBase
from tmtccmd.core.definitions import CoreComInterfaces, CoreModeList, CoreServiceList
from tmtccmd.config.hook import TmTcHookBase
from tmtccmd.config.com_if import create_communication_interface_default
from tmtccmd.sendreceive.tm_listener import TmListener
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.config.definitions import CoreComInterfaces, CoreModeList, CoreServiceList
from tmtccmd.core.frontend_base import FrontendBase


Expand All @@ -18,11 +20,19 @@ def create_hook_mock() -> TmTcHookBase:


def create_backend_mock() -> TmTcHandler:
tmtc_printer = TmTcPrinter(display_mode="LONG", do_print_to_file=False, print_tc=True)
com_if = create_communication_interface_default(
com_if_key=CoreComInterfaces.DUMMY, json_cfg_path="tmtc_config.json", tmtc_printer=tmtc_printer
)
tm_listener = TmListener(
com_if=com_if, tm_timeout=3.0, tc_timeout_factor=3.0
)
# The global variables are set by the argument parser.
tmtc_backend = TmTcHandler(
init_com_if=CoreComInterfaces.DUMMY, init_mode=CoreModeList.IDLE,
init_service=CoreServiceList.SERVICE_17
com_if=com_if, tmtc_printer=tmtc_printer, tm_listener=tm_listener, init_mode=CoreModeList.IDLE,
init_service=17, init_opcode="0"
)
tmtc_backend.start = MagicMock(return_value=0)
tmtc_backend.start_listener = MagicMock(return_value=0)
tmtc_backend.initialize = MagicMock(return_value=0)
return tmtc_backend

Expand Down
5 changes: 5 additions & 0 deletions src/tests/test_global_manager.py
@@ -0,0 +1,5 @@
from unittest import TestCase


class TestGlobalManager(TestCase):
pass
2 changes: 1 addition & 1 deletion src/tests/test_printer.py
Expand Up @@ -4,7 +4,7 @@
from tmtccmd.pus_tm.service_5_event import Service5TM, Service5TmPacked, Severity
from tmtccmd.pus_tc.service_17_test import pack_service17_ping_command
from tmtccmd.utility.tmtc_printer import TmTcPrinter, DisplayMode
from tmtccmd.utility.tmtcc_logger import get_logger, set_tmtc_logger
from tmtccmd.utility.logger import get_logger, set_tmtc_logger


class TestPrinter(TestCase):
Expand Down
4 changes: 2 additions & 2 deletions src/tests/test_runner.py
Expand Up @@ -10,7 +10,7 @@ def test_tmtc_runner(self):
backend_mock = create_backend_mock()
initialize_tmtc_commander(hook_object=hook_base)
run_tmtc_commander(False, False, True, tmtc_backend=backend_mock)
backend_mock.start.assert_called_with()
backend_mock.start_listener.assert_called_with()
backend_mock.initialize.assert_called_with()

frontend_mock = create_frontend_mock()
Expand All @@ -20,7 +20,7 @@ def test_tmtc_runner(self):
frontend_mock.start.assert_called_once()
qt_app = frontend_mock.start.call_args[0][0]
self.assertTrue(qt_app is None)
default_backend = get_default_tmtc_backend()
default_backend = get_default_tmtc_backend(hook_obj=hook_base, json_cfg_path="tmtc_config.json")
self.assertTrue(default_backend is not None)

def test_errors(self):
Expand Down
7 changes: 3 additions & 4 deletions src/tmtccmd/__init__.py
@@ -1,9 +1,8 @@
VERSION_NAME = "tmtccmd"
VERSION_MAJOR = 1
VERSION_MINOR = 4
VERSION_REVISION = 4
VERSION_MINOR = 5
VERSION_REVISION = 0

# I think this needs to be in string representation to be parsed so we can't
# use a formatted string here.
__version__ = "1.4.4"

__version__ = "1.5.0"
89 changes: 44 additions & 45 deletions src/tmtccmd/com_if/com_if_utilities.py
@@ -1,60 +1,59 @@
import json

from tmtccmd.utility.tmtcc_logger import get_logger

from tmtccmd.utility.logger import get_logger
from tmtccmd.utility.json_handler import check_json_file, JsonKeyNames
from tmtccmd.config.definitions import ComIFDictT

LOGGER = get_logger()


def determine_com_if(integer_to_string_dict: dict, json_cfg_path: str) -> int:
prompt_com_if = False
def determine_com_if(com_if_dict: ComIFDictT, json_cfg_path: str) -> str:
do_prompt_com_if = False
if not check_json_file(json_cfg_path=json_cfg_path):
prompt_com_if = True

if not prompt_com_if:
do_prompt_com_if = True
if not do_prompt_com_if:
with open(json_cfg_path, "r") as read:
com_if_string = ""
try:
load_data = json.load(read)
com_if_string = load_data[JsonKeyNames.COM_IF.value]
except KeyError:
prompt_com_if = True
com_if_value = convert_com_if_string_to_integer(com_if_string)
if com_if_value != -1:
return com_if_value
else:
prompt_com_if = True
if prompt_com_if:
while True:
for integer_rep, string_rep in integer_to_string_dict.items():
print(f"{integer_rep}: {string_rep}")
com_if_key = input("Please enter the desired communication interface by key: ")
if not com_if_key.isdigit():
print("Key is not a digit, try again")
continue
com_if_key = int(com_if_key)
if com_if_key in integer_to_string_dict:
com_if_string = integer_to_string_dict[com_if_key]
break
else:
print("Invalid key, try again.")
do_prompt_com_if = True
com_if_string = str(com_if_string)
if do_prompt_com_if:
com_if_string = prompt_com_if(com_if_dict=com_if_dict)
save_to_json = input("Do you want to store the communication interface? (y/n): ")
if save_to_json.lower() in ['y', "yes", "1"]:
with open(json_cfg_path, "r+") as file:
data = json.load(file)
data[JsonKeyNames.COM_IF.value] = com_if_string
file.seek(0)
json.dump(data, file, indent=4)
LOGGER.info(
"Communication interface was stored in the JSON file config/tmtcc_config.json"
)
LOGGER.info("Delete this file or edit it manually to edit the communication interface")
return com_if_key


def convert_com_if_string_to_integer(com_if_string: str) -> int:
from tmtccmd.core.definitions import CoreComInterfacesString
for com_if_int, com_if_string_in_dict in CoreComInterfacesString.items():
if com_if_string == com_if_string_in_dict:
return com_if_int
return -1
store_com_if_json(com_if_string=com_if_string, json_cfg_path=json_cfg_path)
return com_if_string


def prompt_com_if(com_if_dict: dict) -> str:
while True:
com_if_list = []
for index, com_if_value in enumerate(com_if_dict.items()):
print(f"{index}: {com_if_value}")
com_if_list.append(com_if_value)
com_if_key = input("Please enter the desired communication interface by key: ")
if not com_if_key.isdigit():
print("Key is not a digit, try again")
continue
com_if_key = int(com_if_key)
if com_if_key >= len(com_if_list):
print("Key invalid, try again.")
continue
com_if_string = com_if_list[com_if_key][0]
break
return com_if_string


def store_com_if_json(com_if_string: str, json_cfg_path: str):
with open(json_cfg_path, "r+") as file:
data = json.load(file)
data[JsonKeyNames.COM_IF.value] = com_if_string
file.seek(0)
json.dump(data, file, indent=4)
LOGGER.info(
f"Communication interface was stored in the JSON file {json_cfg_path}"
)
LOGGER.info("Delete this file or edit it manually to edit the communication interface")
6 changes: 5 additions & 1 deletion src/tmtccmd/com_if/com_interface_base.py
Expand Up @@ -23,9 +23,13 @@ class CommunicationInterface:
Generic form of a communication interface to separate communication logic from
the underlying interface.
"""
def __init__(self, tmtc_printer: TmTcPrinter):
def __init__(self, tmtc_printer: TmTcPrinter, com_if_key: str):
self.tmtc_printer = tmtc_printer
self.valid = True
self.com_if_key = com_if_key

def get_id(self) -> str:
return self.com_if_key

@abstractmethod
def initialize(self, args: any = None) -> any:
Expand Down
7 changes: 4 additions & 3 deletions src/tmtccmd/com_if/dummy_com_if.py
Expand Up @@ -11,14 +11,15 @@
from tmtccmd.pus_tm.service_1_verification import Service1TmPacked
from tmtccmd.pus_tm.service_17_test import Service17TmPacked
from tmtccmd.pus.service_17_test import Srv17Subservices
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

LOGGER = get_logger()


class DummyComIF(CommunicationInterface):
def __init__(self, tmtc_printer):
super().__init__(tmtc_printer)
def __init__(self, com_if_key: str, tmtc_printer: TmTcPrinter):
super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer)
self.dummy_handler = DummyHandler()
self.service_sent = 0
self.tc_ssc = 0
Expand Down
4 changes: 2 additions & 2 deletions src/tmtccmd/com_if/qemu_com_if.py
Expand Up @@ -32,7 +32,7 @@
from tmtccmd.pus_tm.factory import PusTelemetryFactory
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.com_if.serial_com_if import SerialComIF, SerialCommunicationType
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
from tmtccmd.utility.dle_encoder import encode_dle, decode_dle, STX_CHAR, ETX_CHAR, DleErrorCodes

LOGGER = get_logger()
Expand Down Expand Up @@ -107,7 +107,7 @@ def initialize(self, args: any = None) -> any:
target=start_background_loop, args=(self.loop,), daemon=True)

def open(self, args: any = None) -> None:
self.background_loop_thread.start()
self.background_loop_thread.start_listener()
try:
self.usart = asyncio.run_coroutine_threadsafe(
Usart.create_async(QEMU_ADDR_AT91_USART0), self.loop).result()
Expand Down
13 changes: 7 additions & 6 deletions src/tmtccmd/com_if/serial_com_if.py
Expand Up @@ -17,7 +17,7 @@
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.utility.tmtc_printer import TmTcPrinter
from tmtccmd.pus_tm.factory import PusTelemetryFactory, PusTmListT
from tmtccmd.utility.tmtcc_logger import get_logger
from tmtccmd.utility.logger import get_logger
from tmtccmd.utility.dle_encoder import encode_dle, decode_dle, STX_CHAR, ETX_CHAR, DleErrorCodes


Expand Down Expand Up @@ -54,9 +54,10 @@ class SerialComIF(CommunicationInterface):
"""
Communication Interface to use serial communication. This requires the PySerial library.
"""
def __init__(self, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int,
serial_timeout: float,
ser_com_type: SerialCommunicationType = SerialCommunicationType.FIXED_FRAME_BASED):
def __init__(
self, com_if_key: str, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int, serial_timeout: float,
ser_com_type: SerialCommunicationType = SerialCommunicationType.FIXED_FRAME_BASED
):
"""
Initiaze a serial communication handler.
:param tmtc_printer: TMTC printer object. Can be used for diagnostic purposes, but main
Expand All @@ -66,7 +67,7 @@ def __init__(self, tmtc_printer: TmTcPrinter, com_port: str, baud_rate: int,
:param serial_timeout: Specify serial timeout
:param ser_com_type: Specify how to handle serial reception
"""
super().__init__(tmtc_printer)
super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer)

self.com_port = com_port
self.baud_rate = baud_rate
Expand Down Expand Up @@ -111,7 +112,7 @@ def open(self, args: any = None) -> None:
self.serial = serial.Serial(
port=self.com_port, baudrate=self.baud_rate, timeout=self.serial_timeout)
except serial.SerialException:
LOGGER.error("SERIAL Port opening failure!")
LOGGER.error("Serial Port opening failure!")
raise IOError
"""
Needs to be called by application code once for DLE mode!
Expand Down

0 comments on commit d5f8c62

Please sign in to comment.