Skip to content

Commit

Permalink
refactor cat strikes again
Browse files Browse the repository at this point in the history
  • Loading branch information
smn committed Jul 17, 2015
1 parent 1647afd commit af34ef4
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 96 deletions.
15 changes: 13 additions & 2 deletions consular/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,30 @@
@click.option('--timeout',
help='HTTP API client timeout',
default=5, type=int)
@click.option('--fallback/--no-fallback',
help=('Fallback to the default Consul agent for service '
'registration if the host running the mesos tasks '
'is not running a consul agent.'),
default=True)
@click.option('--fallback-timeout',
help=('How long to wait until assuming there is no consul '
'agent running on a mesos-slave machine'),
default=2, type=int)
def main(scheme, host, port,
consul, marathon, registration_id,
sync_interval, purge, logfile, debug, timeout): # pragma: no cover
sync_interval, purge, logfile, debug, timeout,
fallback, fallback_timeout): # pragma: no cover
from consular.main import Consular
from twisted.internet.task import LoopingCall
from twisted.internet import reactor
from twisted.python import log

log.startLogging(logfile)

consular = Consular(consul, marathon)
consular = Consular(consul, marathon, fallback)
consular.debug = debug
consular.timeout = timeout
consular.fallback_timeout = fallback_timeout
if registration_id:
events_url = "%s://%s:%s/events?%s" % (
scheme, host, port,
Expand Down
100 changes: 65 additions & 35 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from urllib import quote, urlencode
from twisted.internet import reactor
from twisted.web import client, server
# Twisted'de fault HTTP11 client factory is way too verbose
# Twisted's fault HTTP11 client factory is way too verbose
client._HTTP11ClientFactory.noisy = False
from twisted.internet.defer import (
succeed, inlineCallbacks, returnValue, gatherResults)
Expand All @@ -18,6 +18,10 @@ def get_appid(app_id_string):
return app_id_string.rsplit('/', 1)[1]


def get_agent_endpoint(host):
return 'http://%s:8500' % (host,)


class ConsularSite(server.Site):

debug = False
Expand All @@ -33,11 +37,14 @@ class Consular(object):
debug = False
clock = reactor
timeout = 5
fallback_timeout = 2
request = lambda *a, **kw: treq.request(*a, **kw)

def __init__(self, consul_endpoint, marathon_endpoint):
def __init__(self, consul_endpoint, marathon_endpoint, enable_fallback):
self.consul_endpoint = consul_endpoint
self.marathon_endpoint = marathon_endpoint
self.pool = client.HTTPConnectionPool(self.clock, persistent=False)
self.enable_fallback = enable_fallback
self.event_dispatch = {
'status_update_event': self.handle_status_update_event,
}
Expand Down Expand Up @@ -85,31 +92,36 @@ def log_http_response(self, response, method, path, data):
return response

def marathon_request(self, method, path, data=None):
d = treq.request(
method, ('%s%s' % (self.marathon_endpoint, path)).encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
},
data=(json.dumps(data) if data is not None else None),
pool=self.pool,
timeout=self.timeout)
if self.debug:
d.addCallback(self.log_http_response, method, path, data)
return self._request(
method, '%s%s' % (self.marathon_endpoint, path), data)

def consul_request(self, agent_endpoint, method, path, data=None):
d = self._request(
method, '%s%s' % (agent_endpoint, path), data,
timeout=self.fallback_timeout)
d.addErrback(self.consul_request_error_handler)
return d

def consul_request(self, method, path, data=None):
d = treq.request(
method, ('%s%s' % (self.consul_endpoint, path)).encode('utf-8'),
def consul_request_error_handler(self, failure):
print 'fail!', failure

def fallback_consul_request(self, method, path, data=None):
return self._request(
method, '%s%s' % (self.consul_endpoint, path), data)

def _request(self, method, url, data, timeout=None):
d = self.request(
method,
url.encode('utf-8'),
headers={
'Content-Type': 'application/json',
'Accept': 'application/json',
},
data=(json.dumps(data) if data is not None else None),
pool=self.pool,
timeout=self.timeout)
timeout=timeout or self.timeout)
if self.debug:
d.addCallback(self.log_http_response, method, path, data)
d.addCallback(self.log_http_response, method, url, data)
return d

@app.route('/')
Expand Down Expand Up @@ -153,7 +165,9 @@ def update_task_running(self, request, event):

def update_task_killed(self, request, event):
d = self.deregister_service(
get_appid(event['appId']), event['taskId'])
get_agent_endpoint(event['host']),
get_appid(event['appId']),
event['taskId'])
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

Expand All @@ -166,20 +180,20 @@ def handle_unknown_event(self, request, event):
'error': 'Event type %s not supported.' % (event_type,)
})

def register_service(self, name, id, address, port):
log.msg('Registering %s with %s at %s:%s.' % (
name, id, address, port))
return self.consul_request('PUT', '/v1/agent/service/register', {
'Name': name,
'ID': id,
def register_service(self, node, app_id, service_id, address, port):
log.msg('Registering %s at %s with %s at %s:%s.' % (
app_id, node, service_id, address, port))
return self.consul_request(node, 'PUT', '/v1/agent/service/register', {
'Name': app_id,
'ID': service_id,
'Address': address,
'Port': port,
})

def deregister_service(self, app_id, service_id):
log.msg('Deregistering %s with %s' % (app_id, service_id,))
return self.consul_request('PUT', '/v1/agent/service/deregister/%s' % (
service_id,))
def deregister_service(self, node, app_id, service_id):
log.msg('Deregistering %s at %s with %s' % (app_id, node, service_id,))
return self.consul_request(
node, 'PUT', '/v1/agent/service/deregister/%s' % (service_id,))

def sync_apps(self, purge=False):
d = self.marathon_request('GET', '/v2/apps')
Expand All @@ -205,8 +219,11 @@ def sync_app(self, app):

def sync_app_labels(self, app):
labels = app.get('labels', {})
# NOTE: KV requests can go straight to the consul registry
# we're already connected to, they're not local to the agents.
return gatherResults([
self.consul_request(
self.consul_endpoint,
'PUT', '/v1/kv/consular/%s/%s' % (
quote(get_appid(app['id'])), quote(key)), value)
for key, value in labels.items()
Expand All @@ -221,11 +238,24 @@ def sync_app_tasks(self, app):

def sync_app_task(self, app, task):
return self.register_service(
get_appid(app['id']), task['id'], task['host'], task['ports'][0])
get_agent_endpoint(task['host']),
get_appid(app['id']), task['id'],
task['host'], task['ports'][0])

@inlineCallbacks
def purge_dead_services(self):
response = yield self.consul_request('GET', '/v1/agent/services')
d = self.consul_request(
self.consul_endpoint, 'GET', '/v1/catalog/nodes')
d.addCallback(lambda response: response.json())
d.addCallback(lambda data: gatherResults([
self.purge_dead_agent_services(node['Address']) for node in data
]))
return d

@inlineCallbacks
def purge_dead_agent_services(self, node):
agent_endpoint = get_agent_endpoint(node)
response = yield self.consul_request(
agent_endpoint, 'GET', '/v1/agent/services')
data = yield response.json()

# collect the task ids for the service name
Expand All @@ -234,10 +264,10 @@ def purge_dead_services(self):
services.setdefault(service['Service'], set([])).add(service_id)

for app_id, task_ids in services.items():
yield self.purge_service_if_dead(app_id, task_ids)
yield self.purge_service_if_dead(agent_endpoint, app_id, task_ids)

@inlineCallbacks
def purge_service_if_dead(self, app_id, consul_task_ids):
def purge_service_if_dead(self, node, app_id, consul_task_ids):
response = yield self.marathon_request(
'GET', '/v2/apps/%s/tasks' % (app_id,))
data = yield response.json()
Expand All @@ -249,4 +279,4 @@ def purge_service_if_dead(self, app_id, consul_task_ids):
marathon_task_ids = set([task['id'] for task in data['tasks']])
tasks_to_be_purged = consul_task_ids - marathon_task_ids
for task_id in tasks_to_be_purged:
yield self.deregister_service(app_id, task_id)
yield self.deregister_service(node, app_id, task_id)

0 comments on commit af34ef4

Please sign in to comment.