Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output unframed (garbage) data to file #151

Merged
merged 3 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions src/fprime_gds/common/communication/framing.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ def deframe(self, data, no_copy=False):
"""
Deframes the incoming data from the specified format. Produces exactly one packet, and leftover bytes. Users
wanting all packets to be deframed should call "deframe_all". If no full packet is available, this method
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, and the leftover
bytes that were unused. Will search and discard data up until a start token is found. Note: data will be
consumed up to the first start token found.
returns None. Expects incoming raw bytes to deframe, and returns a deframed packet or None, the leftover
bytes that were unused, and any bytes discarded from the existing data stream. Will search and discard data up
until a start token is found. Note: data will be consumed up to the first start token found.

:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
:return: (packet as array of bytes or None, leftover bytes, any discarded data)
"""

def deframe_all(self, data, no_copy):
Expand All @@ -56,16 +56,18 @@ def deframe_all(self, data, no_copy):

:param data: framed data bytes
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return:
:return: list of packets, remaining data, discarded/unframed/garbage data
"""
packets = []
if not no_copy:
data = copy.copy(data)
discarded_aggregate = b""
while True:
# Deframe and return only on None
(packet, data) = self.deframe(data, no_copy=True)
(packet, data, discarded) = self.deframe(data, no_copy=True)
discarded_aggregate += discarded
if packet is None:
return packets, data
return packets, data, discarded_aggregate
packets.append(packet)


Expand Down Expand Up @@ -147,6 +149,7 @@ def deframe(self, data, no_copy=False):
:param no_copy: (optional) will prevent extra copy if True, but "data" input will be destroyed.
:return: (packet as array of bytes or None, leftover bytes)
"""
discarded = b""
if not no_copy:
data = copy.copy(data)
# Continue until there is not enough data for the header, or until a packet is found (return)
Expand All @@ -163,6 +166,7 @@ def deframe(self, data, no_copy=False):
start != FpFramerDeframer.START_TOKEN
or data_size >= FpFramerDeframer.MAXIMUM_DATA_SIZE
):
discarded += data[0:1]
data = data[1:]
continue
# If the pool is large enough to read the whole frame, then read it
Expand All @@ -175,17 +179,18 @@ def deframe(self, data, no_copy=False):
data[: data_size + FpFramerDeframer.HEADER_SIZE]
):
data = data[total_size:]
return deframed, data
return deframed, data, discarded
print(
"[WARNING] Checksum validation failed. Have you correctly set '--comm-checksum-type'",
file=sys.stderr,
)
# Bad checksum, rotate 1 and keep looking for non-garbage
discarded += data[0:1]
data = data[1:]
continue
# Case of not enough data for a full packet, return hoping for more later
return None, data
return None, data
return None, data, discarded
return None, data, discarded


class TcpServerFramerDeframer(FramerDeframer):
Expand Down Expand Up @@ -237,11 +242,11 @@ def deframe(self, data, no_copy=False):
data = data[1:]
# Break out of data when not enough
if len(data) < 8:
return None, data
return None, data, b""
# Read the length and break if not enough data
(data_len,) = struct.unpack_from(">I", data, 4)
if len(data) < data_len + 8:
return None, data
return None, data, b""
packet = data[8 : data_len + 8]
data = data[data_len + 8 :]
return packet, data
return packet, data, b""
2 changes: 1 addition & 1 deletion src/fprime_gds/common/communication/ground.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def receive_all(self):
:return: list deframed packets
"""
self.data += self.tcp.read()
(frames, self.data) = self.deframer.deframe_all(self.data, no_copy=True)
(frames, self.data, _) = self.deframer.deframe_all(self.data, no_copy=True)
return frames

def send_all(self, frames):
Expand Down
31 changes: 26 additions & 5 deletions src/fprime_gds/common/communication/updown.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,23 @@ class Downlinker:
"""

def __init__(
self, adapter: BaseAdapter, ground: GroundHandler, deframer: FramerDeframer
self,
adapter: BaseAdapter,
ground: GroundHandler,
deframer: FramerDeframer,
discarded=None,
):
"""Initialize the downlinker

Constructs a new downlinker object used to run the downlink and deframing operation.
Constructs a new downlinker object used to run the downlink and deframing operation. This downlinker will log
discarded (unframed) data when discarded is a writable data object. When discarded is None the discarded data is
dropped.

Args:
adapter: adapter used to read raw data from the hardware connection
ground: handles the ground side connection
deframer: deframer used to deframe data from the communication format
discarded: file to write discarded data to. None to drop the data.
"""
self.running = True
self.th_ground = None
Expand All @@ -54,13 +61,18 @@ def __init__(
self.ground = ground
self.deframer = deframer
self.outgoing = Queue()
self.discarded = discarded

def start(self):
"""Starts the downlink pipeline"""
self.th_ground = threading.Thread(target=self.sending, name="DownlinkTTSGroundThread")
self.th_ground = threading.Thread(
target=self.sending, name="DownlinkTTSGroundThread"
)
self.th_ground.daemon = True
self.th_ground.start()
self.th_data = threading.Thread(target=self.deframing, name="DownLinkDeframingThread")
self.th_data = threading.Thread(
target=self.deframing, name="DownLinkDeframingThread"
)
self.th_data.daemon = True
self.th_data.start()

Expand All @@ -74,12 +86,20 @@ def deframing(self):
while self.running:
# Blocks until data is available, but may still return b"" if timeout
pool += self.adapter.read()
frames, pool = self.deframer.deframe_all(pool, no_copy=True)
frames, pool, discarded_data = self.deframer.deframe_all(pool, no_copy=True)
try:
for frame in frames:
self.outgoing.put_nowait(frame)
except Full:
DW_LOGGER.warning("GDS ground queue full, dropping frame")
try:
if self.discarded is not None:
self.discarded.write(discarded_data)
self.discarded.flush()
# Failure to write discarded data should never stop the GDS. Log it and move on.
except Exception as exc:
DW_LOGGER.warning("Cannot write discarded data %s", exc)
self.discarded = None # Give up on logging further data

def sending(self):
"""Outgoing stage of downlink
Expand Down Expand Up @@ -107,6 +127,7 @@ def join(self):
for thread in [self.th_data, self.th_ground]:
if thread is not None:
thread.join()
self.discarded = None

def add_loopback_frame(self, frame):
"""Adds a frame to loopback to ground
Expand Down
13 changes: 11 additions & 2 deletions src/fprime_gds/common/logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import os
import sys

INITIALIZED = False


def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=False):
"""
Expand All @@ -21,7 +23,14 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
:param mode: of file to write
:param mirror_to_stdout: mirror the log output to standard our
"""
handlers = [logging.StreamHandler(sys.stdout)] if directory is None or mirror_to_stdout else []
global INITIALIZED
if INITIALIZED:
return
handlers = (
[logging.StreamHandler(sys.stdout)]
if directory is None or mirror_to_stdout
else []
)
if directory is not None:
log_file = os.path.join(directory, os.path.basename(filename))
log_file = log_file if log_file.endswith(".log") else f"{log_file}.log"
Expand All @@ -33,4 +42,4 @@ def configure_py_log(directory=None, filename=sys.argv[0], mirror_to_stdout=Fals
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
logging.info("Logging system initialized!")

INITIALIZED = True
26 changes: 18 additions & 8 deletions src/fprime_gds/executables/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,16 @@ def handle_arguments(self, args, **kwargs):
if likely_deployment.exists():
args.deployment = likely_deployment
return args
child_directories = [child for child in detected_toolchain.iterdir() if child.is_dir()]
child_directories = [
child for child in detected_toolchain.iterdir() if child.is_dir()
]
if not child_directories:
msg = f"No deployments found in {detected_toolchain}. Specify deployment with: --deployment"
raise Exception(msg)
# Works for the old structure where the bin, lib, and dict directories live immediately under the platform
elif len(child_directories) == 3 and set([path.name for path in child_directories]) == {"bin", "lib", "dict"}:
elif len(child_directories) == 3 and set(
[path.name for path in child_directories]
) == {"bin", "lib", "dict"}:
args.deployment = detected_toolchain
return args
elif len(child_directories) > 1:
Expand Down Expand Up @@ -310,8 +314,7 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
"action": "store",
"type": str,
"help": "Adapter for communicating to flight deployment. [default: %(default)s]",
"choices": ["none"]
+ list(adapter_definition_dictionaries),
"choices": ["none"] + list(adapter_definition_dictionaries),
"default": "ip",
},
("--comm-checksum-type",): {
Expand All @@ -326,6 +329,15 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
],
"default": fprime_gds.common.communication.checksum.CHECKSUM_SELECTION,
},
("--output-unframed-data",): {
"dest": "output_unframed_data",
"action": "store",
"nargs": "?",
"help": "Log unframed data to supplied file relative to log directory. Use '-' for standard out.",
"default": None,
"const": "unframed.log",
"required": False,
},
}
return {**adapter_arguments, **com_arguments}

Expand Down Expand Up @@ -699,9 +711,7 @@ def handle_arguments(self, args, **kwargs):
args.app = Path(args.app) if args.app else Path(find_app(args.deployment))
if not args.app.is_file():
msg = f"F prime binary '{args.app}' does not exist or is not a file"
raise ValueError(
msg
)
raise ValueError(msg)
return args


Expand All @@ -728,7 +738,7 @@ def get_arguments(self) -> Dict[Tuple[str, ...], Dict[str, Any]]:
"action": "store",
"required": False,
"type": int,
"nargs":'+',
"nargs": "+",
"help": f"only show {self.command_name} matching the given type ID(s) 'ID'; can provide multiple IDs to show all given types",
"metavar": "ID",
},
Expand Down
79 changes: 53 additions & 26 deletions src/fprime_gds/executables/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import signal
import sys

from pathlib import Path

# Required adapters built on standard tools
try:
from fprime_gds.common.zmq_transport import ZmqGround
Expand Down Expand Up @@ -84,34 +86,59 @@
# Set the framing class used and pass it to the uplink and downlink component constructions giving each a separate
# instantiation
framer_class = FpFramerDeframer
LOGGER.info("Starting uplinker/downlinker connecting to FSW using %s with %s", adapter, framer_class.__name__)
downlinker = Downlinker(adapter, ground, framer_class())
uplinker = Uplinker(adapter, ground, framer_class(), downlinker)

# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
adapter.open()

# Finally start the processing of uplink and downlink
downlinker.start()
uplinker.start()
LOGGER.debug("Uplinker and downlinker running")

# Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, and wait for
# everything to terminate as expected.
def shutdown(*_):
"""Shutdown function for signals"""
uplinker.stop()
downlinker.stop()
LOGGER.info(
"Starting uplinker/downlinker connecting to FSW using %s with %s",
adapter,
framer_class.__name__,
)
discarded_file_handle = None
try:
if args.output_unframed_data == "-":
discarded_file_handle = sys.stdout.buffer
elif args.output_unframed_data is not None:
discarded_file_handle_path = (
Path(args.logs) / Path(args.output_unframed_data)
).resolve()
try:
discarded_file_handle = open(discarded_file_handle_path, "wb")
LeStarch marked this conversation as resolved.
Dismissed
Show resolved Hide resolved
LOGGER.info("Logging unframed data to %s", discarded_file_handle_path)
except OSError:
LOGGER.warning(
"Failed to open %s. Unframed data will be discarded.",
discarded_file_handle_path,
)
downlinker = Downlinker(
adapter, ground, framer_class(), discarded=discarded_file_handle
)
uplinker = Uplinker(adapter, ground, framer_class(), downlinker)

# Open resources for the handlers on either side, this prepares the resources needed for reading/writing data
ground.open()
adapter.open()

# Finally start the processing of uplink and downlink
downlinker.start()
uplinker.start()
LOGGER.debug("Uplinker and downlinker running")

# Wait for shutdown event in the form of a KeyboardInterrupt then stop the processing, close resources, and wait for
# everything to terminate as expected.
def shutdown(*_):
"""Shutdown function for signals"""
uplinker.stop()
downlinker.stop()
uplinker.join()
downlinker.join()
ground.close()
adapter.close()

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
uplinker.join()
downlinker.join()
ground.close()
adapter.close()

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
uplinker.join()
downlinker.join()
finally:
if discarded_file_handle is not None and args.output_unframed_data != "-":
discarded_file_handle.close()
return 0


Expand Down
13 changes: 12 additions & 1 deletion src/fprime_gds/flask/static/js/datastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,18 @@ class FullListHistory extends ListHistory {
* @param new_items: new items being to be process
*/
send(new_items) {
this.store.splice(0, this.store.length, ...new_items);
// When the lists are not the same, update the stored list otherwise keep the list to prevent unnecessary bound
// data re-rendering.
if (this.store.length !== new_items.length) {
this.store.splice(0, this.store.length, ...new_items);
return;
}
for (let i = 0; i < Math.min(this.store.length, new_items.length); i++) {
if (this.store[i] !== new_items[i]) {
this.store.splice(0, this.store.length, ...new_items);
return;
}
}
}
}

Expand Down
Loading
Loading