From 8a8a71fd37b28e9e0b6762a6ec1b3010080135f6 Mon Sep 17 00:00:00 2001 From: juga0 Date: Mon, 18 Feb 2019 14:15:29 +0000 Subject: [PATCH 1/5] scanner: Warn when there is no progress measuring unique relays. Create new module heartbeat. Closes: #28652 --- sbws/core/scanner.py | 21 ++++++++++++++++++++- sbws/lib/heartbeat.py | 32 ++++++++++++++++++++++++++++++++ sbws/lib/relaylist.py | 7 +++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 sbws/lib/heartbeat.py diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 24a975b4..21767e05 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -33,6 +33,7 @@ import random from .. import settings +from .lib import heartbeat rng = random.SystemRandom() log = logging.getLogger(__name__) @@ -478,6 +479,14 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, measured. """ + # Variable to count total progress in the last days: + # In case it is needed to see which relays are not being measured, + # store their fingerprint, not only their number. + consensus_fp_set = set() + measured_fp_set = set() + measured_percent = 0 + main_loop_tstart = time.monotonic() + # Set the time to wait for a thread to finish as the half of an HTTP # request timeout. # Do not start a new loop if sbws is stopping. @@ -488,6 +497,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # long, set it here and not outside the loop. pending_results = [] loop_tstart = time.time() + # Store all the relays seen in all the consensuses. + [consensus_fp_set.add(r) for r in relay_list.relays_fingerprints] for target in relay_prioritizer.best_priority(): # Don't start measuring a relay if sbws is stopping. if settings.end_event.is_set(): @@ -503,13 +514,21 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, [args, conf, destinations, circuit_builder, relay_list, target], {}, callback, callback_err) pending_results.append(async_result) - + measured_fp_set.add(async_result) # After the for has finished, the pool has queued all the relays # and pending_results has the list of all the AsyncResults. # It could also be obtained with pool._cache, which contains # a dictionary with AsyncResults as items. num_relays_to_measure = len(pending_results) wait_for_results(num_relays_to_measure, pending_results) + # NOTE: in a future refactor make State a singleton in __init__.py + state_dict = State(conf.getpath('paths', 'state_fname')) + num_loops = state_dict['recent_priority_list_count'] + + measured_percent = heartbeat.total_measured_percent( + measured_percent, consensus_fp_set, measured_fp_set, + main_loop_tstart, num_loops + ) loop_tstop = time.time() loop_tdelta = (loop_tstop - loop_tstart) / 60 diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py new file mode 100644 index 00000000..5cb80930 --- /dev/null +++ b/sbws/lib/heartbeat.py @@ -0,0 +1,32 @@ +""" +Classes and functions to implement a heartbeat system to monitor the progress. +""" +import logging +import time + +log = logging.getLogger(__name__) + + +def total_measured_percent(measured_percent, consensus_fp_set, + measured_fp_set, main_loop_tstart, loops_count): + """Returns the new percentage of the different relays that were measured. + + This way it can be known whether the scanner is making progress measuring + all the Network. + + Log the percentage, the number of relays measured and not measured, + the number of loops and the time elapsed since it started measuring. + """ + not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) + main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 + new_measured_percent = round( + len(measured_fp_set) / len(consensus_fp_set) * 100) + log.info("Run %s main loops.", loops_count) + log.info("Measured in total %s (%s%%) unique relays in %s minutes", + len(measured_fp_set), new_measured_percent, main_loop_tdelta) + log.info("%s relays still not measured.", len(not_measured_fp_set)) + # The case when it is equal will only happen when all the relays have been + # measured. + if (new_measured_percent <= measured_percent): + log.warning("There is no progress measuring relays!.") + return new_measured_percent diff --git a/sbws/lib/relaylist.py b/sbws/lib/relaylist.py index 6a66069f..fcfbdea1 100644 --- a/sbws/lib/relaylist.py +++ b/sbws/lib/relaylist.py @@ -355,6 +355,13 @@ def guards(self): def authorities(self): return self._relays_with_flag(Flag.AUTHORITY) + @property + def relays_fingerprints(self): + # Using relays instead of _relays, so that the list get updated if + # needed, since this method is used to know which fingerprints are in + # the consensus. + return [r.fingerprint for r in self.relays] + def random_relay(self): return self.rng.choice(self.relays) From 22b11e35d3807ba067c9bd2f3ae8b0ec84776ed2 Mon Sep 17 00:00:00 2001 From: juga0 Date: Thu, 21 Mar 2019 10:19:54 +0000 Subject: [PATCH 2/5] fixup! scanner: Warn when there is no progress --- sbws/core/scanner.py | 12 +++--------- sbws/lib/heartbeat.py | 18 ++++++++++++++++-- tests/unit/lib/test_heartbeat.py | 21 +++++++++++++++++++++ 3 files changed, 40 insertions(+), 11 deletions(-) create mode 100644 tests/unit/lib/test_heartbeat.py diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 21767e05..f6443e5a 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -33,7 +33,7 @@ import random from .. import settings -from .lib import heartbeat +from ..lib import heartbeat rng = random.SystemRandom() log = logging.getLogger(__name__) @@ -482,7 +482,6 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # Variable to count total progress in the last days: # In case it is needed to see which relays are not being measured, # store their fingerprint, not only their number. - consensus_fp_set = set() measured_fp_set = set() measured_percent = 0 main_loop_tstart = time.monotonic() @@ -497,8 +496,6 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # long, set it here and not outside the loop. pending_results = [] loop_tstart = time.time() - # Store all the relays seen in all the consensuses. - [consensus_fp_set.add(r) for r in relay_list.relays_fingerprints] for target in relay_prioritizer.best_priority(): # Don't start measuring a relay if sbws is stopping. if settings.end_event.is_set(): @@ -521,13 +518,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # a dictionary with AsyncResults as items. num_relays_to_measure = len(pending_results) wait_for_results(num_relays_to_measure, pending_results) - # NOTE: in a future refactor make State a singleton in __init__.py - state_dict = State(conf.getpath('paths', 'state_fname')) - num_loops = state_dict['recent_priority_list_count'] measured_percent = heartbeat.total_measured_percent( - measured_percent, consensus_fp_set, measured_fp_set, - main_loop_tstart, num_loops + measured_percent, relay_list.relays_fingerprints, measured_fp_set, + main_loop_tstart, conf.getpath('paths', 'state_fname') ) loop_tstop = time.time() diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py index 5cb80930..7dfa7166 100644 --- a/sbws/lib/heartbeat.py +++ b/sbws/lib/heartbeat.py @@ -4,11 +4,17 @@ import logging import time +from ..util.state import State + + log = logging.getLogger(__name__) +# NOTE tech-debt: this could go be tracked globally as a singleton +consensus_fp_set = set() + -def total_measured_percent(measured_percent, consensus_fp_set, - measured_fp_set, main_loop_tstart, loops_count): +def total_measured_percent(measured_percent, relays_fingerprints, + measured_fp_set, main_loop_tstart, state_path): """Returns the new percentage of the different relays that were measured. This way it can be known whether the scanner is making progress measuring @@ -17,6 +23,14 @@ def total_measured_percent(measured_percent, consensus_fp_set, Log the percentage, the number of relays measured and not measured, the number of loops and the time elapsed since it started measuring. """ + global consensus_fp_set + # NOTE: in a future refactor make State a singleton in __init__.py + state_dict = State(state_path) + loops_count = state_dict.get('recent_priority_list_count', 0) + + # Store all the relays seen in all the consensuses. + [consensus_fp_set.add(r) for r in relays_fingerprints] + not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 new_measured_percent = round( diff --git a/tests/unit/lib/test_heartbeat.py b/tests/unit/lib/test_heartbeat.py new file mode 100644 index 00000000..55573a82 --- /dev/null +++ b/tests/unit/lib/test_heartbeat.py @@ -0,0 +1,21 @@ +"""Unit tests for heartbeat""" +import logging +import time + +from sbws.lib import heartbeat + + +def test_total_measured_percent(conf, caplog): + measured_percent = 0 + measured_fp_set = set(['A', 'B']) + main_loop_tstart = time.monotonic() + relays_fingerprints = set(['A', 'B', 'C']) + + caplog.set_level(logging.INFO) + new_measured_percent = heartbeat.total_measured_percent( + measured_percent, relays_fingerprints, measured_fp_set, + main_loop_tstart, conf.getpath('paths', 'state_fname') + ) + assert new_measured_percent == 67 + caplog.records[1].getMessage().find("Measured in total 2 (67%)") + caplog.records[2].getMessage().find("1 relays still not measured") From 944bad5561dc92d04569ed06eceadd139315e0d4 Mon Sep 17 00:00:00 2001 From: George Kadianakis Date: Thu, 21 Mar 2019 19:48:44 +0200 Subject: [PATCH 3/5] Bake more details into the heartbeat module and out of the main loop. --- sbws/core/scanner.py | 22 ++++----- sbws/lib/heartbeat.py | 78 +++++++++++++++++++------------- tests/unit/lib/test_heartbeat.py | 20 ++++---- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index f6443e5a..29d77520 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -479,12 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, measured. """ - # Variable to count total progress in the last days: - # In case it is needed to see which relays are not being measured, - # store their fingerprint, not only their number. - measured_fp_set = set() - measured_percent = 0 - main_loop_tstart = time.monotonic() + hbeat = heartbeat(conf.getpath('paths', 'state_fname')) # Set the time to wait for a thread to finish as the half of an HTTP # request timeout. @@ -496,6 +491,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # long, set it here and not outside the loop. pending_results = [] loop_tstart = time.time() + + # Register relay fingerprints to the heartbeat module + hbeat.register_consensus_fpr(relay_list.relays_fingerprints) + for target in relay_prioritizer.best_priority(): # Don't start measuring a relay if sbws is stopping. if settings.end_event.is_set(): @@ -511,7 +510,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, [args, conf, destinations, circuit_builder, relay_list, target], {}, callback, callback_err) pending_results.append(async_result) - measured_fp_set.add(async_result) + + # Register this measurement to the heartbeat module + hbeat.register_measured_fpr(async_result) + # After the for has finished, the pool has queued all the relays # and pending_results has the list of all the AsyncResults. # It could also be obtained with pool._cache, which contains @@ -519,10 +521,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, num_relays_to_measure = len(pending_results) wait_for_results(num_relays_to_measure, pending_results) - measured_percent = heartbeat.total_measured_percent( - measured_percent, relay_list.relays_fingerprints, measured_fp_set, - main_loop_tstart, conf.getpath('paths', 'state_fname') - ) + # Print the heartbeat message + hbeat.print_heartbeat_message() loop_tstop = time.time() loop_tdelta = (loop_tstop - loop_tstart) / 60 diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py index 7dfa7166..b8545bbb 100644 --- a/sbws/lib/heartbeat.py +++ b/sbws/lib/heartbeat.py @@ -9,38 +9,54 @@ log = logging.getLogger(__name__) -# NOTE tech-debt: this could go be tracked globally as a singleton -consensus_fp_set = set() +class Heartbeat(object): + """ + Tracks current status of sbws and is capable of printing periodic + information about the current state + """ + def __init__(self, state_path): + # Variable to count total progress in the last days: + # In case it is needed to see which relays are not being measured, + # store their fingerprint, not only their number. + self.measured_fp_set = set() + self.consensus_fp_set = set() + self.measured_percent = 0 + self.main_loop_tstart = time.monotonic() -def total_measured_percent(measured_percent, relays_fingerprints, - measured_fp_set, main_loop_tstart, state_path): - """Returns the new percentage of the different relays that were measured. + self.state_dict = State(state_path) - This way it can be known whether the scanner is making progress measuring - all the Network. + self.previous_measurement_percent - Log the percentage, the number of relays measured and not measured, - the number of loops and the time elapsed since it started measuring. - """ - global consensus_fp_set - # NOTE: in a future refactor make State a singleton in __init__.py - state_dict = State(state_path) - loops_count = state_dict.get('recent_priority_list_count', 0) - - # Store all the relays seen in all the consensuses. - [consensus_fp_set.add(r) for r in relays_fingerprints] - - not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) - main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 - new_measured_percent = round( - len(measured_fp_set) / len(consensus_fp_set) * 100) - log.info("Run %s main loops.", loops_count) - log.info("Measured in total %s (%s%%) unique relays in %s minutes", - len(measured_fp_set), new_measured_percent, main_loop_tdelta) - log.info("%s relays still not measured.", len(not_measured_fp_set)) - # The case when it is equal will only happen when all the relays have been - # measured. - if (new_measured_percent <= measured_percent): - log.warning("There is no progress measuring relays!.") - return new_measured_percent + def register_measured_fpr(self, async_result): + measured_fp_set.add(async_result) + + def register_consensus_fprs(self, relay_fprs): + for r in relay_fprs: + self.cosnensus_fp_set.add(r) + + def print_heartbeat_message(self): + """Print the new percentage of the different relays that were measured. + + This way it can be known whether the scanner is making progress measuring + all the Network. + + Log the percentage, the number of relays measured and not measured, + the number of loops and the time elapsed since it started measuring. + """ + loops_count = self.state_dict.get('recent_priority_list_count', 0) + + not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) + main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 + new_measured_percent = round(len(measured_fp_set) / len(consensus_fp_set) * 100) + + log.info("Run %s main loops.", loops_count) + log.info("Measured in total %s (%s%%) unique relays in %s minutes", + len(measured_fp_set), new_measured_percent, main_loop_tdelta) + log.info("%s relays still not measured.", len(not_measured_fp_set)) + + # The case when it is equal will only happen when all the relays have been + # measured. + if (new_measured_percent <= self.previous_measured_percent): + log.warning("There is no progress measuring relays!.") + self.previous_measurement_percent = new_measured_percent diff --git a/tests/unit/lib/test_heartbeat.py b/tests/unit/lib/test_heartbeat.py index 55573a82..e01db82c 100644 --- a/tests/unit/lib/test_heartbeat.py +++ b/tests/unit/lib/test_heartbeat.py @@ -4,18 +4,18 @@ from sbws.lib import heartbeat - def test_total_measured_percent(conf, caplog): - measured_percent = 0 - measured_fp_set = set(['A', 'B']) - main_loop_tstart = time.monotonic() - relays_fingerprints = set(['A', 'B', 'C']) + heartbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname')) + + heartbeat.register_consensus_fpr(['A', 'B', 'C']) + + haertbeat.register_measured_fpr('A') + haertbeat.register_measured_fpr('B') caplog.set_level(logging.INFO) - new_measured_percent = heartbeat.total_measured_percent( - measured_percent, relays_fingerprints, measured_fp_set, - main_loop_tstart, conf.getpath('paths', 'state_fname') - ) - assert new_measured_percent == 67 + + heartbeat.print_heartbeat_message() + + assert heartbeat.previous_measured_percent == 67 caplog.records[1].getMessage().find("Measured in total 2 (67%)") caplog.records[2].getMessage().find("1 relays still not measured") From 49e2d04db01506a13c7ecf7d12e2f076d827eee3 Mon Sep 17 00:00:00 2001 From: George Kadianakis Date: Thu, 21 Mar 2019 20:06:32 +0200 Subject: [PATCH 4/5] fixup! Bake more details into the heartbeat module and out of the main loop. --- sbws/lib/heartbeat.py | 17 +++++++++-------- tests/unit/lib/test_heartbeat.py | 14 ++++++++------ 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py index b8545bbb..edd4c360 100644 --- a/sbws/lib/heartbeat.py +++ b/sbws/lib/heartbeat.py @@ -26,14 +26,14 @@ def __init__(self, state_path): self.state_dict = State(state_path) - self.previous_measurement_percent + self.previous_measurement_percent = 0 def register_measured_fpr(self, async_result): - measured_fp_set.add(async_result) + self.measured_fp_set.add(async_result) def register_consensus_fprs(self, relay_fprs): for r in relay_fprs: - self.cosnensus_fp_set.add(r) + self.consensus_fp_set.add(r) def print_heartbeat_message(self): """Print the new percentage of the different relays that were measured. @@ -46,17 +46,18 @@ def print_heartbeat_message(self): """ loops_count = self.state_dict.get('recent_priority_list_count', 0) - not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) - main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 - new_measured_percent = round(len(measured_fp_set) / len(consensus_fp_set) * 100) + not_measured_fp_set = self.consensus_fp_set.difference(self.measured_fp_set) + main_loop_tdelta = (time.monotonic() - self.main_loop_tstart) / 60 + new_measured_percent = round(len(self.measured_fp_set) / len(self.consensus_fp_set) * 100) log.info("Run %s main loops.", loops_count) log.info("Measured in total %s (%s%%) unique relays in %s minutes", - len(measured_fp_set), new_measured_percent, main_loop_tdelta) + len(self.measured_fp_set), new_measured_percent, main_loop_tdelta) log.info("%s relays still not measured.", len(not_measured_fp_set)) # The case when it is equal will only happen when all the relays have been # measured. - if (new_measured_percent <= self.previous_measured_percent): + if (new_measured_percent <= self.previous_measurement_percent): log.warning("There is no progress measuring relays!.") + self.previous_measurement_percent = new_measured_percent diff --git a/tests/unit/lib/test_heartbeat.py b/tests/unit/lib/test_heartbeat.py index e01db82c..ca11f5e7 100644 --- a/tests/unit/lib/test_heartbeat.py +++ b/tests/unit/lib/test_heartbeat.py @@ -5,17 +5,19 @@ from sbws.lib import heartbeat def test_total_measured_percent(conf, caplog): - heartbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname')) + hbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname')) - heartbeat.register_consensus_fpr(['A', 'B', 'C']) + hbeat.register_consensus_fprs(['A', 'B', 'C']) - haertbeat.register_measured_fpr('A') - haertbeat.register_measured_fpr('B') + hbeat.register_measured_fpr('A') + hbeat.register_measured_fpr('B') caplog.set_level(logging.INFO) - heartbeat.print_heartbeat_message() + assert hbeat.previous_measurement_percent == 0 - assert heartbeat.previous_measured_percent == 67 + hbeat.print_heartbeat_message() + + assert hbeat.previous_measurement_percent == 67 caplog.records[1].getMessage().find("Measured in total 2 (67%)") caplog.records[2].getMessage().find("1 relays still not measured") From eba5e0bfa08a29ec9f1f5dda5f2af5f583a18d34 Mon Sep 17 00:00:00 2001 From: George Kadianakis Date: Thu, 21 Mar 2019 20:17:49 +0200 Subject: [PATCH 5/5] fixup! Bake more details into the heartbeat module and out of the main loop. --- sbws/core/scanner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 29d77520..822c6121 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -33,7 +33,7 @@ import random from .. import settings -from ..lib import heartbeat +from ..lib.heartbeat import Heartbeat rng = random.SystemRandom() log = logging.getLogger(__name__) @@ -479,7 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, measured. """ - hbeat = heartbeat(conf.getpath('paths', 'state_fname')) + hbeat = Heartbeat(conf.getpath('paths', 'state_fname')) # Set the time to wait for a thread to finish as the half of an HTTP # request timeout. @@ -493,7 +493,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, loop_tstart = time.time() # Register relay fingerprints to the heartbeat module - hbeat.register_consensus_fpr(relay_list.relays_fingerprints) + hbeat.register_consensus_fprs(relay_list.relays_fingerprints) for target in relay_prioritizer.best_priority(): # Don't start measuring a relay if sbws is stopping.