Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update from EIVE #40

Merged
merged 17 commits into from
Oct 5, 2021
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ build

/example/log
/example/tmtc_config.json
/log/
/src/log
/src/tests/log

.idea
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ install_requires =
colorama>=0.4.4
colorlog>=5.0.0
dle-encoder>=0.2
spacepackets>=0.2
spacepackets>=0.3
package_dir =
= src
packages = find:
Expand Down
11 changes: 2 additions & 9 deletions src/tests/test_global_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from unittest import TestCase
from tmtccmd.config.globals import update_global, get_global, get_global_apid, set_default_apid, \
from tmtccmd.config.globals import update_global, get_global, \
set_json_cfg_path, get_json_cfg_path, set_glob_com_if_dict, get_glob_com_if_dict, \
set_default_globals_pre_args_parsing, check_and_set_core_mode_arg, CoreModeList, \
CoreGlobalIds
Expand All @@ -11,11 +11,6 @@ def test_global_module(self):
update_global(global_param_id=30, parameter='hello')
self.assertTrue(get_global(global_param_id=30) == 'hello')

current_apid = get_global_apid()
set_default_apid(default_apid=0x01)
self.assertTrue(get_global_apid() == 0x01)
set_default_apid(current_apid)

current_path = get_json_cfg_path()
set_json_cfg_path('.')
self.assertTrue(get_json_cfg_path() == '.')
Expand All @@ -29,10 +24,8 @@ def test_global_module(self):
self.assertTrue(com_if_dict["test"][0] == "Test Interface")

set_default_globals_pre_args_parsing(
gui=False, apid=0x02
gui=False, tc_apid=0x02, tm_apid=0x03
)
self.assertTrue(get_global_apid() == 0x02)
set_default_apid(current_apid)

result = check_and_set_core_mode_arg(mode_arg='udp')
self.assertTrue(result == CoreModeList.SEQUENTIAL_CMD_MODE)
Expand Down
15 changes: 10 additions & 5 deletions src/tests/test_printer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from unittest import TestCase

from tmtccmd.runner import initialize_tmtc_commander
from tmtccmd.tm import Service1TM, Service5TM
from tmtccmd.tm import Service5TM
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.tm.service_5_event import Srv5Subservices
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.utility.tmtc_printer import TmTcPrinter, DisplayMode
from tmtccmd.utility.logger import get_console_logger, set_tmtc_console_logger
from tmtccmd.config.globals import update_global, CoreGlobalIds
Expand All @@ -25,15 +26,19 @@ def test_print_functions(self):
self.assertTrue(self.tmtc_printer.get_display_mode() == DisplayMode.SHORT)
self.tmtc_printer.set_display_mode(DisplayMode.LONG)

service_1_tm = Service1TM(subservice_id=1, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice_id=1, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
# Should not crash and emit warning
self.tmtc_printer.print_telemetry(packet_if=None, info_if=None)

self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
service_1_tm = Service1TM(subservice_id=2, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice_id=2, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(
packet_if=service_1_tm, info_if=service_1_tm, print_raw_tm=True
Expand All @@ -54,7 +59,7 @@ def test_print_functions(self):
object_id=bytes([0x01, 0x02, 0x03, 0x04]), event_id=22, param_1=32, param_2=82452
)

service_17_command = pack_service17_ping_command(ssc=0, apid=42)
service_17_command = pack_service_17_ping_command(ssc=0, apid=42)
self.tmtc_printer.print_telecommand(
tc_packet_obj=service_17_command, tc_packet_raw=service_17_command.pack()
)
Expand Down
30 changes: 14 additions & 16 deletions src/tests/test_pus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,30 @@

from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion, set_default_tm_apid
from spacepackets.util import PrintFormats

from tmtccmd.tm.service_17_test import Service17TM
from tmtccmd.pus.service_17_test import Service17TMExtended


class TestTelemetry(TestCase):
def test_space_packet_functions(self):
psc = get_space_packet_sequence_control(sequence_flags=0b111, source_sequence_count=42)
self.assertTrue(psc & 0xc000 == 0xc000)

def test_generic_pus_c(self):
pus_17_telemetry = Service17TM(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
pus_17_telemetry = Service17TMExtended(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time(),
apid=0xef
)
pus_17_raw = pus_17_telemetry.pack()

pus_17_telemetry = None

def tm_func(raw_telemetry: bytearray):
return Service17TM.unpack(raw_telemetry=raw_telemetry)
return Service17TMExtended.unpack(raw_telemetry=raw_telemetry)

self.assertRaises(ValueError, tm_func, bytearray())
self.assertRaises(ValueError, tm_func, None)

pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)

pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)
self.assertTrue(get_pus_tm_version() == PusVersion.PUS_C)
self.assertTrue(pus_17_telemetry.get_service() == 17)
self.assertTrue(pus_17_telemetry.get_apid() == 0xef)
Expand All @@ -37,22 +35,22 @@ def tm_func(raw_telemetry: bytearray):
self.assertTrue(pus_17_telemetry.get_tm_data() == bytearray())
self.assertTrue(pus_17_telemetry.pus_tm.is_valid())
self.assertTrue(pus_17_telemetry.get_custom_printout() == "")
self.assertTrue(pus_17_telemetry.return_source_data_string() == "[]")
pus_17_telemetry.pus_tm.print_source_data()
pus_17_telemetry.pus_tm.print_full_packet_string()
self.assertTrue(pus_17_telemetry.get_source_data_string() == "hex []")
pus_17_telemetry.pus_tm.print_source_data(print_format=PrintFormats.HEX)
pus_17_telemetry.pus_tm.print_full_packet_string(print_format=PrintFormats.HEX)
# This string changes depending on system time, so its complicated to test its validity
full_string = pus_17_telemetry.pus_tm.return_full_packet_string()
full_string = pus_17_telemetry.pus_tm.get_full_packet_string(print_format=PrintFormats.HEX)
print(full_string)
print(pus_17_telemetry)
print(repr(pus_17_telemetry))
self.assertTrue(pus_17_telemetry.pus_tm.get_packet_id() == 0x8 << 8 | 0xef)

def test_list_functionality(self):
pus_17_telecommand = Service17TM(
pus_17_telecommand = Service17TMExtended(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
)
pus_17_raw = pus_17_telecommand.pack()
pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)
pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)

header_list = []
content_list = []
Expand Down
11 changes: 6 additions & 5 deletions src/tmtccmd/com_if/dummy_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"""
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tm import TelemetryListT, Service1TM, Service17TM
from tmtccmd.pus.service_17_test import Srv17Subservices
from tmtccmd.tm import TelemetryListT
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Srv17Subservices, Service17TMExtended
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

Expand Down Expand Up @@ -70,23 +71,23 @@ def generate_reply_package(self):
"""
if self.last_service == 17:
if self.last_subservice == 1:
tm_packer = Service1TM(
tm_packer = Service1TMExtended(
subservice=1, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
)

self.current_ssc += 1
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
tm_packer = Service1TM(
tm_packer = Service1TMExtended(
subservice=7, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1

tm_packer = Service17TM(subservice=Srv17Subservices.PING_REPLY)
tm_packer = Service17TMExtended(subservice=Srv17Subservices.PING_REPLY)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1
Expand Down
13 changes: 6 additions & 7 deletions src/tmtccmd/com_if/tcpip_tcp_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import threading
import select
from collections import deque
from typing import Union, Optional
from typing import Union, Optional, Tuple

from spacepackets.ccsds.spacepacket import parse_space_packets

Expand Down Expand Up @@ -42,7 +42,7 @@ class TcpIpTcpComIF(CommunicationInterface):
TM_LOOP_DELAY = 0.2

def __init__(
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_id: int,
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_ids: Tuple[int],
tm_polling_freqency: float, tm_timeout: float, tc_timeout_factor: float,
send_address: EthernetAddressT, max_recv_size: int,
max_packets_stored: int = 50,
Expand All @@ -52,16 +52,16 @@ def __init__(
:param com_if_key:
:param com_type: Communication Type. By default, it is assumed that
space packets are sent via TCP
:param space_packet_id: 16 bit packet header for space packet headers. It is used
to detect the start of a header.
:param space_packet_ids: 16 bit packet header for space packet headers. Used to
detect the start of PUS packets
:param tm_polling_freqency: Polling frequency in seconds
:param tm_timeout: Timeout in seconds
:param tmtc_printer: Printer instance, can be passed optionally to allow packet debugging
"""
super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer)
self.tm_timeout = tm_timeout
self.com_type = com_type
self.space_packet_id = space_packet_id
self.space_packet_ids = space_packet_ids
self.tc_timeout_factor = tc_timeout_factor
self.tm_polling_frequency = tm_polling_freqency
self.send_address = send_address
Expand Down Expand Up @@ -122,8 +122,7 @@ def receive(self, poll_timeout: float = 0) -> TelemetryListT:
# call. We parse the space packets contained in the stream here
if self.com_type == TcpCommunicationType.SPACE_PACKETS:
tm_packet_list = parse_space_packets(
analysis_queue=self.__analysis_queue, packet_id=self.space_packet_id,
max_len=self.max_recv_size
analysis_queue=self.__analysis_queue, packet_ids=self.space_packet_ids
)
else:
while self.__analysis_queue:
Expand Down
2 changes: 1 addition & 1 deletion src/tmtccmd/com_if/tcpip_udp_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def open(self, args: any = None):
# Set non-blocking because we use select.
self.udp_socket.setblocking(False)
if self.init_mode == CoreModeList.LISTENER_MODE:
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service17_ping_command
# Send ping command immediately so the reception address is known for UDP
ping_cmd = pack_service17_ping_command(ssc=0)
self.send(ping_cmd.pack())
Expand Down
23 changes: 13 additions & 10 deletions src/tmtccmd/config/com_if.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Optional
from typing import Optional, Tuple

from tmtccmd.config.definitions import CoreGlobalIds, CoreComInterfaces
from tmtccmd.core.globals_manager import get_global, update_global
Expand All @@ -17,7 +17,7 @@

def create_communication_interface_default(
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str,
space_packet_id: int = 0
space_packet_ids: Tuple[int] = (0,)
) -> Optional[CommunicationInterface]:
"""Return the desired communication interface object

Expand All @@ -36,7 +36,7 @@ def create_communication_interface_default(
com_if_key == CoreComInterfaces.TCPIP_TCP.value:
communication_interface = create_default_tcpip_interface(
com_if_key=com_if_key, json_cfg_path=json_cfg_path, tmtc_printer=tmtc_printer,
space_packet_id=space_packet_id
space_packet_ids=space_packet_ids
)
elif com_if_key == CoreComInterfaces.SERIAL_DLE.value or \
com_if_key == CoreComInterfaces.SERIAL_FIXED_FRAME.value:
Expand Down Expand Up @@ -69,7 +69,9 @@ def create_communication_interface_default(
sys.exit(1)


def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_packet_id: int = 0):
def default_tcpip_cfg_setup(
tcpip_type: TcpIpType, json_cfg_path: str, space_packet_ids: Tuple[int] = (0,)
):
"""Default setup for TCP/IP communication interfaces. This intantiates all required data in the
globals manager so a TCP/IP communication interface can be built with
:func:`create_default_tcpip_interface`
Expand All @@ -93,11 +95,11 @@ def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_pac
ethernet_cfg_dict.update({TcpIpConfigIds.SEND_ADDRESS: send_tuple})
ethernet_cfg_dict.update({TcpIpConfigIds.RECV_ADDRESS: recv_tuple})
ethernet_cfg_dict.update({TcpIpConfigIds.RECV_MAX_SIZE: max_recv_buf_size})
if space_packet_id == 0 and tcpip_type == TcpIpType.TCP:
if space_packet_ids == (0,) and tcpip_type == TcpIpType.TCP:
LOGGER.warning(
'TCP communication interface without any specified space packet ID might not work!'
)
ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_id})
ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_ids})
update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)


Expand All @@ -117,15 +119,16 @@ def default_serial_cfg_setup(com_if_key: str, json_cfg_path: str):


def create_default_tcpip_interface(
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str, space_packet_id: int = 0
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str,
space_packet_ids: Tuple[int] = (0,)
) -> Optional[CommunicationInterface]:
"""Create a default serial interface. Requires a certain set of global variables set up. See
:func:`default_tcpip_cfg_setup` for more details.

:param com_if_key:
:param tmtc_printer:
:param json_cfg_path:
:param space_packet_id: Two byte packet ID which is used by TCP to parse space packets
:param space_packet_ids: Two byte packet IDs which is used by TCP to parse space packets
:return:
"""
communication_interface = None
Expand All @@ -136,7 +139,7 @@ def create_default_tcpip_interface(
elif com_if_key == CoreComInterfaces.TCPIP_TCP.value:
default_tcpip_cfg_setup(
tcpip_type=TcpIpType.TCP, json_cfg_path=json_cfg_path,
space_packet_id=space_packet_id
space_packet_ids=space_packet_ids
)
ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG)
send_addr = ethernet_cfg_dict[TcpIpConfigIds.SEND_ADDRESS]
Expand All @@ -154,7 +157,7 @@ def create_default_tcpip_interface(
elif com_if_key == CoreComInterfaces.TCPIP_TCP.value:
communication_interface = TcpIpTcpComIF(
com_if_key=com_if_key, com_type=TcpCommunicationType.SPACE_PACKETS,
space_packet_id=space_packet_id, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
space_packet_ids=space_packet_ids, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
tc_timeout_factor=get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR),
tm_polling_freqency=0.5, send_address=send_addr, max_recv_size=max_recv_size,
tmtc_printer=tmtc_printer, init_mode=init_mode
Expand Down
1 change: 0 additions & 1 deletion src/tmtccmd/config/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class CoreGlobalIds(enum.IntEnum):

# Parameters
JSON_CFG_PATH = 139
APID = 140
MODE = 141
CURRENT_SERVICE = 142
COM_IF = 144
Expand Down
12 changes: 4 additions & 8 deletions src/tmtccmd/config/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.conf_util import check_args_in_dict, print_core_globals
from spacepackets.ecss.conf import PusVersion, set_default_apid, get_default_apid, \
from spacepackets.ecss.conf import PusVersion, set_default_tc_apid, set_default_tm_apid, \
set_pus_tc_version, set_pus_tm_version
from tmtccmd.core.globals_manager import update_global, get_global
from tmtccmd.config.definitions import CoreGlobalIds, CoreModeList, CoreServiceList, \
Expand All @@ -17,10 +17,6 @@
SERVICE_OP_CODE_DICT = dict()


def get_global_apid() -> int:
return get_default_apid()


def set_json_cfg_path(json_cfg_path: str):
update_global(CoreGlobalIds.JSON_CFG_PATH, json_cfg_path)

Expand All @@ -39,16 +35,16 @@ def get_glob_com_if_dict() -> ComIFDictT:


def set_default_globals_pre_args_parsing(
gui: bool, apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C,
gui: bool, tc_apid: int, tm_apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C,
pus_tm_version: PusVersion = PusVersion.PUS_C,
com_if_id: str = CoreComInterfaces.DUMMY.value, custom_com_if_dict=None,
display_mode="long", tm_timeout: float = 4.0, print_to_file: bool = True,
tc_send_timeout_factor: float = 2.0
):
if custom_com_if_dict is None:
custom_com_if_dict = dict()
update_global(CoreGlobalIds.APID, apid)
set_default_apid(default_apid=apid)
set_default_tc_apid(tc_apid=tc_apid)
set_default_tm_apid(tm_apid=tm_apid)
set_pus_tc_version(pus_tc_version)
set_pus_tm_version(pus_tm_version)
update_global(CoreGlobalIds.COM_IF, com_if_id)
Expand Down