Skip to content

Commit

Permalink
Rework driver tests to handle concurrent requests from old/new watcher.
Browse files Browse the repository at this point in the history
Improve shutdown mechanism.
  • Loading branch information
Shaun Crampton committed Nov 10, 2015
1 parent c7cc1be commit 2ca1589
Show file tree
Hide file tree
Showing 2 changed files with 289 additions and 127 deletions.
211 changes: 151 additions & 60 deletions calico/etcddriver/test/stubs.py
Expand Up @@ -19,6 +19,7 @@
Stub objects used for testing driver/protocol code.
"""
import json
import threading

import logging
from Queue import Queue, Empty
Expand All @@ -31,7 +32,15 @@


# Singleton representing a flush in the stream of writes.
FLUSH = object()
class Sigil(object):
def __init__(self, name):
self.name = name

def __str__(self):
return "<%s>" % self.name


FLUSH = Sigil("FLUSH")


class StubMessageReader(MessageReader):
Expand Down Expand Up @@ -93,6 +102,9 @@ def send_message(self, msg_type, fields=None, flush=True):
if flush:
self.flush()

def next_msg(self):
return self.queue.get(timeout=1)

def flush(self):
self.queue.put(FLUSH)

Expand Down Expand Up @@ -128,65 +140,32 @@ def __del__(self):
assert self._finished, "PipeFile wasn't correctly finished."


class StubEtcd(object):
"""
A fake connection to etcd. We hook the driver's _issue_etcd_request
method and block the relevant thread until the test calls one of the
respond_... methods.
"""
def __init__(self):
self.request_queue = Queue()
self.response_queue = Queue()
self.headers = {
"x-etcd-cluster-id": "abcdefg"
}

def request(self, key, **kwargs):
"""
Called from the driver to make a request. Blocks until the
test thread sends a response.
"""
self.request_queue.put((key, kwargs))
response = self.response_queue.get(30)
if isinstance(response, BaseException):
raise response
else:
return response

def get_next_request(self):
"""
Called from the test to get the next request from the driver.
"""
return self.request_queue.get(timeout=1)
class StubRequest(object):
def __init__(self, stub_etcd, key, kwargs):
self.stub_etcd = stub_etcd
self.thread = threading.current_thread()
self.key = key
self.kwargs = kwargs
self.response = None
self.response_available = threading.Event()
self.pipe_file = None

def assert_request(self, expected_key, **expected_args):
"""
Asserts the properies of the next request.
"""
_log.info("Waiting for request for key %s, %s",
expected_key, expected_args)
key, args = self.get_next_request()
default_args = {'wait_index': None,
'preload_content': None,
'recursive': False,
'timeout': 5}
_log.info("Got request for key %s")
for k, v in default_args.iteritems():
if k in args and args[k] == v:
del args[k]
if expected_key != key:
raise AssertionError("Expected request for %s but got %s" %
(expected_key, key))
if expected_args != args:
raise AssertionError("Expected request args %s for %s but got %s" %
(expected_args, key, args))
def __str__(self):
return "Request<key=%s,args=%s,thread=%s>" % (self.key,
self.kwargs,
self.thread)

def respond_with_exception(self, exc):
"""
Called from the test to raise an exception from the current/next
request.
"""
self.response_queue.put(exc)
self.response = exc
self.on_response_avail()

def on_response_avail(self):
self.response_available.set()
self.stub_etcd.on_req_closed(self)

def respond_with_value(self, key, value, dir=False, mod_index=None,
etcd_index=None, status=200, action="get"):
Expand Down Expand Up @@ -234,24 +213,136 @@ def respond_with_data(self, data, etcd_index, status):
Called from the test to return a raw response (e.g. to send
malformed JSON).
"""
headers = self.headers.copy()
headers = self.stub_etcd.headers.copy()
if etcd_index is not None:
headers["x-etcd-index"] = str(etcd_index)
resp = MockResponse(status, data, headers)
self.response_queue.put(resp)
self.response = resp
self.on_response_avail()

def respond_with_stream(self, etcd_index, status=200):
"""
Called from the test to respond with a stream, allowing the test to
send chunks of data in response.
"""
headers = self.headers.copy()
headers = self.stub_etcd.headers.copy()
if etcd_index is not None:
headers["x-etcd-index"] = str(etcd_index)
f = PipeFile()
resp = MockResponse(status, f, headers)
self.response_queue.put(resp)
return f
self.pipe_file = PipeFile()
resp = MockResponse(status, self.pipe_file, headers)
self.response = resp
self.response_available.set() # We leave the req open in StubEtcd.
return self.pipe_file

def get_response(self):
if self.response_available.wait(30):
return self.response
else:
raise AssertionError("No response")

def assert_request(self, expected_key, **expected_args):
"""
Asserts the properies of the next request.
"""
default_args = {'wait_index': None,
'preload_content': None,
'recursive': False,
'timeout': 5}
key = self.key
args = self.kwargs
for k, v in default_args.iteritems():
if k in args and args[k] == v:
del args[k]
if expected_key != key:
raise AssertionError("Expected request for %s but got %s" %
(expected_key, key))
if expected_args != args:
raise AssertionError("Expected request args %s for %s but got %s" %
(expected_args, key, args))

def stop(self):
if self.response_available.is_set():
if self.pipe_file:
self.pipe_file.write(SystemExit())
else:
self.respond_with_exception(SystemExit())


class StubEtcd(object):
"""
A fake connection to etcd. We hook the driver's _issue_etcd_request
method and block the relevant thread until the test calls one of the
respond_... methods.
"""
def __init__(self):
self.request_queue = Queue()
self.response_queue = Queue()
self.headers = {
"x-etcd-cluster-id": "abcdefg"
}
self.lock = threading.Lock()
self.open_reqs = set()

def request(self, key, **kwargs):
"""
Called from the driver to make a request. Blocks until the
test thread sends a response.
"""
_log.info("New request on thread %s: %s, %s",
threading.current_thread(),
key, kwargs)
request = StubRequest(self, key, kwargs)
with self.lock:
self.open_reqs.add(request)
rq = self.request_queue
if rq is None:
_log.warn("Request after shutdown: %s, %s", key, kwargs)
raise SystemExit()
else:
rq.put(request)
response = request.get_response()
if isinstance(response, BaseException):
raise response
else:
return response

def get_next_request(self):
"""
Called from the test to get the next request from the driver.
"""
_log.info("Waiting for next request")
req = self.request_queue.get(timeout=1)
_log.info("Got request %s", req)
return req

def assert_request(self, expected_key, **expected_args):
"""
Asserts the properies of the next request.
"""
req = self.request_queue.get(timeout=1)
req.assert_request(expected_key, **expected_args)
return req

def on_req_closed(self, req):
with self.lock:
self.open_reqs.remove(req)

def stop(self):
_log.info("Stopping stub etcd")
with self.lock:
_log.info("stop() got rq_lock")
while True:
try:
req = self.request_queue.get_nowait()
except Empty:
break
else:
self.open_reqs.add(req)
self.request_queue = None
for req in list(self.open_reqs):
_log.info("Aborting request %s", req)
req.stop()
_log.info("Stub etcd stopped; future requests should self-abort")


class MockResponse(object):
Expand Down

0 comments on commit 2ca1589

Please sign in to comment.