Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xbot_service_interface_py/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ testpaths = ["tests"]

[project.scripts]
xbot-shell = "xbot_service_interface.shell:main"
xbot-log = "xbot_service_interface.log_receiver:main"

[project.optional-dependencies]
dev = ["pytest>=8"]
Expand Down
128 changes: 128 additions & 0 deletions xbot_service_interface_py/xbot_service_interface/log_receiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
xbot-log — receive and display xBot remote log messages.

Services call startRemoteLogging() which multicasts XbotHeader+text to
LOG_MULTICAST_ADDR:MULTICAST_PORT. arg1 carries the ulog severity level.
"""
import argparse
import socket
import struct
import sys
from datetime import datetime

from .datatypes import (
HEADER_FORMAT, HEADER_SIZE, MessageType, LogLevel,
LOG_MULTICAST_ADDR, MULTICAST_PORT,
)

# ANSI colours keyed by LogLevel
_RESET = '\033[0m'
_LEVEL_FMT: dict[int, tuple[str, str]] = {
LogLevel.TRACE: ('\033[2m', 'TRC'),
LogLevel.DEBUG: ('\033[36m', 'DBG'),
LogLevel.INFO: ('\033[32m', 'INF'),
LogLevel.WARNING: ('\033[33m', 'WRN'),
LogLevel.ERROR: ('\033[31m', 'ERR'),
LogLevel.CRITICAL: ('\033[1;31m', 'CRT'),
LogLevel.ALWAYS: ('\033[1m', 'ALW'),
}


def _colour(level: int) -> tuple[str, str]:
return _LEVEL_FMT.get(level, ('', f'L{level:02d}'))


def _make_socket(bind_ip: str) -> socket.socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except AttributeError:
pass
sock.bind((LOG_MULTICAST_ADDR, MULTICAST_PORT))
iface = bind_ip if bind_ip != '0.0.0.0' else '0.0.0.0'
mreq = struct.pack('4s4s',
socket.inet_aton(LOG_MULTICAST_ADDR),
socket.inet_aton(iface))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.settimeout(1.0)
return sock


def _recv_loop(sock: socket.socket, min_level: int, no_colour: bool) -> None:
while True:
try:
data, (src_ip, _) = sock.recvfrom(4096)
except socket.timeout:
continue
except OSError:
break

if len(data) < HEADER_SIZE:
continue

fields = struct.unpack_from(HEADER_FORMAT, data)
msg_type = fields[1]
level = fields[5] # arg1
service_id = fields[4]
pay_size = fields[10]

if msg_type != MessageType.LOG:
continue
if level < min_level:
continue

payload = data[HEADER_SIZE: HEADER_SIZE + pay_size]
try:
text = payload.decode('utf-8', errors='replace').rstrip('\0')
except Exception:
text = repr(payload)

ts = datetime.now().strftime('%H:%M:%S.%f')[:-3]
ansi, tag = _colour(level)

if no_colour:
print(f"{ts} [{tag}] {src_ip} | {text}", flush=True)
else:
print(f"{ansi}{ts} [{tag}]{_RESET} \033[2m{src_ip}\033[0m | {ansi}{text}{_RESET}",
flush=True)


def main() -> None:
parser = argparse.ArgumentParser(
description='Receive and display xBot remote log messages')
parser.add_argument('--bind', '-b', default='0.0.0.0', metavar='IP',
help='local interface IP to join multicast on (default: 0.0.0.0)')
parser.add_argument('--level', '-l',
choices=['trace', 'debug', 'info', 'warning', 'error', 'critical'],
default='debug',
help='minimum log level to display (default: debug)')
parser.add_argument('--no-colour', '--no-color', action='store_true',
help='disable ANSI colour output')
args = parser.parse_args()

level_map = {
'trace': LogLevel.TRACE,
'debug': LogLevel.DEBUG,
'info': LogLevel.INFO,
'warning': LogLevel.WARNING,
'error': LogLevel.ERROR,
'critical': LogLevel.CRITICAL,
}
min_level = level_map[args.level]
no_colour = args.no_colour or not sys.stdout.isatty()

sock = _make_socket(args.bind)
print(f"Listening for xBot log messages on {LOG_MULTICAST_ADDR}:{MULTICAST_PORT}"
f" (bind={args.bind}, min_level={args.level.upper()})",
flush=True)
try:
_recv_loop(sock, min_level, no_colour)
except KeyboardInterrupt:
pass
finally:
sock.close()


if __name__ == '__main__':
main()
Loading