Skip to content

Commit

Permalink
Code review markups for concurrent resync function.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaun Crampton committed Nov 12, 2015
1 parent 2ca1589 commit d61de4a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 43 deletions.
54 changes: 38 additions & 16 deletions calico/etcddriver/driver.py
Expand Up @@ -151,19 +151,20 @@ def stop(self):

def _read_from_socket(self):
"""
Thread: reader thread. Reads messages from Felix.
So far, this means reading the init message and then dealing
with the exception if Felix dies.
Thread: reader thread. Reads messages from Felix and fans them out.
"""
try:
while not self._stop_event.is_set():
for msg_type, msg in self._msg_reader.new_messages(timeout=1):
if msg_type == MSG_TYPE_INIT:
# Init message, received at start of day.
self._handle_init(msg)
elif msg_type == MSG_TYPE_CONFIG:
# Config message, expected after we send the raw
# config to Felix.
self._handle_config(msg)
elif msg_type == MSG_TYPE_RESYNC:
# Request to do a resync.
self._handle_resync(msg)
else:
_log.error("Unexpected message from Felix: %s", msg)
Expand Down Expand Up @@ -227,8 +228,7 @@ def _resync_and_merge(self):
self._send_status(STATUS_WAIT_FOR_READY)
self._wait_for_ready()
self._preload_config()
# Now (on the first run through) wait for Felix to process the
# config.
# Wait for config if we have not already received it.
self._wait_for_config()
# Kick off the snapshot request as far as the headers.
self._send_status(STATUS_RESYNC)
Expand All @@ -252,7 +252,7 @@ def _resync_and_merge(self):
socket.error) as e:
_log.error("Request to etcd failed: %r; resyncing.", e)
if monotonic_time() - loop_start < 1:
_log.debug("May be tight looping, sleeping...")
_log.warning("May be tight looping, sleeping...")
time.sleep(1)
except DriverShutdown:
_log.info("Driver shut down.")
Expand All @@ -268,8 +268,7 @@ def _resync_and_merge(self):
def _wait_for_config(self):
while not self._config_received.is_set():
_log.info("Waiting for Felix to process the config...")
if self._stop_event.is_set():
raise DriverShutdown()
self._check_stop_event()
self._config_received.wait(1)
_log.info("Felix sent us the config, continuing.")

Expand All @@ -292,6 +291,12 @@ def _wait_for_ready(self):
else:
_log.info("Ready flag set to %s", etcd_resp["node"]["value"])
self._hwms.update_hwm(READY_KEY, mod_idx)
self._check_stop_event()

def _check_stop_event(self):
if self._stop_event.is_set():
_log.info("Told to stop, raising DriverShutdown.")
raise DriverShutdown()

def _preload_config(self):
"""
Expand Down Expand Up @@ -447,16 +452,29 @@ def _process_snapshot_and_events(self, etcd_response, snapshot_index):

def _handle_etcd_node(self, snap_mod, snap_key, snap_value,
snapshot_index=None):
"""
Callback for use with parse_snapshot. Called once for each key/value
pair that is found.
Handles the key/value itself and then checks for work from the
watcher.
:param snap_mod: Modified index of the key.
:param snap_key: The key itself.
:param snap_value: The value attached to the key.
:param snapshot_index: Index of the snapshot as a whole.
"""
assert snapshot_index is not None
old_hwm = self._hwms.update_hwm(snap_key, snapshot_index)
if snap_mod > old_hwm:
# This specific key's HWM is newer than the previous
# version we've seen, send an update.
self._on_key_updated(snap_key, snap_value)
# After we process an update from the snapshot, process
# several updates from the watcher queue (if there are
# any). We limit the number to ensure that we always
# finish the snapshot eventually.
# After we process an update from the snapshot, process several
# updates from the watcher queue (if there are any). We limit the
# number to ensure that we always finish the snapshot eventually.
# The limit isn't too sensitive but values much lower than 100 seemed
# to starve the watcher in testing.
for _ in xrange(100):
if not self._watcher_queue or self._watcher_queue.empty():
# Don't block on the watcher if there's nothing to do.
Expand All @@ -469,9 +487,7 @@ def _handle_etcd_node(self, snap_mod, snap_key, snap_value,
_log.warning("Watcher thread died, continuing "
"with snapshot")
break
if self._stop_event.is_set():
_log.error("Stop event set, exiting")
raise DriverShutdown()
self._check_stop_event()

def _process_events_only(self):
"""
Expand All @@ -485,6 +501,7 @@ def _process_events_only(self):
while not self._stop_event.is_set():
self._handle_next_watcher_event(resync_in_progress=False)
self._msg_writer.flush()
self._check_stop_event()

def _scan_for_deletions(self, snapshot_index):
"""
Expand Down Expand Up @@ -552,6 +569,8 @@ def _start_watcher(self, snapshot_index):
"""
Starts the watcher thread, creating its queue and event in the process.
"""
# Defensive: stop the watcher if it's already running.
self._stop_watcher()
self._watcher_queue = Queue()
self._watcher_stop_event = Event()
# Note: we pass the queue and event in as arguments so that the thread
Expand Down Expand Up @@ -590,6 +609,9 @@ def _on_key_updated(self, key, value):
deletion).
"""
if key == READY_KEY and value != "true":
# Special case: the global Ready flag has been unset, trigger a
# resync, which will poll the Ready flag until it is set to true
# again.
_log.warning("Ready key no longer set to true, triggering resync.")
raise ResyncRequired()
self._msg_writer.send_message(
Expand Down
39 changes: 25 additions & 14 deletions calico/etcddriver/hwm.py
Expand Up @@ -30,8 +30,14 @@

_log = logging.getLogger(__name__)

# The trie implementation that we use requires us to specify the character set
# in advance...
# Symbols that are allowed in our etcd keys.
TRIE_SYMBOLS = "/_-:."
# Chars we allow in the trie. In addition to alphanumerics and our
# white-listed symbols, we also use % for %-encoding of unexpected symbols.
TRIE_CHARS = string.ascii_letters + string.digits + TRIE_SYMBOLS + "%"
# Regex that matches chars that are allowed in the trie.
TRIE_CHARS_MATCH = re.compile(r'^[%s]+$' % re.escape(TRIE_CHARS))


Expand Down Expand Up @@ -100,7 +106,7 @@ def stop_tracking_deletions(self):
self._deletion_hwms = None
self._latest_deletion = None

def update_hwm(self, key, hwm):
def update_hwm(self, key, new_mod_idx):
"""
Updates the HWM for a key if the new value is greater than the old.
If deletion tracking is enabled, resolves deletions so that updates
Expand All @@ -110,40 +116,40 @@ def update_hwm(self, key, hwm):
:return int|NoneType: the old HWM of the key (or the HWM at which it
was deleted) or None if it did not previously exist.
"""
_log.debug("Updating HWM for %s to %s", key, hwm)
_log.debug("Updating HWM for %s to %s", key, new_mod_idx)
key = encode_key(key)
if (self._deletion_hwms is not None and
# Optimization: avoid expensive lookup if this update comes
# after all deletions.
hwm < self._latest_deletion):
new_mod_idx < self._latest_deletion):
# We're tracking deletions, check that this key hasn't been
# deleted.
del_hwm = self._deletion_hwms.longest_prefix_value(key, None)
if hwm < del_hwm:
if new_mod_idx < del_hwm:
_log.debug("Key %s previously deleted, skipping", key)
return del_hwm
try:
old_hwm = self._hwms[key] # Trie doesn't have get().
except KeyError:
old_hwm = None
if old_hwm < hwm: # Works for None too.
if old_hwm < new_mod_idx: # Works for None too.
_log.debug("Key %s HWM updated to %s, previous %s",
key, hwm, old_hwm)
self._hwms[key] = hwm
key, new_mod_idx, old_hwm)
self._hwms[key] = new_mod_idx
return old_hwm

def store_deletion(self, key, hwm):
def store_deletion(self, key, deletion_mod_idx):
"""
Store that a given key (or directory) was deleted at a given HWM.
:return: List of known keys that were deleted. This will be the
leaves only when a subtree is being deleted.
"""
_log.debug("Key %s deleted", key)
key = encode_key(key)
self._latest_deletion = max(hwm, self._latest_deletion)
self._latest_deletion = max(deletion_mod_idx, self._latest_deletion)
if self._deletion_hwms is not None:
_log.debug("Tracking deletion in deletions trie")
self._deletion_hwms[key] = hwm
self._deletion_hwms[key] = deletion_mod_idx
deleted_keys = []
for child_key, child_mod in self._hwms.items(key):
del self._hwms[child_key]
Expand Down Expand Up @@ -193,10 +199,15 @@ def encode_key(key):
here than to blow up.
"""
if key[-1] != "/":
key += "/"
key = unicode(urllib.quote(key.encode("utf8"), safe=TRIE_SYMBOLS))
assert TRIE_CHARS_MATCH.match(key)
return key
suffixed_key = key + "/"
else:
suffixed_key = key
encoded_key = unicode(urllib.quote(suffixed_key.encode("utf8"),
safe=TRIE_SYMBOLS))
assert TRIE_CHARS_MATCH.match(encoded_key), (
"Key %r encoded to %r contained invalid chars" % (key, encoded_key)
)
return encoded_key


def decode_key(key):
Expand Down
4 changes: 4 additions & 0 deletions calico/etcddriver/protocol.py
Expand Up @@ -138,10 +138,14 @@ def new_messages(self, timeout=1):
Generator: generates 0 or more tuples containing message type and
message body (as a dict).
May generate 0 events in certain conditions even if there are
events available. (If the socket returns EAGAIN, for example.)
:param timeout: Maximum time to block waiting on the socket before
giving up. No exception is raised upon timeout but 0 events
are generated.
:raises SocketClosed if the socket is closed.
:raises socket.error if an unexpected socket error occurs.
"""
if timeout is not None:
read_ready, _, _ = select.select([self._sck], [], [], timeout)
Expand Down
3 changes: 2 additions & 1 deletion calico/etcddriver/test/stubs.py
Expand Up @@ -235,7 +235,8 @@ def respond_with_stream(self, etcd_index, status=200):
return self.pipe_file

def get_response(self):
if self.response_available.wait(30):
self.response_available.wait(timeout=30) # returns None in Python 2.6
if self.response_available.is_set():
return self.response
else:
raise AssertionError("No response")
Expand Down
6 changes: 5 additions & 1 deletion calico/etcddriver/test/test_driver.py
Expand Up @@ -697,6 +697,10 @@ def test_shutdown_before_config(self):
self.driver._stop_event.set()
self.assertRaises(DriverShutdown, self.driver._wait_for_config)

def test_shutdown_before_ready(self):
self.driver._stop_event.set()
self.assertRaises(DriverShutdown, self.driver._wait_for_ready)

def test_issue_etcd_request_basic_get(self):
# Initialise the etcd URL.
self.driver._handle_init({
Expand Down Expand Up @@ -882,7 +886,7 @@ def test_join_not_stopped(self):

def test_process_events_stopped(self):
self.driver._stop_event.set()
self.driver._process_events_only()
self.assertRaises(DriverShutdown, self.driver._process_events_only)


def dump_all_thread_stacks():
Expand Down
2 changes: 1 addition & 1 deletion calico/felix/felix.py
Expand Up @@ -214,7 +214,7 @@ def main():

try:
config = Config(options.config_file)
except Exception as e:
except Exception:
# Config loading error, and not just invalid parameters (from optparse)
# as they generate a SystemExit. Attempt to open a log file, ignoring
# any errors it gets, before we raise the exception.
Expand Down
22 changes: 12 additions & 10 deletions calico/felix/fetcd.py
Expand Up @@ -239,7 +239,9 @@ def start_watch(self, splitter):
Starts watching etcd for changes. Implicitly loads the config
if it hasn't been loaded yet.
"""
self._watcher.load_config.set()
assert self._watcher.load_config.is_set(), (
"load_config() should be called before start_watch()."
)
self._watcher.splitter = splitter
self._watcher.begin_polling.set()

Expand Down Expand Up @@ -271,7 +273,7 @@ class _FelixEtcdWatcher(gevent.Greenlet):
"""
Greenlet that communicates with the etcd driver over a socket.
* Does the initial handshake with the driver, sening it the init
* Does the initial handshake with the driver, sending it the init
message.
* Receives the pre-loaded config from the driver and uses that
to do Felix's one-off configuration.
Expand Down Expand Up @@ -490,14 +492,14 @@ def _on_status_from_driver(self, msg):
The driver sends us status messages whenever its status changes.
It moves through these states:
* wait-for-ready (waiting for the global ready flag to become set)
* resync (resyncing with etcd, processing a snapshot and any
concurrent events)
* in-sync (snapshot processsing complete, now processing only events
from etcd)
(1) wait-for-ready (waiting for the global ready flag to become set)
(2) resync (resyncing with etcd, processing a snapshot and any
concurrent events)
(3) in-sync (snapshot processsing complete, now processing only events
from etcd)
If it falls out of sync with etcd then it moves back into
wait-for-ready state and starts again.
If the driver falls out of sync with etcd then it will start again
from (1).
If the status is in-sync, triggers the relevant processing.
"""
Expand Down Expand Up @@ -789,7 +791,7 @@ def _finish_msg_batch(self, batch, results):
self._cleanup_pending):
# Schedule a timer to stop our rate limiting or retry cleanup.
timeout = self._config.ENDPOINT_REPORT_DELAY
timeout *= 0.9 + (random.random() * 0.2) # Jitter by +/- 10%.
timeout *= (0.9 + (random.random() * 0.2)) # Jitter by +/- 10%.
gevent.spawn_later(timeout,
self._on_timer_pop,
async=True)
Expand Down
1 change: 1 addition & 0 deletions calico/felix/test/test_fetcd.py
Expand Up @@ -136,6 +136,7 @@ def test_load_config(self):

def test_start_watch(self):
m_splitter = Mock()
self.api.load_config(async=True)
result = self.api.start_watch(m_splitter, async=True)
self.step_actor(self.api)
self.m_etcd_watcher.load_config.set.assert_called_once_with()
Expand Down

0 comments on commit d61de4a

Please sign in to comment.