diff --git a/calico/etcddriver/driver.py b/calico/etcddriver/driver.py index 3760be2bfee..114b5077c79 100644 --- a/calico/etcddriver/driver.py +++ b/calico/etcddriver/driver.py @@ -151,19 +151,20 @@ def stop(self): def _read_from_socket(self): """ - Thread: reader thread. Reads messages from Felix. - - So far, this means reading the init message and then dealing - with the exception if Felix dies. + Thread: reader thread. Reads messages from Felix and fans them out. """ try: while not self._stop_event.is_set(): for msg_type, msg in self._msg_reader.new_messages(timeout=1): if msg_type == MSG_TYPE_INIT: + # Init message, received at start of day. self._handle_init(msg) elif msg_type == MSG_TYPE_CONFIG: + # Config message, expected after we send the raw + # config to Felix. self._handle_config(msg) elif msg_type == MSG_TYPE_RESYNC: + # Request to do a resync. self._handle_resync(msg) else: _log.error("Unexpected message from Felix: %s", msg) @@ -227,8 +228,7 @@ def _resync_and_merge(self): self._send_status(STATUS_WAIT_FOR_READY) self._wait_for_ready() self._preload_config() - # Now (on the first run through) wait for Felix to process the - # config. + # Wait for config if we have not already received it. self._wait_for_config() # Kick off the snapshot request as far as the headers. self._send_status(STATUS_RESYNC) @@ -252,7 +252,7 @@ def _resync_and_merge(self): socket.error) as e: _log.error("Request to etcd failed: %r; resyncing.", e) if monotonic_time() - loop_start < 1: - _log.debug("May be tight looping, sleeping...") + _log.warning("May be tight looping, sleeping...") time.sleep(1) except DriverShutdown: _log.info("Driver shut down.") @@ -268,8 +268,7 @@ def _resync_and_merge(self): def _wait_for_config(self): while not self._config_received.is_set(): _log.info("Waiting for Felix to process the config...") - if self._stop_event.is_set(): - raise DriverShutdown() + self._check_stop_event() self._config_received.wait(1) _log.info("Felix sent us the config, continuing.") @@ -292,6 +291,12 @@ def _wait_for_ready(self): else: _log.info("Ready flag set to %s", etcd_resp["node"]["value"]) self._hwms.update_hwm(READY_KEY, mod_idx) + self._check_stop_event() + + def _check_stop_event(self): + if self._stop_event.is_set(): + _log.info("Told to stop, raising DriverShutdown.") + raise DriverShutdown() def _preload_config(self): """ @@ -447,16 +452,29 @@ def _process_snapshot_and_events(self, etcd_response, snapshot_index): def _handle_etcd_node(self, snap_mod, snap_key, snap_value, snapshot_index=None): + """ + Callback for use with parse_snapshot. Called once for each key/value + pair that is found. + + Handles the key/value itself and then checks for work from the + watcher. + + :param snap_mod: Modified index of the key. + :param snap_key: The key itself. + :param snap_value: The value attached to the key. + :param snapshot_index: Index of the snapshot as a whole. + """ assert snapshot_index is not None old_hwm = self._hwms.update_hwm(snap_key, snapshot_index) if snap_mod > old_hwm: # This specific key's HWM is newer than the previous # version we've seen, send an update. self._on_key_updated(snap_key, snap_value) - # After we process an update from the snapshot, process - # several updates from the watcher queue (if there are - # any). We limit the number to ensure that we always - # finish the snapshot eventually. + # After we process an update from the snapshot, process several + # updates from the watcher queue (if there are any). We limit the + # number to ensure that we always finish the snapshot eventually. + # The limit isn't too sensitive but values much lower than 100 seemed + # to starve the watcher in testing. for _ in xrange(100): if not self._watcher_queue or self._watcher_queue.empty(): # Don't block on the watcher if there's nothing to do. @@ -469,9 +487,7 @@ def _handle_etcd_node(self, snap_mod, snap_key, snap_value, _log.warning("Watcher thread died, continuing " "with snapshot") break - if self._stop_event.is_set(): - _log.error("Stop event set, exiting") - raise DriverShutdown() + self._check_stop_event() def _process_events_only(self): """ @@ -485,6 +501,7 @@ def _process_events_only(self): while not self._stop_event.is_set(): self._handle_next_watcher_event(resync_in_progress=False) self._msg_writer.flush() + self._check_stop_event() def _scan_for_deletions(self, snapshot_index): """ @@ -552,6 +569,8 @@ def _start_watcher(self, snapshot_index): """ Starts the watcher thread, creating its queue and event in the process. """ + # Defensive: stop the watcher if it's already running. + self._stop_watcher() self._watcher_queue = Queue() self._watcher_stop_event = Event() # Note: we pass the queue and event in as arguments so that the thread @@ -590,6 +609,9 @@ def _on_key_updated(self, key, value): deletion). """ if key == READY_KEY and value != "true": + # Special case: the global Ready flag has been unset, trigger a + # resync, which will poll the Ready flag until it is set to true + # again. _log.warning("Ready key no longer set to true, triggering resync.") raise ResyncRequired() self._msg_writer.send_message( diff --git a/calico/etcddriver/hwm.py b/calico/etcddriver/hwm.py index 5d3d8ecf2da..21da89bce24 100644 --- a/calico/etcddriver/hwm.py +++ b/calico/etcddriver/hwm.py @@ -30,8 +30,14 @@ _log = logging.getLogger(__name__) +# The trie implementation that we use requires us to specify the character set +# in advance... +# Symbols that are allowed in our etcd keys. TRIE_SYMBOLS = "/_-:." +# Chars we allow in the trie. In addition to alphanumerics and our +# white-listed symbols, we also use % for %-encoding of unexpected symbols. TRIE_CHARS = string.ascii_letters + string.digits + TRIE_SYMBOLS + "%" +# Regex that matches chars that are allowed in the trie. TRIE_CHARS_MATCH = re.compile(r'^[%s]+$' % re.escape(TRIE_CHARS)) @@ -100,7 +106,7 @@ def stop_tracking_deletions(self): self._deletion_hwms = None self._latest_deletion = None - def update_hwm(self, key, hwm): + def update_hwm(self, key, new_mod_idx): """ Updates the HWM for a key if the new value is greater than the old. If deletion tracking is enabled, resolves deletions so that updates @@ -110,29 +116,29 @@ def update_hwm(self, key, hwm): :return int|NoneType: the old HWM of the key (or the HWM at which it was deleted) or None if it did not previously exist. """ - _log.debug("Updating HWM for %s to %s", key, hwm) + _log.debug("Updating HWM for %s to %s", key, new_mod_idx) key = encode_key(key) if (self._deletion_hwms is not None and # Optimization: avoid expensive lookup if this update comes # after all deletions. - hwm < self._latest_deletion): + new_mod_idx < self._latest_deletion): # We're tracking deletions, check that this key hasn't been # deleted. del_hwm = self._deletion_hwms.longest_prefix_value(key, None) - if hwm < del_hwm: + if new_mod_idx < del_hwm: _log.debug("Key %s previously deleted, skipping", key) return del_hwm try: old_hwm = self._hwms[key] # Trie doesn't have get(). except KeyError: old_hwm = None - if old_hwm < hwm: # Works for None too. + if old_hwm < new_mod_idx: # Works for None too. _log.debug("Key %s HWM updated to %s, previous %s", - key, hwm, old_hwm) - self._hwms[key] = hwm + key, new_mod_idx, old_hwm) + self._hwms[key] = new_mod_idx return old_hwm - def store_deletion(self, key, hwm): + def store_deletion(self, key, deletion_mod_idx): """ Store that a given key (or directory) was deleted at a given HWM. :return: List of known keys that were deleted. This will be the @@ -140,10 +146,10 @@ def store_deletion(self, key, hwm): """ _log.debug("Key %s deleted", key) key = encode_key(key) - self._latest_deletion = max(hwm, self._latest_deletion) + self._latest_deletion = max(deletion_mod_idx, self._latest_deletion) if self._deletion_hwms is not None: _log.debug("Tracking deletion in deletions trie") - self._deletion_hwms[key] = hwm + self._deletion_hwms[key] = deletion_mod_idx deleted_keys = [] for child_key, child_mod in self._hwms.items(key): del self._hwms[child_key] @@ -193,10 +199,15 @@ def encode_key(key): here than to blow up. """ if key[-1] != "/": - key += "/" - key = unicode(urllib.quote(key.encode("utf8"), safe=TRIE_SYMBOLS)) - assert TRIE_CHARS_MATCH.match(key) - return key + suffixed_key = key + "/" + else: + suffixed_key = key + encoded_key = unicode(urllib.quote(suffixed_key.encode("utf8"), + safe=TRIE_SYMBOLS)) + assert TRIE_CHARS_MATCH.match(encoded_key), ( + "Key %r encoded to %r contained invalid chars" % (key, encoded_key) + ) + return encoded_key def decode_key(key): diff --git a/calico/etcddriver/protocol.py b/calico/etcddriver/protocol.py index c2204c478c4..5fa5a80b3e4 100644 --- a/calico/etcddriver/protocol.py +++ b/calico/etcddriver/protocol.py @@ -138,10 +138,14 @@ def new_messages(self, timeout=1): Generator: generates 0 or more tuples containing message type and message body (as a dict). + May generate 0 events in certain conditions even if there are + events available. (If the socket returns EAGAIN, for example.) + :param timeout: Maximum time to block waiting on the socket before giving up. No exception is raised upon timeout but 0 events are generated. :raises SocketClosed if the socket is closed. + :raises socket.error if an unexpected socket error occurs. """ if timeout is not None: read_ready, _, _ = select.select([self._sck], [], [], timeout) diff --git a/calico/etcddriver/test/stubs.py b/calico/etcddriver/test/stubs.py index ec95a2be2d7..f43575e71d5 100644 --- a/calico/etcddriver/test/stubs.py +++ b/calico/etcddriver/test/stubs.py @@ -235,7 +235,8 @@ def respond_with_stream(self, etcd_index, status=200): return self.pipe_file def get_response(self): - if self.response_available.wait(30): + self.response_available.wait(timeout=30) # returns None in Python 2.6 + if self.response_available.is_set(): return self.response else: raise AssertionError("No response") diff --git a/calico/etcddriver/test/test_driver.py b/calico/etcddriver/test/test_driver.py index a58e45da963..3b7dde2d1e0 100644 --- a/calico/etcddriver/test/test_driver.py +++ b/calico/etcddriver/test/test_driver.py @@ -697,6 +697,10 @@ def test_shutdown_before_config(self): self.driver._stop_event.set() self.assertRaises(DriverShutdown, self.driver._wait_for_config) + def test_shutdown_before_ready(self): + self.driver._stop_event.set() + self.assertRaises(DriverShutdown, self.driver._wait_for_ready) + def test_issue_etcd_request_basic_get(self): # Initialise the etcd URL. self.driver._handle_init({ @@ -882,7 +886,7 @@ def test_join_not_stopped(self): def test_process_events_stopped(self): self.driver._stop_event.set() - self.driver._process_events_only() + self.assertRaises(DriverShutdown, self.driver._process_events_only) def dump_all_thread_stacks(): diff --git a/calico/felix/felix.py b/calico/felix/felix.py index 7e109e9c50d..13aa1b53c75 100644 --- a/calico/felix/felix.py +++ b/calico/felix/felix.py @@ -214,7 +214,7 @@ def main(): try: config = Config(options.config_file) - except Exception as e: + except Exception: # Config loading error, and not just invalid parameters (from optparse) # as they generate a SystemExit. Attempt to open a log file, ignoring # any errors it gets, before we raise the exception. diff --git a/calico/felix/fetcd.py b/calico/felix/fetcd.py index bd90433c96a..dac2ee00961 100644 --- a/calico/felix/fetcd.py +++ b/calico/felix/fetcd.py @@ -239,7 +239,9 @@ def start_watch(self, splitter): Starts watching etcd for changes. Implicitly loads the config if it hasn't been loaded yet. """ - self._watcher.load_config.set() + assert self._watcher.load_config.is_set(), ( + "load_config() should be called before start_watch()." + ) self._watcher.splitter = splitter self._watcher.begin_polling.set() @@ -271,7 +273,7 @@ class _FelixEtcdWatcher(gevent.Greenlet): """ Greenlet that communicates with the etcd driver over a socket. - * Does the initial handshake with the driver, sening it the init + * Does the initial handshake with the driver, sending it the init message. * Receives the pre-loaded config from the driver and uses that to do Felix's one-off configuration. @@ -490,14 +492,14 @@ def _on_status_from_driver(self, msg): The driver sends us status messages whenever its status changes. It moves through these states: - * wait-for-ready (waiting for the global ready flag to become set) - * resync (resyncing with etcd, processing a snapshot and any - concurrent events) - * in-sync (snapshot processsing complete, now processing only events - from etcd) + (1) wait-for-ready (waiting for the global ready flag to become set) + (2) resync (resyncing with etcd, processing a snapshot and any + concurrent events) + (3) in-sync (snapshot processsing complete, now processing only events + from etcd) - If it falls out of sync with etcd then it moves back into - wait-for-ready state and starts again. + If the driver falls out of sync with etcd then it will start again + from (1). If the status is in-sync, triggers the relevant processing. """ @@ -789,7 +791,7 @@ def _finish_msg_batch(self, batch, results): self._cleanup_pending): # Schedule a timer to stop our rate limiting or retry cleanup. timeout = self._config.ENDPOINT_REPORT_DELAY - timeout *= 0.9 + (random.random() * 0.2) # Jitter by +/- 10%. + timeout *= (0.9 + (random.random() * 0.2)) # Jitter by +/- 10%. gevent.spawn_later(timeout, self._on_timer_pop, async=True) diff --git a/calico/felix/test/test_fetcd.py b/calico/felix/test/test_fetcd.py index 24fb3cd8bab..4ee5279399a 100644 --- a/calico/felix/test/test_fetcd.py +++ b/calico/felix/test/test_fetcd.py @@ -136,6 +136,7 @@ def test_load_config(self): def test_start_watch(self): m_splitter = Mock() + self.api.load_config(async=True) result = self.api.start_watch(m_splitter, async=True) self.step_actor(self.api) self.m_etcd_watcher.load_config.set.assert_called_once_with()