From 2ca158908ec0d623cd686e621e82b7c4060bf8f0 Mon Sep 17 00:00:00 2001 From: Shaun Crampton Date: Tue, 10 Nov 2015 16:46:23 +0000 Subject: [PATCH] Rework driver tests to handle concurrent requests from old/new watcher. Improve shutdown mechanism. --- calico/etcddriver/test/stubs.py | 211 ++++++++++++++++++-------- calico/etcddriver/test/test_driver.py | 205 +++++++++++++++++-------- 2 files changed, 289 insertions(+), 127 deletions(-) diff --git a/calico/etcddriver/test/stubs.py b/calico/etcddriver/test/stubs.py index d886931164d..ec95a2be2d7 100644 --- a/calico/etcddriver/test/stubs.py +++ b/calico/etcddriver/test/stubs.py @@ -19,6 +19,7 @@ Stub objects used for testing driver/protocol code. """ import json +import threading import logging from Queue import Queue, Empty @@ -31,7 +32,15 @@ # Singleton representing a flush in the stream of writes. -FLUSH = object() +class Sigil(object): + def __init__(self, name): + self.name = name + + def __str__(self): + return "<%s>" % self.name + + +FLUSH = Sigil("FLUSH") class StubMessageReader(MessageReader): @@ -93,6 +102,9 @@ def send_message(self, msg_type, fields=None, flush=True): if flush: self.flush() + def next_msg(self): + return self.queue.get(timeout=1) + def flush(self): self.queue.put(FLUSH) @@ -128,65 +140,32 @@ def __del__(self): assert self._finished, "PipeFile wasn't correctly finished." -class StubEtcd(object): - """ - A fake connection to etcd. We hook the driver's _issue_etcd_request - method and block the relevant thread until the test calls one of the - respond_... methods. - """ - def __init__(self): - self.request_queue = Queue() - self.response_queue = Queue() - self.headers = { - "x-etcd-cluster-id": "abcdefg" - } - - def request(self, key, **kwargs): - """ - Called from the driver to make a request. Blocks until the - test thread sends a response. - """ - self.request_queue.put((key, kwargs)) - response = self.response_queue.get(30) - if isinstance(response, BaseException): - raise response - else: - return response - - def get_next_request(self): - """ - Called from the test to get the next request from the driver. - """ - return self.request_queue.get(timeout=1) +class StubRequest(object): + def __init__(self, stub_etcd, key, kwargs): + self.stub_etcd = stub_etcd + self.thread = threading.current_thread() + self.key = key + self.kwargs = kwargs + self.response = None + self.response_available = threading.Event() + self.pipe_file = None - def assert_request(self, expected_key, **expected_args): - """ - Asserts the properies of the next request. - """ - _log.info("Waiting for request for key %s, %s", - expected_key, expected_args) - key, args = self.get_next_request() - default_args = {'wait_index': None, - 'preload_content': None, - 'recursive': False, - 'timeout': 5} - _log.info("Got request for key %s") - for k, v in default_args.iteritems(): - if k in args and args[k] == v: - del args[k] - if expected_key != key: - raise AssertionError("Expected request for %s but got %s" % - (expected_key, key)) - if expected_args != args: - raise AssertionError("Expected request args %s for %s but got %s" % - (expected_args, key, args)) + def __str__(self): + return "Request" % (self.key, + self.kwargs, + self.thread) def respond_with_exception(self, exc): """ Called from the test to raise an exception from the current/next request. """ - self.response_queue.put(exc) + self.response = exc + self.on_response_avail() + + def on_response_avail(self): + self.response_available.set() + self.stub_etcd.on_req_closed(self) def respond_with_value(self, key, value, dir=False, mod_index=None, etcd_index=None, status=200, action="get"): @@ -234,24 +213,136 @@ def respond_with_data(self, data, etcd_index, status): Called from the test to return a raw response (e.g. to send malformed JSON). """ - headers = self.headers.copy() + headers = self.stub_etcd.headers.copy() if etcd_index is not None: headers["x-etcd-index"] = str(etcd_index) resp = MockResponse(status, data, headers) - self.response_queue.put(resp) + self.response = resp + self.on_response_avail() def respond_with_stream(self, etcd_index, status=200): """ Called from the test to respond with a stream, allowing the test to send chunks of data in response. """ - headers = self.headers.copy() + headers = self.stub_etcd.headers.copy() if etcd_index is not None: headers["x-etcd-index"] = str(etcd_index) - f = PipeFile() - resp = MockResponse(status, f, headers) - self.response_queue.put(resp) - return f + self.pipe_file = PipeFile() + resp = MockResponse(status, self.pipe_file, headers) + self.response = resp + self.response_available.set() # We leave the req open in StubEtcd. + return self.pipe_file + + def get_response(self): + if self.response_available.wait(30): + return self.response + else: + raise AssertionError("No response") + + def assert_request(self, expected_key, **expected_args): + """ + Asserts the properies of the next request. + """ + default_args = {'wait_index': None, + 'preload_content': None, + 'recursive': False, + 'timeout': 5} + key = self.key + args = self.kwargs + for k, v in default_args.iteritems(): + if k in args and args[k] == v: + del args[k] + if expected_key != key: + raise AssertionError("Expected request for %s but got %s" % + (expected_key, key)) + if expected_args != args: + raise AssertionError("Expected request args %s for %s but got %s" % + (expected_args, key, args)) + + def stop(self): + if self.response_available.is_set(): + if self.pipe_file: + self.pipe_file.write(SystemExit()) + else: + self.respond_with_exception(SystemExit()) + + +class StubEtcd(object): + """ + A fake connection to etcd. We hook the driver's _issue_etcd_request + method and block the relevant thread until the test calls one of the + respond_... methods. + """ + def __init__(self): + self.request_queue = Queue() + self.response_queue = Queue() + self.headers = { + "x-etcd-cluster-id": "abcdefg" + } + self.lock = threading.Lock() + self.open_reqs = set() + + def request(self, key, **kwargs): + """ + Called from the driver to make a request. Blocks until the + test thread sends a response. + """ + _log.info("New request on thread %s: %s, %s", + threading.current_thread(), + key, kwargs) + request = StubRequest(self, key, kwargs) + with self.lock: + self.open_reqs.add(request) + rq = self.request_queue + if rq is None: + _log.warn("Request after shutdown: %s, %s", key, kwargs) + raise SystemExit() + else: + rq.put(request) + response = request.get_response() + if isinstance(response, BaseException): + raise response + else: + return response + + def get_next_request(self): + """ + Called from the test to get the next request from the driver. + """ + _log.info("Waiting for next request") + req = self.request_queue.get(timeout=1) + _log.info("Got request %s", req) + return req + + def assert_request(self, expected_key, **expected_args): + """ + Asserts the properies of the next request. + """ + req = self.request_queue.get(timeout=1) + req.assert_request(expected_key, **expected_args) + return req + + def on_req_closed(self, req): + with self.lock: + self.open_reqs.remove(req) + + def stop(self): + _log.info("Stopping stub etcd") + with self.lock: + _log.info("stop() got rq_lock") + while True: + try: + req = self.request_queue.get_nowait() + except Empty: + break + else: + self.open_reqs.add(req) + self.request_queue = None + for req in list(self.open_reqs): + _log.info("Aborting request %s", req) + req.stop() + _log.info("Stub etcd stopped; future requests should self-abort") class MockResponse(object): diff --git a/calico/etcddriver/test/test_driver.py b/calico/etcddriver/test/test_driver.py index 984e4a4a4c0..a58e45da963 100644 --- a/calico/etcddriver/test/test_driver.py +++ b/calico/etcddriver/test/test_driver.py @@ -19,12 +19,13 @@ Tests for the etcd driver module. """ import json +import traceback from Queue import Empty -import time from StringIO import StringIO from unittest import TestCase +import sys from mock import Mock, patch, call from urllib3 import HTTPConnectionPool from urllib3.exceptions import TimeoutError, HTTPError @@ -80,10 +81,10 @@ def test_mainline_resync(self): # Initial handshake. self.start_driver_and_handshake() # Check for etcd request and start the response. - snap_stream = self.start_snapshot_response() + snap_stream, watcher_req = self.start_snapshot_response() # Respond to the watcher, this should get merged into the event # stream at some point later. - self.watcher_etcd.respond_with_value( + watcher_req.respond_with_value( "/calico/v1/adir/bkey", "b", mod_index=12, @@ -92,7 +93,7 @@ def test_mainline_resync(self): # Wait until the watcher makes its next request (with revved # wait_index) to make sure it has queued its event to the resync # thread. - self.watcher_etcd.assert_request( + watcher_req = self.watcher_etcd.assert_request( VERSION_DIR, recursive=True, timeout=90, wait_index=13 ) # Write some more data to the resync thread, it should process that @@ -113,7 +114,7 @@ def test_mainline_resync(self): MSG_KEY_VALUE: "b", }) # Respond to the watcher with another event. - self.watcher_etcd.respond_with_value( + watcher_req.respond_with_value( "/calico/v1/adir2/dkey", "d", mod_index=13, @@ -122,7 +123,7 @@ def test_mainline_resync(self): # Wait until the watcher makes its next request (with revved # wait_index) to make sure it has queued its event to the resync # thread. - self.watcher_etcd.assert_request( + watcher_req = self.watcher_etcd.assert_request( VERSION_DIR, recursive=True, timeout=90, wait_index=14 ) # Send the resync thread some data that should be ignored due to the @@ -156,7 +157,7 @@ def test_mainline_resync(self): # HWM. self.assert_status_message(STATUS_IN_SYNC) # Now send a watcher event, which should go straight through. - self.send_watcher_event_and_assert_felix_msg(14) + self.send_watcher_event_and_assert_felix_msg(14, req=watcher_req) # Check the contents of the trie. keys = set(self.driver._hwms._hwms.keys()) @@ -167,10 +168,9 @@ def test_mainline_resync(self): u'/calico/v1/adir2/dkey/', u'/calico/v1/adir/ekey/'])) - def test_many_events_during_resync(self): """ - Test of the mainline resync-and-merge processing. + Test many events during resync * Does the initial config handshake with Felix. * Interleaves the snapshot response with updates via the watcher. @@ -180,18 +180,18 @@ def test_many_events_during_resync(self): self.start_driver_and_handshake() # Check for etcd request and start the response. - snap_stream = self.start_snapshot_response() + snap_stream, watcher_req = self.start_snapshot_response() # Respond to the watcher, this should get merged into the event # stream at some point later. for ii in xrange(200): - self.watcher_etcd.respond_with_value( + watcher_req.respond_with_value( "/calico/v1/adir/bkey", "watch", mod_index=11 + ii, action="set" ) - self.watcher_etcd.assert_request( + watcher_req = self.watcher_etcd.assert_request( VERSION_DIR, recursive=True, timeout=90, wait_index=12 + ii ) snap_stream.write(''' @@ -225,39 +225,99 @@ def test_many_events_during_resync(self): def test_felix_triggers_resync(self): self._run_initial_resync() - # Send a resync request from Felix. - self.send_resync_and_wait_for_flag() - # Wait for the watcher to make its request. - self.watcher_etcd.assert_request( + watcher_req = self.watcher_etcd.assert_request( VERSION_DIR, recursive=True, timeout=90, wait_index=15 ) - # Then for determinism, force it to die before it polls again. - self.driver._watcher_stop_event.set() - # The event from the watcher triggers the resync. - self.send_watcher_event_and_assert_felix_msg(15) - # Back into wait-for-ready mode. + # Take a copy of the watcher stop event so that we don't race to read + # it. + watcher_stop_event = self.driver._watcher_stop_event + + # Send a resync request from Felix. + self.msg_reader.send_msg(MSG_TYPE_RESYNC, {}) + + # Respond to the watcher, this should trigger the resync. + watcher_req.respond_with_value( + "/calico/v1/adir/ekey", + "e", + mod_index=15, + action="set" + ) + + # Resync thread should tell the watcher to die. + watcher_stop_event.wait(timeout=1) + + self.assert_msg_to_felix(MSG_TYPE_UPDATE, { + MSG_KEY_KEY: "/calico/v1/adir/ekey", + MSG_KEY_VALUE: "e", + }) + self.assert_flush_to_felix() self.assert_status_message(STATUS_WAIT_FOR_READY) + # Re-do the config handshake. self.do_handshake() - # Check for etcd request and start the response. - snap_stream = self.start_snapshot_response(etcd_index=100) + # We should get a request to load the full snapshot. + watcher_req = self.resync_etcd.assert_request( + VERSION_DIR, recursive=True, timeout=120, preload_content=False + ) + snap_stream = watcher_req.respond_with_stream( + etcd_index=100 + ) + + # There could be more than one watcher now so we need to be careful + # to respond to the right one... + watcher_req = self.watcher_etcd.get_next_request() + if watcher_req.kwargs["wait_index"] == 16: + # Old watcher thread + watcher_req.respond_with_value("/calico/v1/adir/ekey", "e", + mod_index=99) + watcher_req = self.watcher_etcd.get_next_request() + # watcher_req should be from the new watcher thread + self.assertEqual(watcher_req.kwargs["wait_index"], 101) + + # Start sending the snapshot response: + snap_stream.write('''{ + "action": "get", + "node": { + "key": "/calico/v1", + "dir": true, + "nodes": [ + { + "key": "/calico/v1/adir", + "dir": true, + "nodes": [ + { + "key": "/calico/v1/adir/akey", + "value": "akey's value", + "modifiedIndex": 98 + }, + ''') + # Should generate a message to felix even though it's only seen part + # of the response... + self.assert_msg_to_felix(MSG_TYPE_UPDATE, { + MSG_KEY_KEY: "/calico/v1/adir/akey", + MSG_KEY_VALUE: "akey's value", + }) + # Respond to the watcher, this should get merged into the event # stream at some point later. - self.watcher_etcd.respond_with_value( + watcher_req.respond_with_value( "/calico/v1/adir/bkey", "b", mod_index=102, action="set" ) + # Wait until the watcher makes its next request (with revved # wait_index) to make sure it has queued its event to the resync - # thread. - self.watcher_etcd.assert_request( - VERSION_DIR, recursive=True, timeout=90, wait_index=103 - ) + # thread. Skip any events fro the old watcher. + watcher_req = self.watcher_etcd.get_next_request() + if watcher_req.kwargs["wait_index"] in (16, 100): + watcher_req = self.watcher_etcd.get_next_request() + self.assertFalse(watcher_req.kwargs["wait_index"] in (16, 100)) + # Write some data for an unchanged key to the resync thread, which # should be ignored. snap_stream.write(''' @@ -299,28 +359,18 @@ def test_felix_triggers_resync(self): # HWM. self.assert_status_message(STATUS_IN_SYNC) # Now send a watcher event, which should go straight through. - self.send_watcher_event_and_assert_felix_msg(104) - - def send_resync_and_wait_for_flag(self): - # Felix sends a resync message. - self.msg_reader.send_msg(MSG_TYPE_RESYNC, {}) - - # For determinism, wait for the message to be processed. - for _ in xrange(100): - if self.driver._resync_requested: - break - time.sleep(0.01) - else: - self.fail("Resync flag never got set.") + self.send_watcher_event_and_assert_felix_msg(104, req=watcher_req) def test_directory_deletion(self): self._run_initial_resync() # For coverage: Nothing happens for a while, poll times out. - self.watcher_etcd.respond_with_exception( + watcher_req = self.watcher_etcd.get_next_request() + watcher_req.respond_with_exception( driver.ReadTimeoutError(Mock(), "", "") ) # For coverage: Then a set to a dir, which should be ignored. - self.watcher_etcd.respond_with_data(json.dumps({ + watcher_req = self.watcher_etcd.get_next_request() + watcher_req.respond_with_data(json.dumps({ "action": "create", "node": { "key": "/calico/v1/foo", @@ -328,7 +378,8 @@ def test_directory_deletion(self): } }), 100, 200) # Then a whole directory is deleted. - self.watcher_etcd.respond_with_value( + watcher_req = self.watcher_etcd.get_next_request() + watcher_req.respond_with_value( "/calico/v1/adir", dir=True, value=None, @@ -364,7 +415,7 @@ def test_directory_deletion(self): def _run_initial_resync(self): try: # Start by going through the first resync. - self.test_mainline_resync() + self.test_mainline_resync() # Returns open watcher req. except AssertionError: _log.exception("Mainline resync test failed, aborting test %s", self.id()) @@ -374,7 +425,8 @@ def _run_initial_resync(self): def test_root_directory_deletion(self): self._run_initial_resync() # Delete the whole /calico/v1 dir. - self.watcher_etcd.respond_with_data(json.dumps({ + watcher_req = self.watcher_etcd.get_next_request() + watcher_req.respond_with_data(json.dumps({ "action": "delete", "node": { "key": "/calico/v1/", @@ -388,13 +440,16 @@ def test_root_directory_deletion(self): def test_garbage_watcher_response(self): self._run_initial_resync() # Delete the whole /calico/v1 dir. - self.watcher_etcd.respond_with_data("{foobar", 100, 200) + watcher_req = self.watcher_etcd.get_next_request() + watcher_req.respond_with_data("{foobar", 100, 200) # Should trigger a resync. self.assert_status_message(STATUS_WAIT_FOR_READY) - def send_watcher_event_and_assert_felix_msg(self, etcd_index): - self.watcher_etcd.respond_with_value( + def send_watcher_event_and_assert_felix_msg(self, etcd_index, req=None): + if req is None: + req = self.watcher_etcd.get_next_request() + req.respond_with_value( "/calico/v1/adir/ekey", "e", mod_index=etcd_index, @@ -434,7 +489,7 @@ def test_resync_etcd_read_fail(self, m_sleep): # Initial handshake. self.start_driver_and_handshake() # Start streaming some data. - snap_stream = self.start_snapshot_response() + snap_stream, watcher_req = self.start_snapshot_response() # But then the read times out... snap_stream.write(TimeoutError()) # Triggering a restart of the resync loop. @@ -444,8 +499,8 @@ def test_resync_etcd_read_fail(self, m_sleep): def test_bad_ready_key_retry(self, m_sleep): self.start_driver_and_init() # Respond to etcd request with a bad response - self.resync_etcd.assert_request(READY_KEY) - self.resync_etcd.respond_with_data("foobar", 123, 500) + req = self.resync_etcd.assert_request(READY_KEY) + req.respond_with_data("foobar", 123, 500) # Then it should retry. self.resync_etcd.assert_request(READY_KEY) m_sleep.assert_called_once_with(1) @@ -464,18 +519,19 @@ def start_driver_and_handshake(self): def do_handshake(self): # Respond to etcd request with ready == true. - self.resync_etcd.assert_request(READY_KEY) - self.resync_etcd.respond_with_value(READY_KEY, "true", mod_index=10) + req = self.resync_etcd.assert_request(READY_KEY) + req.respond_with_value(READY_KEY, "true", mod_index=10) # Then etcd should get the global config request. - self.resync_etcd.assert_request(CONFIG_DIR, recursive=True) - self.resync_etcd.respond_with_dir(CONFIG_DIR, { + req = self.resync_etcd.assert_request(CONFIG_DIR, recursive=True) + req.respond_with_dir(CONFIG_DIR, { CONFIG_DIR + "/InterfacePrefix": "tap", CONFIG_DIR + "/Foo": None, # Directory }) # Followed by the per-host one... - self.resync_etcd.assert_request("/calico/v1/host/thehostname/config", - recursive=True) - self.resync_etcd.respond_with_data('{"errorCode": 100}', + req = self.resync_etcd.assert_request( + "/calico/v1/host/thehostname/config", recursive=True + ) + req.respond_with_data('{"errorCode": 100}', 10, 404) # Then the driver should send the config to Felix. self.assert_msg_to_felix( @@ -501,16 +557,16 @@ def do_handshake(self): def start_snapshot_response(self, etcd_index=10): # We should get a request to load the full snapshot. - self.resync_etcd.assert_request( + req = self.resync_etcd.assert_request( VERSION_DIR, recursive=True, timeout=120, preload_content=False ) - snap_stream = self.resync_etcd.respond_with_stream( + snap_stream = req.respond_with_stream( etcd_index=etcd_index ) # And then the headers should trigger a request from the watcher # including the etcd_index we sent even though we haven't sent a # response body to the resync thread. - self.watcher_etcd.assert_request( + req = self.watcher_etcd.assert_request( VERSION_DIR, recursive=True, timeout=90, wait_index=etcd_index+1 ) # Start sending the snapshot response: @@ -536,7 +592,7 @@ def start_snapshot_response(self, etcd_index=10): MSG_KEY_KEY: "/calico/v1/adir/akey", MSG_KEY_VALUE: "akey's value", }) - return snap_stream + return snap_stream, req def assert_status_message(self, status): _log.info("Expecting %s status from driver...", status) @@ -557,7 +613,7 @@ def send_init_msg(self): def assert_msg_to_felix(self, msg_type, fields=None): try: - mt, fs = self.msg_writer.queue.get(timeout=2) + mt, fs = self.msg_writer.next_msg() except Empty: self.fail("Expected %s message to felix but no message was sent" % msg_type) @@ -609,10 +665,12 @@ def tearDown(self): self.msg_reader.send_timeout() # SystemExit kills (only) the thread silently. - self.resync_etcd.respond_with_exception(SystemExit()) - self.watcher_etcd.respond_with_exception(SystemExit()) + self.resync_etcd.stop() + self.watcher_etcd.stop() # Wait for it to stop. - self.assertTrue(self.driver.join(0.1), "Driver failed to stop") + if not self.driver.join(1): + dump_all_thread_stacks() + self.fail("Driver failed to stop") finally: # Now the driver is stopped, it's safe to remove our patch of # complete_logging() @@ -827,3 +885,16 @@ def test_process_events_stopped(self): self.driver._process_events_only() +def dump_all_thread_stacks(): + print >> sys.stderr, "\n*** STACKTRACE - START ***\n" + code = [] + for threadId, stack in sys._current_frames().items(): + code.append("\n# ThreadID: %s" % threadId) + for filename, lineno, name, line in traceback.extract_stack(stack): + code.append('File: "%s", line %d, in %s' % (filename, + lineno, name)) + if line: + code.append(" %s" % (line.strip())) + for line in code: + print >> sys.stderr, line + print >> sys.stderr, "\n*** STACKTRACE - END ***\n" \ No newline at end of file