From e941ae87a46a29d6efb702c5024351770ab3ef49 Mon Sep 17 00:00:00 2001 From: Jonathan Heathcote Date: Tue, 14 Jul 2015 11:14:31 +0100 Subject: [PATCH] Add rig-counters utility. This application reports router counter value changes (e.g. dropped packets) and may be used to discover if packets are being dropped in your application. --- README.rst | 2 + docs/source/utility_apps.rst | 67 +++++++ rig/scripts/rig_counters.py | 204 ++++++++++++++++++++ setup.py | 1 + tests/scripts/test_rig_counters.py | 296 +++++++++++++++++++++++++++++ 5 files changed, 570 insertions(+) create mode 100644 rig/scripts/rig_counters.py create mode 100644 tests/scripts/test_rig_counters.py diff --git a/README.rst b/README.rst index a7edc5b..f973649 100644 --- a/README.rst +++ b/README.rst @@ -89,6 +89,8 @@ The utilities provided by Rig can be broken down approximately as follows: from a SpiNNaker application. * ``rig-ps``: No-nonsense command line utility for listing all applications (and their locations) in a SpiNNaker machine. + * ``rig-counters``: No-nonsense command line utility which can + non-intrusively monitor a SpiNNaker system for dropped packets. Python Version Support ---------------------- diff --git a/docs/source/utility_apps.rst b/docs/source/utility_apps.rst index f07c284..cc5c82b 100644 --- a/docs/source/utility_apps.rst +++ b/docs/source/utility_apps.rst @@ -143,3 +143,70 @@ command):: X Y P State Application App ID --- --- --- ----------------- ---------------- ------ 0 0 3 sync0 network_tester 66 + + +``rig-counters`` +================ + +The ``rig-counters`` command reads the router diagnostic counters for all chips +in a SpiNNaker system and reports any changes in value. This can be useful, for +example, when checking if (and where) an application is dropping packets. + +In the simplest use case, simply call ``rig-counters`` with a SpiNNaker +hostname as an argument, run your application and then press enter to see how +many packets were dropped:: + + $ rig-counters HOSTNAME + time,dropped_multicast + + 8.7,234 + +In the example above, 234 packets were dropped. Note that the output is in the +form of a CSV file. You can give the `--multiple`` option to allow multiple +samples to be captured. In the example below we capture four samples:: + + $ rig-counters HOSTNAME --multiple > out.csv + + + + + ^C + $ cat out.csv + time,dropped_multicast + 1.0,12 + 1.4,34 + 2.3,23 + 2.7,11 + +Instead of manually pressing enter to trigger a sample, you can use the +``--command`` argument to report the number of dropped packets during the +execution of your program:: + + $ rig-counters HOSTNAME --command ./my_program my_args + time,dropped_multicast + 10.4,102 + +You can also report each router's counter values individually using the +``--detailed`` option:: + + $ rig-counters HOSTNAME --detailed + time,x,y,dropped_multicast + + 10.4,0,0,10 + 10.4,0,1,2 + 10.4,0,2,5 + ... + +Other router counter values can be reported too, see ``rig-counters --help`` +for more details. + +.. warning:: + + ``rig-counters`` works by polling the router in every chip in a SpiNNaker + machine. This process takes some time (i.e. it isn't monotonic) and also + results in P2P messages being sent through the SpiNNaker network. + + The system is polled once when the utility is started and then once more + for each sample requested (e.g. every time you press enter). As a result, + you should be careful to only start or trigger a poll when the machine is + otherwise idle, for example, before or after your application runs. diff --git a/rig/scripts/rig_counters.py b/rig/scripts/rig_counters.py new file mode 100644 index 0000000..27908c8 --- /dev/null +++ b/rig/scripts/rig_counters.py @@ -0,0 +1,204 @@ +"""A minimal command-line utility which samples counter values around the +machine. + +Installed as "rig-counters" by setuptools. +""" + +import sys +import argparse +import subprocess +import time + +import rig + +from six import iteritems +from six.moves import input + +from rig.machine_control import MachineController + +from rig.machine_control.machine_controller import RouterDiagnostics + +from rig.machine_control.scp_connection import TimeoutError + + +def sample_counters(mc, machine): + """Sample every router counter in the machine.""" + return { + (x, y): mc.get_router_diagnostics(x, y) for (x, y) in machine + } + + +def deltas(last, now): + """Return the change in counter values (accounting for wrap-around).""" + return { + xy: RouterDiagnostics(*((n - l) & 0xFFFFFFFF + for l, n in zip(last[xy], now[xy]))) + for xy in last + } + + +def monitor_counters(mc, output, counters, detailed, f): + """Monitor the counters on a specified machine, taking a snap-shot every + time the generator 'f' yields.""" + # Print CSV header + output.write("time,{}{}\n".format("x,y," if detailed else "", + ",".join(counters))) + + machine = mc.get_machine() + + # Make an initial sample of the counters + last_counter_values = sample_counters(mc, machine) + + start_time = time.time() + + for _ in f(): + # Snapshot the change in counter values + counter_values = sample_counters(mc, machine) + delta = deltas(last_counter_values, counter_values) + last_counter_values = counter_values + + now = time.time() - start_time + + # Output the changes + if detailed: + for x, y in machine: + output.write("{:0.1f},{},{},{}\n".format( + now, x, y, + ",".join(str(getattr(delta[(x, y)], c)) + for c in counters))) + else: + totals = [0 for _ in counters] + for xy in machine: + for i, counter in enumerate(counters): + totals[i] += getattr(delta[xy], counter) + output.write("{:0.1f},{}\n".format( + now, ",".join(map(str, totals)))) + + +def press_enter(multiple=False, silent=False): + """Return a generator function which yields every time the user presses + return.""" + + def f(): + try: + while True: + if silent: + yield input() + else: + sys.stderr.write(" ") + sys.stderr.flush() + yield input() + if not multiple: + break + except (EOFError, KeyboardInterrupt): + # User Ctrl+D or Ctrl+C'd + if not silent: + # Prevents the user's terminal getting clobbered + sys.stderr.write("\n") + sys.stderr.flush() + return + + return f + + +def run_command(command): + """Return a generator function which yields once when a supplied command + exits.""" + def f(): + try: + subprocess.call(command) + except KeyboardInterrupt: + # If the user interrupts the process, just continue + pass + yield "" + return f + + +def main(args=None): + parser = argparse.ArgumentParser( + description="Report changes in router diagnostic counters in a " + "SpiNNaker system.") + parser.add_argument("--version", "-V", action="version", + version="%(prog)s {}".format(rig.__version__)) + + parser.add_argument("hostname", type=str, + help="hostname or IP of SpiNNaker system") + + parser.add_argument("--detailed", "-d", action="store_true", + help="give counter values for each chip individually " + "by default, just a sum is given") + parser.add_argument("--silent", "-s", action="store_true", + help="do not produce informational messages on STDOUT") + parser.add_argument("--output", "-o", type=str, default="-", + metavar="FILENAME", + help="filename to write recorded counter values to " + "or - for stdout (default: %(default)s)") + + when_group = parser.add_mutually_exclusive_group() + when_group.add_argument("--command", "-c", nargs=argparse.REMAINDER, + help="report the difference in counter values " + "before and after executing the supplied " + "command") + when_group.add_argument("--multiple", "-m", action="store_true", + help="allow recording of multiple snapshots " + "(default: just one snapshot)") + + counter_group = parser.add_argument_group( + "counter selection arguments", + description="Any subset of these counters may be selected for output. " + "If none are specified, only dropped multicast packets " + "will be reported.") + abbreviations = { + "local": "loc", + "external": "ext", + "dropped": "drop", + "multicast": "mc", + "nearest-neighbour": "nn", + "fixed-route": "fr", + "counter": "c", + } + for counter in RouterDiagnostics._fields: + arg_name = "--{}".format(counter.replace("_", "-")) + short_name = arg_name + for full, abbrev in iteritems(abbreviations): + short_name = short_name.replace(full, abbrev) + counter_group.add_argument(arg_name, short_name, + dest="counters", + action="append_const", const=counter) + + args = parser.parse_args(args) + + try: + mc = MachineController(args.hostname) + info = mc.get_software_version(0, 0) + if "SpiNNaker" in info.version_string: + counters = args.counters or ["dropped_multicast"] + if args.output == "-": + output = sys.stdout + else: + output = open(args.output, "w") + + if args.command is None: + f = press_enter(args.multiple, args.silent) + else: + f = run_command(args.command) + + try: + monitor_counters(mc, output, counters, args.detailed, f) + finally: + if output is not sys.stdout: # pragma: no branch + output.close() # pragma: no branch + else: + sys.stderr.write("{}: error: unknown architecture '{}'\n".format( + parser.prog, info.version_string.strip("\x00"))) + return 2 + except TimeoutError: + sys.stderr.write("{}: error: command timed out\n".format( + parser.prog)) + return 1 + + return 0 + + +if __name__ == "__main__": # pragma: no cover + sys.exit(main()) diff --git a/setup.py b/setup.py index 86308ad..c42e48b 100644 --- a/setup.py +++ b/setup.py @@ -104,6 +104,7 @@ def get_new_url(url): "rig-discover = rig.scripts.rig_discover:main", "rig-iobuf = rig.scripts.rig_iobuf:main", "rig-ps = rig.scripts.rig_ps:main", + "rig-counters = rig.scripts.rig_counters:main", ], } ) diff --git a/tests/scripts/test_rig_counters.py b/tests/scripts/test_rig_counters.py new file mode 100644 index 0000000..f93a91d --- /dev/null +++ b/tests/scripts/test_rig_counters.py @@ -0,0 +1,296 @@ +"""Test the counters command produces the correct output.""" + +import pytest + +import os + +import mock + +import subprocess + +from tempfile import mkstemp + +from six import StringIO + +import rig.scripts.rig_counters as rig_counters + +from rig.machine_control.scp_connection import TimeoutError + +from rig.machine import Machine + +from rig.machine_control.machine_controller import RouterDiagnostics + + +def test_sample_counters(): + mc = mock.Mock() + mc.get_router_diagnostics.return_value = RouterDiagnostics( + *(0 for _ in RouterDiagnostics._fields)) + machine = Machine(2, 2) + + # All counters should have been sampled + counters = rig_counters.sample_counters(mc, machine) + assert set(counters) == set(machine) + assert all(all(v == 0 for v in counters[xy]) for xy in counters) + + +def test_deltas(): + # In this example we include both unchanging, positive and wrap-around + # changes + last = { + (0, 0): RouterDiagnostics(1, 0xFFFFFFFF, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + (1, 0): RouterDiagnostics(0xFFFFFFFF, 1, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + } + this = { + (0, 0): RouterDiagnostics(10, 1, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + (1, 0): RouterDiagnostics(2, 11, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + } + assert rig_counters.deltas(last, this) == { + (0, 0): RouterDiagnostics(9, 2, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + (1, 0): RouterDiagnostics(3, 10, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0, + 0, 0, 0, 0), + } + + +@pytest.mark.parametrize("keyboard_interrupt", [False, True]) +@pytest.mark.parametrize("multiple", [False, True]) +@pytest.mark.parametrize("silent", [False, True]) +def test_press_enter(capsys, monkeypatch, keyboard_interrupt, multiple, + silent): + # Inputs should be safely terminated by EOF or ^C + mock_input = mock.Mock(side_effect=["", "", + KeyboardInterrupt + if keyboard_interrupt else + EOFError]) + monkeypatch.setattr(rig_counters, "input", mock_input) + + responses = list(rig_counters.press_enter(multiple, silent)()) + + # Should have the right number of responses + if multiple: + assert responses == ["", ""] + else: + assert responses == [""] + + # Sould only print instructions if not silent. Should add a newline when a + # ^C or EOF terminates the output + out, err = capsys.readouterr() + if silent: + assert out == "" + assert err == "" + else: + assert out == "" + if multiple: + assert err == " \n" + else: + assert err == " " + + +@pytest.mark.parametrize("keyboard_interrupt", [False, True]) +def test_run_command(capsys, monkeypatch, keyboard_interrupt): + # Commands should be safely terminated by ^C + mock_call = mock.Mock(side_effect=[KeyboardInterrupt + if keyboard_interrupt else 0]) + monkeypatch.setattr(subprocess, "call", mock_call) + + f = rig_counters.run_command(["foo", "--bar"]) + + # Shouldn't be called until we start the generator + assert not mock_call.called + + # Should not propagate a KeyboardInterrupt + assert list(f()) == [""] + + # Should now have been called + mock_call.assert_called_once_with(["foo", "--bar"]) + + # Should be silent on stderr/stdout + out, err = capsys.readouterr() + assert out == "" + assert err == "" + + +@pytest.mark.parametrize("multiple", [False, True]) +@pytest.mark.parametrize("detailed", [False, True]) +def test_monitor_counters(multiple, detailed): + mock_mc = mock.Mock() + mock_mc.get_machine.return_value = Machine(2, 2) + + cur_count = {xy: 0 for xy in mock_mc.get_machine()} + + def get_router_diagnostics(x, y): + counters = RouterDiagnostics(*(cur_count[(x, y)] * (i + 1) + for i in range(16))) + cur_count[(x, y)] += 1 + return counters + mock_mc.get_router_diagnostics.side_effect = get_router_diagnostics + + output = StringIO() + counters = ["dropped_multicast", "local_multicast"] + + f = mock.Mock(return_value=["", ""] if multiple else [""]) + rig_counters.monitor_counters(mock_mc, output, counters, detailed, f) + + # Should have a fully printed table + output = output.getvalue() + lines = output.rstrip("\n").split("\n") + + if detailed: + assert len(lines) >= 1 + 4 + + # Order of fields should be as specified + assert lines[0] == "time,x,y,dropped_multicast,local_multicast" + assert lines[1] == "0.0,0,0,9,1" + assert lines[2] == "0.0,0,1,9,1" + assert lines[3] == "0.0,1,0,9,1" + assert lines[4] == "0.0,1,1,9,1" + + if multiple: + assert len(lines) == 1 + 4 + 4 + assert lines[5] == "0.0,0,0,9,1" + assert lines[6] == "0.0,0,1,9,1" + assert lines[7] == "0.0,1,0,9,1" + assert lines[8] == "0.0,1,1,9,1" + else: + assert len(lines) == 1 + 4 + else: + assert len(lines) >= 2 + + # Order of fields should be as specified + assert lines[0] == "time,dropped_multicast,local_multicast" + assert lines[1] == "0.0,36,4" + + if multiple: + assert len(lines) == 3 + assert lines[2] == "0.0,36,4" + else: + assert len(lines) == 2 + + +def test_bad_args(): + # No hostname + with pytest.raises(SystemExit): + rig_counters.main([]) + + # Command and multiple specified at same time + with pytest.raises(SystemExit): + rig_counters.main(["localhost", "-m", "-c", "true"]) + + +def test_no_machine(monkeypatch): + # Should fail if nothing responds + mc = mock.Mock() + mc.get_software_version = mock.Mock(side_effect=TimeoutError) + + MC = mock.Mock() + MC.return_value = mc + monkeypatch.setattr(rig_counters, "MachineController", MC) + + assert rig_counters.main(["localhost"]) != 0 + + +def test_unknown_arch(monkeypatch): + # Should fail if system responds to initial sver with non-SpiNNaker/BMP + # machine type. + mc = mock.Mock() + info = mock.Mock() + info.version_string = "Mock/Tester" + info.version = 1.337 + mc.get_software_version.return_value = info + + MC = mock.Mock() + MC.return_value = mc + monkeypatch.setattr(rig_counters, "MachineController", MC) + + assert rig_counters.main(["localhost"]) != 0 + + +@pytest.mark.parametrize("counter_args,counters", [ + # Default to just dropped multicast + ([], ["dropped_multicast"]), + # Single counter + (["--drop-nn"], ["dropped_nearest_neighbour"]), + # Multiple counters + (["--drop-nn", "--local-p2p"], ["dropped_nearest_neighbour", "local_p2p"]), +]) +@pytest.mark.parametrize("output_to_file", [None, "-", "temp"]) +@pytest.mark.parametrize("use_command", [True, False]) +def test_command(monkeypatch, capsys, counter_args, counters, output_to_file, + use_command): + # Make sure the less-trivial commandline arguments are handled correctly + mock_mc = mock.Mock() + mock_mc.get_machine.return_value = Machine(2, 2) + mock_mc.get_router_diagnostics.return_value = RouterDiagnostics(*[0] * 16) + mock_info = mock.Mock() + mock_info.version_string = "SpiNNaker" + mock_mc.get_software_version.return_value = mock_info + + if use_command: + mock_call = mock.Mock(return_value=0) + monkeypatch.setattr(subprocess, "call", mock_call) + command_args = ["-c"] + else: + mock_input = mock.Mock(side_effect=["", EOFError]) + monkeypatch.setattr(rig_counters, "input", mock_input) + command_args = [] + + monkeypatch.setattr(rig_counters, "MachineController", + mock.Mock(return_value=mock_mc)) + + if output_to_file is None: + output_args = [] + uses_stdout = True + elif output_to_file is "-": + output_args = ["--output", "-"] + uses_stdout = True + else: + # Make a tempoary file to output into + _, tempfile = mkstemp() + output_args = ["--output", tempfile] + uses_stdout = False + + assert rig_counters.main(["localhost"] + + output_args + + counter_args + + command_args) == 0 + + # Make sure the right trigger was used + if use_command: + assert mock_call.called + else: + assert mock_input.called + + # Get the output from the selected file + if uses_stdout: + output, _ = capsys.readouterr() + else: + output, _ = capsys.readouterr() + assert output == "" + with open(tempfile, "r") as f: + output = f.read() + + # Make sure the output is correct + lines = output.rstrip("\n").split("\n") + assert len(lines) == 2 + assert lines[0] == "time,{}".format(",".join(counters)) + assert lines[1] == "0.0,{}".format(",".join("0" for _ in counters)) + + # Delete the temporary file + if not uses_stdout: + os.remove(tempfile)