Skip to content
This repository has been archived by the owner. It is now read-only.

Ticket28652 rebased #354

Closed
wants to merge 5 commits into from
Closed
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Bake more details into the heartbeat module and out of the main loop.
  • Loading branch information
asn-d6 committed Mar 21, 2019
commit 944bad5561dc92d04569ed06eceadd139315e0d4
@@ -479,12 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
measured.
"""
# Variable to count total progress in the last days:
# In case it is needed to see which relays are not being measured,
# store their fingerprint, not only their number.
measured_fp_set = set()
measured_percent = 0
main_loop_tstart = time.monotonic()
hbeat = heartbeat(conf.getpath('paths', 'state_fname'))

# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
@@ -496,6 +491,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
# long, set it here and not outside the loop.
pending_results = []
loop_tstart = time.time()

# Register relay fingerprints to the heartbeat module
hbeat.register_consensus_fpr(relay_list.relays_fingerprints)

for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
@@ -511,18 +510,19 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
[args, conf, destinations, circuit_builder, relay_list,
target], {}, callback, callback_err)
pending_results.append(async_result)
measured_fp_set.add(async_result)

# Register this measurement to the heartbeat module
hbeat.register_measured_fpr(async_result)

# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
# a dictionary with AsyncResults as items.
num_relays_to_measure = len(pending_results)
wait_for_results(num_relays_to_measure, pending_results)

measured_percent = heartbeat.total_measured_percent(
measured_percent, relay_list.relays_fingerprints, measured_fp_set,
main_loop_tstart, conf.getpath('paths', 'state_fname')
)
# Print the heartbeat message
hbeat.print_heartbeat_message()

loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
@@ -9,38 +9,54 @@

log = logging.getLogger(__name__)

# NOTE tech-debt: this could go be tracked globally as a singleton
consensus_fp_set = set()
class Heartbeat(object):
"""
Tracks current status of sbws and is capable of printing periodic
information about the current state
"""

def __init__(self, state_path):
# Variable to count total progress in the last days:
# In case it is needed to see which relays are not being measured,
# store their fingerprint, not only their number.
self.measured_fp_set = set()
self.consensus_fp_set = set()
self.measured_percent = 0
self.main_loop_tstart = time.monotonic()

def total_measured_percent(measured_percent, relays_fingerprints,
measured_fp_set, main_loop_tstart, state_path):
"""Returns the new percentage of the different relays that were measured.
self.state_dict = State(state_path)

This way it can be known whether the scanner is making progress measuring
all the Network.
self.previous_measurement_percent

Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
global consensus_fp_set
# NOTE: in a future refactor make State a singleton in __init__.py
state_dict = State(state_path)
loops_count = state_dict.get('recent_priority_list_count', 0)

# Store all the relays seen in all the consensuses.
[consensus_fp_set.add(r) for r in relays_fingerprints]

not_measured_fp_set = consensus_fp_set.difference(measured_fp_set)
main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60
new_measured_percent = round(
len(measured_fp_set) / len(consensus_fp_set) * 100)
log.info("Run %s main loops.", loops_count)
log.info("Measured in total %s (%s%%) unique relays in %s minutes",
len(measured_fp_set), new_measured_percent, main_loop_tdelta)
log.info("%s relays still not measured.", len(not_measured_fp_set))
# The case when it is equal will only happen when all the relays have been
# measured.
if (new_measured_percent <= measured_percent):
log.warning("There is no progress measuring relays!.")
return new_measured_percent
def register_measured_fpr(self, async_result):
measured_fp_set.add(async_result)

def register_consensus_fprs(self, relay_fprs):
for r in relay_fprs:
self.cosnensus_fp_set.add(r)

def print_heartbeat_message(self):
"""Print the new percentage of the different relays that were measured.
This way it can be known whether the scanner is making progress measuring
all the Network.
Log the percentage, the number of relays measured and not measured,
the number of loops and the time elapsed since it started measuring.
"""
loops_count = self.state_dict.get('recent_priority_list_count', 0)

not_measured_fp_set = consensus_fp_set.difference(measured_fp_set)
main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60
new_measured_percent = round(len(measured_fp_set) / len(consensus_fp_set) * 100)

log.info("Run %s main loops.", loops_count)
log.info("Measured in total %s (%s%%) unique relays in %s minutes",
len(measured_fp_set), new_measured_percent, main_loop_tdelta)
log.info("%s relays still not measured.", len(not_measured_fp_set))

# The case when it is equal will only happen when all the relays have been
# measured.
if (new_measured_percent <= self.previous_measured_percent):
log.warning("There is no progress measuring relays!.")
self.previous_measurement_percent = new_measured_percent
@@ -4,18 +4,18 @@

from sbws.lib import heartbeat


def test_total_measured_percent(conf, caplog):
measured_percent = 0
measured_fp_set = set(['A', 'B'])
main_loop_tstart = time.monotonic()
relays_fingerprints = set(['A', 'B', 'C'])
heartbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname'))

heartbeat.register_consensus_fpr(['A', 'B', 'C'])

haertbeat.register_measured_fpr('A')
haertbeat.register_measured_fpr('B')

caplog.set_level(logging.INFO)
new_measured_percent = heartbeat.total_measured_percent(
measured_percent, relays_fingerprints, measured_fp_set,
main_loop_tstart, conf.getpath('paths', 'state_fname')
)
assert new_measured_percent == 67

heartbeat.print_heartbeat_message()

assert heartbeat.previous_measured_percent == 67
caplog.records[1].getMessage().find("Measured in total 2 (67%)")
caplog.records[2].getMessage().find("1 relays still not measured")