Skip to content

Commit

Permalink
Tag services with the registration ID and clean old services
Browse files Browse the repository at this point in the history
* Tag services with Consular's registration ID so that the services
  are "owned" by Consular and can be trimmed when they no longer
  exist.
  • Loading branch information
JayH5 committed Aug 19, 2015
1 parent ff445e1 commit 9373edf
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 27 deletions.
7 changes: 5 additions & 2 deletions consular/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
help='The Marathon HTTP API')
@click.option('--registration-id',
help=('Auto register for Marathon event callbacks with the '
'registration-id. Must be unique for each consular '
'process.'), type=str)
'registration-id. Also used to identify which services in '
'Consul should be maintained by consular. Must be unique '
'for each consular process.'),
type=str)
@click.option('--sync-interval',
help=('Automatically sync the apps in Marathon with what\'s '
'in Consul every _n_ seconds. Defaults to 0 (disabled).'),
Expand Down Expand Up @@ -64,6 +66,7 @@ def main(scheme, host, port,
consular.timeout = timeout
consular.fallback_timeout = fallback_timeout
if registration_id:
consular.registration_id = registration_id
events_url = "%s://%s:%s/events?%s" % (
scheme, host, port,
urlencode({
Expand Down
81 changes: 56 additions & 25 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Consular(object):
clock = reactor
timeout = 5
fallback_timeout = 2
registration_id = None
requester = lambda self, *a, **kw: treq.request(*a, **kw)

def __init__(self, consul_endpoint, marathon_endpoint, enable_fallback):
Expand Down Expand Up @@ -182,38 +183,49 @@ def handle_unknown_event(self, request, event):
'error': 'Event type %s not supported.' % (event_type,)
})

def _registration_tag(self):
"""
Get the Consul service tag used to mark a service as created by this
instance of Consular.
"""
return 'consular-reg-id:%s' % (self.registration_id,)

def _create_service_registration(self, app_id, service_id, address, port):
"""
Create the request body for registering a service with Consul.
"""
registration = {
'Name': app_id,
'ID': service_id,
'Address': address,
'Port': port
}
if self.registration_id:
registration['Tags'] = [self._registration_tag()]
return registration

def register_service(self, agent_endpoint,
app_id, service_id, address, port):
log.msg('Registering %s at %s with %s at %s:%s.' % (
app_id, agent_endpoint, service_id, address, port))
registration = self._create_service_registration(app_id, service_id,
address, port)

d = self.consul_request(
'PUT',
'%s/v1/agent/service/register' % (agent_endpoint,),
{
'Name': app_id,
'ID': service_id,
'Address': address,
'Port': port,
})
registration)
if self.enable_fallback:
d.addErrback(
self.register_service_fallback, app_id, service_id,
address, port)
d.addErrback(self.register_service_fallback, registration)
return d

def register_service_fallback(self, failure,
app_id, service_id, address, port):
log.msg('Falling back for %s at %s with %s at %s:%s.' % (
app_id, self.consul_endpoint, service_id, address, port))
def register_service_fallback(self, failure, registration):
log.msg('Falling back for %s at %s.' % (
registration['Name'], self.consul_endpoint))
return self.consul_request(
'PUT',
'%s/v1/agent/service/register' % (self.consul_endpoint,),
{
'Name': app_id,
'ID': service_id,
'Address': address,
'Port': port,
})
registration)

def deregister_service(self, agent_endpoint, app_id, service_id):
log.msg('Deregistering %s at %s with %s' % (
Expand Down Expand Up @@ -288,22 +300,41 @@ def purge_dead_agent_services(self, agent_endpoint):
# collect the task ids for the service name
services = {}
for service_id, service in data.items():
services.setdefault(service['Service'], set([])).add(service_id)
# If we have a registration ID, check the service for a tag that
# matches our registration ID
if (not self.registration_id
or self._is_registration_in_tags(service['Tags'])):
services.setdefault(service['Service'], set([])).add(
service_id)

for app_id, task_ids in services.items():
yield self.purge_service_if_dead(agent_endpoint, app_id, task_ids)

def _is_registration_in_tags(self, tags):
"""
Check if the Consul service was tagged with our registration ID.
"""
if not tags:
return False

return self._registration_tag() in tags

@inlineCallbacks
def purge_service_if_dead(self, agent_endpoint, app_id, consul_task_ids):
response = yield self.marathon_request(
'GET', '/v2/apps/%s/tasks' % (app_id,))
data = yield response.json()
tasks_to_be_purged = set(consul_task_ids)
if 'tasks' not in data:
log.msg(('App %s does not look like a Marathon application, '
'skipping') % (str(app_id),))
return
# If there are no matching tasks in Marathon and we haven't matched
# the service by registration ID, then skip it.
if not self.registration_id:
log.msg(('App %s does not look like a Marathon application, '
'skipping') % (str(app_id),))
return
else:
marathon_task_ids = set([task['id'] for task in data['tasks']])
tasks_to_be_purged -= marathon_task_ids

marathon_task_ids = set([task['id'] for task in data['tasks']])
tasks_to_be_purged = consul_task_ids - marathon_task_ids
for task_id in tasks_to_be_purged:
yield self.deregister_service(agent_endpoint, app_id, task_id)
77 changes: 77 additions & 0 deletions consular/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,83 @@ def test_purge_dead_services(self):
FakeResponse(200, [], json.dumps({})))
yield d

@inlineCallbacks
def test_purge_old_services(self):
"""
Services previously registered with Consul by Consular but that no
longer exist in Marathon should be purged if a registration ID is set.
"""
self.consular.registration_id = "test"
d = self.consular.purge_dead_services()
consul_request = yield self.requests.get()
self.assertEqual(
consul_request['url'],
'http://localhost:8500/v1/catalog/nodes')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps([{
'Node': 'consul-node',
'Address': '1.2.3.4',
}]))
)
agent_request = yield self.requests.get()
# Expecting a request to list of all services in Consul, returning 3
# services - one tagged with our registration ID, one tagged with a
# different registration ID, and one with no tags.
self.assertEqual(
agent_request['url'],
'http://1.2.3.4:8500/v1/agent/services')
self.assertEqual(agent_request['method'], 'GET')
agent_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"testingapp.someid1": {
"ID": "testingapp.someid1",
"Service": "testingapp",
"Tags": [
"consular-reg-id:test"
],
"Address": "machine-1",
"Port": 8102
},
"testingapp.someid2": {
"ID": "testingapp.someid2",
"Service": "testingapp",
"Tags": [
"consular-reg-id:blah"
],
"Address": "machine-2",
"Port": 8103
},
"testingapp.someid3": {
"ID": "testingapp.someid2",
"Service": "testingapp",
"Tags": None,
"Address": "machine-2",
"Port": 8104
}
}))
)

# Expecting a request for the tasks for a given app, returning no tasks
testingapp_request = yield self.requests.get()
self.assertEqual(testingapp_request['url'],
'http://localhost:8080/v2/apps/testingapp/tasks')
self.assertEqual(testingapp_request['method'], 'GET')
testingapp_request['deferred'].callback(
FakeResponse(200, [], json.dumps({}))
)

# Expecting a service deregistering in Consul as a result. Only the
# task with the correct tag is returned.
deregister_request = yield self.requests.get()
self.assertEqual(
deregister_request['url'],
('http://1.2.3.4:8500/v1/agent/service/deregister/'
'testingapp.someid1'))
self.assertEqual(deregister_request['method'], 'PUT')
deregister_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))
yield d

@inlineCallbacks
def test_fallback_to_main_consul(self):
self.consular.enable_fallback = True
Expand Down

0 comments on commit 9373edf

Please sign in to comment.