Skip to content

Commit

Permalink
Merge pull request #42 from robamu-org/develop
Browse files Browse the repository at this point in the history
v1.10.0
  • Loading branch information
robamu committed Oct 13, 2021
2 parents 45a62b5 + 2e29897 commit 81be253
Show file tree
Hide file tree
Showing 38 changed files with 512 additions and 495 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -13,7 +13,7 @@ build

/example/log
/example/tmtc_config.json
/log/
/src/log
/src/tests/log

.idea
Expand Down
2 changes: 1 addition & 1 deletion example/config/hook_implementation.py
Expand Up @@ -18,7 +18,7 @@ class ExampleHookClass(TmTcHookBase):

def add_globals_pre_args_parsing(self, gui: bool = False):
from tmtccmd.config.globals import set_default_globals_pre_args_parsing
set_default_globals_pre_args_parsing(gui=gui, apid=APID)
set_default_globals_pre_args_parsing(gui=gui, tc_apid=APID, tm_apid=APID)

def add_globals_post_args_parsing(self, args: argparse.Namespace):
from tmtccmd.config.globals import set_default_globals_post_args_parsing
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -32,7 +32,7 @@ install_requires =
colorama>=0.4.4
colorlog>=5.0.0
dle-encoder>=0.2
spacepackets>=0.2
spacepackets>=0.4
package_dir =
= src
packages = find:
Expand Down
11 changes: 2 additions & 9 deletions src/tests/test_global_manager.py
@@ -1,5 +1,5 @@
from unittest import TestCase
from tmtccmd.config.globals import update_global, get_global, get_global_apid, set_default_apid, \
from tmtccmd.config.globals import update_global, get_global, \
set_json_cfg_path, get_json_cfg_path, set_glob_com_if_dict, get_glob_com_if_dict, \
set_default_globals_pre_args_parsing, check_and_set_core_mode_arg, CoreModeList, \
CoreGlobalIds
Expand All @@ -11,11 +11,6 @@ def test_global_module(self):
update_global(global_param_id=30, parameter='hello')
self.assertTrue(get_global(global_param_id=30) == 'hello')

current_apid = get_global_apid()
set_default_apid(default_apid=0x01)
self.assertTrue(get_global_apid() == 0x01)
set_default_apid(current_apid)

current_path = get_json_cfg_path()
set_json_cfg_path('.')
self.assertTrue(get_json_cfg_path() == '.')
Expand All @@ -29,10 +24,8 @@ def test_global_module(self):
self.assertTrue(com_if_dict["test"][0] == "Test Interface")

set_default_globals_pre_args_parsing(
gui=False, apid=0x02
gui=False, tc_apid=0x02, tm_apid=0x03
)
self.assertTrue(get_global_apid() == 0x02)
set_default_apid(current_apid)

result = check_and_set_core_mode_arg(mode_arg='udp')
self.assertTrue(result == CoreModeList.SEQUENTIAL_CMD_MODE)
Expand Down
17 changes: 11 additions & 6 deletions src/tests/test_printer.py
@@ -1,10 +1,11 @@
from unittest import TestCase

from tmtccmd.runner import initialize_tmtc_commander
from tmtccmd.tm import Service1TM, Service5TM
from tmtccmd.tm import Service5TM
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.tm.service_5_event import Srv5Subservices
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.utility.tmtc_printer import TmTcPrinter, DisplayMode
from tmtccmd.utility.logger import get_console_logger, set_tmtc_console_logger
from tmtccmd.config.globals import update_global, CoreGlobalIds
Expand All @@ -25,23 +26,27 @@ def test_print_functions(self):
self.assertTrue(self.tmtc_printer.get_display_mode() == DisplayMode.SHORT)
self.tmtc_printer.set_display_mode(DisplayMode.LONG)

service_1_tm = Service1TM(subservice_id=1, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice=1, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
# Should not crash and emit warning
self.tmtc_printer.print_telemetry(packet_if=None, info_if=None)

self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
service_1_tm = Service1TM(subservice_id=2, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice=2, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(
packet_if=service_1_tm, info_if=service_1_tm, print_raw_tm=True
)

self.tmtc_printer.set_display_mode(DisplayMode.LONG)
service_5_tm = Service5TM(
subservice_id=Srv5Subservices.INFO_EVENT, object_id=bytearray([0x01, 0x02, 0x03, 0x04]),
subservice=Srv5Subservices.INFO_EVENT, object_id=bytearray([0x01, 0x02, 0x03, 0x04]),
event_id=22, param_1=32, param_2=82452, time=CdsShortTimestamp.init_from_current_time()
)
hook_base = create_hook_mock_with_srv_handlers()
Expand All @@ -54,7 +59,7 @@ def test_print_functions(self):
object_id=bytes([0x01, 0x02, 0x03, 0x04]), event_id=22, param_1=32, param_2=82452
)

service_17_command = pack_service17_ping_command(ssc=0, apid=42)
service_17_command = pack_service_17_ping_command(ssc=0, apid=42)
self.tmtc_printer.print_telecommand(
tc_packet_obj=service_17_command, tc_packet_raw=service_17_command.pack()
)
Expand Down
46 changes: 22 additions & 24 deletions src/tests/test_pus.py
Expand Up @@ -3,56 +3,54 @@

from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion, set_default_tm_apid
from spacepackets.util import PrintFormats

from tmtccmd.tm.service_17_test import Service17TM
from tmtccmd.pus.service_17_test import Service17TMExtended


class TestTelemetry(TestCase):
def test_space_packet_functions(self):
psc = get_space_packet_sequence_control(sequence_flags=0b111, source_sequence_count=42)
self.assertTrue(psc & 0xc000 == 0xc000)

def test_generic_pus_c(self):
pus_17_telemetry = Service17TM(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
pus_17_telemetry = Service17TMExtended(
subservice=1, ssc=36, time=CdsShortTimestamp.init_from_current_time(),
apid=0xef
)
pus_17_raw = pus_17_telemetry.pack()

pus_17_telemetry = None

def tm_func(raw_telemetry: bytearray):
return Service17TM.unpack(raw_telemetry=raw_telemetry)
return Service17TMExtended.unpack(raw_telemetry=raw_telemetry)

self.assertRaises(ValueError, tm_func, bytearray())
self.assertRaises(ValueError, tm_func, None)

pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)

pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)
self.assertTrue(get_pus_tm_version() == PusVersion.PUS_C)
self.assertTrue(pus_17_telemetry.get_service() == 17)
self.assertTrue(pus_17_telemetry.get_apid() == 0xef)
self.assertTrue(pus_17_telemetry.get_subservice() == 1)
self.assertTrue(pus_17_telemetry.get_ssc() == 36)
self.assertTrue(pus_17_telemetry.get_tm_data() == bytearray())
self.assertTrue(pus_17_telemetry.pus_tm.is_valid())
self.assertTrue(pus_17_telemetry.service == 17)
self.assertTrue(pus_17_telemetry.apid == 0xef)
self.assertTrue(pus_17_telemetry.subservice == 1)
self.assertTrue(pus_17_telemetry.ssc == 36)
self.assertTrue(pus_17_telemetry.tm_data == bytearray())
self.assertTrue(pus_17_telemetry.pus_tm.valid)
self.assertTrue(pus_17_telemetry.get_custom_printout() == "")
self.assertTrue(pus_17_telemetry.return_source_data_string() == "[]")
pus_17_telemetry.pus_tm.print_source_data()
pus_17_telemetry.pus_tm.print_full_packet_string()
self.assertTrue(pus_17_telemetry.get_source_data_string() == "hex []")
pus_17_telemetry.pus_tm.print_source_data(print_format=PrintFormats.HEX)
pus_17_telemetry.pus_tm.print_full_packet_string(print_format=PrintFormats.HEX)
# This string changes depending on system time, so its complicated to test its validity
full_string = pus_17_telemetry.pus_tm.return_full_packet_string()
full_string = pus_17_telemetry.pus_tm.get_full_packet_string(print_format=PrintFormats.HEX)
print(full_string)
print(pus_17_telemetry)
print(repr(pus_17_telemetry))
self.assertTrue(pus_17_telemetry.pus_tm.get_packet_id() == 0x8 << 8 | 0xef)
self.assertTrue(pus_17_telemetry.pus_tm.packet_id == 0x8 << 8 | 0xef)

def test_list_functionality(self):
pus_17_telecommand = Service17TM(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
pus_17_telecommand = Service17TMExtended(
subservice=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
)
pus_17_raw = pus_17_telecommand.pack()
pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)
pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)

header_list = []
content_list = []
Expand Down
6 changes: 3 additions & 3 deletions src/tmtccmd/__init__.py
@@ -1,8 +1,8 @@
VERSION_NAME = "tmtccmd"
VERSION_MAJOR = 1
VERSION_MINOR = 9
VERSION_REVISION = 1
VERSION_MINOR = 10
VERSION_REVISION = 0

# I think this needs to be in string representation to be parsed so we can't
# use a formatted string here.
__version__ = "1.9.1"
__version__ = "1.10.0"
23 changes: 15 additions & 8 deletions src/tmtccmd/com_if/dummy_com_if.py
@@ -1,9 +1,12 @@
"""Dummy Communication Interface. Currently serves to provide an example without external hardware
"""
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tm import TelemetryListT, Service1TM, Service17TM
from tmtccmd.pus.service_17_test import Srv17Subservices
from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control, SequenceFlags

from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.tm import TelemetryListT
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Srv17Subservices, Service17TMExtended
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

Expand Down Expand Up @@ -70,23 +73,27 @@ def generate_reply_package(self):
"""
if self.last_service == 17:
if self.last_subservice == 1:
tm_packer = Service1TM(
tc_psc = get_space_packet_sequence_control(
sequence_flags=SequenceFlags.UNSEGMENTED,
source_sequence_count=self.last_tc_ssc
)
tm_packer = Service1TMExtended(
subservice=1, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
tc_psc=tc_psc
)

self.current_ssc += 1
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
tm_packer = Service1TM(
tm_packer = Service1TMExtended(
subservice=7, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
tc_psc=tc_psc
)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1

tm_packer = Service17TM(subservice=Srv17Subservices.PING_REPLY)
tm_packer = Service17TMExtended(subservice=Srv17Subservices.PING_REPLY)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1
Expand Down
13 changes: 6 additions & 7 deletions src/tmtccmd/com_if/tcpip_tcp_com_if.py
Expand Up @@ -10,7 +10,7 @@
import threading
import select
from collections import deque
from typing import Union, Optional
from typing import Union, Optional, Tuple

from spacepackets.ccsds.spacepacket import parse_space_packets

Expand Down Expand Up @@ -42,7 +42,7 @@ class TcpIpTcpComIF(CommunicationInterface):
TM_LOOP_DELAY = 0.2

def __init__(
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_id: int,
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_ids: Tuple[int],
tm_polling_freqency: float, tm_timeout: float, tc_timeout_factor: float,
send_address: EthernetAddressT, max_recv_size: int,
max_packets_stored: int = 50,
Expand All @@ -52,16 +52,16 @@ def __init__(
:param com_if_key:
:param com_type: Communication Type. By default, it is assumed that
space packets are sent via TCP
:param space_packet_id: 16 bit packet header for space packet headers. It is used
to detect the start of a header.
:param space_packet_ids: 16 bit packet header for space packet headers. Used to
detect the start of PUS packets
:param tm_polling_freqency: Polling frequency in seconds
:param tm_timeout: Timeout in seconds
:param tmtc_printer: Printer instance, can be passed optionally to allow packet debugging
"""
super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer)
self.tm_timeout = tm_timeout
self.com_type = com_type
self.space_packet_id = space_packet_id
self.space_packet_ids = space_packet_ids
self.tc_timeout_factor = tc_timeout_factor
self.tm_polling_frequency = tm_polling_freqency
self.send_address = send_address
Expand Down Expand Up @@ -122,8 +122,7 @@ def receive(self, poll_timeout: float = 0) -> TelemetryListT:
# call. We parse the space packets contained in the stream here
if self.com_type == TcpCommunicationType.SPACE_PACKETS:
tm_packet_list = parse_space_packets(
analysis_queue=self.__analysis_queue, packet_id=self.space_packet_id,
max_len=self.max_recv_size
analysis_queue=self.__analysis_queue, packet_ids=self.space_packet_ids
)
else:
while self.__analysis_queue:
Expand Down
2 changes: 1 addition & 1 deletion src/tmtccmd/com_if/tcpip_udp_com_if.py
Expand Up @@ -69,7 +69,7 @@ def open(self, args: any = None):
# Set non-blocking because we use select.
self.udp_socket.setblocking(False)
if self.init_mode == CoreModeList.LISTENER_MODE:
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service17_ping_command
# Send ping command immediately so the reception address is known for UDP
ping_cmd = pack_service17_ping_command(ssc=0)
self.send(ping_cmd.pack())
Expand Down
4 changes: 2 additions & 2 deletions src/tmtccmd/config/args.py
Expand Up @@ -242,10 +242,10 @@ def prompt_op_code(service_op_code_dict: ServiceOpCodeDictT, service: str) -> st
info_adjustment = 34
horz_line_num = op_code_adjustment + info_adjustment + 3
horiz_line = horz_line_num * "-"
op_code_string = "Operation Code".ljust(op_code_adjustment)
op_code_info_str = "Operation Code".ljust(op_code_adjustment)
info_string = "Information".ljust(info_adjustment)
while True:
LOGGER.info(f"{op_code_string} | {info_string}")
LOGGER.info(f"{op_code_info_str} | {info_string}")
LOGGER.info(horiz_line)
if service in service_op_code_dict:
op_code_dict = service_op_code_dict[service][1]
Expand Down

0 comments on commit 81be253

Please sign in to comment.