From 209faa70b676b957c15510e94008f96f0a518c86 Mon Sep 17 00:00:00 2001 From: Joachim Wiberg Date: Tue, 25 Nov 2025 09:11:13 +0100 Subject: [PATCH] test: attempt to stabilize syslog hostname filtering - Refactor to use a common function for all messages - Use until() to retry check_log() Signed-off-by: Joachim Wiberg --- test/case/syslog/hostname_filter/test.py | 79 +++++++++++++----------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/test/case/syslog/hostname_filter/test.py b/test/case/syslog/hostname_filter/test.py index 37911a542..78ebf480a 100755 --- a/test/case/syslog/hostname_filter/test.py +++ b/test/case/syslog/hostname_filter/test.py @@ -7,7 +7,8 @@ """ import infamy -import time +import infamy.iface as iface +from infamy import until TEST_MESSAGES = [ ("router1", "Message from router1"), @@ -16,6 +17,27 @@ ("other", "Message from a different host"), ] +def verify_log_content(ssh, logfile, expected_hosts, test): + """Verify log file contains only messages from expected hosts.""" + def check_log(): + rc = ssh.runsh(f"cat /var/log/{logfile} 2>/dev/null") + log_content = rc.stdout if rc.returncode == 0 else "" + + expected_messages = [msg for host, msg in TEST_MESSAGES if host in expected_hosts] + missing = [msg for msg in expected_messages if msg not in log_content] + if missing: + return False + + if expected_hosts: # Only check for unwanted if filtering by hostname + unwanted_messages = [msg for host, msg in TEST_MESSAGES if host not in expected_hosts] + found = [msg for msg in unwanted_messages if msg in log_content] + if found: + test.fail(f"Found unwanted messages in /var/log/{logfile}: {found}") + + return True + + until(check_log, attempts=20) + with infamy.Test() as test: with test.step("Set up topology and attach to DUTs"): env = infamy.Env() @@ -91,6 +113,15 @@ } }) + with test.step("Wait for server interface to be operational"): + until(lambda: iface.get_param(server, server_link, "oper-status") == "up", attempts=20) + + with test.step("Verify server IP address is configured"): + until(lambda: iface.address_exist(server, server_link, "10.0.0.1", proto="static"), attempts=20) + + with test.step("Verify syslog server is listening on UDP port 514"): + until(lambda: "10.0.0.1:514" in serverssh.runsh("ss -ulnp 2>/dev/null | grep :514 || true").stdout, attempts=20) + with test.step("Configure client to forward logs to server"): _, client_link = env.ltop.xlate("client", "link") @@ -131,49 +162,27 @@ } }) - time.sleep(2) + with test.step("Wait for client interface to be operational"): + until(lambda: iface.get_param(client, client_link, "oper-status") == "up", attempts=20) + + with test.step("Verify client IP address is configured"): + until(lambda: iface.address_exist(client, client_link, "10.0.0.2", proto="static"), attempts=20) + + with test.step("Verify network connectivity between client and server"): + until(lambda: clientssh.runsh("ping -c 1 -W 1 10.0.0.1 >/dev/null 2>&1").returncode == 0, attempts=10) with test.step("Send log messages with different hostnames"): for hostname, message in TEST_MESSAGES: clientssh.runsh(f"logger -t test -p daemon.info -H {hostname} -h 10.0.0.1 '{message}'") - time.sleep(2) with test.step("Verify router1 log contains only router1 messages"): - rc = serverssh.runsh("cat /var/log/router1 2>/dev/null") - log_content = rc.stdout if rc.returncode == 0 else "" - - router1_messages = [msg for host, msg in TEST_MESSAGES if host == "router1"] - missing = [msg for msg in router1_messages if msg not in log_content] - if missing: - test.fail(f"Missing router1 messages in /var/log/router1: {missing}") - - unwanted_messages = [msg for host, msg in TEST_MESSAGES if host != "router1"] - found = [msg for msg in unwanted_messages if msg in log_content] - if found: - test.fail(f"Found unwanted messages in /var/log/router1: {found}") + verify_log_content(serverssh, "router1", ["router1"], test) with test.step("Verify router2 log contains only router2 messages"): - rc = serverssh.runsh("cat /var/log/router2 2>/dev/null") - log_content = rc.stdout if rc.returncode == 0 else "" - - router2_messages = [msg for host, msg in TEST_MESSAGES if host == "router2"] - missing = [msg for msg in router2_messages if msg not in log_content] - if missing: - test.fail(f"Missing router2 messages in /var/log/router2: {missing}") - - unwanted_messages = [msg for host, msg in TEST_MESSAGES if host != "router2"] - found = [msg for msg in unwanted_messages if msg in log_content] - if found: - test.fail(f"Found unwanted messages in /var/log/router2: {found}") + verify_log_content(serverssh, "router2", ["router2"], test) with test.step("Verify all-hosts log contains all messages"): - rc = serverssh.runsh("cat /var/log/all-hosts 2>/dev/null") - log_content = rc.stdout if rc.returncode == 0 else "" - - expected_messages = [msg for _, msg in TEST_MESSAGES] - missing = [msg for msg in expected_messages if msg not in log_content] - - if missing: - test.fail(f"Missing messages in /var/log/all-hosts: {missing}") + all_hosts = [host for host, _ in TEST_MESSAGES] + verify_log_content(serverssh, "all-hosts", all_hosts, test) test.succeed()