Skip to content

Commit

Permalink
Delete Consul k/v entries for non-existant apps
Browse files Browse the repository at this point in the history
* With `--purge`, clean out old k/v entries for apps during every
  sync.
  • Loading branch information
JayH5 committed Sep 15, 2015
1 parent e053177 commit 8590ddc
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 28 deletions.
106 changes: 82 additions & 24 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,27 +369,38 @@ def sync_apps(self, purge=False):
:param bool purge:
To purge or not to purge.
"""
d = self.get_marathon_apps()
return d.addCallback(self.sync_and_purge_apps, purge)

def get_marathon_apps(self):
""" Get a list of running apps from the Marathon API. """
d = self.marathon_request('GET', '/v2/apps')
d.addCallback(lambda response: response.json())
d.addCallback(
lambda data: gatherResults(
[self.sync_app(app) for app in data['apps']]))
return d.addCallback(lambda data: data['apps'])

def sync_and_purge_apps(self, apps, purge=False):
deferreds = [gatherResults([self.sync_app(app) for app in apps])]
if purge:
d.addCallback(lambda _: self.purge_dead_services())
return d
deferreds.append(self.purge_dead_apps(apps))
return gatherResults(deferreds)

def get_app(self, app_id):
d = self.marathon_request('GET', '/v2/apps%s' % (app_id,))
d.addCallback(lambda response: response.json())
d.addCallback(lambda data: data['app'])
return d
d = d.addCallback(lambda response: response.json())
return d.addCallback(lambda data: data['app'])

def sync_app(self, app):
return gatherResults([
self.sync_app_labels(app),
self.sync_app_tasks(app),
])

def purge_dead_apps(self, apps):
return gatherResults([
self.purge_dead_services(),
self.purge_dead_app_labels(apps)
])

def sync_app_labels(self, app):
"""
Sync the app labels for the given app by pushing its labels to the
Expand Down Expand Up @@ -438,29 +449,38 @@ def clean_consul_app_labels(self, app_name, labels):
d.addCallback(self._filter_marathon_labels, labels)

# Delete the non-existant keys
d.addCallback(self.delete_consul_kv_keys)

return d
return d.addCallback(self.delete_consul_kv_keys)

def get_consul_app_keys(self, app_name):
""" Get the Consul k/v keys for the app with the given name. """
return self.get_consul_kv_keys('consular/%s' % (app_name,))

def get_consul_kv_keys(self, key_path):
""" Get the Consul k/v keys present at the given key path. """
d = self.consul_request('GET', '%s/v1/kv/%s?keys' % (
self.consul_endpoint, quote(key_path),))
d.addCallback(lambda response: response.json())
return d
def get_consul_consular_keys(self):
"""
Get the next level of Consul k/v keys at 'consular/', i.e. will
return 'consular/my-app' but not 'consular/my-app/my-label'.
"""
return self.get_consul_kv_keys('consular/', separator='/')

def delete_consul_kv_keys(self, keys):
def get_consul_kv_keys(self, key_path, separator=None):
""" Get the Consul k/v keys present at the given key path. """
params = {'keys': ''}
if separator:
params['separator'] = separator
d = self.consul_request('GET', '%s/v1/kv/%s?%s' % (
self.consul_endpoint, quote(key_path), urlencode(params)))
return d.addCallback(lambda response: response.json())

def delete_consul_kv_keys(self, keys, recurse=False):
""" Delete a sequence of Consul k/v keys. """
return gatherResults([self.delete_consul_kv_key(key) for key in keys])
return gatherResults([self.delete_consul_kv_key(key, recurse)
for key in keys])

def delete_consul_kv_key(self, key):
def delete_consul_kv_key(self, key, recurse=False):
""" Delete the Consul k/v entry associated with the given key. """
return self.consul_request('DELETE', '%s/v1/kv/%s' % (
self.consul_endpoint, quote(key),))
return self.consul_request('DELETE', '%s/v1/kv/%s%s' % (
self.consul_endpoint, quote(key),
'?recurse' if recurse else '',))

def _filter_marathon_labels(self, consul_keys, marathon_labels):
"""
Expand All @@ -487,15 +507,53 @@ def _consul_key_to_marathon_label_key(self, consul_key):
def sync_app_tasks(self, app):
d = self.marathon_request('GET', '/v2/apps%(id)s/tasks' % app)
d.addCallback(lambda response: response.json())
d.addCallback(lambda data: gatherResults(
return d.addCallback(lambda data: gatherResults(
self.sync_app_task(app, task) for task in data['tasks']))
return d

def sync_app_task(self, app, task):
return self.register_service(
get_agent_endpoint(task['host']), app['id'], task['id'],
task['host'], task['ports'][0])

def purge_dead_app_labels(self, apps):
"""
Delete any keys stored in the Consul k/v store that belong to apps that
no longer exist.
:param: apps:
The list of apps as returned by the Marathon API.
"""
# Get the existing keys
d = self.get_consul_consular_keys()

# Filter the present apps out
d.addCallback(self._filter_marathon_apps, apps)

# Delete the remaining keys
return d.addCallback(self.delete_consul_kv_keys, recurse=True)

def _filter_marathon_apps(self, consul_keys, marathon_apps):
"""
Takes a list of Consul keys and removes those with keys not found in
the given list of Marathon apps.
:param: consul_keys:
The list of Consul keys as returned by the Consul API.
:param: marathon_apps:
The list of apps as returned by the Marathon API.
"""
app_name_set = set([get_app_name(app['id']) for app in marathon_apps])
return [key for key in consul_keys
if (self._consul_key_to_marathon_app_name(key)
not in app_name_set)]

def _consul_key_to_marathon_app_name(self, consul_key):
"""
Trims the 'consular/' from the front of the key path to get the
Marathon app name.
"""
return consul_key.split('/', 1)[-1]

def purge_dead_services(self):
d = self.consul_request(
'GET', '%s/v1/catalog/nodes' % (self.consul_endpoint,))
Expand Down
42 changes: 38 additions & 4 deletions consular/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def test_TASK_RUNNING(self):
consul_kv_request = yield self.requests.get()
self.assertEqual(consul_kv_request['method'], 'GET')
self.assertEqual(consul_kv_request['url'],
'http://localhost:8500/v1/kv/consular/my-app?keys')
'http://localhost:8500/v1/kv/consular/my-app?keys=')
consul_kv_request['deferred'].callback(
FakeResponse(200, [], json.dumps([])))

Expand Down Expand Up @@ -398,7 +398,7 @@ def test_sync_app_labels(self):
consul_request = yield self.requests.get()
self.assertEqual(consul_request['method'], 'GET')
self.assertEqual(consul_request['url'],
'http://localhost:8500/v1/kv/consular/my-app?keys')
'http://localhost:8500/v1/kv/consular/my-app?keys=')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps([])))

Expand Down Expand Up @@ -427,7 +427,7 @@ def test_sync_app_labels_cleanup(self):
get_request = yield self.requests.get()
self.assertEqual(get_request['method'], 'GET')
self.assertEqual(get_request['url'],
'http://localhost:8500/v1/kv/consular/my-app?keys')
'http://localhost:8500/v1/kv/consular/my-app?keys=')
consul_labels = [
'consular/my-app/foo',
'consular/my-app/oldfoo',
Expand Down Expand Up @@ -467,7 +467,7 @@ def test_sync_app(self):
self.assertEqual(consul_request['method'], 'GET')
self.assertEqual(
consul_request['url'],
'http://localhost:8500/v1/kv/consular/my-app?keys')
'http://localhost:8500/v1/kv/consular/my-app?keys=')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps([])))

Expand Down Expand Up @@ -691,6 +691,40 @@ def test_purge_old_service_no_app_id(self):
# Expecting no action to be taken as there is no app ID.
yield d

@inlineCallbacks
def test_purge_dead_app_labels(self):
"""
Services previously registered with Consul by Consular but that no
longer exist in Marathon should have their labels removed from the k/v
store.
"""
d = self.consular.purge_dead_app_labels([{
'id': 'my-app'
}])
consul_request = yield self.requests.get()
self.assertEqual(consul_request['method'], 'GET')
self.assertEqual(
consul_request['url'],
'http://localhost:8500/v1/kv/consular/?keys=&separator=%2F')
# Return one existing app and one non-existing app
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps([
'consular/my-app',
'consular/my-app2',
]))
)

# Consular should delete the app that doesn't exist
consul_request = yield self.requests.get()
self.assertEqual(consul_request['method'], 'DELETE')
self.assertEqual(
consul_request['url'],
'http://localhost:8500/v1/kv/consular/my-app2?recurse')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))

yield d

@inlineCallbacks
def test_fallback_to_main_consul(self):
self.consular.enable_fallback = True
Expand Down

0 comments on commit 8590ddc

Please sign in to comment.