Skip to content

Commit

Permalink
purge every once in a while
Browse files Browse the repository at this point in the history
  • Loading branch information
smn committed Jul 14, 2015
1 parent 505b3c9 commit 671ea4f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 16 deletions.
12 changes: 9 additions & 3 deletions consular/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@
help=('Automatically sync the apps in Marathon with what\'s '
'in Consul every _n_ seconds. Defaults to 0 (disabled).'),
type=int)
@click.option('--purge/--no-purge',
help=('Automatically purge dead services from Consul if they '
'are not known in Marathon '
'(needs sync-interval enabled).'),
default=False)
def main(scheme, host, port,
consul, marathon, registration_id, sync_interval): # pragma: no cover
consul, marathon, registration_id,
sync_interval, purge): # pragma: no cover
from consular.main import Consular
from twisted.internet.task import LoopingCall

Expand All @@ -37,7 +43,7 @@ def main(scheme, host, port,
consular.register_marathon_event_callback(events_url)

if sync_interval > 0:
lc = LoopingCall(consular.sync_apps)
lc.start(sync_interval, now=True)
sync_lc = LoopingCall(consular.sync_apps, purge)
sync_lc.start(sync_interval, now=True)

consular.app.run(host, port)
67 changes: 55 additions & 12 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from urllib import quote, urlencode
from twisted.internet import reactor
from twisted.web.client import HTTPConnectionPool
from twisted.web import client
# Twisted'de fault HTTP11 client factory is way too verbose
client._HTTP11ClientFactory.noisy = False
from twisted.internet.defer import (
succeed, inlineCallbacks, returnValue, gatherResults)
from twisted.python import log


import treq
from klein import Klein

Expand All @@ -22,7 +25,7 @@ class Consular(object):
def __init__(self, consul_endpoint, marathon_endpoint):
self.consul_endpoint = consul_endpoint
self.marathon_endpoint = marathon_endpoint
self.pool = HTTPConnectionPool(reactor, persistent=False)
self.pool = client.HTTPConnectionPool(reactor, persistent=False)
self.event_dispatch = {
'status_update_event': self.handle_status_update_event,
}
Expand Down Expand Up @@ -119,8 +122,7 @@ def update_task_running(self, request, event):
return d

def update_task_killed(self, request, event):
d = self.consul_request('PUT', '/v1/agent/service/deregister/%s' % (
event['taskId'],))
d = self.deregister_service(event['taskId'])
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

Expand All @@ -133,12 +135,26 @@ def handle_unknown_event(self, request, event):
'error': 'Event type %s not supported.' % (event_type,)
})

def sync_apps(self):
def register_service(self, name, id, address, port):
return self.consul_request('PUT', '/v1/agent/service/register', {
'Name': name,
'ID': id,
'Address': address,
'Port': port,
})

def deregister_service(self, service_id):
return self.consul_request('PUT', '/v1/agent/service/deregister/%s' % (
service_id,))

def sync_apps(self, purge=False):
d = self.marathon_request('GET', '/v2/apps')
d.addCallback(lambda response: response.json())
d.addCallback(
lambda data: gatherResults(
[self.sync_app(app) for app in data['apps']]))
if purge:
d.addCallback(lambda _: self.purge_dead_services())
return d

def get_app(self, app_id):
Expand All @@ -151,7 +167,7 @@ def sync_app(self, app):
return gatherResults([
self.sync_app_labels(app),
self.sync_app_tasks(app),
])
])

def sync_app_labels(self, app):
labels = app.get('labels', {})
Expand All @@ -170,9 +186,36 @@ def sync_app_tasks(self, app):
return d

def sync_app_task(self, app, task):
return self.consul_request('PUT', '/v1/agent/service/register', {
'Name': get_appid(app['id']),
'ID': task['id'],
'Address': task['host'],
'Port': task['ports'][0]
})
return self.register_service(
get_appid(app['id']), task['id'], task['host'], task['ports'][0])

@inlineCallbacks
def purge_dead_services(self):
response = yield self.consul_request('GET', '/v1/agent/services')
data = yield response.json()

# collect the task ids for the service name
services = {}
for service_id, service in data.items():
services.setdefault(service['Service'], set([])).add(service_id)

for app_id, task_ids in services.items():
yield self.purge_service_if_dead(app_id, task_ids)

@inlineCallbacks
def purge_service_if_dead(self, app_id, consul_task_ids):
response = yield self.marathon_request(
'GET', '/v2/apps/%s/tasks' % (app_id,))
data = yield response.json()
if 'tasks' not in data:
log.err('%s does not look like a Marathon application: %s' % (
app_id, data))
return

marathon_task_ids = set([task['id'] for task in data['tasks']])

tasks_to_be_purged = consul_task_ids - marathon_task_ids
if tasks_to_be_purged:
for task_id in tasks_to_be_purged:
yield self.deregister_service(task_id)
log.msg('Deleted: %s -> %s' % (app_id, task_id,))
60 changes: 59 additions & 1 deletion consular/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,68 @@ def test_sync_app(self):

@inlineCallbacks
def test_sync_apps(self):
d = self.consular.sync_apps()
d = self.consular.sync_apps(purge=False)
marathon_request = yield self.marathon_requests.get()
self.assertEqual(marathon_request['path'], '/v2/apps')
self.assertEqual(marathon_request['method'], 'GET')
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({'apps': []})))
yield d

@inlineCallbacks
def test_purge_dead_services(self):
d = self.consular.purge_dead_services()
consul_request = yield self.consul_requests.get()

# Expecting a request to list of all services in Consul,
# returning 2
self.assertEqual(consul_request['path'], '/v1/agent/services')
self.assertEqual(consul_request['method'], 'GET')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"testingapp.someid1": {
"ID": "testingapp.someid1",
"Service": "testingapp",
"Tags": None,
"Address": "machine-1",
"Port": 8102
},
"testingapp.someid2": {
"ID": "testingapp.someid2",
"Service": "testingapp",
"Tags": None,
"Address": "machine-2",
"Port": 8103
}
}))
)

# Expecting a request for the tasks for a given app, returning
# 1 less than Consul thinks exists.
testingapp_request = yield self.marathon_requests.get()
self.assertEqual(testingapp_request['path'],
'/v2/apps/testingapp/tasks')
self.assertEqual(testingapp_request['method'], 'GET')
testingapp_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"tasks": [{
"appId": "/testingapp",
"id": "testingapp.someid2",
"host": "machine-2",
"ports": [8103],
"startedAt": "2015-07-14T14:54:31.934Z",
"stagedAt": "2015-07-14T14:54:31.544Z",
"version": "2015-07-14T13:07:32.095Z"
}]
}))
)

# Expecting a service registering in Consul as a result for one
# of these services
deregister_request = yield self.consul_requests.get()
self.assertEqual(deregister_request['path'],
'/v1/agent/service/deregister/testingapp.someid1')
self.assertEqual(deregister_request['method'], 'PUT')
deregister_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))
yield d

0 comments on commit 671ea4f

Please sign in to comment.