diff --git a/.gitignore b/.gitignore index 70345d0..6e42562 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ .coverage _trial_temp/ /docs/_build +htmlcov/ +.cache/ diff --git a/.travis.yml b/.travis.yml index 2c4f319..85b552a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,22 +1,31 @@ language: python python: - "2.7" - - "pypy" +matrix: + include: + - python: "pypy" + env: PYPY_VERSION="4.0.1" NO_COVERAGE=1 cache: directories: - $HOME/.cache/pip + - $HOME/downloads + +before_install: + # If necessary, set up an appropriate version of pypy. + - if [ ! -z "$PYPY_VERSION" ]; then source setup-pypy-travis.sh; fi + - if [ ! -z "$PYPY_VERSION" ]; then python --version 2>&1 | fgrep "PyPy $PYPY_VERSION"; fi install: - - pip install twine - - pip install coveralls - pip install --upgrade pip - - pip install flake8 - - pip install -r requirements-dev.txt - pip install -e . + - pip install -r requirements-dev.txt + - pip install coveralls + - pip install twine script: - flake8 consular - - py.test consular --cov consular + - if [ -z "$NO_COVERAGE" ]; then COVERAGE_OPT="--cov consular"; else COVERAGE_OPT=""; fi + - py.test consular $COVERAGE_OPT after_success: - - coveralls + - if [ -z "$NO_COVERAGE" ]; then coveralls; fi deploy: provider: pypi user: smn diff --git a/VERSION b/VERSION index 6d7de6e..f0bb29e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.2 +1.3.0 diff --git a/consular/cli.py b/consular/cli.py index 21dbaa9..d973330 100644 --- a/consular/cli.py +++ b/consular/cli.py @@ -1,7 +1,7 @@ import click import sys -from urllib import urlencode +from uritools import uricompose @click.command() @@ -16,9 +16,10 @@ @click.option('--marathon', default='http://localhost:8080', help='The Marathon HTTP API') @click.option('--registration-id', - help=('Auto register for Marathon event callbacks with the ' - 'registration-id. Must be unique for each consular ' - 'process.'), type=str) + help=('Name used to register for event callbacks in Marathon as ' + 'well as to register services in Consul. Must be unique ' + 'for each consular process.'), + type=str, default='consular') @click.option('--sync-interval', help=('Automatically sync the apps in Marathon with what\'s ' 'in Consul every _n_ seconds. Defaults to 0 (disabled).'), @@ -53,27 +54,22 @@ def main(scheme, host, port, sync_interval, purge, logfile, debug, timeout, fallback, fallback_timeout): # pragma: no cover from consular.main import Consular - from twisted.internet.task import LoopingCall from twisted.internet import reactor from twisted.python import log log.startLogging(logfile) - consular = Consular(consul, marathon, fallback) - consular.debug = debug - consular.timeout = timeout + consular = Consular(consul, marathon, fallback, registration_id) + consular.set_debug(debug) + consular.set_timeout(timeout) consular.fallback_timeout = fallback_timeout - if registration_id: - events_url = "%s://%s:%s/events?%s" % ( - scheme, host, port, - urlencode({ - 'registration': registration_id, - })) - consular.register_marathon_event_callback(events_url) + events_url = uricompose( + scheme=scheme, host=host, port=port, path='/events', + query={'registration': registration_id}) + consular.register_marathon_event_callback(events_url) if sync_interval > 0: - lc = LoopingCall(consular.sync_apps, purge) - lc.start(sync_interval, now=True) + consular.schedule_sync(sync_interval, purge) consular.run(host, port) reactor.run() diff --git a/consular/clients.py b/consular/clients.py new file mode 100644 index 0000000..73d4163 --- /dev/null +++ b/consular/clients.py @@ -0,0 +1,299 @@ +import json +import treq + +from twisted.internet import reactor +from twisted.python import log +from twisted.web import client +from twisted.web.http import OK + +from uritools import uricompose, urisplit + +# Twisted's default HTTP11 client factory is way too verbose +client._HTTP11ClientFactory.noisy = False + + +class JsonClient(object): + debug = False + clock = reactor + timeout = 5 + agent = None + + def __init__(self, endpoint): + """ + Create a client with the specified default endpoint. + """ + self.endpoint = urisplit(endpoint) + self.pool = client.HTTPConnectionPool(self.clock, persistent=False) + + def requester(self, *args, **kwargs): + return treq.request(*args, **kwargs) + + def _log_http_response(self, response, method, path, data): + log.msg('%s %s with %s returned: %s' % ( + method, path, data, response.code)) + return response + + def _log_http_error(self, failure, url): + log.err(failure, 'Error performing request to %s' % (url,)) + return failure + + def request(self, method, path, query=None, endpoint=None, json_data=None, + **kwargs): + """ + Perform a request. A number of basic defaults are set on the request + that make using a JSON API easier. These defaults can be overridden by + setting the parameters in the keyword args. + + :param: method: + The HTTP method to use (example is `GET`). + :param: path: + The URL path (example is `/v2/apps`). + :param: query: + The URL query parameters as a dict. + :param: endpoint: + The URL endpoint to use. The default value is the endpoint this + client was created with (`self.endpoint`) (example is + `http://localhost:8080`) + :param: json_data: + A python data structure that will be converted to a JSON string + using `json.dumps` and used as the request body. + :param: kwargs: + Any other parameters that will be passed to `treq.request`, for + example headers or parameters. + """ + if endpoint is not None: + scheme, authority = urisplit(endpoint)[:2] + else: + scheme, authority = self.endpoint[:2] + url = uricompose(scheme, authority, path, query) + + data = json.dumps(json_data) if json_data else None + requester_kwargs = { + 'headers': { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + 'data': data, + 'pool': self.pool, + 'agent': self.agent, + 'timeout': self.timeout + } + requester_kwargs.update(kwargs) + + d = self.requester(method, url, **requester_kwargs) + + if self.debug: + d.addCallback(self._log_http_response, method, url, data) + + d.addErrback(self._log_http_error, url) + return d.addCallback(self._raise_for_status, url) + + def get_json(self, path, query=None, **kwargs): + """ + Perform a GET request to the given path and return the JSON response. + """ + d = self.request('GET', path, query, **kwargs) + return d.addCallback(lambda response: response.json()) + + def _raise_for_status(self, response, url): + """ + Raises an `HTTPError` if the response did not succeed. + Adapted from the Requests library: + https://github.com/kennethreitz/requests/blob/v2.8.1/requests/models.py#L825-L837 + """ + http_error_msg = '' + + if 400 <= response.code < 500: + http_error_msg = '%s Client Error for url: %s' % (response.code, + url) + + elif 500 <= response.code < 600: + http_error_msg = '%s Server Error for url: %s' % (response.code, + url) + + if http_error_msg: + raise HTTPError(http_error_msg, response) + + return response + + +class HTTPError(IOError): + """ + Error raised for 4xx and 5xx response codes. + """ + def __init__(self, message, response): + self.response = response + super(HTTPError, self).__init__(message) + + +class MarathonClient(JsonClient): + + def get_json_field(self, path, field): + """ + Perform a GET request and get the contents of the JSON response. + + Marathon's JSON responses tend to contain an object with a single key + which points to the actual data of the response. For example /v2/apps + returns something like {"apps": [ {"app1"}, {"app2"} ]}. We're + interested in the contents of "apps". + """ + return self.get_json(path).addCallback(self._get_json_field, field) + + def _get_json_field(self, response_json, field_name): + """ + Get a JSON field from the response JSON. + + :param: response_json: + The parsed JSON content of the response. + :param: field_name: + The name of the field in the JSON to get. + """ + if field_name not in response_json: + raise KeyError('Unable to get value for "%s" from Marathon ' + 'response: "%s"' % ( + field_name, json.dumps(response_json),)) + + return response_json[field_name] + + def get_event_subscriptions(self): + """ + Get the current Marathon event subscriptions, returning a list of + callback URLs. + """ + return self.get_json_field( + '/v2/eventSubscriptions', 'callbackUrls') + + def post_event_subscription(self, callback_url): + """ + Post a new Marathon event subscription with the given callback URL. + """ + d = self.request( + 'POST', '/v2/eventSubscriptions', {'callbackUrl': callback_url}) + return d.addCallback(lambda response: response.code == OK) + + def get_apps(self): + """ + Get the currently running Marathon apps, returning a list of app + definitions. + """ + return self.get_json_field('/v2/apps', 'apps') + + def get_app(self, app_id): + """ + Get information about the app with the given app ID. + """ + return self.get_json_field('/v2/apps%s' % (app_id,), 'app') + + def get_app_tasks(self, app_id): + """ + Get the currently running tasks for the app with the given app ID, + returning a list of task definitions. + """ + return self.get_json_field('/v2/apps%s/tasks' % (app_id,), 'tasks') + + +class ConsulClient(JsonClient): + + fallback_timeout = 2 + + def __init__(self, endpoint, enable_fallback=False): + """ + Create a Consul client. + + :param: endpoint: + The default Consul endpoint, usually on the same node as Consular + is running. + :param: enable_fallback: + Fall back to the default Consul endpoint when registering services + on an agent that cannot be reached. + """ + super(ConsulClient, self).__init__(endpoint) + self.enable_fallback = enable_fallback + + def _get_agent_endpoint(self, agent_address): + """ + Use the default endpoint to construct the agent endpoint from an + address, i.e. use the same scheme and port but swap in the address. + """ + return uricompose(scheme=self.endpoint.scheme, host=agent_address, + port=self.endpoint.port) + + def register_agent_service(self, agent_address, registration): + """ + Register a Consul service at the given agent address. + """ + agent_endpoint = self._get_agent_endpoint(agent_address) + d = self.request('PUT', '/v1/agent/service/register', + endpoint=agent_endpoint, json_data=registration) + + if self.enable_fallback: + d.addErrback(self._register_agent_service_fallback, registration) + + return d + + def _register_agent_service_fallback(self, failure, registration): + """ + Fallback to the default agent endpoint (`self.endpoint`) to register + a Consul service. + """ + log.msg('Falling back for %s at %s.' % ( + registration['Name'], self.endpoint)) + return self.request( + 'PUT', '/v1/agent/service/register', json_data=registration, + timeout=self.fallback_timeout) + + def deregister_agent_service(self, agent_address, service_id): + """ + Deregister a Consul service at the given agent address. + """ + agent_endpoint = self._get_agent_endpoint(agent_address) + return self.request('PUT', '/v1/agent/service/deregister/%s' % ( + service_id,), endpoint=agent_endpoint) + + def put_kv(self, key, value): + """ + Put a key/value in Consul's k/v store. + """ + return self.request( + 'PUT', '/v1/kv/%s' % (key,), json_data=value) + + def get_kv_keys(self, keys_path, separator=None): + """ + Get the stored keys for the given keys path from the Consul k/v store. + + :param: keys_path: + The path to some keys (example is `consular/my-app/`). + :param: separator: + Get all the keys up to some separator in the key path. Useful for + getting all the keys non-recursively for a path. For more + information see the Consul API documentation. + """ + query = {'keys': None} + if separator: + query['separator'] = separator + return self.get_json('/v1/kv/%s' % (keys_path,), query) + + def delete_kv_keys(self, key, recurse=False): + """ + Delete the store key(s) at the given path from the Consul k/v store. + + :param: key: + The key or key path to be deleted. + :param: recurse: + Whether or not to recursively delete all subpaths of the key. + """ + query = {'recurse': None} if recurse else None + return self.request('DELETE', '/v1/kv/%s' % (key,), query) + + def get_catalog_nodes(self): + """ + Get the list of active Consul nodes from the catalog. + """ + return self.get_json('/v1/catalog/nodes') + + def get_agent_services(self, agent_address): + """ + Get the list of running services for the given agent address. + """ + agent_endpoint = self._get_agent_endpoint(agent_address) + return self.get_json('/v1/agent/services', endpoint=agent_endpoint) diff --git a/consular/main.py b/consular/main.py index 67b7a19..824bfcc 100644 --- a/consular/main.py +++ b/consular/main.py @@ -1,25 +1,45 @@ import json -from urllib import quote, urlencode +from consular.clients import ConsulClient, MarathonClient, HTTPError + from twisted.internet import reactor -from twisted.web import client, server -# Twisted's fault HTTP11 client factory is way too verbose -client._HTTP11ClientFactory.noisy = False -from twisted.internet.defer import ( - succeed, inlineCallbacks, returnValue, gatherResults) +from twisted.web import server +from twisted.internet.defer import succeed, inlineCallbacks, returnValue +from twisted.internet.task import LoopingCall +from twisted.web.http import NOT_FOUND from twisted.python import log - -import treq from klein import Klein -def get_appid(app_id_string): - return app_id_string.rsplit('/', 1)[1] - - -def get_agent_endpoint(host): - return 'http://%s:8500' % (host,) +def get_app_name(app_id): + """ + Get the app name from the marathon app ID. Separators in the ID ('/') are + replaced with '-'s while the leading separator is removed. + """ + return app_id.lstrip('/').replace('/', '-') + + +@inlineCallbacks +def handle_not_found_error(f, *args, **kwargs): + """ + Perform a request and catch the not found (404) error if one occurs. + + :param: f: The function to call to perform the request. The function may + return a deferred. + :param: args: The arguments to call the function with. + :param: kwargs: The keyword arguments to call the function with. + :returns: The return value of the function call or None if there was a 404 + response code. + """ + try: + response = yield f(*args, **kwargs) + except HTTPError as e: + if e.response.code == NOT_FOUND: + response = None + else: + raise e + returnValue(response) class ConsularSite(server.Site): @@ -32,100 +52,117 @@ def log(self, request): class Consular(object): + """ + :param str consul_endpoint: + The HTTP endpoint for Consul (often http://example.org:8500). + :param str marathon_endpoint: + The HTTP endpoint for Marathon (often http://example.org:8080). + :param bool enable_fallback: + Fallback to the main Consul endpoint for registrations if unable + to reach Consul running on the machine running a specific Marathon + task. + :param str registration_id: + A unique parameter for this Consul server. It is used for house-keeping + purposes such as purging tasks that are no longer running in Marathon. + """ app = Klein() - debug = False + _debug = False clock = reactor - timeout = 5 - fallback_timeout = 2 - requester = lambda self, *a, **kw: treq.request(*a, **kw) - - def __init__(self, consul_endpoint, marathon_endpoint, enable_fallback): - self.consul_endpoint = consul_endpoint - self.marathon_endpoint = marathon_endpoint - self.pool = client.HTTPConnectionPool(self.clock, persistent=False) - self.enable_fallback = enable_fallback + + def __init__(self, consul_endpoint, marathon_endpoint, enable_fallback, + registration_id): + self.consul_client = ConsulClient(consul_endpoint, enable_fallback) + self.marathon_client = MarathonClient(marathon_endpoint) + self.registration_id = registration_id self.event_dispatch = { 'status_update_event': self.handle_status_update_event, } + def set_debug(self, debug): + self._debug = debug + self.consul_client.debug = debug + self.marathon_client.debug = debug + + def set_timeout(self, timeout): + self.consul_client.timeout = timeout + self.marathon_client.timeout = timeout + + def set_requester(self, requester): + self.consul_client.requester = requester + self.marathon_client.requester = requester + def run(self, host, port): + """ + Starts the HTTP server. + + :param str host: + The host to bind to (example is ``localhost``) + :param int port: + The port to listen on (example is ``7000``) + """ site = ConsularSite(self.app.resource()) - site.debug = self.debug + site.debug = self._debug self.clock.listenTCP(port, site, interface=host) - def get_marathon_event_callbacks(self): - d = self.marathon_request('GET', '/v2/eventSubscriptions') - d.addErrback(log.err) - d.addCallback(lambda response: response.json()) - d.addCallback(self.get_marathon_event_callbacks_from_json) - return d - - def get_marathon_event_callbacks_from_json(self, json): + def schedule_sync(self, interval, purge=False): """ - Marathon may return a bad response when we get the existing event - callbacks. A common cause for this is that Marathon is not properly - configured. Raise an exception with information from Marathon if this - is the case, else return the callback URLs from the JSON response. + Schedule a recurring sync of apps, starting after this method is + called. + + :param float interval: + The number of seconds between syncs. + :param bool purge: + Whether to purge old apps after each sync. + :return: + A tuple of the LoopingCall object and the deferred created when it + was started. """ - if 'callbackUrls' not in json: - raise RuntimeError('Unable to get existing event callbacks from ' + - 'Marathon: %r' % (str(json),)) - - return json['callbackUrls'] + lc = LoopingCall(self._try_sync_apps, purge) + lc.clock = self.clock + return (lc, lc.start(interval, now=True)) - def create_marathon_event_callback(self, url): - d = self.marathon_request( - 'POST', '/v2/eventSubscriptions?%s' % urlencode({ - 'callbackUrl': url, - })) - d.addErrback(log.err) - d.addCallback(lambda response: response.code == 200) - return d + @inlineCallbacks + def _try_sync_apps(self, purge=False): + """ + Sync the apps, catching and logging any exception that occurs. + """ + try: + yield self.sync_apps(purge) + except Exception as e: + # TODO: More specialised exception handling. + log.msg('Error syncing apps: %s' % e) @inlineCallbacks def register_marathon_event_callback(self, events_url): - existing_callbacks = yield self.get_marathon_event_callbacks() + """ + Register Consular with Marathon to receive HTTP event callbacks. + To use this ensure that `Marathon is configured`_ to send HTTP event + callbacks for state changes in tasks. + + :param str events_url: + The HTTP endpoint to register with Marathon for event callbacks. + + .. _`Marathon is configured`: + https://mesosphere.github.io/marathon/docs/event-bus.html + #configuration + """ + existing_callbacks = ( + yield self.marathon_client.get_event_subscriptions()) already_registered = any( [events_url == url for url in existing_callbacks]) if already_registered: log.msg('Consular event callback already registered.') returnValue(True) - registered = yield self.create_marathon_event_callback(events_url) + registered = ( + yield self.marathon_client.post_event_subscription(events_url)) if registered: log.msg('Consular event callback registered.') else: log.err('Consular event callback registration failed.') returnValue(registered) - def log_http_response(self, response, method, path, data): - log.msg('%s %s with %s returned: %s' % ( - method, path, data, response.code)) - return response - - def marathon_request(self, method, path, data=None): - return self._request( - method, '%s%s' % (self.marathon_endpoint, path), data) - - def consul_request(self, method, url, data=None): - return self._request(method, url, data, timeout=self.fallback_timeout) - - def _request(self, method, url, data, timeout=None): - d = self.requester( - method, - url.encode('utf-8'), - headers={ - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - data=(json.dumps(data) if data is not None else None), - pool=self.pool, - timeout=timeout or self.timeout) - if self.debug: - d.addCallback(self.log_http_response, method, url, data) - return d - @app.route('/') def index(self, request): request.setHeader('Content-Type', 'application/json') @@ -133,6 +170,12 @@ def index(self, request): @app.route('/events') def events(self, request): + """ + Listens to incoming events from Marathon on ``/events``. + + :param klein.app.KleinRequest request: + The Klein HTTP request + """ request.setHeader('Content-Type', 'application/json') event = json.load(request.content) handler = self.event_dispatch.get( @@ -140,6 +183,24 @@ def events(self, request): return handler(request, event) def handle_status_update_event(self, request, event): + """ + Handles status updates from Marathon. + + The various task stages are handled as follows: + + TASK_STAGING: ignored + TASK_STARTING: ignored + TASK_RUNNING: task data updated on Consul + TASK_FINISHED: task data removed from Consul + TASK_FAILED: task data removed from Consul + TASK_KILLED: task data removed from Consul + TASK_LOST: task data removed from Consul + + :param klein.app.KleinRequest request: + The Klein HTTP request + :param dict event: + The Marathon event + """ dispatch = { 'TASK_STAGING': self.noop, 'TASK_STARTING': self.noop, @@ -157,19 +218,30 @@ def noop(self, request, event): 'status': 'ok' })) + @inlineCallbacks def update_task_running(self, request, event): - # NOTE: Marathon sends a list of ports, I don't know yet when & if - # there are multiple values in that list. - d = self.get_app(event['appId']) - d.addCallback(lambda app: self.sync_app(app)) - d.addCallback(lambda _: json.dumps({'status': 'ok'})) - return d + """ Use a running event to register a new Consul service. """ + # Register the task as a service + yield self.register_task_service( + event['appId'], event['taskId'], event['host'], event['ports']) + + # Sync the app labels in case they've changed or aren't stored yet + app = yield handle_not_found_error( + self.marathon_client.get_app, event['appId']) + + # The app could have disappeared in this time if it was destroyed. If + # it has been destroyed, do nothing and wait for the TASK_KILLED event + # to clear it. + if app is not None: + yield self.sync_app_labels(app) + else: + log.msg('Warning. App with ID "%s" could not be found for new ' + 'task with ID "%s"' % (event['appId'], event['taskId'],)) + + returnValue(json.dumps({'status': 'ok'})) def update_task_killed(self, request, event): - d = self.deregister_service( - get_agent_endpoint(event['host']), - get_appid(event['appId']), - event['taskId']) + d = self.deregister_task_service(event['taskId'], event['host']) d.addCallback(lambda _: json.dumps({'status': 'ok'})) return d @@ -182,128 +254,393 @@ def handle_unknown_event(self, request, event): 'error': 'Event type %s not supported.' % (event_type,) }) - def register_service(self, agent_endpoint, - app_id, service_id, address, port): + def reg_id_tag(self): + """ Get the registration ID tag for this instance of Consular. """ + return self._consular_tag('reg-id', self.registration_id) + + def app_id_tag(self, app_id): + """ Get the app ID tag for the given app ID. """ + return self._consular_tag('app-id', app_id) + + def _consular_tag(self, tag_name, value): + return self._consular_tag_key(tag_name) + value + + def get_app_id_from_tags(self, tags): + """ + Get the app ID from the app ID tag in the given tags, or None if the + tag could not be found. + """ + return self._find_consular_tag(tags, 'app-id') + + def _find_consular_tag(self, tags, tag_name): + pseudo_key = self._consular_tag_key(tag_name) + matches = [tag for tag in tags if tag.startswith(pseudo_key)] + + if not matches: + return None + if len(matches) > 1: + raise RuntimeError('Multiple (%d) Consular tags found for key ' + '"%s": %s' + % (len(matches), pseudo_key, matches,)) + + return matches[0].lstrip(pseudo_key) + + def _consular_tag_key(self, tag_name): + return 'consular-%s=' % (tag_name,) + + def _create_service_registration(self, app_id, service_id, address, port): + """ + Create the request body for registering a service with Consul. + """ + registration = { + 'Name': get_app_name(app_id), + 'ID': service_id, + 'Address': address, + 'Tags': [ + self.reg_id_tag(), + self.app_id_tag(app_id), + ] + } + if port is not None: + registration['Port'] = port + + return registration + + def register_task_service(self, app_id, task_id, host, ports): + """ + Register a Marathon task as a service in Consul. + + :param str app_id: + The ID of the Marathon app that the task belongs to. + :param str task_id: + The ID of the task, this will be used as the Consul service ID. + :param str host: + The host address of the machine the task is running on. + :param list ports: + The port numbers the task can be accessed on on the host machine. + """ + if not ports: + port = None + elif len(ports) == 1: + [port] = ports + else: + # TODO: Support multiple ports (issue #29) + port = min(ports) + log.msg('Warning. %d ports found for app "%s". Consular currently ' + 'only supports a single port. Only the lowest port (%s) ' + 'will be used.' % (len(ports), app_id, port,)) + log.msg('Registering %s at %s with %s at %s:%s.' % ( - app_id, agent_endpoint, service_id, address, port)) - d = self.consul_request( - 'PUT', - '%s/v1/agent/service/register' % (agent_endpoint,), - { - 'Name': app_id, - 'ID': service_id, - 'Address': address, - 'Port': port, - }) - if self.enable_fallback: - d.addErrback( - self.register_service_fallback, app_id, service_id, - address, port) - return d + app_id, host, task_id, host, port)) + registration = self._create_service_registration(app_id, task_id, + host, port) - def register_service_fallback(self, failure, - app_id, service_id, address, port): - log.msg('Falling back for %s at %s with %s at %s:%s.' % ( - app_id, self.consul_endpoint, service_id, address, port)) - return self.consul_request( - 'PUT', - '%s/v1/agent/service/register' % (self.consul_endpoint,), - { - 'Name': app_id, - 'ID': service_id, - 'Address': address, - 'Port': port, - }) - - def deregister_service(self, agent_endpoint, app_id, service_id): - log.msg('Deregistering %s at %s with %s' % ( - app_id, agent_endpoint, service_id,)) - return self.consul_request( - 'PUT', '%s/v1/agent/service/deregister/%s' % ( - agent_endpoint, service_id,)) + return self.consul_client.register_agent_service(host, registration) + + def deregister_task_service(self, task_id, host): + """ + Deregister a Marathon task's service from Consul. + + :param str task_id: + The ID of the task, this will be used as the Consul service ID. + :param str host: + The host address of the machine the task is running on. + """ + return self.deregister_consul_service(host, task_id) + def deregister_consul_service(self, agent_endpoint, service_id): + """ + Deregister a service from a Consul agent. + + :param str agent_endpoint: + The HTTP endpoint of the Consul agent. + :param str service_id: + The ID of the Consul service to be deregistered. + """ + log.msg('Deregistering service with ID "%s" at Consul endpoint %s ' % ( + service_id, agent_endpoint,)) + return self.consul_client.deregister_agent_service( + agent_endpoint, service_id) + + @inlineCallbacks def sync_apps(self, purge=False): - d = self.marathon_request('GET', '/v2/apps') - d.addCallback(lambda response: response.json()) - d.addCallback( - lambda data: gatherResults( - [self.sync_app(app) for app in data['apps']])) + """ + Ensure all the apps in Marathon are registered as services + in Consul. + + Set ``purge`` to ``True`` if you automatically want services in Consul + that aren't registered in Marathon to be purged. Consular only purges + services that have been registered with the same ``registration-id``. + + :param bool purge: + To purge or not to purge. + """ + log.msg('Syncing apps') + apps = yield self.marathon_client.get_apps() + + self.check_apps_namespace_clash(apps) + + for app in apps: + yield self.sync_app(app) + if purge: - d.addCallback(lambda _: self.purge_dead_services()) - return d + log.msg('Purging dead apps') + yield self.purge_dead_apps(apps) - def get_app(self, app_id): - d = self.marathon_request('GET', '/v2/apps%s' % (app_id,)) - d.addCallback(lambda response: response.json()) - d.addCallback(lambda data: data['app']) - return d + def check_apps_namespace_clash(self, apps): + """ + Checks if app names in Marathon will cause a namespace clash in Consul. + Throws an exception if there is a collision, else returns the apps. + + :param: apps: + The JSON list of apps from Marathon's API. + """ + # Collect the app name to app id(s) mapping. + name_ids = {} + for app in apps: + app_id = app['id'] + app_name = get_app_name(app_id) + name_ids.setdefault(app_name, []).append(app_id) + + # Check if any app names map to more than one app id. + collisions = {name: ids + for name, ids in name_ids.items() if len(ids) > 1} + + if collisions: + collisions_string = '\n'.join(sorted( + ['%s => %s' % (name, ', '.join(ids),) + for name, ids in collisions.items()])) + raise RuntimeError( + 'The following Consul service name(s) will resolve to ' + 'multiple Marathon app names: \n%s' % (collisions_string,)) + + return apps + @inlineCallbacks def sync_app(self, app): - return gatherResults([ - self.sync_app_labels(app), - self.sync_app_tasks(app), - ]) + yield self.sync_app_labels(app) + yield self.sync_app_tasks(app) + + @inlineCallbacks + def purge_dead_apps(self, apps): + yield self.purge_dead_services() + yield self.purge_dead_app_labels(apps) + @inlineCallbacks def sync_app_labels(self, app): - labels = app.get('labels', {}) + """ + Sync the app labels for the given app by pushing its labels to the + Consul k/v store and cleaning any labels there that are no longer + present. + + :param: app: + The app JSON as return by the Marathon HTTP API. + """ # NOTE: KV requests can go straight to the consul registry # we're already connected to, they're not local to the agents. - return gatherResults([ - self.consul_request( - 'PUT', '%s/v1/kv/consular/%s/%s' % ( - self.consul_endpoint, - quote(get_appid(app['id'])), quote(key)), value) - for key, value in labels.items() - ]) + app_name = get_app_name(app['id']) + labels = app.get('labels', {}) + yield self.put_consul_app_labels(app_name, labels) + yield self.clean_consul_app_labels(app_name, labels) + + def put_consul_app_labels(self, app_name, labels): + """ + Store the given set of labels under the given app name in the Consul + k/v store. + """ + return self.put_consul_kvs({'consular/%s/%s' % (app_name, key,): value + for key, value in labels.items()}) + + @inlineCallbacks + def put_consul_kvs(self, key_values): + """ Store the given key/value set in the Consul k/v store. """ + for key, value in key_values.items(): + yield self.consul_client.put_kv(key, value) + + @inlineCallbacks + def clean_consul_app_labels(self, app_name, labels): + """ + Delete app labels stored in the Consul k/v store under the given app + name that aren't present in the given set of labels. + """ + # Get the existing labels from Consul + if self._debug: + log.msg('Cleaning labels no longer in use by app "%s"' % app_name) + + keys = yield handle_not_found_error(self.get_consul_app_keys, app_name) + if keys is None: + log.msg('No keys found in Consul for service "%s"' % app_name) + return + + if self._debug: + log.msg('%d labels stored in Marathon, %d keys found in Consul ' + 'for app "%s"' % (len(labels), len(keys), app_name)) + + # Filter out the Marathon labels + keys = self._filter_marathon_labels(keys, labels) + + if self._debug: + log.msg('%d keys to be deleted from Consul for app %s' % ( + len(keys), app_name)) + + # Delete the non-existant keys + for key in keys: + yield self.consul_client.delete_kv_keys(key) + + def get_consul_app_keys(self, app_name): + """ Get the Consul k/v keys for the app with the given name. """ + return self.consul_client.get_kv_keys('consular/%s' % (app_name,)) + def get_consul_consular_keys(self): + """ + Get the next level of Consul k/v keys at 'consular/', i.e. will + return 'consular/my-app' but not 'consular/my-app/my-label'. + """ + return self.consul_client.get_kv_keys('consular/', separator='/') + + def _filter_marathon_labels(self, consul_keys, marathon_labels): + """ + Takes a list of Consul keys and removes those with keys not found in + the given dict of Marathon labels. + + :param: consul_keys: + The list of Consul keys as returned by the Consul API. + :param: marathon_labels: + The dict of Marathon labels as returned by the Marathon API. + """ + label_key_set = set(marathon_labels.keys()) + return [key for key in consul_keys + if (self._consul_key_to_marathon_label_key(key) + not in label_key_set)] + + def _consul_key_to_marathon_label_key(self, consul_key): + """ + Trims the 'consular//' from the front of the key path to get + the Marathon label key. + """ + return consul_key.split('/', 2)[-1] + + @inlineCallbacks def sync_app_tasks(self, app): - d = self.marathon_request('GET', '/v2/apps%(id)s/tasks' % app) - d.addCallback(lambda response: response.json()) - d.addCallback(lambda data: gatherResults( - self.sync_app_task(app, task) for task in data['tasks'])) - return d + tasks = yield handle_not_found_error( + self.marathon_client.get_app_tasks, app['id']) + if tasks is None: + # Certain versions of Marathon may return 404 when an app has no + # tasks. Other versions return an empty list. + # https://github.com/mesosphere/marathon/issues/3881 + log.msg('No tasks found in Marathon for app ID "%s"' % app['id']) + return + + for task in tasks: + if task['state'] == 'TASK_RUNNING': + yield self.register_task_service( + app['id'], task['id'], task['host'], task['ports']) + + @inlineCallbacks + def purge_dead_app_labels(self, apps): + """ + Delete any keys stored in the Consul k/v store that belong to apps that + no longer exist. - def sync_app_task(self, app, task): - return self.register_service( - get_agent_endpoint(task['host']), - get_appid(app['id']), task['id'], - task['host'], task['ports'][0]) + :param: apps: + The list of apps as returned by the Marathon API. + """ + log.msg('Purging dead app labels') + # Get the existing keys + keys = yield handle_not_found_error(self.get_consul_consular_keys) + if keys is None: + log.msg('No Consular keys found in Consul') + return + + if self._debug: + log.msg('Got %d keys from Consul' % len(keys)) + + # Filter the present apps out + keys = self._filter_marathon_apps(keys, apps) + + if self._debug: + log.msg('After filtering out running apps, %d Consul keys remain ' + 'to be purged' % len(keys)) + # Delete the remaining keys + for key in keys: + yield self.consul_client.delete_kv_keys(key, recurse=True) + + def _filter_marathon_apps(self, consul_keys, marathon_apps): + """ + Takes a list of Consul keys and removes those with keys not found in + the given list of Marathon apps. + + :param: consul_keys: + The list of Consul keys as returned by the Consul API. + :param: marathon_apps: + The list of apps as returned by the Marathon API. + """ + app_name_set = set([get_app_name(app['id']) for app in marathon_apps]) + return [key for key in consul_keys + if (self._consul_key_to_marathon_app_name(key) + not in app_name_set)] + + def _consul_key_to_marathon_app_name(self, consul_key): + """ + Trims the 'consular/' from the front of the key path to get the + Marathon app name. + """ + return consul_key.split('/', 1)[-1].strip('/') + + @inlineCallbacks def purge_dead_services(self): - d = self.consul_request( - 'GET', '%s/v1/catalog/nodes' % (self.consul_endpoint,)) - d.addCallback(lambda response: response.json()) - d.addCallback(lambda data: gatherResults([ - self.purge_dead_agent_services( - get_agent_endpoint(node['Address'])) for node in data - ])) - return d + nodes = yield self.consul_client.get_catalog_nodes() + for node in nodes: + self.purge_dead_agent_services(node['Address']) @inlineCallbacks def purge_dead_agent_services(self, agent_endpoint): - response = yield self.consul_request( - 'GET', '%s/v1/agent/services' % (agent_endpoint,)) - data = yield response.json() + data = yield self.consul_client.get_agent_services(agent_endpoint) # collect the task ids for the service name services = {} for service_id, service in data.items(): - services.setdefault(service['Service'], set([])).add(service_id) + # Check the service for a tag that matches our registration ID + tags = service['Tags'] + if tags and self.reg_id_tag() in tags: + app_id = self.get_app_id_from_tags(tags) + if app_id: + services.setdefault(app_id, set()).add(service_id) + else: + log.msg('Service "%s" does not have an app ID in its ' + 'tags, it cannot be purged.' + % (service['Service'],)) + elif self._debug: + log.msg('Service "%s" is not tagged with our registration ID, ' + 'not touching it.' % (service['Service'],)) for app_id, task_ids in services.items(): yield self.purge_service_if_dead(agent_endpoint, app_id, task_ids) @inlineCallbacks def purge_service_if_dead(self, agent_endpoint, app_id, consul_task_ids): - response = yield self.marathon_request( - 'GET', '/v2/apps/%s/tasks' % (app_id,)) - data = yield response.json() - if 'tasks' not in data: - log.msg(('App %s does not look like a Marathon application, ' - 'skipping') % (str(app_id),)) - return - - marathon_task_ids = set([task['id'] for task in data['tasks']]) - tasks_to_be_purged = consul_task_ids - marathon_task_ids - for task_id in tasks_to_be_purged: - yield self.deregister_service(agent_endpoint, app_id, task_id) + # Get the running tasks for the app (don't raise an error if the tasks + # are not found) + tasks = yield handle_not_found_error( + self.marathon_client.get_app_tasks, app_id) + if tasks is None: + log.msg('No tasks found in Marathon for app ID "%s"' % app_id) + tasks = [] + + # Remove the running tasks from the set of Consul services + service_ids = self._filter_marathon_tasks(tasks, consul_task_ids) + + # Deregister the remaining old services + for service_id in service_ids: + yield self.deregister_consul_service(agent_endpoint, service_id) + + def _filter_marathon_tasks(self, marathon_tasks, consul_service_ids): + if not marathon_tasks: + return consul_service_ids + + task_id_set = set([task['id'] + for task in marathon_tasks + if task['state'] == 'TASK_RUNNING']) + return [service_id for service_id in consul_service_ids + if service_id not in task_id_set] diff --git a/consular/tests/test_clients.py b/consular/tests/test_clients.py new file mode 100644 index 0000000..cdc6cc8 --- /dev/null +++ b/consular/tests/test_clients.py @@ -0,0 +1,750 @@ +import json + +from twisted.internet.defer import inlineCallbacks, DeferredQueue +from twisted.trial.unittest import TestCase +from twisted.web.server import NOT_DONE_YET + +from txfake import FakeHttpServer +from txfake.fake_connection import wait0 + +from uritools import urisplit + +from consular.clients import ( + ConsulClient, HTTPError, JsonClient, MarathonClient) + + +class JsonClientTestBase(TestCase): + def setUp(self): + self.client = self.get_client() + self.requests = DeferredQueue() + self.fake_server = FakeHttpServer(self.handle_request) + + self.client.agent = self.fake_server.get_agent() + + def handle_request(self, request): + self.requests.put(request) + return NOT_DONE_YET + + def get_client(self): + """To be implemented by subclass""" + raise NotImplementedError() + + def write_json_response(self, request, json_data, response_code=200, + headers={'Content-Type': 'application/json'}): + request.setResponseCode(response_code) + for name, value in headers.items(): + request.setHeader(name, value) + request.write(json.dumps(json_data)) + request.finish() + + def uri(self, path): + return '%s%s' % (self.client.endpoint.geturi(), path,) + + def parse_query(self, uri): + """ + When Twisted parses "args" from the URI, it leaves out query parameters + that have no value. In those cases we rather use uritools to parse the + query parameters. + """ + return urisplit(uri).getquerydict() + + +class JsonClientTest(JsonClientTestBase): + + def get_client(self): + return JsonClient('http://localhost:8000') + + @inlineCallbacks + def test_request(self): + """ + When a request is made, it should be made with the correct method, + address and headers, and should contain an empty body. The response + should be returned. + """ + d = self.client.request('GET', '/hello') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/hello')) + self.assertEqual(request.getHeader('content-type'), 'application/json') + self.assertEqual(request.getHeader('accept'), 'application/json') + self.assertEqual(request.content.read(), '') + + request.setResponseCode(200) + request.write('hi\n') + request.finish() + + response = yield d + text = yield response.text() + self.assertEqual(text, 'hi\n') + + @inlineCallbacks + def test_request_json_data(self): + """ + When a request is made with the json_data parameter set, that data + should be sent as JSON. + """ + d = self.client.request('GET', '/hello', json_data={'test': 'hello'}) + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/hello')) + self.assertEqual(json.load(request.content), {'test': 'hello'}) + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_request_endpoint(self): + """ + When a request is made with the endpoint parameter set, that parameter + should be used as the endpoint. + """ + d = self.client.request('GET', '/hello', + endpoint='http://localhost:9000') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, 'http://localhost:9000/hello') + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_get_json(self): + """ + When the get_json method is called, a GET request should be made and + the response should be deserialized from JSON. + """ + d = self.client.get_json('/hello') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/hello')) + + request.setResponseCode(200) + request.write(json.dumps({'test': 'hello'})) + request.finish() + + res = yield d + self.assertEqual(res, {'test': 'hello'}) + + @inlineCallbacks + def test_client_error_response(self): + """ + When a request is made and a 4xx response code is returned, a HTTPError + should be raised to indicate a client error. + """ + d = self.client.request('GET', '/hello') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/hello')) + + request.setResponseCode(403) + request.write('Unauthorized\n') + request.finish() + + yield wait0() + failure = self.failureResultOf(d, HTTPError) + self.assertEqual( + failure.getErrorMessage(), + '403 Client Error for url: %s' % self.uri('/hello')) + + @inlineCallbacks + def test_server_error_response(self): + """ + When a request is made and a 5xx response code is returned, a HTTPError + should be raised to indicate a server error. + """ + d = self.client.request('GET', '/hello') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/hello')) + + request.setResponseCode(502) + request.write('Bad gateway\n') + request.finish() + + yield wait0() + failure = self.failureResultOf(d, HTTPError) + self.assertEqual( + failure.getErrorMessage(), + '502 Server Error for url: %s' % self.uri('/hello')) + + +class MarathonClientTest(JsonClientTestBase): + def get_client(self): + return MarathonClient('http://localhost:8080') + + @inlineCallbacks + def test_get_json_field(self): + """ + When get_json_field is used to make a request, the response is + deserialized from JSON and the value of the specified field is + returned. + """ + d = self.client.get_json_field('/my-path', 'field-key') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/my-path')) + + self.write_json_response(request, { + 'field-key': 'field-value', + 'other-field-key': 'do-not-care' + }) + + res = yield d + self.assertEqual(res, 'field-value') + + @inlineCallbacks + def test_get_json_field_missing(self): + """ + When get_json_field is used to make a request, the response is + deserialized from JSON and if the specified field is missing, an error + is raised. + """ + d = self.client.get_json_field('/my-path', 'field-key') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/my-path')) + + self.write_json_response(request, {'other-field-key': 'do-not-care'}) + + yield wait0() + failure = self.failureResultOf(d, KeyError) + self.assertEqual( + failure.getErrorMessage(), + '\'Unable to get value for "field-key" from Marathon response: ' + '"{"other-field-key": "do-not-care"}"\'') + + @inlineCallbacks + def test_get_event_subscription(self): + """ + When we request event subscriptions from Marathon, we should receive a + list of callback URLs. + """ + d = self.client.get_event_subscriptions() + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/v2/eventSubscriptions')) + + self.write_json_response(request, { + 'callbackUrls': [ + 'http://localhost:7000/events?registration=localhost' + ] + }) + + res = yield d + self.assertEqual(res, [ + 'http://localhost:7000/events?registration=localhost' + ]) + + @inlineCallbacks + def test_post_event_subscription(self): + """ + When we post an event subscription with a callback URL, we should + return True for a 200/OK response from Marathon. + """ + d = self.client.post_event_subscription( + 'http://localhost:7000/events?registration=localhost') + + request = yield self.requests.get() + self.assertEqual(request.method, 'POST') + self.assertEqual(request.path, self.uri('/v2/eventSubscriptions')) + self.assertEqual(request.args, { + 'callbackUrl': [ + 'http://localhost:7000/events?registration=localhost' + ] + }) + + self.write_json_response(request, { + # TODO: Add check that callbackUrl is correct + 'callbackUrl': + 'http://localhost:7000/events?registration=localhost', + 'clientIp': '0:0:0:0:0:0:0:1', + 'eventType': 'subscribe_event' + }) + + res = yield d + self.assertEqual(res, True) + + @inlineCallbacks + def test_post_event_subscription_not_ok(self): + """ + When we post an event subscription with a callback URL, we should + return False for a non-200/OK response from Marathon. + """ + d = self.client.post_event_subscription( + 'http://localhost:7000/events?registration=localhost') + + request = yield self.requests.get() + self.assertEqual(request.method, 'POST') + self.assertEqual(request.path, self.uri('/v2/eventSubscriptions')) + self.assertEqual(request.args, { + 'callbackUrl': [ + 'http://localhost:7000/events?registration=localhost' + ] + }) + + self.write_json_response(request, {}, response_code=201) + + res = yield d + self.assertEqual(res, False) + + @inlineCallbacks + def test_get_apps(self): + """ + When we request the list of apps from Marathon, we should receive the + list of apps with some information. + """ + d = self.client.get_apps() + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/v2/apps')) + + apps = { + 'apps': [ + { + 'id': '/product/us-east/service/myapp', + 'cmd': 'env && sleep 60', + 'constraints': [ + [ + 'hostname', + 'UNIQUE', + '' + ] + ], + 'container': None, + 'cpus': 0.1, + 'env': { + 'LD_LIBRARY_PATH': '/usr/local/lib/myLib' + }, + 'executor': '', + 'instances': 3, + 'mem': 5.0, + 'ports': [ + 15092, + 14566 + ], + 'tasksRunning': 0, + 'tasksStaged': 1, + 'uris': [ + 'https://raw.github.com/mesosphere/marathon/master/' + 'README.md' + ], + 'version': '2014-03-01T23:42:20.938Z' + } + ] + } + self.write_json_response(request, apps) + + res = yield d + self.assertEqual(res, apps['apps']) + + @inlineCallbacks + def test_get_app(self): + """ + When we request information on a specific app from Marathon, we should + receive information on that app. + """ + d = self.client.get_app('/my-app') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/v2/apps/my-app')) + + app = { + 'app': { + 'args': None, + 'backoffFactor': 1.15, + 'backoffSeconds': 1, + 'maxLaunchDelaySeconds': 3600, + 'cmd': 'python toggle.py $PORT0', + 'constraints': [], + 'container': None, + 'cpus': 0.2, + 'dependencies': [], + 'deployments': [ + { + 'id': '44c4ed48-ee53-4e0f-82dc-4df8b2a69057' + } + ], + 'disk': 0.0, + 'env': {}, + 'executor': '', + 'healthChecks': [ + { + 'command': None, + 'gracePeriodSeconds': 5, + 'intervalSeconds': 10, + 'maxConsecutiveFailures': 3, + 'path': '/health', + 'portIndex': 0, + 'protocol': 'HTTP', + 'timeoutSeconds': 10 + }, + { + 'command': None, + 'gracePeriodSeconds': 5, + 'intervalSeconds': 10, + 'maxConsecutiveFailures': 6, + 'path': '/machinehealth', + 'overridePort': 3333, + 'protocol': 'HTTP', + 'timeoutSeconds': 10 + } + ], + 'id': '/my-app', + 'instances': 2, + 'mem': 32.0, + 'ports': [ + 10000 + ], + 'requirePorts': False, + 'storeUrls': [], + 'upgradeStrategy': { + 'minimumHealthCapacity': 1.0 + }, + 'uris': [ + 'http://downloads.mesosphere.com/misc/toggle.tgz' + ], + 'user': None, + 'version': '2014-09-12T23:28:21.737Z', + 'versionInfo': { + 'lastConfigChangeAt': '2014-09-11T02:26:01.135Z', + 'lastScalingAt': '2014-09-12T23:28:21.737Z' + } + } + } + self.write_json_response(request, app) + + res = yield d + self.assertEqual(res, app['app']) + + @inlineCallbacks + def test_get_app_tasks(self): + """ + When we request the list of tasks for an app from Marathon, we should + receive a list of app tasks. + """ + d = self.client.get_app_tasks('/my-app') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/v2/apps/my-app/tasks')) + + tasks = { + 'tasks': [ + { + 'host': 'agouti.local', + 'id': 'my-app_1-1396592790353', + 'ports': [ + 31336, + 31337 + ], + 'stagedAt': '2014-04-04T06:26:30.355Z', + 'startedAt': '2014-04-04T06:26:30.860Z', + 'version': '2014-04-04T06:26:23.051Z' + }, + { + 'host': 'agouti.local', + 'id': 'my-app_0-1396592784349', + 'ports': [ + 31382, + 31383 + ], + 'stagedAt': '2014-04-04T06:26:24.351Z', + 'startedAt': '2014-04-04T06:26:24.919Z', + 'version': '2014-04-04T06:26:23.051Z' + } + ] + } + self.write_json_response(request, tasks) + + res = yield d + self.assertEqual(res, tasks['tasks']) + + +class ConsulClientTest(JsonClientTestBase): + def get_client(self): + return ConsulClient('http://localhost:8500') + + @inlineCallbacks + def test_register_agent_service(self): + """ + When a service is registered with an agent, the registration JSON is + PUT to the correct address. + """ + registration = { + 'ID': 'redis1', + 'Name': 'redis', + 'Tags': [ + 'master', + 'v1' + ], + 'Address': '127.0.0.1', + 'Port': 8000, + 'Check': { + 'Script': '/usr/local/bin/check_redis.py', + 'HTTP': 'http://localhost:5000/health', + 'Interval': '10s', + 'TTL': '15s' + } + } + d = self.client.register_agent_service('foo.example.com', registration) + + request = yield self.requests.get() + self.assertEqual(request.method, 'PUT') + self.assertEqual( + request.uri, + 'http://foo.example.com:8500/v1/agent/service/register') + self.assertEqual(json.load(request.content), registration) + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_register_agent_service_fallback(self): + """ + When a service is registered with an agent but the registration request + fails, the registration should fall back to the local Consul agent. + """ + self.client.enable_fallback = True + # First try and do a regular registration + registration = { + 'ID': 'redis1', + 'Name': 'redis', + 'Tags': [ + 'master', + 'v1' + ], + 'Address': '127.0.0.1', + 'Port': 8000, + 'Check': { + 'Script': '/usr/local/bin/check_redis.py', + 'HTTP': 'http://localhost:5000/health', + 'Interval': '10s', + 'TTL': '15s' + } + } + d = self.client.register_agent_service('foo.example.com', registration) + + request = yield self.requests.get() + # Fail the request + request.setResponseCode(503) + request.write("Service unavailable\n") + request.finish() + + # Expect the request to fallback to the regular endpoint + request = yield self.requests.get() + self.assertEqual(request.method, 'PUT') + self.assertEqual(request.uri, self.uri('/v1/agent/service/register')) + self.assertEqual(json.load(request.content), registration) + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_deregister_agent_service(self): + """ + When a service is deregistered, a PUT request is made to the correct + address. + """ + d = self.client.deregister_agent_service('foo.example.com', 'redis1') + + request = yield self.requests.get() + self.assertEqual(request.method, 'PUT') + self.assertEqual( + request.uri, + 'http://foo.example.com:8500/v1/agent/service/deregister/redis1') + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_put_kv(self): + """ + When a value is put in the key/value store, a PUT request is made to + the correct address with the JSON data in the payload. + """ + d = self.client.put_kv('foo', {'bar': 'baz'}) + + request = yield self.requests.get() + self.assertEqual(request.method, 'PUT') + self.assertEqual(request.uri, self.uri('/v1/kv/foo')) + self.assertEqual(json.load(request.content), {'bar': 'baz'}) + + request.setResponseCode(200) + request.write('true') + request.finish() + + res = yield d + json_res = yield res.json() + self.assertEqual(json_res, True) + + # TODO: Consul returns False. What should we do? + + @inlineCallbacks + def test_get_kv_keys(self): + """ + When we get keys from the key/value store, a request is made to the + correct address and a list of keys is returned. + """ + d = self.client.get_kv_keys('foo') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.path, self.uri('/v1/kv/foo')) + self.assertEqual(self.parse_query(request.uri), { + 'keys': [None] + }) + + keys = [ + '/foo/bar', + '/foo/baz/boo' + ] + request.setResponseCode(200) + request.write(json.dumps(keys)) + request.finish() + + res = yield d + self.assertEqual(res, keys) + + @inlineCallbacks + def test_get_kv_keys_separator(self): + """ + When we get keys from the key/value store and the "separator" parameter + is specified, a request is made to the correct address, the separator + is passed as a query parameter, and a list of keys is returned. + """ + d = self.client.get_kv_keys('foo', separator='/') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.path, self.uri('/v1/kv/foo')) + self.assertEqual(self.parse_query(request.uri), { + 'keys': [None], + 'separator': ['/'] + }) + + keys = [ + '/foo/bar', + '/foo/baz/' + ] + request.setResponseCode(200) + request.write(json.dumps(keys)) + request.finish() + + res = yield d + self.assertEqual(res, keys) + + @inlineCallbacks + def test_delete_kv_keys(self): + """ + When we delete keys from the key/value store, a request is made to the + correct address. + """ + d = self.client.delete_kv_keys('foo') + + request = yield self.requests.get() + self.assertEqual(request.method, 'DELETE') + self.assertEqual(request.uri, self.uri('/v1/kv/foo')) + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_delete_kv_keys_recursive(self): + """ + When we delete keys from the key/value store recursively, a request is + made to the correct address with the "recurse" query parameter set. + """ + d = self.client.delete_kv_keys('foo', recurse=True) + + request = yield self.requests.get() + self.assertEqual(request.method, 'DELETE') + self.assertEqual(request.path, self.uri('/v1/kv/foo')) + self.assertEqual(self.parse_query(request.uri), { + 'recurse': [None] + }) + + request.setResponseCode(200) + request.finish() + + yield d + + @inlineCallbacks + def test_get_catalog_nodes(self): + """ + When we get the list of nodes from the catalog, a request is made to + the correct address and a list of nodes is returned. + """ + d = self.client.get_catalog_nodes() + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, self.uri('/v1/catalog/nodes')) + + nodes = [ + { + 'Node': 'baz', + 'Address': '10.1.10.11' + }, + { + 'Node': 'foobar', + 'Address': '10.1.10.12' + } + ] + request.setResponseCode(200) + request.write(json.dumps(nodes)) + request.finish() + + res = yield d + self.assertEqual(res, nodes) + + @inlineCallbacks + def test_get_agent_services(self): + """ + When we get the list of services from an agent, a request is made to + the correct address and a list of services is returned. + """ + d = self.client.get_agent_services('foo.example.com') + + request = yield self.requests.get() + self.assertEqual(request.method, 'GET') + self.assertEqual(request.uri, + 'http://foo.example.com:8500/v1/agent/services') + + services = { + 'redis': { + 'ID': 'redis', + 'Service': 'redis', + 'Tags': None, + 'Address': 'http://foo.example.com', + 'Port': 8000 + } + } + request.setResponseCode(200) + request.write(json.dumps(services)) + request.finish() + + res = yield d + self.assertEqual(res, services) diff --git a/consular/tests/test_main.py b/consular/tests/test_main.py index 70e81b1..776069a 100644 --- a/consular/tests/test_main.py +++ b/consular/tests/test_main.py @@ -1,14 +1,15 @@ import json -from urllib import urlencode from twisted.trial.unittest import TestCase from twisted.web.server import Site from twisted.internet import reactor from twisted.internet.defer import ( inlineCallbacks, DeferredQueue, Deferred, succeed) +from twisted.internet.task import Clock from twisted.web.client import HTTPConnectionPool from twisted.python import log +from consular.clients import HTTPError from consular.main import Consular import treq @@ -30,6 +31,10 @@ def json(self): return d +class DummyConsularException(Exception): + pass + + class ConsularTest(TestCase): timeout = 1 @@ -38,9 +43,11 @@ def setUp(self): self.consular = Consular( 'http://localhost:8500', 'http://localhost:8080', - False + False, + 'test' ) - self.consular.debug = True + self.consular.set_debug(True) + self.consular.clock = Clock() # spin up a site so we can test it, pretty sure Klein has better # ways of doing this but they're not documented anywhere. @@ -56,30 +63,75 @@ def setUp(self): # We use this to mock requests going to Consul & Marathon self.requests = DeferredQueue() - def mock_requests(method, url, headers, data, pool, timeout): + def mock_requests(method, url, **kwargs): d = Deferred() self.requests.put({ 'method': method, 'url': url, - 'data': data, + 'data': kwargs.get('data'), 'deferred': d, }) return d - self.patch(self.consular, 'requester', mock_requests) + self.consular.set_requester(mock_requests) def request(self, method, path, data=None): return treq.request( method, 'http://localhost:%s%s' % ( self.listener_port, - path - ), + path), data=(json.dumps(data) if data is not None else None), pool=self.pool) def tearDown(self): pass + def test_reg_id_tag(self): + """ Consular's registration ID tag is properly formed. """ + self.assertEqual(self.consular.reg_id_tag(), 'consular-reg-id=test') + + def test_app_id_tag(self): + """ Consular's application ID tag is properly formed. """ + self.assertEqual(self.consular.app_id_tag('test'), + 'consular-app-id=test') + + def test_get_app_id_from_tags(self): + """ The app ID is successfully parsed from the Consul tags. """ + tags = [ + 'randomstuff', + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ] + self.assertEqual(self.consular.get_app_id_from_tags(tags), '/my-app') + + def test_get_app_id_from_tags_not_found(self): + """ + None is returned when the app ID cannot be found in the Consul tags. + """ + tags = [ + 'randomstuff', + 'consular-reg-id=test', + ] + self.assertEqual(self.consular.get_app_id_from_tags(tags), None) + + def test_get_app_id_from_tags_multiple(self): + """ + An exception is raised when multiple app IDs are found in the Consul + tags. + """ + tags = [ + 'randomstuff', + 'consular-reg-id=test', + 'consular-app-id=/my-app', + 'consular-app-id=/my-app2', + ] + exception = self.assertRaises(RuntimeError, + self.consular.get_app_id_from_tags, tags) + self.assertEqual(str(exception), + 'Multiple (2) Consular tags found for key ' + '"consular-app-id=": [\'consular-app-id=/my-app\', ' + '\'consular-app-id=/my-app2\']') + @inlineCallbacks def test_service(self): response = yield self.request('GET', '/') @@ -133,6 +185,25 @@ def test_TASK_RUNNING(self): "version": "2014-04-04T06:26:23.051Z" }) + # Store the task as a service in Consul + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') + self.assertEqual( + consul_request['url'], + 'http://slave-1234.acme.org:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-app_0-1396592784349', + 'Address': 'slave-1234.acme.org', + 'Port': 31372, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + # We should get the app info for the event marathon_app_request = yield self.requests.get() self.assertEqual(marathon_app_request['method'], 'GET') @@ -145,33 +216,186 @@ def test_TASK_RUNNING(self): } }))) - # Then we collect the tasks for the app - marathon_tasks_request = yield self.requests.get() - self.assertEqual(marathon_tasks_request['method'], 'GET') - self.assertEqual(marathon_tasks_request['url'], - 'http://localhost:8080/v2/apps/my-app/tasks') - marathon_tasks_request['deferred'].callback( + # Check if any existing labels stored in Consul + consul_kv_request = yield self.requests.get() + self.assertEqual(consul_kv_request['method'], 'GET') + self.assertEqual(consul_kv_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_kv_request['deferred'].callback( + FakeResponse(200, [], json.dumps([]))) + + response = yield d + self.assertEqual((yield response.json()), { + 'status': 'ok' + }) + + @inlineCallbacks + def test_TASK_RUNNING_app_not_found(self): + d = self.request('POST', '/events', { + "eventType": "status_update_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "slaveId": "20140909-054127-177048842-5050-1494-0", + "taskId": "my-app_0-1396592784349", + "taskStatus": "TASK_RUNNING", + "appId": "/my-app", + "host": "slave-1234.acme.org", + "ports": [31372], + "version": "2014-04-04T06:26:23.051Z" + }) + + # Store the task as a service in Consul + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') + self.assertEqual( + consul_request['url'], + 'http://slave-1234.acme.org:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-app_0-1396592784349', + 'Address': 'slave-1234.acme.org', + 'Port': 31372, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + # We try to get the app info for the event but the app is gone + marathon_app_request = yield self.requests.get() + self.assertEqual(marathon_app_request['method'], 'GET') + self.assertEqual(marathon_app_request['url'], + 'http://localhost:8080/v2/apps/my-app') + marathon_app_request['deferred'].callback( + FakeResponse(404, [], json.dumps({'message': 'Not found'}))) + + # So we do nothing... + + response = yield d + self.assertEqual((yield response.json()), { + 'status': 'ok' + }) + + @inlineCallbacks + def test_TASK_RUNNING_no_ports(self): + """ + When a TASK_RUNNING event is received from Marathon, and the task has + no ports, the task should be registered as a service in Consul. + """ + d = self.request('POST', '/events', { + "eventType": "status_update_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "slaveId": "20140909-054127-177048842-5050-1494-0", + "taskId": "my-app_0-1396592784349", + "taskStatus": "TASK_RUNNING", + "appId": "/my-app", + "host": "slave-1234.acme.org", + "ports": [], + "version": "2014-04-04T06:26:23.051Z" + }) + + # Store the task as a service in Consul with no port + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') + self.assertEqual( + consul_request['url'], + 'http://slave-1234.acme.org:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-app_0-1396592784349', + 'Address': 'slave-1234.acme.org', + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + # We should get the app info for the event + marathon_app_request = yield self.requests.get() + self.assertEqual(marathon_app_request['method'], 'GET') + self.assertEqual(marathon_app_request['url'], + 'http://localhost:8080/v2/apps/my-app') + marathon_app_request['deferred'].callback( FakeResponse(200, [], json.dumps({ - 'tasks': [{ - 'id': 'my-app_0-1396592784349', - 'host': 'slave-1234.acme.org', - 'ports': [31372], - }] + 'app': { + 'id': '/my-app', + } }))) - request = yield self.requests.get() - self.assertEqual(request['method'], 'PUT') + # Check if any existing labels stored in Consul + consul_kv_request = yield self.requests.get() + self.assertEqual(consul_kv_request['method'], 'GET') + self.assertEqual(consul_kv_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_kv_request['deferred'].callback( + FakeResponse(200, [], json.dumps([]))) + + response = yield d + self.assertEqual((yield response.json()), { + 'status': 'ok' + }) + + @inlineCallbacks + def test_TASK_RUNNING_multiple_ports(self): + """ + When a TASK_RUNNING event is received from Marathon, and the task has + multiple ports, the task should be registered as a service in Consul + with the lowest port. + """ + d = self.request('POST', '/events', { + "eventType": "status_update_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "slaveId": "20140909-054127-177048842-5050-1494-0", + "taskId": "my-app_0-1396592784349", + "taskStatus": "TASK_RUNNING", + "appId": "/my-app", + "host": "slave-1234.acme.org", + "ports": [4567, 1234, 6789], + "version": "2014-04-04T06:26:23.051Z" + }) + + # Store the task as a service in Consul with the lowest port + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') self.assertEqual( - request['url'], + consul_request['url'], 'http://slave-1234.acme.org:8500/v1/agent/service/register') - self.assertEqual(request['data'], json.dumps({ + self.assertEqual(json.loads(consul_request['data']), { 'Name': 'my-app', 'ID': 'my-app_0-1396592784349', 'Address': 'slave-1234.acme.org', - 'Port': 31372, - })) - request['deferred'].callback( + 'Port': 1234, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + consul_request['deferred'].callback( FakeResponse(200, [], json.dumps({}))) + + # We should get the app info for the event + marathon_app_request = yield self.requests.get() + self.assertEqual(marathon_app_request['method'], 'GET') + self.assertEqual(marathon_app_request['url'], + 'http://localhost:8080/v2/apps/my-app') + marathon_app_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'app': { + 'id': '/my-app', + } + }))) + + # Check if any existing labels stored in Consul + consul_kv_request = yield self.requests.get() + self.assertEqual(consul_kv_request['method'], 'GET') + self.assertEqual(consul_kv_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_kv_request['deferred'].callback( + FakeResponse(200, [], json.dumps([]))) + response = yield d self.assertEqual((yield response.json()), { 'status': 'ok' @@ -203,6 +427,78 @@ def test_TASK_KILLED(self): 'status': 'ok' }) + @inlineCallbacks + def test_schedule_sync(self): + """ + When Consular is set to schedule syncs, a sync should occur right away + and further syncs should occur after the correct delay. + """ + lc, d = self.consular.schedule_sync(1) + + self.assertTrue(lc.running) + + # Consular should do the first sync right away + request = yield self.requests.get() + self.assertEqual(request['method'], 'GET') + self.assertEqual( + request['url'], + 'http://localhost:8080/v2/apps') + + # Return no apps... let's make this quick + request['deferred'].callback( + FakeResponse(200, [], json.dumps({'apps': []}))) + + # Advance the clock for the next sync + self.consular.clock.advance(1) + + request = yield self.requests.get() + self.assertEqual(request['method'], 'GET') + self.assertEqual( + request['url'], + 'http://localhost:8080/v2/apps') + + request['deferred'].callback( + FakeResponse(200, [], json.dumps({'apps': []}))) + + lc.stop() + yield d + + @inlineCallbacks + def test_schedule_sync_handles_server_errors(self): + """ + When Consular is set to schedule syncs, syncs should not be interrupted + due to errors in previously scheduled syncs. + """ + lc, d = self.consular.schedule_sync(1) + + self.assertTrue(lc.running) + + # Consular should do the first sync right away + request = yield self.requests.get() + self.assertEqual(request['method'], 'GET') + self.assertEqual( + request['url'], + 'http://localhost:8080/v2/apps') + + # Return a server error. + request['deferred'].callback(FakeResponse(500, [], 'Server error')) + + # Advance the clock for the next sync + self.consular.clock.advance(1) + + # The next sync should happen regardless of the previous server error + request = yield self.requests.get() + self.assertEqual(request['method'], 'GET') + self.assertEqual( + request['url'], + 'http://localhost:8080/v2/apps') + + request['deferred'].callback( + FakeResponse(200, [], json.dumps({'apps': []}))) + + lc.stop() + yield d + @inlineCallbacks def test_register_with_marathon(self): d = self.consular.register_marathon_event_callback( @@ -215,10 +511,8 @@ def test_register_with_marathon(self): create_callback_request = yield self.requests.get() self.assertEqual( create_callback_request['url'], - 'http://localhost:8080/v2/eventSubscriptions?%s' % (urlencode({ - 'callbackUrl': ('http://localhost:7000/' - 'events?registration=the-uuid') - }),)) + 'http://localhost:8080/v2/eventSubscriptions?' + 'callbackUrl=http://localhost:7000/events?registration=the-uuid') self.assertEqual(create_callback_request['method'], 'POST') create_callback_request['deferred'].callback(FakeResponse(200, [])) @@ -253,80 +547,514 @@ def test_register_with_marathon_unexpected_response(self): list_callbacks_request['deferred'].callback( FakeResponse(400, [], json.dumps({ 'message': - 'http event callback system is not running on this Marathon ' + - 'instance. Please re-start this instance with ' + + 'http event callback system is not running on this Marathon ' + 'instance. Please re-start this instance with ' '"--event_subscriber http_callback".'}))) - failure = self.failureResultOf(d, RuntimeError) + failure = self.failureResultOf(d, HTTPError) self.assertEqual( failure.getErrorMessage(), - 'Unable to get existing event callbacks from Marathon: ' + - '\'{u\\\'message\\\': u\\\'http event callback system is not ' + - 'running on this Marathon instance. Please re-start this ' + - 'instance with "--event_subscriber http_callback".\\\'}\'') + '400 Client Error for url: ' + 'http://localhost:8080/v2/eventSubscriptions') @inlineCallbacks - def test_sync_app_task(self): - app = {'id': '/my-app'} - task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} - d = self.consular.sync_app_task(app, task) + def test_sync_app_tasks(self): + """ + When syncing an app with a task, Consul is updated with a service entry + for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [{ + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_RUNNING', + }]})) + ) + + # Consular should register the task in Consul consul_request = yield self.requests.get() self.assertEqual( consul_request['url'], 'http://0.0.0.0:8500/v1/agent/service/register') - self.assertEqual(consul_request['data'], json.dumps({ + self.assertEqual(json.loads(consul_request['data']), { 'Name': 'my-app', 'ID': 'my-task-id', 'Address': '0.0.0.0', 'Port': 1234, - })) + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) self.assertEqual(consul_request['method'], 'PUT') consul_request['deferred'].callback( FakeResponse(200, [], json.dumps({}))) yield d @inlineCallbacks - def test_sync_app_labels(self): - app = { - 'id': '/my-app', - 'labels': {'foo': 'bar'} - } - d = self.consular.sync_app_labels(app) + def test_sync_app_tasks_grouped(self): + """ + When syncing an app in a group with a task, Consul is updated with a + service entry for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-group/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-group/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + { + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_RUNNING', + } + ]})) + ) + + # Consular should register the task in Consul consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://0.0.0.0:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-group-my-app', + 'ID': 'my-task-id', + 'Address': '0.0.0.0', + 'Port': 1234, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-group/my-app', + ], + }) self.assertEqual(consul_request['method'], 'PUT') - self.assertEqual(consul_request['url'], - 'http://localhost:8500/v1/kv/consular/my-app/foo') - self.assertEqual(consul_request['data'], '"bar"') consul_request['deferred'].callback( FakeResponse(200, [], json.dumps({}))) yield d @inlineCallbacks - def test_sync_app(self): - app = { - 'id': '/my-app', - } - d = self.consular.sync_app(app) + def test_sync_app_tasks_task_lost(self): + """ + When syncing an app with a task that has the TASK_LOST state, + Consul should not be updated with a service entry + for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') self.assertEqual( marathon_request['url'], 'http://localhost:8080/v2/apps/my-app/tasks') - self.assertEqual(marathon_request['method'], 'GET') - marathon_request['deferred'].callback( - FakeResponse(200, [], json.dumps({'tasks': []}))) - yield d - @inlineCallbacks - def test_sync_apps(self): - d = self.consular.sync_apps(purge=False) - marathon_request = yield self.requests.get() - self.assertEqual(marathon_request['url'], - 'http://localhost:8080/v2/apps') - self.assertEqual(marathon_request['method'], 'GET') + # Respond with one task marathon_request['deferred'].callback( - FakeResponse(200, [], json.dumps({'apps': []}))) + FakeResponse(200, [], json.dumps({ + 'tasks': [ + { + 'id': 'my-task-1', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_LOST', + }, + { + 'id': 'my-task-2', + 'host': '0.0.0.0', + 'ports': [5678], + 'state': 'TASK_RUNNING', + } + ]})) + ) + + # Consular should register the task in Consul + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://0.0.0.0:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-task-2', + 'Address': '0.0.0.0', + 'Port': 5678, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + self.assertEqual(consul_request['method'], 'PUT') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_sync_app_tasks_no_ports(self): + """ + When syncing an app with a task with no ports, Consul is updated with a + service entry for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + { + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [], + 'state': 'TASK_RUNNING', + } + ]})) + ) + + # Consular should register the task in Consul with no port + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://0.0.0.0:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-task-id', + 'Address': '0.0.0.0', + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + self.assertEqual(consul_request['method'], 'PUT') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) yield d + @inlineCallbacks + def test_sync_app_tasks_multiple_ports(self): + """ + When syncing an app with a task with multiple ports, Consul is updated + with a service entry for the task with the lowest port. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + { + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [4567, 1234, 6789], + 'state': 'TASK_RUNNING', + } + ]})) + ) + + # Consular should register the task in Consul with the lowest port + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://0.0.0.0:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-task-id', + 'Address': '0.0.0.0', + 'Port': 1234, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + self.assertEqual(consul_request['method'], 'PUT') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_sync_app_tasks_not_found(self): + """ + When syncing an app with a task, and Marathon has no tasks for the app, + Consular should handle a 404 response from Marathon gracefully. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with a 404 + marathon_request['deferred'].callback( + FakeResponse(404, [], json.dumps( + {"message": "App '/my-app' does not exist"})) + ) + + # Nothing much should happen -- there are no tasks + + yield d + + @inlineCallbacks + def test_sync_app_labels(self): + app = { + 'id': '/my-app', + 'labels': {'foo': 'bar'} + } + d = self.consular.sync_app_labels(app) + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') + self.assertEqual(consul_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app/foo') + self.assertEqual(consul_request['data'], '"bar"') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'GET') + self.assertEqual(consul_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([]))) + + yield d + + @inlineCallbacks + def test_sync_app_labels_cleanup(self): + """ + When Consular syncs app labels, and labels are found in Consul which + aren't present in the Marathon app definition, those labels are deleted + from Consul. + """ + app = { + 'id': '/my-app', + 'labels': {'foo': 'bar'} + } + d = self.consular.sync_app_labels(app) + put_request = yield self.requests.get() + self.assertEqual(put_request['method'], 'PUT') + self.assertEqual(put_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app/foo') + self.assertEqual(put_request['data'], '"bar"') + put_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + get_request = yield self.requests.get() + self.assertEqual(get_request['method'], 'GET') + self.assertEqual(get_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_labels = [ + 'consular/my-app/foo', + 'consular/my-app/oldfoo', + 'consular/my-app/misplaced/foo', + ] + get_request['deferred'].callback( + FakeResponse(200, [], json.dumps(consul_labels))) + + delete_request1 = yield self.requests.get() + self.assertEqual(delete_request1['method'], 'DELETE') + self.assertEqual(delete_request1['url'], + 'http://localhost:8500/v1/kv/consular/my-app/oldfoo') + delete_request1['deferred'].callback( + FakeResponse(200, [], json.dumps(True))) + + delete_request2 = yield self.requests.get() + self.assertEqual(delete_request2['method'], 'DELETE') + self.assertEqual( + delete_request2['url'], + 'http://localhost:8500/v1/kv/consular/my-app/misplaced/foo') + delete_request2['deferred'].callback( + FakeResponse(200, [], json.dumps(True))) + + yield d + + @inlineCallbacks + def test_sync_app_labels_cleanup_not_found(self): + """ + When Consular syncs app labels, and labels aren't found in Consul and + Consul returns a 404, we should fail gracefully. + """ + app = { + 'id': '/my-app', + 'labels': {'foo': 'bar'} + } + d = self.consular.sync_app_labels(app) + put_request = yield self.requests.get() + self.assertEqual(put_request['method'], 'PUT') + self.assertEqual(put_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app/foo') + self.assertEqual(put_request['data'], '"bar"') + put_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + get_request = yield self.requests.get() + self.assertEqual(get_request['method'], 'GET') + self.assertEqual(get_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + get_request['deferred'].callback(FakeResponse(404, [], None)) + + yield d + + @inlineCallbacks + def test_sync_app_labels_cleanup_forbidden(self): + """ + When Consular syncs app labels, and labels aren't found in Consul and + Consul returns a 403, an error should be raised. + """ + app = { + 'id': '/my-app', + 'labels': {'foo': 'bar'} + } + d = self.consular.sync_app_labels(app) + put_request = yield self.requests.get() + self.assertEqual(put_request['method'], 'PUT') + self.assertEqual(put_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app/foo') + self.assertEqual(put_request['data'], '"bar"') + put_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + get_request = yield self.requests.get() + self.assertEqual(get_request['method'], 'GET') + self.assertEqual(get_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + get_request['deferred'].callback(FakeResponse(403, [], None)) + + # Error is raised into a DeferredList, must get actual error + failure = self.failureResultOf(d, HTTPError) + self.assertEqual( + failure.getErrorMessage(), + '403 Client Error for url: ' + 'http://localhost:8500/v1/kv/consular/my-app?keys') + + @inlineCallbacks + def test_sync_app(self): + app = { + 'id': '/my-app', + } + d = self.consular.sync_app(app) + + # First Consular syncs app labels... + # There are no labels in this definition so Consular doesn't push any + # labels to Consul, it just tries to clean up any existing labels. + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'GET') + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app?keys') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([]))) + + # Next Consular syncs app tasks... + # It fetches a list of tasks for an app and gets an empty list so + # nothing is to be done. + marathon_request = yield self.requests.get() + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + self.assertEqual(marathon_request['method'], 'GET') + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({'tasks': []}))) + yield d + + @inlineCallbacks + def test_sync_apps(self): + d = self.consular.sync_apps(purge=False) + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['url'], + 'http://localhost:8080/v2/apps') + self.assertEqual(marathon_request['method'], 'GET') + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({'apps': []}))) + yield d + + @inlineCallbacks + def test_sync_apps_field_not_found(self): + """ + When syncing apps, and Marathon returns a JSON response with an + unexpected structure (the "apps" field is missing). A KeyError should + be raised with a useful message. + """ + d = self.consular.sync_apps(purge=False) + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['url'], + 'http://localhost:8080/v2/apps') + self.assertEqual(marathon_request['method'], 'GET') + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'some field': 'that was unexpected' + }))) + + failure = self.failureResultOf(d, KeyError) + self.assertEqual( + failure.getErrorMessage(), + '\'Unable to get value for "apps" from Marathon response: "{"some ' + 'field": "that was unexpected"}"\'') + + def test_check_apps_namespace_clash_no_clash(self): + """ + When checking for app namespace clashes and there are no clashes, the + list of apps is returned. + """ + apps = [ + {'id': '/my-group/my-app'}, + {'id': '/my-app'}, + {'id': '/my-group/my-app2'}, + ] + apps_returned = self.consular.check_apps_namespace_clash(apps) + self.assertEqual(apps, apps_returned) + + def test_check_apps_namespace_clash_clashing(self): + """ + When checking for app namespace clashes and there are clashes, an + error is raised with an error message describing the clashes. + """ + apps = [ + {'id': '/my-group/my-subgroup/my-app'}, + {'id': '/my-group/my-subgroup-my-app'}, + {'id': '/my-group-my-subgroup-my-app'}, + {'id': '/my-app'}, + ] + exception = self.assertRaises( + RuntimeError, self.consular.check_apps_namespace_clash, apps) + + self.assertEqual('The following Consul service name(s) will resolve ' + 'to multiple Marathon app names: \nmy-group-my-subgro' + 'up-my-app => /my-group/my-subgroup/my-app, /my-group' + '/my-subgroup-my-app, /my-group-my-subgroup-my-app', + str(exception)) + @inlineCallbacks def test_purge_dead_services(self): d = self.consular.purge_dead_services() @@ -349,19 +1077,27 @@ def test_purge_dead_services(self): self.assertEqual(agent_request['method'], 'GET') agent_request['deferred'].callback( FakeResponse(200, [], json.dumps({ - "testingapp.someid1": { - "ID": "testingapp.someid1", + "testinggroup-someid1": { + "ID": "taskid1", "Service": "testingapp", "Tags": None, "Address": "machine-1", - "Port": 8102 + "Port": 8102, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], }, - "testingapp.someid2": { - "ID": "testingapp.someid2", + "testinggroup-someid1": { + "ID": "taskid2", "Service": "testingapp", "Tags": None, "Address": "machine-2", - "Port": 8103 + "Port": 8103, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], } })) ) @@ -370,15 +1106,17 @@ def test_purge_dead_services(self): # 1 less than Consul thinks exists. testingapp_request = yield self.requests.get() self.assertEqual(testingapp_request['url'], - 'http://localhost:8080/v2/apps/testingapp/tasks') + 'http://localhost:8080/v2/apps/testinggroup/someid1/' + 'tasks') self.assertEqual(testingapp_request['method'], 'GET') testingapp_request['deferred'].callback( FakeResponse(200, [], json.dumps({ "tasks": [{ - "appId": "/testingapp", - "id": "testingapp.someid2", + "appId": "/testinggroup/someid1", + "id": "taskid2", "host": "machine-2", "ports": [8103], + "state": "TASK_RUNNING", "startedAt": "2015-07-14T14:54:31.934Z", "stagedAt": "2015-07-14T14:54:31.544Z", "version": "2015-07-14T13:07:32.095Z" @@ -389,6 +1127,253 @@ def test_purge_dead_services(self): # Expecting a service registering in Consul as a result for one # of these services deregister_request = yield self.requests.get() + self.assertEqual( + deregister_request['url'], + ('http://1.2.3.4:8500/v1/agent/service/deregister/' + 'testinggroup-someid1')) + self.assertEqual(deregister_request['method'], 'PUT') + deregister_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_purge_task_lost_services(self): + """ + When a task has anything but the TASK_RUNNING state it should + be deregistered from Consul + """ + d = self.consular.purge_dead_services() + consul_request = yield self.requests.get() + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([{ + 'Node': 'consul-node', + 'Address': '1.2.3.4', + }])) + ) + + agent_request = yield self.requests.get() + # Expecting a request to list of all services in Consul, + # returning 2 + self.assertEqual( + agent_request['url'], + 'http://1.2.3.4:8500/v1/agent/services') + self.assertEqual(agent_request['method'], 'GET') + agent_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "testinggroup-someid1": { + "ID": "taskid1", + "Service": "testingapp", + "Tags": None, + "Address": "machine-1", + "Port": 8102, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], + }, + "testinggroup-someid1": { + "ID": "taskid2", + "Service": "testingapp", + "Tags": None, + "Address": "machine-2", + "Port": 8103, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], + } + })) + ) + + # Expecting a request for the tasks for a given app, + # returning only 1 task with state `TASK_RUNNING` + testingapp_request = yield self.requests.get() + self.assertEqual(testingapp_request['url'], + 'http://localhost:8080/v2/apps/testinggroup/someid1/' + 'tasks') + self.assertEqual(testingapp_request['method'], 'GET') + testingapp_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "tasks": [{ + "appId": "/testinggroup/someid1", + "id": "taskid1", + "host": "machine-1", + "ports": [8103], + "state": "TASK_RUNNING", + "startedAt": "2015-07-14T14:54:31.934Z", + "stagedAt": "2015-07-14T14:54:31.544Z", + "version": "2015-07-14T13:07:32.095Z" + }, { + "appId": "/testinggroup/someid1", + "id": "taskid2", + "host": "machine-2", + "ports": [8103], + "state": "TASK_LOST", + "startedAt": "2015-07-14T14:54:31.934Z", + "stagedAt": "2015-07-14T14:54:31.544Z", + "version": "2015-07-14T13:07:32.095Z" + }] + })) + ) + + # Expecting a service registering in Consul as a result for one + # of these services + deregister_request = yield self.requests.get() + self.assertEqual( + deregister_request['url'], + ('http://1.2.3.4:8500/v1/agent/service/deregister/' + 'testinggroup-someid1')) + self.assertEqual(deregister_request['method'], 'PUT') + deregister_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_purge_old_services(self): + """ + Services previously registered with Consul by Consular but that no + longer exist in Marathon should be purged if a registration ID is set. + """ + d = self.consular.purge_dead_services() + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/catalog/nodes') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([{ + 'Node': 'consul-node', + 'Address': '1.2.3.4', + }])) + ) + agent_request = yield self.requests.get() + # Expecting a request to list of all services in Consul, returning 3 + # services - one tagged with our registration ID, one tagged with a + # different registration ID, and one with no tags. + self.assertEqual( + agent_request['url'], + 'http://1.2.3.4:8500/v1/agent/services') + self.assertEqual(agent_request['method'], 'GET') + agent_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "testingapp.someid1": { + "ID": "testingapp.someid1", + "Service": "testingapp", + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testingapp", + ], + "Address": "machine-1", + "Port": 8102 + }, + "testingapp.someid2": { + "ID": "testingapp.someid2", + "Service": "testingapp", + "Tags": [ + "consular-reg-id=blah", + "consular-app-id=/testingapp", + ], + "Address": "machine-2", + "Port": 8103 + }, + "testingapp.someid3": { + "ID": "testingapp.someid2", + "Service": "testingapp", + "Tags": None, + "Address": "machine-2", + "Port": 8104 + } + })) + ) + + # Expecting a request for the tasks for a given app, returning no tasks + testingapp_request = yield self.requests.get() + self.assertEqual(testingapp_request['url'], + 'http://localhost:8080/v2/apps/testingapp/tasks') + self.assertEqual(testingapp_request['method'], 'GET') + testingapp_request['deferred'].callback( + FakeResponse(200, [], json.dumps({'tasks': []}))) + + # Expecting a service deregistering in Consul as a result. Only the + # task with the correct tag is returned. + deregister_request = yield self.requests.get() + self.assertEqual( + deregister_request['url'], + ('http://1.2.3.4:8500/v1/agent/service/deregister/' + 'testingapp.someid1')) + self.assertEqual(deregister_request['method'], 'PUT') + deregister_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_purge_old_services_tasks_not_found(self): + """ + Services previously registered with Consul by Consular but that no + longer exist in Marathon should be purged if a registration ID is set, + even if the tasks are not found. + """ + d = self.consular.purge_dead_services() + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/catalog/nodes') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([{ + 'Node': 'consul-node', + 'Address': '1.2.3.4', + }])) + ) + agent_request = yield self.requests.get() + # Expecting a request to list of all services in Consul, returning 3 + # services - one tagged with our registration ID, one tagged with a + # different registration ID, and one with no tags. + self.assertEqual( + agent_request['url'], + 'http://1.2.3.4:8500/v1/agent/services') + self.assertEqual(agent_request['method'], 'GET') + agent_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "testingapp.someid1": { + "ID": "testingapp.someid1", + "Service": "testingapp", + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testingapp", + ], + "Address": "machine-1", + "Port": 8102 + }, + "testingapp.someid2": { + "ID": "testingapp.someid2", + "Service": "testingapp", + "Tags": [ + "consular-reg-id=blah", + "consular-app-id=/testingapp", + ], + "Address": "machine-2", + "Port": 8103 + }, + "testingapp.someid3": { + "ID": "testingapp.someid2", + "Service": "testingapp", + "Tags": None, + "Address": "machine-2", + "Port": 8104 + } + })) + ) + + # Expecting a request for the tasks for a given app, returning a 404 + testingapp_request = yield self.requests.get() + self.assertEqual(testingapp_request['url'], + 'http://localhost:8080/v2/apps/testingapp/tasks') + self.assertEqual(testingapp_request['method'], 'GET') + testingapp_request['deferred'].callback( + FakeResponse(404, [], None)) + + # Expecting a service deregistering in Consul as a result. Only the + # task with the correct tag is returned. + deregister_request = yield self.requests.get() self.assertEqual( deregister_request['url'], ('http://1.2.3.4:8500/v1/agent/service/deregister/' @@ -398,24 +1383,147 @@ def test_purge_dead_services(self): FakeResponse(200, [], json.dumps({}))) yield d + @inlineCallbacks + def test_purge_old_service_no_app_id(self): + """ + Services previously registered with Consul by Consular but without an + app ID tagged (for some reason) should not be purged. + """ + d = self.consular.purge_dead_services() + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/catalog/nodes') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([{ + 'Node': 'consul-node', + 'Address': '1.2.3.4', + }])) + ) + agent_request = yield self.requests.get() + self.assertEqual( + agent_request['url'], + 'http://1.2.3.4:8500/v1/agent/services') + self.assertEqual(agent_request['method'], 'GET') + agent_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "testingapp.someid1": { + "ID": "testingapp.someid1", + "Service": "testingapp", + "Tags": [ + "consular-reg-id=test", + ], + "Address": "machine-1", + "Port": 8102 + } + })) + ) + + # Expecting no action to be taken as there is no app ID. + yield d + + @inlineCallbacks + def test_purge_dead_app_labels(self): + """ + Services previously registered with Consul by Consular but that no + longer exist in Marathon should have their labels removed from the k/v + store. + """ + d = self.consular.purge_dead_app_labels([{ + 'id': 'my-app' + }]) + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'GET') + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/kv/consular/?keys&separator=/') + # Return one existing app and one non-existing app + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([ + 'consular/my-app/', + 'consular/my-app2/', + ])) + ) + + # Consular should delete the app that doesn't exist + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'DELETE') + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/kv/consular/my-app2/?recurse') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + + yield d + + @inlineCallbacks + def test_purge_dead_app_labels_not_found(self): + """ + When purging labels from the Consul k/v store, if Consul can't find + a key and returns 404, we should fail gracefully. + """ + d = self.consular.purge_dead_app_labels([{ + 'id': 'my-app' + }]) + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'GET') + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/kv/consular/?keys&separator=/') + # Return a 404 error + consul_request['deferred'].callback(FakeResponse(404, [], None)) + + # No keys exist in Consul so nothing to purge + yield d + + @inlineCallbacks + def test_purge_dead_app_labels_forbidden(self): + """ + When purging labels from the Consul k/v store, if Consul can't find + a key and returns 403, an error should be raised. + """ + d = self.consular.purge_dead_app_labels([{ + 'id': 'my-app' + }]) + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'GET') + self.assertEqual( + consul_request['url'], + 'http://localhost:8500/v1/kv/consular/?keys&separator=/') + # Return a 403 error + consul_request['deferred'].callback(FakeResponse(403, [], None)) + + failure = self.failureResultOf(d, HTTPError) + self.assertEqual( + failure.getErrorMessage(), + '403 Client Error for url: ' + 'http://localhost:8500/v1/kv/consular/?keys&separator=/') + @inlineCallbacks def test_fallback_to_main_consul(self): - self.consular.enable_fallback = True - self.consular.register_service( - 'http://foo:8500', 'app_id', 'service_id', 'foo', 1234) + self.consular.consul_client.enable_fallback = True + self.consular.register_task_service( + '/app_id', 'service_id', 'foo', [1234]) request = yield self.requests.get() self.assertEqual( request['url'], 'http://foo:8500/v1/agent/service/register') - request['deferred'].errback(Exception('Something terrible')) + request['deferred'].errback( + DummyConsularException('Something terrible')) + [exc] = self.flushLoggedErrors(DummyConsularException) + self.assertEqual(str(exc.value), 'Something terrible') fallback_request = yield self.requests.get() self.assertEqual( fallback_request['url'], 'http://localhost:8500/v1/agent/service/register') - self.assertEqual(fallback_request['data'], json.dumps({ + self.assertEqual(json.loads(fallback_request['data']), { 'Name': 'app_id', 'ID': 'service_id', 'Address': 'foo', 'Port': 1234, - })) + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/app_id', + ], + }) diff --git a/docs/conf.py b/docs/conf.py index 4b22d4f..1a02c06 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -19,7 +19,7 @@ # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. -#sys.path.insert(0, os.path.abspath('.')) +sys.path.insert(0, os.path.abspath('..')) # -- General configuration ------------------------------------------------ @@ -289,4 +289,7 @@ # Example configuration for intersphinx: refer to the Python standard library. -intersphinx_mapping = {'https://docs.python.org/': None} +intersphinx_mapping = { + 'https://docs.python.org/': None, + 'klein': ('https://klein.readthedocs.org/en/latest/', None), +} diff --git a/docs/index.rst b/docs/index.rst index 29f298e..9dfa853 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,8 +1,21 @@ Consular ======== -Receive events from Marathon_, update Consul_ with the relevant information -about services & tasks. +Consular is a micro-service that relays information between Marathon_ and +Consul_. It registers itself for HTTP event callbacks with Marathon_ and makes +the appropriate API calls to register applications that Marathon_ runs as +services in Consul_. De-registrations of applications happens in the same way. + +Marathon_ is always considered the source of truth. + +If Marathon application definitions contain labels_ (application metadata) +they will be added to the Consul_ key/value store. This can be especially +useful when Consul_ is combined with `Consul Template`_ to automatically +generate configuration files for proxies such as HAProxy_ and Nginx_. + +.. image:: http://www.websequencediagrams.com/cgi-bin/cdraw?lz=dGl0bGUgQ29uc3VsYXIsAAMHICYgTWFyYXRob24KCgACCCAtPgAgCTogTm90aWZpY2F0aW9uIG9mXG5uZXcgYXBwbAANBwoATAgALQo6IFJlZ2lzdHIAJw5zZXJ2aWNlABwVQWRkIEEASwogbWV0YWRhdGFcbihsYWJlbHMpIHRvAIE4B1xuSy9WIHN0b3IARAgAgSgKLVRlbXBsYXRlAIEmE2NoYW5nZXMAgSoHACEJIC0-IExvYWQtQmFsYW5jZXI6IEdlbmVyYXRlIG5ld1xubG9hZC1iABYHIGNvbmZpZwAiI1JlbG9hZAApB3UAggYG&s=napkin + :target: http://www.websequencediagrams.com/?lz=dGl0bGUgQ29uc3VsYXIsAAMHICYgTWFyYXRob24KCgACCCAtPgAgCTogTm90aWZpY2F0aW9uIG9mXG5uZXcgYXBwbAANBwoATAgALQo6IFJlZ2lzdHIAJw5zZXJ2aWNlABwVQWRkIEEASwogbWV0YWRhdGFcbihsYWJlbHMpIHRvAIE4B1xuSy9WIHN0b3IARAgAgSgKLVRlbXBsYXRlAIEmE2NoYW5nZXMAgSoHACEJIC0-IExvYWQtQmFsYW5jZXI6IEdlbmVyYXRlIG5ld1xubG9hZC1iABYHIGNvbmZpZwAiI1JlbG9hZAApB3UAggYG&s=napkin + :alt: consular sequence diagram .. image:: https://travis-ci.org/universalcore/consular.svg?branch=develop :target: https://travis-ci.org/universalcore/consular @@ -41,6 +54,24 @@ Installing for local dev (ve)$ pip install -e . (ve)$ pip install -r requirements-dev.txt +Running tests +~~~~~~~~~~~~~ + +:: + + $ source ve/bin/activate + (ve)$ py.test consular .. _Marathon: http://mesosphere.github.io/marathon/ .. _Consul: http://consul.io/ +.. _labels: https://mesosphere.github.io/marathon/docs/rest-api.html#labels-object-of-string-values +.. _HAProxy: http://www.haproxy.org/ +.. _Nginx: http://nginx.org/ +.. _`Consul Template`: https://github.com/hashicorp/consul-template + + +Consular Class Documentation +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. automodule:: consular.main + :members: diff --git a/requirements-dev.txt b/requirements-dev.txt index c3ab0d5..78bdd69 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,6 @@ -pytest +pytest>=3.0.0 pytest-coverage pytest-xdist flake8 sphinx +txfake>=0.1.1 diff --git a/requirements.txt b/requirements.txt index 903109c..0b5bd7a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,2 @@ -Twisted -Klein -treq -click +# Our dependencies are all specified in setup.py. +-e . diff --git a/setup-pypy-travis.sh b/setup-pypy-travis.sh new file mode 100644 index 0000000..05caa67 --- /dev/null +++ b/setup-pypy-travis.sh @@ -0,0 +1,16 @@ +# NOTE: This script needs to be sourced so it can modify the environment. + +# Get out of the virtualenv we're in. +deactivate + +# Install pyenv. +curl -L https://raw.githubusercontent.com/yyuu/pyenv-installer/master/bin/pyenv-installer | bash +export PATH="$HOME/.pyenv/bin:$PATH" +eval "$(pyenv init -)" +eval "$(pyenv virtualenv-init -)" + +# Install pypy and make a virtualenv for it. +pyenv install -s pypy-$PYPY_VERSION +pyenv global pypy-$PYPY_VERSION +virtualenv -p $(which python) ~/env-pypy-$PYPY_VERSION +source ~/env-pypy-$PYPY_VERSION/bin/activate diff --git a/setup.cfg b/setup.cfg index 050d01b..28516f8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ -[pytest] -addopts = --doctest-modules --verbose --ignore=ve/ consular +[tool:pytest] +addopts = --doctest-modules --verbose --ignore=ve/ diff --git a/setup.py b/setup.py index 43a96fb..f84670d 100644 --- a/setup.py +++ b/setup.py @@ -6,9 +6,6 @@ with open(os.path.join(here, 'README.rst')) as f: README = f.read() -with open(os.path.join(here, 'requirements.txt')) as f: - requires = filter(None, f.readlines()) - with open(os.path.join(here, 'VERSION')) as f: version = f.read().strip() @@ -29,8 +26,13 @@ packages=find_packages(exclude=['docs']), include_package_data=True, zip_safe=False, - install_requires=requires, - tests_require=requires, + install_requires=[ + 'click', + 'Klein', + 'treq', + 'Twisted', + 'uritools>=1.0.0', + ], entry_points={ 'console_scripts': ['consular = consular.cli:main'], }) diff --git a/templates/nginx.ctmpl b/templates/nginx.ctmpl index 88e3f67..7132da9 100644 --- a/templates/nginx.ctmpl +++ b/templates/nginx.ctmpl @@ -7,7 +7,7 @@ # Nginx. If the key does not exist the service isn't added to the # list of services in the Nginx config. -{{range services}}{{if key (print "consular/" .Name "/domain") }} +{{range services}}{{$labels := ls (print "consular/" .Name) | explode }}{{if $labels.domain}} upstream {{.Name}} { {{range service .Name }}server {{.Address}}:{{.Port}}; @@ -16,7 +16,7 @@ upstream {{.Name}} { server { listen 80; - server_name {{key (print "consular/" .Name "/domain") | parseJSON }}; + server_name {{$labels.domain | parseJSON }}; location / { proxy_pass http://{{.Name}};