Skip to content

Commit

Permalink
Merge pull request #48 from universalcore/feature/issue-48-running-ev…
Browse files Browse the repository at this point in the history
…ent-smarter

Be smarter about adding new Marathon tasks to Consul
  • Loading branch information
JayH5 committed Oct 27, 2015
2 parents 8ff0c8c + 0ecbd88 commit dabfdf7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 66 deletions.
91 changes: 52 additions & 39 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,30 @@ def noop(self, request, event):
'status': 'ok'
}))

@inlineCallbacks
def update_task_running(self, request, event):
# NOTE: Marathon sends a list of ports, I don't know yet when & if
# there are multiple values in that list.
d = self.marathon_client.get_app(event['appId'])
d.addCallback(lambda app: self.sync_app(app))
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d
""" Use a running event to register a new Consul service. """
# Register the task as a service
yield self.register_task_service(
event['appId'], event['taskId'], event['host'], event['ports'][0])

# Sync the app labels in case they've changed or aren't stored yet
app = yield handle_not_found_error(
self.marathon_client.get_app, event['appId'])

# The app could have disappeared in this time if it was destroyed. If
# it has been destroyed, do nothing and wait for the TASK_KILLED event
# to clear it.
if app is not None:
yield self.sync_app_labels(app)
else:
log.msg('Warning. App with ID "%s" could not be found for new '
'task with ID "%s"' % (event['appId'], event['taskId'],))

returnValue(json.dumps({'status': 'ok'}))

def update_task_killed(self, request, event):
d = self.deregister_service(
get_agent_endpoint(event['host']),
get_app_name(event['appId']),
event['taskId'])
d = self.deregister_task_service(event['taskId'], event['host'])
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

Expand Down Expand Up @@ -297,45 +308,51 @@ def _create_service_registration(self, app_id, service_id, address, port):
}
return registration

def register_service(self, agent_endpoint,
app_id, service_id, address, port):
def register_task_service(self, app_id, task_id, host, port):
"""
Register a task in Marathon as a service in Consul
Register a Marathon task as a service in Consul.
:param str agent_endpoint:
The HTTP endpoint of where Consul on the Mesos worker machine
can be accessed.
:param str app_id:
Marathon's App-id for the task.
:param str service_id:
The service-id to register it as in Consul.
:param str address:
The ID of the Marathon app that the task belongs to.
:param str task_id:
The ID of the task, this will be used as the Consul service ID.
:param str host:
The host address of the machine the task is running on.
:param int port:
The port number the task can be accessed on on the host machine.
"""
agent_endpoint = get_agent_endpoint(host)
log.msg('Registering %s at %s with %s at %s:%s.' % (
app_id, agent_endpoint, service_id, address, port))
registration = self._create_service_registration(app_id, service_id,
address, port)
app_id, agent_endpoint, task_id, host, port))
registration = self._create_service_registration(app_id, task_id,
host, port)

return self.consul_client.register_agent_service(
agent_endpoint, registration)

def deregister_service(self, agent_endpoint, app_id, service_id):
def deregister_task_service(self, task_id, host):
"""
Deregister a service from Consul
Deregister a Marathon task's service from Consul.
:param str task_id:
The ID of the task, this will be used as the Consul service ID.
:param str host:
The host address of the machine the task is running on.
"""
return self.deregister_consul_service(
get_agent_endpoint(host), task_id)

def deregister_consul_service(self, agent_endpoint, service_id):
"""
Deregister a service from a Consul agent.
:param str agent_endpoint:
The HTTP endpoint of where Consul on the Mesos worker machine
can be accessed.
:param str app_id:
Marathon's App-id for the task.
The HTTP endpoint of the Consul agent.
:param str service_id:
The service-id to register it as in Consul.
The ID of the Consul service to be deregistered.
"""
log.msg('Deregistering %s at %s with %s' % (
app_id, agent_endpoint, service_id,))
log.msg('Deregistering service with ID "%s" at Consul endpoint %s ' % (
service_id, agent_endpoint,))
return self.consul_client.deregister_agent_service(
agent_endpoint, service_id)

Expand Down Expand Up @@ -501,12 +518,8 @@ def _consul_key_to_marathon_label_key(self, consul_key):
def sync_app_tasks(self, app):
tasks = yield self.marathon_client.get_app_tasks(app['id'])
for task in tasks:
yield self.sync_app_task(app, task)

def sync_app_task(self, app, task):
return self.register_service(
get_agent_endpoint(task['host']), app['id'], task['id'],
task['host'], task['ports'][0])
yield self.register_task_service(
app['id'], task['id'], task['host'], task['ports'][0])

@inlineCallbacks
def purge_dead_app_labels(self, apps):
Expand Down Expand Up @@ -606,7 +619,7 @@ def purge_service_if_dead(self, agent_endpoint, app_id, consul_task_ids):

# Deregister the remaining old services
for service_id in service_ids:
yield self.deregister_service(agent_endpoint, app_id, service_id)
yield self.deregister_consul_service(agent_endpoint, service_id)

def _filter_marathon_tasks(self, marathon_tasks, consul_service_ids):
if not marathon_tasks:
Expand Down
124 changes: 97 additions & 27 deletions consular/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,25 @@ def test_TASK_RUNNING(self):
"version": "2014-04-04T06:26:23.051Z"
})

# Store the task as a service in Consul
consul_request = yield self.requests.get()
self.assertEqual(consul_request['method'], 'PUT')
self.assertEqual(
consul_request['url'],
'http://slave-1234.acme.org:8500/v1/agent/service/register')
self.assertEqual(consul_request['data'], json.dumps({
'Name': 'my-app',
'ID': 'my-app_0-1396592784349',
'Address': 'slave-1234.acme.org',
'Port': 31372,
'Tags': [
'consular-reg-id=test',
'consular-app-id=/my-app',
],
}))
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))

# We should get the app info for the event
marathon_app_request = yield self.requests.get()
self.assertEqual(marathon_app_request['method'], 'GET')
Expand All @@ -207,26 +226,32 @@ def test_TASK_RUNNING(self):
consul_kv_request['deferred'].callback(
FakeResponse(200, [], json.dumps([])))

# Then we collect the tasks for the app
marathon_tasks_request = yield self.requests.get()
self.assertEqual(marathon_tasks_request['method'], 'GET')
self.assertEqual(marathon_tasks_request['url'],
'http://localhost:8080/v2/apps/my-app/tasks')
marathon_tasks_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [{
'id': 'my-app_0-1396592784349',
'host': 'slave-1234.acme.org',
'ports': [31372],
}]
})))
response = yield d
self.assertEqual((yield response.json()), {
'status': 'ok'
})

request = yield self.requests.get()
self.assertEqual(request['method'], 'PUT')
@inlineCallbacks
def test_TASK_RUNNING_app_not_found(self):
d = self.request('POST', '/events', {
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_RUNNING",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
})

# Store the task as a service in Consul
consul_request = yield self.requests.get()
self.assertEqual(consul_request['method'], 'PUT')
self.assertEqual(
request['url'],
consul_request['url'],
'http://slave-1234.acme.org:8500/v1/agent/service/register')
self.assertEqual(request['data'], json.dumps({
self.assertEqual(consul_request['data'], json.dumps({
'Name': 'my-app',
'ID': 'my-app_0-1396592784349',
'Address': 'slave-1234.acme.org',
Expand All @@ -236,8 +261,19 @@ def test_TASK_RUNNING(self):
'consular-app-id=/my-app',
],
}))
request['deferred'].callback(
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))

# We try to get the app info for the event but the app is gone
marathon_app_request = yield self.requests.get()
self.assertEqual(marathon_app_request['method'], 'GET')
self.assertEqual(marathon_app_request['url'],
'http://localhost:8080/v2/apps/my-app')
marathon_app_request['deferred'].callback(
FakeResponse(404, [], json.dumps({'message': 'Not found'})))

# So we do nothing...

response = yield d
self.assertEqual((yield response.json()), {
'status': 'ok'
Expand Down Expand Up @@ -402,10 +438,29 @@ def test_register_with_marathon_unexpected_response(self):
'http://localhost:8080/v2/eventSubscriptions')

@inlineCallbacks
def test_sync_app_task(self):
app = {'id': '/my-app'}
task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
d = self.consular.sync_app_task(app, task)
def test_sync_app_tasks(self):
"""
When syncing an app with a task, Consul is updated with a service entry
for the task.
"""
d = self.consular.sync_app_tasks({'id': '/my-app'})

# First Consular fetches the tasks for the app
marathon_request = yield self.requests.get()
self.assertEqual(marathon_request['method'], 'GET')
self.assertEqual(
marathon_request['url'],
'http://localhost:8080/v2/apps/my-app/tasks')

# Respond with one task
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
]}))
)

# Consular should register the task in Consul
consul_request = yield self.requests.get()
self.assertEqual(
consul_request['url'],
Expand All @@ -431,9 +486,24 @@ def test_sync_app_task_grouped(self):
When syncing an app in a group with a task, Consul is updated with a
service entry for the task.
"""
app = {'id': '/my-group/my-app'}
task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
d = self.consular.sync_app_task(app, task)
d = self.consular.sync_app_tasks({'id': '/my-group/my-app'})

# First Consular fetches the tasks for the app
marathon_request = yield self.requests.get()
self.assertEqual(marathon_request['method'], 'GET')
self.assertEqual(
marathon_request['url'],
'http://localhost:8080/v2/apps/my-group/my-app/tasks')

# Respond with one task
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
]}))
)

# Consular should register the task in Consul
consul_request = yield self.requests.get()
self.assertEqual(
consul_request['url'],
Expand Down Expand Up @@ -1036,8 +1106,8 @@ def test_purge_dead_app_labels_forbidden(self):
@inlineCallbacks
def test_fallback_to_main_consul(self):
self.consular.consul_client.enable_fallback = True
self.consular.register_service(
'http://foo:8500', '/app_id', 'service_id', 'foo', 1234)
self.consular.register_task_service(
'/app_id', 'service_id', 'foo', 1234)
request = yield self.requests.get()
self.assertEqual(
request['url'],
Expand Down

0 comments on commit dabfdf7

Please sign in to comment.