Skip to content

Commit

Permalink
Merge pull request #40 from robamu-org/eive
Browse files Browse the repository at this point in the history
Update from EIVE
  • Loading branch information
robamu committed Oct 5, 2021
2 parents 45a62b5 + 48995b4 commit a7299e7
Show file tree
Hide file tree
Showing 32 changed files with 273 additions and 271 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ build

/example/log
/example/tmtc_config.json
/log/
/src/log
/src/tests/log

.idea
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ install_requires =
colorama>=0.4.4
colorlog>=5.0.0
dle-encoder>=0.2
spacepackets>=0.2
spacepackets>=0.3
package_dir =
= src
packages = find:
Expand Down
11 changes: 2 additions & 9 deletions src/tests/test_global_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from unittest import TestCase
from tmtccmd.config.globals import update_global, get_global, get_global_apid, set_default_apid, \
from tmtccmd.config.globals import update_global, get_global, \
set_json_cfg_path, get_json_cfg_path, set_glob_com_if_dict, get_glob_com_if_dict, \
set_default_globals_pre_args_parsing, check_and_set_core_mode_arg, CoreModeList, \
CoreGlobalIds
Expand All @@ -11,11 +11,6 @@ def test_global_module(self):
update_global(global_param_id=30, parameter='hello')
self.assertTrue(get_global(global_param_id=30) == 'hello')

current_apid = get_global_apid()
set_default_apid(default_apid=0x01)
self.assertTrue(get_global_apid() == 0x01)
set_default_apid(current_apid)

current_path = get_json_cfg_path()
set_json_cfg_path('.')
self.assertTrue(get_json_cfg_path() == '.')
Expand All @@ -29,10 +24,8 @@ def test_global_module(self):
self.assertTrue(com_if_dict["test"][0] == "Test Interface")

set_default_globals_pre_args_parsing(
gui=False, apid=0x02
gui=False, tc_apid=0x02, tm_apid=0x03
)
self.assertTrue(get_global_apid() == 0x02)
set_default_apid(current_apid)

result = check_and_set_core_mode_arg(mode_arg='udp')
self.assertTrue(result == CoreModeList.SEQUENTIAL_CMD_MODE)
Expand Down
15 changes: 10 additions & 5 deletions src/tests/test_printer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from unittest import TestCase

from tmtccmd.runner import initialize_tmtc_commander
from tmtccmd.tm import Service1TM, Service5TM
from tmtccmd.tm import Service5TM
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.tm.service_5_event import Srv5Subservices
from spacepackets.ccsds.time import CdsShortTimestamp
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.utility.tmtc_printer import TmTcPrinter, DisplayMode
from tmtccmd.utility.logger import get_console_logger, set_tmtc_console_logger
from tmtccmd.config.globals import update_global, CoreGlobalIds
Expand All @@ -25,15 +26,19 @@ def test_print_functions(self):
self.assertTrue(self.tmtc_printer.get_display_mode() == DisplayMode.SHORT)
self.tmtc_printer.set_display_mode(DisplayMode.LONG)

service_1_tm = Service1TM(subservice_id=1, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice_id=1, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
# Should not crash and emit warning
self.tmtc_printer.print_telemetry(packet_if=None, info_if=None)

self.tmtc_printer.set_display_mode(DisplayMode.SHORT)
self.tmtc_printer.print_telemetry(packet_if=service_1_tm, info_if=service_1_tm)
service_1_tm = Service1TM(subservice_id=2, time=CdsShortTimestamp.init_from_current_time())
service_1_tm = Service1TMExtended(
subservice_id=2, time=CdsShortTimestamp.init_from_current_time()
)
service_1_packed = service_1_tm.pack()
self.tmtc_printer.print_telemetry(
packet_if=service_1_tm, info_if=service_1_tm, print_raw_tm=True
Expand All @@ -54,7 +59,7 @@ def test_print_functions(self):
object_id=bytes([0x01, 0x02, 0x03, 0x04]), event_id=22, param_1=32, param_2=82452
)

service_17_command = pack_service17_ping_command(ssc=0, apid=42)
service_17_command = pack_service_17_ping_command(ssc=0, apid=42)
self.tmtc_printer.print_telecommand(
tc_packet_obj=service_17_command, tc_packet_raw=service_17_command.pack()
)
Expand Down
30 changes: 14 additions & 16 deletions src/tests/test_pus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,30 @@

from spacepackets.ccsds.spacepacket import get_space_packet_sequence_control
from spacepackets.ccsds.time import CdsShortTimestamp
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion
from spacepackets.ecss.conf import get_pus_tm_version, PusVersion, set_default_tm_apid
from spacepackets.util import PrintFormats

from tmtccmd.tm.service_17_test import Service17TM
from tmtccmd.pus.service_17_test import Service17TMExtended


class TestTelemetry(TestCase):
def test_space_packet_functions(self):
psc = get_space_packet_sequence_control(sequence_flags=0b111, source_sequence_count=42)
self.assertTrue(psc & 0xc000 == 0xc000)

def test_generic_pus_c(self):
pus_17_telemetry = Service17TM(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
pus_17_telemetry = Service17TMExtended(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time(),
apid=0xef
)
pus_17_raw = pus_17_telemetry.pack()

pus_17_telemetry = None

def tm_func(raw_telemetry: bytearray):
return Service17TM.unpack(raw_telemetry=raw_telemetry)
return Service17TMExtended.unpack(raw_telemetry=raw_telemetry)

self.assertRaises(ValueError, tm_func, bytearray())
self.assertRaises(ValueError, tm_func, None)

pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)

pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)
self.assertTrue(get_pus_tm_version() == PusVersion.PUS_C)
self.assertTrue(pus_17_telemetry.get_service() == 17)
self.assertTrue(pus_17_telemetry.get_apid() == 0xef)
Expand All @@ -37,22 +35,22 @@ def tm_func(raw_telemetry: bytearray):
self.assertTrue(pus_17_telemetry.get_tm_data() == bytearray())
self.assertTrue(pus_17_telemetry.pus_tm.is_valid())
self.assertTrue(pus_17_telemetry.get_custom_printout() == "")
self.assertTrue(pus_17_telemetry.return_source_data_string() == "[]")
pus_17_telemetry.pus_tm.print_source_data()
pus_17_telemetry.pus_tm.print_full_packet_string()
self.assertTrue(pus_17_telemetry.get_source_data_string() == "hex []")
pus_17_telemetry.pus_tm.print_source_data(print_format=PrintFormats.HEX)
pus_17_telemetry.pus_tm.print_full_packet_string(print_format=PrintFormats.HEX)
# This string changes depending on system time, so its complicated to test its validity
full_string = pus_17_telemetry.pus_tm.return_full_packet_string()
full_string = pus_17_telemetry.pus_tm.get_full_packet_string(print_format=PrintFormats.HEX)
print(full_string)
print(pus_17_telemetry)
print(repr(pus_17_telemetry))
self.assertTrue(pus_17_telemetry.pus_tm.get_packet_id() == 0x8 << 8 | 0xef)

def test_list_functionality(self):
pus_17_telecommand = Service17TM(
pus_17_telecommand = Service17TMExtended(
subservice_id=1, ssc=36, time=CdsShortTimestamp.init_from_current_time()
)
pus_17_raw = pus_17_telecommand.pack()
pus_17_telemetry = Service17TM.unpack(raw_telemetry=pus_17_raw)
pus_17_telemetry = Service17TMExtended.unpack(raw_telemetry=pus_17_raw)

header_list = []
content_list = []
Expand Down
11 changes: 6 additions & 5 deletions src/tmtccmd/com_if/dummy_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
"""
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tm import TelemetryListT, Service1TM, Service17TM
from tmtccmd.pus.service_17_test import Srv17Subservices
from tmtccmd.tm import TelemetryListT
from tmtccmd.pus.service_1_verification import Service1TMExtended
from tmtccmd.pus.service_17_test import Srv17Subservices, Service17TMExtended
from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.tmtc_printer import TmTcPrinter

Expand Down Expand Up @@ -70,23 +71,23 @@ def generate_reply_package(self):
"""
if self.last_service == 17:
if self.last_subservice == 1:
tm_packer = Service1TM(
tm_packer = Service1TMExtended(
subservice=1, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
)

self.current_ssc += 1
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
tm_packer = Service1TM(
tm_packer = Service1TMExtended(
subservice=7, ssc=self.current_ssc, tc_packet_id=self.last_tc_packet_id,
tc_ssc=self.last_tc_ssc
)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1

tm_packer = Service17TM(subservice=Srv17Subservices.PING_REPLY)
tm_packer = Service17TMExtended(subservice=Srv17Subservices.PING_REPLY)
tm_packet_raw = tm_packer.pack()
self.next_telemetry_package.append(tm_packet_raw)
self.current_ssc += 1
Expand Down
13 changes: 6 additions & 7 deletions src/tmtccmd/com_if/tcpip_tcp_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import threading
import select
from collections import deque
from typing import Union, Optional
from typing import Union, Optional, Tuple

from spacepackets.ccsds.spacepacket import parse_space_packets

Expand Down Expand Up @@ -42,7 +42,7 @@ class TcpIpTcpComIF(CommunicationInterface):
TM_LOOP_DELAY = 0.2

def __init__(
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_id: int,
self, com_if_key: str, com_type: TcpCommunicationType, space_packet_ids: Tuple[int],
tm_polling_freqency: float, tm_timeout: float, tc_timeout_factor: float,
send_address: EthernetAddressT, max_recv_size: int,
max_packets_stored: int = 50,
Expand All @@ -52,16 +52,16 @@ def __init__(
:param com_if_key:
:param com_type: Communication Type. By default, it is assumed that
space packets are sent via TCP
:param space_packet_id: 16 bit packet header for space packet headers. It is used
to detect the start of a header.
:param space_packet_ids: 16 bit packet header for space packet headers. Used to
detect the start of PUS packets
:param tm_polling_freqency: Polling frequency in seconds
:param tm_timeout: Timeout in seconds
:param tmtc_printer: Printer instance, can be passed optionally to allow packet debugging
"""
super().__init__(com_if_key=com_if_key, tmtc_printer=tmtc_printer)
self.tm_timeout = tm_timeout
self.com_type = com_type
self.space_packet_id = space_packet_id
self.space_packet_ids = space_packet_ids
self.tc_timeout_factor = tc_timeout_factor
self.tm_polling_frequency = tm_polling_freqency
self.send_address = send_address
Expand Down Expand Up @@ -122,8 +122,7 @@ def receive(self, poll_timeout: float = 0) -> TelemetryListT:
# call. We parse the space packets contained in the stream here
if self.com_type == TcpCommunicationType.SPACE_PACKETS:
tm_packet_list = parse_space_packets(
analysis_queue=self.__analysis_queue, packet_id=self.space_packet_id,
max_len=self.max_recv_size
analysis_queue=self.__analysis_queue, packet_ids=self.space_packet_ids
)
else:
while self.__analysis_queue:
Expand Down
2 changes: 1 addition & 1 deletion src/tmtccmd/com_if/tcpip_udp_com_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def open(self, args: any = None):
# Set non-blocking because we use select.
self.udp_socket.setblocking(False)
if self.init_mode == CoreModeList.LISTENER_MODE:
from tmtccmd.tc.service_17_test import pack_service17_ping_command
from tmtccmd.pus.service_17_test import pack_service17_ping_command
# Send ping command immediately so the reception address is known for UDP
ping_cmd = pack_service17_ping_command(ssc=0)
self.send(ping_cmd.pack())
Expand Down
23 changes: 13 additions & 10 deletions src/tmtccmd/config/com_if.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Optional
from typing import Optional, Tuple

from tmtccmd.config.definitions import CoreGlobalIds, CoreComInterfaces
from tmtccmd.core.globals_manager import get_global, update_global
Expand All @@ -17,7 +17,7 @@

def create_communication_interface_default(
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str,
space_packet_id: int = 0
space_packet_ids: Tuple[int] = (0,)
) -> Optional[CommunicationInterface]:
"""Return the desired communication interface object
Expand All @@ -36,7 +36,7 @@ def create_communication_interface_default(
com_if_key == CoreComInterfaces.TCPIP_TCP.value:
communication_interface = create_default_tcpip_interface(
com_if_key=com_if_key, json_cfg_path=json_cfg_path, tmtc_printer=tmtc_printer,
space_packet_id=space_packet_id
space_packet_ids=space_packet_ids
)
elif com_if_key == CoreComInterfaces.SERIAL_DLE.value or \
com_if_key == CoreComInterfaces.SERIAL_FIXED_FRAME.value:
Expand Down Expand Up @@ -69,7 +69,9 @@ def create_communication_interface_default(
sys.exit(1)


def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_packet_id: int = 0):
def default_tcpip_cfg_setup(
tcpip_type: TcpIpType, json_cfg_path: str, space_packet_ids: Tuple[int] = (0,)
):
"""Default setup for TCP/IP communication interfaces. This intantiates all required data in the
globals manager so a TCP/IP communication interface can be built with
:func:`create_default_tcpip_interface`
Expand All @@ -93,11 +95,11 @@ def default_tcpip_cfg_setup(tcpip_type: TcpIpType, json_cfg_path: str, space_pac
ethernet_cfg_dict.update({TcpIpConfigIds.SEND_ADDRESS: send_tuple})
ethernet_cfg_dict.update({TcpIpConfigIds.RECV_ADDRESS: recv_tuple})
ethernet_cfg_dict.update({TcpIpConfigIds.RECV_MAX_SIZE: max_recv_buf_size})
if space_packet_id == 0 and tcpip_type == TcpIpType.TCP:
if space_packet_ids == (0,) and tcpip_type == TcpIpType.TCP:
LOGGER.warning(
'TCP communication interface without any specified space packet ID might not work!'
)
ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_id})
ethernet_cfg_dict.update({TcpIpConfigIds.SPACE_PACKET_ID: space_packet_ids})
update_global(CoreGlobalIds.ETHERNET_CONFIG, ethernet_cfg_dict)


Expand All @@ -117,15 +119,16 @@ def default_serial_cfg_setup(com_if_key: str, json_cfg_path: str):


def create_default_tcpip_interface(
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str, space_packet_id: int = 0
com_if_key: str, tmtc_printer: TmTcPrinter, json_cfg_path: str,
space_packet_ids: Tuple[int] = (0,)
) -> Optional[CommunicationInterface]:
"""Create a default serial interface. Requires a certain set of global variables set up. See
:func:`default_tcpip_cfg_setup` for more details.
:param com_if_key:
:param tmtc_printer:
:param json_cfg_path:
:param space_packet_id: Two byte packet ID which is used by TCP to parse space packets
:param space_packet_ids: Two byte packet IDs which is used by TCP to parse space packets
:return:
"""
communication_interface = None
Expand All @@ -136,7 +139,7 @@ def create_default_tcpip_interface(
elif com_if_key == CoreComInterfaces.TCPIP_TCP.value:
default_tcpip_cfg_setup(
tcpip_type=TcpIpType.TCP, json_cfg_path=json_cfg_path,
space_packet_id=space_packet_id
space_packet_ids=space_packet_ids
)
ethernet_cfg_dict = get_global(CoreGlobalIds.ETHERNET_CONFIG)
send_addr = ethernet_cfg_dict[TcpIpConfigIds.SEND_ADDRESS]
Expand All @@ -154,7 +157,7 @@ def create_default_tcpip_interface(
elif com_if_key == CoreComInterfaces.TCPIP_TCP.value:
communication_interface = TcpIpTcpComIF(
com_if_key=com_if_key, com_type=TcpCommunicationType.SPACE_PACKETS,
space_packet_id=space_packet_id, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
space_packet_ids=space_packet_ids, tm_timeout=get_global(CoreGlobalIds.TM_TIMEOUT),
tc_timeout_factor=get_global(CoreGlobalIds.TC_SEND_TIMEOUT_FACTOR),
tm_polling_freqency=0.5, send_address=send_addr, max_recv_size=max_recv_size,
tmtc_printer=tmtc_printer, init_mode=init_mode
Expand Down
1 change: 0 additions & 1 deletion src/tmtccmd/config/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class CoreGlobalIds(enum.IntEnum):

# Parameters
JSON_CFG_PATH = 139
APID = 140
MODE = 141
CURRENT_SERVICE = 142
COM_IF = 144
Expand Down
12 changes: 4 additions & 8 deletions src/tmtccmd/config/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from tmtccmd.utility.logger import get_console_logger
from tmtccmd.utility.conf_util import check_args_in_dict, print_core_globals
from spacepackets.ecss.conf import PusVersion, set_default_apid, get_default_apid, \
from spacepackets.ecss.conf import PusVersion, set_default_tc_apid, set_default_tm_apid, \
set_pus_tc_version, set_pus_tm_version
from tmtccmd.core.globals_manager import update_global, get_global
from tmtccmd.config.definitions import CoreGlobalIds, CoreModeList, CoreServiceList, \
Expand All @@ -17,10 +17,6 @@
SERVICE_OP_CODE_DICT = dict()


def get_global_apid() -> int:
return get_default_apid()


def set_json_cfg_path(json_cfg_path: str):
update_global(CoreGlobalIds.JSON_CFG_PATH, json_cfg_path)

Expand All @@ -39,16 +35,16 @@ def get_glob_com_if_dict() -> ComIFDictT:


def set_default_globals_pre_args_parsing(
gui: bool, apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C,
gui: bool, tc_apid: int, tm_apid: int, pus_tc_version: PusVersion = PusVersion.PUS_C,
pus_tm_version: PusVersion = PusVersion.PUS_C,
com_if_id: str = CoreComInterfaces.DUMMY.value, custom_com_if_dict=None,
display_mode="long", tm_timeout: float = 4.0, print_to_file: bool = True,
tc_send_timeout_factor: float = 2.0
):
if custom_com_if_dict is None:
custom_com_if_dict = dict()
update_global(CoreGlobalIds.APID, apid)
set_default_apid(default_apid=apid)
set_default_tc_apid(tc_apid=tc_apid)
set_default_tm_apid(tm_apid=tm_apid)
set_pus_tc_version(pus_tc_version)
set_pus_tm_version(pus_tm_version)
update_global(CoreGlobalIds.COM_IF, com_if_id)
Expand Down

0 comments on commit a7299e7

Please sign in to comment.