diff --git a/DEPLOY.rst b/DEPLOY.rst index e62e17f6..9629c75e 100644 --- a/DEPLOY.rst +++ b/DEPLOY.rst @@ -30,6 +30,8 @@ or ``_ (local build or Read the Docs). To run the ``scanner`` it is mandatory to create a configuration file with at least one ``destination``. +It is recommended to set several ``destination``s so that the ``scanner`` can +continue if one fails. If ``sbws`` is installed from the Debian package, then create a file in ``/etc/sbws/sbws.ini`` like in the following example: diff --git a/docs/source/examples/sbws.example.ini b/docs/source/examples/sbws.example.ini index 0130bdb6..2a620108 100644 --- a/docs/source/examples/sbws.example.ini +++ b/docs/source/examples/sbws.example.ini @@ -4,7 +4,12 @@ nickname = sbws_default [destinations] -# a destination can be disabled changing `on` by `off` +# With several destinations, the scanner can continue even if some of them +# fail, which can be caused by a network problem on their side. +# If all of them fail, the scanner will stop, which +# will happen if there is network problem on the scanner side. + +# A destination can be disabled changing `on` by `off` foo = on [destinations.foo] diff --git a/docs/source/man_sbws.ini.rst b/docs/source/man_sbws.ini.rst index 0ea98b51..de603d67 100644 --- a/docs/source/man_sbws.ini.rst +++ b/docs/source/man_sbws.ini.rst @@ -58,6 +58,11 @@ paths (Default ~/.sbws/log) destinations + + It is required to set at least one destination for the scanner to run. + It is recommended to set several destinations so that the scanner can + continue if one fails. + STR = {on, off} Name of destination. It is a name for the Web server from where to download files in order to measure bandwidths. diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 80251ec2..be8680cb 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -6,7 +6,8 @@ from ..lib.resultdump import ResultErrorStream from ..lib.relaylist import RelayList from ..lib.relayprioritizer import RelayPrioritizer -from ..lib.destination import DestinationList +from ..lib.destination import (DestinationList, + connect_to_destination_over_circuit) from ..util.timestamp import now_isodt_str from ..util.state import State from sbws.globals import fail_hard @@ -169,10 +170,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay): cb.controller, conf.getfloat('general', 'http_timeout')) # Pick a destionation dest = destinations.next() + # If there is no any destination at this point, it can not continue. if not dest: - log.warning('Unable to get destination to measure %s %s', - relay.nickname, relay.fingerprint[0:8]) - return None + log.critical("There are not any functional destinations.\n" + "It is recommended to set several destinations so that " + "the scanner can continue if one fails.") + # This should raise an error so that the caller can close the pool. + exit(1) # Pick a relay to help us measure the given relay. If the given relay is an # exit, then pick a non-exit. Otherwise pick an exit. helper = None @@ -206,9 +210,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay): log.debug('Built circ %s %s for relay %s %s', circ_id, stem_utils.circuit_str(cb.controller, circ_id), relay.nickname, relay.fingerprint[0:8]) - # Make a connection to the destionation webserver and make sure it can - # still help us measure - is_usable, usable_data = dest.is_usable(circ_id, s, cb.controller) + # Make a connection to the destination + is_usable, usable_data = connect_to_destination_over_circuit( + dest, circ_id, s, cb.controller, dest._max_dl) if not is_usable: log.warning('When measuring %s %s the destination seemed to have ' 'stopped being usable: %s', relay.nickname, diff --git a/sbws/globals.py b/sbws/globals.py index 2277850d..8a068cc0 100644 --- a/sbws/globals.py +++ b/sbws/globals.py @@ -53,6 +53,12 @@ BW_LINE_SIZE = 510 +# This number might need adjusted depending on the percentage of circuits and +# HTTP requests failures. +# While the scanner can not recover from some/all failing destionations, +# set a big number so that it continues trying. +MAXIMUM_NUMBER_DESTINATION_FAILURES = 100 + def fail_hard(*a, **kw): ''' Log something ... and then exit as fast as possible ''' diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py index 1b4c1927..1b4779cb 100644 --- a/sbws/lib/destination.py +++ b/sbws/lib/destination.py @@ -1,13 +1,11 @@ import logging import random -import time -import os -from threading import RLock import requests from urllib.parse import urlparse from stem.control import EventType import sbws.util.stem as stem_utils -import sbws.util.requests as requests_utils + +from ..globals import MAXIMUM_NUMBER_DESTINATION_FAILURES log = logging.getLogger(__name__) @@ -76,21 +74,32 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl): head = requests_utils.head(session, dest.url, verify=dest.verify) except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: + dest.set_failure() return False, 'Could not connect to {} over circ {} {}: {}'.format( dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e) finally: stem_utils.remove_event_listener(cont, listener) if head.status_code != requests.codes.ok: + dest.set_failure() return False, error_prefix + 'we expected HTTP code '\ '{} not {}'.format(requests.codes.ok, head.status_code) if 'content-length' not in head.headers: + dest.set_failure() return False, error_prefix + 'we except the header Content-Length '\ - 'to exist in the response' + 'to exist in the response' content_length = int(head.headers['content-length']) if max_dl > content_length: + dest.set_failure() return False, error_prefix + 'our maximum configured download size '\ 'is {} but the content is only {}'.format(max_dl, content_length) log.debug('Connected to %s over circuit %s', dest.url, circ_id) + # Any failure connecting to the destination will call set_failure, + # which will set `failed` to True and count consecutives failures. + # It can not be set at the start, to be able to know if it failed a + # a previous time, which is checked by set_failure. + # Future improvement: use a list to count consecutive failures + # or calculate it from the results. + dest.failed = False return True, {'content_length': content_length} @@ -103,17 +112,38 @@ def __init__(self, url, max_dl, verify): assert u.netloc self._url = u self._verify = verify + # Flag to record whether this destination failed in the last + # measurement. + # Failures can happen if: + # - an HTTPS request can not be made over Tor + # (which might be the relays fault, not the destination being + # unreachable) + # - the destination does not support HTTP Range requests. + self.failed = False + self.consecutive_failures = 0 - def is_usable(self, circ_id, session, cont): - ''' Use **connect_to_destination_over_circuit** to determine if this - destination is usable and return what it returns. Just a small wrapper. - ''' - if not isinstance(self.verify, bool): - if not os.path.isfile(self.verify): - return False, '{} is believed to be a CA bundle file on disk '\ - 'but it does not exist'.format(self.verify) - return connect_to_destination_over_circuit( - self, circ_id, session, cont, self._max_dl) + @property + def is_functional(self): + """ + Returns True if there has not been a number consecutive measurements. + Otherwise warn about it and return False. + + """ + if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES: + log.warning("Destination %s is not functional. Please check that " + "it is correct.", self._url) + return False + return True + + def set_failure(self): + """Set failed to True and increase the number of consecutive failures. + Only if it also failed in the previous measuremnt. + + """ + # if it failed in the last measurement + if self.failed: + self.consecutive_failures += 1 + self.failed = True @property def url(self): @@ -159,62 +189,10 @@ def __init__(self, conf, dests, circuit_builder, relay_list, controller): self._cb = circuit_builder self._rl = relay_list self._all_dests = dests - self._usable_dests = [] - self._last_usability_test = 0 - self._usability_test_interval = \ - conf.getint('destinations', 'usability_test_interval') - self._usability_test_timeout = \ - conf.getfloat('general', 'http_timeout') - self._usability_lock = RLock() - def _should_perform_usability_test(self): - return self._last_usability_test + self._usability_test_interval <\ - time.time() - - def _perform_usability_test(self): - self._usability_lock.acquire() - log.debug('Perform usability tests') - cont = self._cont - timeout = self._usability_test_timeout - session = requests_utils.make_session(cont, timeout) - usable_dests = [] - for dest in self._all_dests: - possible_exits = self._rl.exits_not_bad_allowing_port(dest.port) - # Keep the fastest 10% of exits, or 3, whichever is larger - num_keep = int(max(3, len(possible_exits) * 0.1)) - possible_exits = sorted( - possible_exits, key=lambda e: e.consensus_bandwidth, - reverse=True) - exits = possible_exits[0:num_keep] - if len(exits) < 1: - log.warning("There are no exits to perform usability tests.") - continue - # Try three times to build a circuit to test this destination - circ_id = None - for _ in range(0, 3): - # Pick a random exit - exit = self._rng.choice(exits) - circ_id = self._cb.build_circuit([None, exit.fingerprint]) - if circ_id: - break - if not circ_id: - log.warning('Unable to build a circuit to test the usability ' - 'of %s. Assuming it isn\'t usable.', dest.url) - continue - log.debug('Built circ %s %s to test usability of %s', circ_id, - stem_utils.circuit_str(cont, circ_id), dest.url) - is_usable, data = dest.is_usable(circ_id, session, cont) - if not is_usable: - log.warning(data) - self._cb.close_circuit(circ_id) - continue - assert is_usable - log.debug('%s seems usable so we will keep it', dest.url) - usable_dests.append(dest) - self._cb.close_circuit(circ_id) - self._usable_dests = usable_dests - self._last_usability_test = time.time() - self._usability_lock.release() + @property + def functional_destinations(self): + return [d for d in self._all_dests if d.is_functional] @staticmethod def from_config(conf, circuit_builder, relay_list, controller): @@ -245,23 +223,10 @@ def next(self): ''' Returns the next destination that should be used in a measurement ''' - with self._usability_lock: - while True: - if self._should_perform_usability_test(): - self._perform_usability_test() - log.debug('%s/%s of our configured destinations are ' - 'usable at this time', len(self._usable_dests), - len(self._all_dests)) - if len(self._usable_dests) > 0: - break - time_till_next_check = self._usability_test_interval + 0.0001 - log.warning( - 'Of our %d configured destinations, none are usable at ' - 'this time. Sleeping %f seconds on this blocking call ' - 'to DestinationList.next() until we can check for a ' - 'usable destination again.', len(self._all_dests), - time_till_next_check) - time.sleep(time_till_next_check) - - self._rng.shuffle(self._usable_dests) - return self._usable_dests[0] + # Do not perform usability tests since a destination is already proven + # usable or not in every measurement, and it should depend on a X + # number of failures. + # This removes the need for an extra lock for every measurement. + # Do not change the order of the destinations, just return a + # destination. + return self._rng.choice(self.functional_destinations) diff --git a/tests/integration/lib/test_destination.py b/tests/integration/lib/test_destination.py new file mode 100644 index 00000000..cc8c7943 --- /dev/null +++ b/tests/integration/lib/test_destination.py @@ -0,0 +1,95 @@ +"""Integration tests for destination.py""" +from sbws.globals import MAXIMUM_NUMBER_DESTINATION_FAILURES +import sbws.util.requests as requests_utils +from sbws.lib.destination import (DestinationList, Destination, + connect_to_destination_over_circuit) + + +def test_destination_list_no_usability_test_success( + conf, persistent_launch_tor, cb, rl + ): + # In a future refactor, if DestionationList is not initialized with the + # controller, this test should be an unit test. + destination_list, error_msg = DestinationList.from_config( + conf, cb, rl, persistent_launch_tor + ) + # Because there's only 1 destination in conftest, random should return + # the same one. + assert destination_list.next() == \ + destination_list._all_dests[0] + + +def test_connect_to_destination_over_circuit_success(persistent_launch_tor, + dests, cb, rl): + destination = dests.next() + session = requests_utils.make_session(persistent_launch_tor, 10) + # Choose a relay that is not an exit + relay = [r for r in rl.relays + if r.nickname == 'relay1mbyteMAB'][0] + # Choose an exit, for this test it does not matter the bandwidth + helper = rl.exits_not_bad_allowing_port(destination.port)[0] + circuit_path = [relay.fingerprint, helper.fingerprint] + # build a circuit + circuit_id = cb.build_circuit(circuit_path) + # Perform "usability test" + is_usable, response = connect_to_destination_over_circuit( + destination, circuit_id, session, persistent_launch_tor, 1024) + assert is_usable is True + assert 'content_length' in response + assert not destination.failed + assert destination.consecutive_failures == 0 + assert destination.is_functional + + +def test_connect_to_destination_over_circuit_fail(persistent_launch_tor, + dests, cb, rl): + bad_destination = Destination('https://example.example', 1024, False) + # dests._all_dests.append(bad_destination) + # dests._usable_dests.append(bad_destination) + session = requests_utils.make_session(persistent_launch_tor, 10) + # Choose a relay that is not an exit + relay = [r for r in rl.relays + if r.nickname == 'relay1mbyteMAB'][0] + # Choose an exit, for this test it does not matter the bandwidth + helper = rl.exits_not_bad_allowing_port(bad_destination.port)[0] + circuit_path = [relay.fingerprint, helper.fingerprint] + # Build a circuit. + circuit_id = cb.build_circuit(circuit_path) + # Perform "usability test" + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + assert is_usable is False + + # because it is the first time it fails, failures aren't count + assert bad_destination.failed + assert bad_destination.consecutive_failures == 0 + assert bad_destination.is_functional + + # fail twice in a row + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + assert bad_destination.failed + assert bad_destination.consecutive_failures == 1 + assert bad_destination.is_functional + + +def test_functional_destinations(conf, cb, rl, persistent_launch_tor): + good_destination = Destination('https://127.0.0.1:28888', 1024, False) + # Mock that it failed before and just now, but it's still considered + # functional. + good_destination.consecutive_failures = 3 + good_destination.failed = True + bad_destination = Destination('https://example.example', 1024, False) + # Mock that it didn't fail now, but it already failed 11 consecutive + # times. + bad_destination.consecutive_failures = \ + MAXIMUM_NUMBER_DESTINATION_FAILURES + 1 + bad_destination.failed = False + # None of the arguments are used, move to unit tests when this get + # refactored + destination_list = DestinationList( + conf, [good_destination, bad_destination], cb, rl, + persistent_launch_tor) + expected_functional_destinations = [good_destination] + functional_destinations = destination_list.functional_destinations + assert expected_functional_destinations == functional_destinations