diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 7246f12f..7ab1e6eb 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -1,4 +1,5 @@ ''' Measure the relays. ''' +import queue from ..lib.circuitbuilder import GapsCircuitBuilder as CB from ..lib.resultdump import ResultDump @@ -315,8 +316,21 @@ def _next_expected_amount(expected_amount, result_time, download_times, def result_putter(result_dump): ''' Create a function that takes a single argument -- the measurement result -- and return that function so it can be used by someone else ''' + def closure(measurement_result): - return result_dump.queue.put(measurement_result) + # Since result_dump thread is calling queue.get() every second, + # the queue should be full for only 1 second. + # This call blocks at maximum timeout seconds. + try: + result_dump.queue.put(measurement_result, timeout=3) + except queue.Full: + # The result would be lost, the scanner will continue working. + log.warning( + "The queue with measurements is full, when adding %s.\n" + "It is possible that the thread that get them to " + "write them to the disk (ResultDump.enter) is stalled.", + measurement_result + ) return closure diff --git a/sbws/lib/resultdump.py b/sbws/lib/resultdump.py index 52a0b7cc..384bd42b 100644 --- a/sbws/lib/resultdump.py +++ b/sbws/lib/resultdump.py @@ -582,7 +582,22 @@ def handle_result(self, result): log.info(msg) def enter(self): - ''' Main loop for the ResultDump thread ''' + """Main loop for the ResultDump thread. + + When there are results in the queue, queue.get will get them until + there are not anymore or timeout happen. + + For every result it gets, it process it and store in the filesystem, + which takes ~1 millisecond and will not trigger the timeout. + It can then store in the filesystem ~1000 results per second. + + I does not accept any other data type than Results or list of Results, + therefore is not possible to put big data types in the queue. + + If there are not any results in the queue, it waits 1 second and checks + again. + + """ with self.data_lock: self.data = load_recent_results_in_datadir( self.fresh_days, self.datadir) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 4dd442b7..ada1b9b9 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -267,3 +267,23 @@ def sbwshome_success_result_two_relays(sbwshome_only_datadir, conf): write_result_to_datadir(RESULT_SUCCESS2, dd) write_result_to_datadir(RESULT_SUCCESS2, dd) return sbwshome_only_datadir + + +@pytest.fixture(scope='function') +def end_event(): + import threading + return threading.Event() + + +@pytest.fixture(scope='function') +def rd(args, conf, end_event): + from sbws.lib.resultdump import ResultDump + # in Travis the next line gives the error: + # TypeError: __init__() takes 3 positional arguments but 4 were given + # No idea why. + # Returning None to disable the test in case ResultDump can not be + # initialized. + try: + return ResultDump(args, conf, end_event) + except TypeError: + return None diff --git a/tests/unit/core/test_scanner.py b/tests/unit/core/test_scanner.py new file mode 100644 index 00000000..3f84472f --- /dev/null +++ b/tests/unit/core/test_scanner.py @@ -0,0 +1,28 @@ +"""Unit tests for scanner.py.""" +import pytest + +from sbws.core.scanner import result_putter + + +def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event): + if rd is None: + pytest.skip("ResultDump is None") + # Put one item in the queue + callback = result_putter(rd) + callback(result_success) + assert rd.queue.qsize() == 1 + + # Make queue maxsize 1, so that it'll be full after the first callback. + # The second callback will wait 1 second, then the queue will be empty + # again. + rd.queue.maxsize = 1 + callback(result_success) + # after putting 1 result, the queue will be full + assert rd.queue.qsize() == 1 + assert rd.queue.full() + # it's still possible to put another results, because the callback will + # wait 1 second and the queue will be empty again. + callback(result_success) + assert rd.queue.qsize() == 1 + assert rd.queue.full() + end_event.set()