diff --git a/consular/main.py b/consular/main.py index da90892..d5c5ca1 100644 --- a/consular/main.py +++ b/consular/main.py @@ -222,19 +222,30 @@ def noop(self, request, event): 'status': 'ok' })) + @inlineCallbacks def update_task_running(self, request, event): - # NOTE: Marathon sends a list of ports, I don't know yet when & if - # there are multiple values in that list. - d = self.marathon_client.get_app(event['appId']) - d.addCallback(lambda app: self.sync_app(app)) - d.addCallback(lambda _: json.dumps({'status': 'ok'})) - return d + """ Use a running event to register a new Consul service. """ + # Register the task as a service + yield self.register_task_service( + event['appId'], event['taskId'], event['host'], event['ports'][0]) + + # Sync the app labels in case they've changed or aren't stored yet + app = yield handle_not_found_error( + self.marathon_client.get_app, event['appId']) + + # The app could have disappeared in this time if it was destroyed. If + # it has been destroyed, do nothing and wait for the TASK_KILLED event + # to clear it. + if app is not None: + yield self.sync_app_labels(app) + else: + log.msg('Warning. App with ID "%s" could not be found for new ' + 'task with ID "%s"' % (event['appId'], event['taskId'],)) + + returnValue(json.dumps({'status': 'ok'})) def update_task_killed(self, request, event): - d = self.deregister_service( - get_agent_endpoint(event['host']), - get_app_name(event['appId']), - event['taskId']) + d = self.deregister_task_service(event['taskId'], event['host']) d.addCallback(lambda _: json.dumps({'status': 'ok'})) return d @@ -297,45 +308,51 @@ def _create_service_registration(self, app_id, service_id, address, port): } return registration - def register_service(self, agent_endpoint, - app_id, service_id, address, port): + def register_task_service(self, app_id, task_id, host, port): """ - Register a task in Marathon as a service in Consul + Register a Marathon task as a service in Consul. - :param str agent_endpoint: - The HTTP endpoint of where Consul on the Mesos worker machine - can be accessed. :param str app_id: - Marathon's App-id for the task. - :param str service_id: - The service-id to register it as in Consul. - :param str address: + The ID of the Marathon app that the task belongs to. + :param str task_id: + The ID of the task, this will be used as the Consul service ID. + :param str host: The host address of the machine the task is running on. :param int port: The port number the task can be accessed on on the host machine. """ + agent_endpoint = get_agent_endpoint(host) log.msg('Registering %s at %s with %s at %s:%s.' % ( - app_id, agent_endpoint, service_id, address, port)) - registration = self._create_service_registration(app_id, service_id, - address, port) + app_id, agent_endpoint, task_id, host, port)) + registration = self._create_service_registration(app_id, task_id, + host, port) return self.consul_client.register_agent_service( agent_endpoint, registration) - def deregister_service(self, agent_endpoint, app_id, service_id): + def deregister_task_service(self, task_id, host): """ - Deregister a service from Consul + Deregister a Marathon task's service from Consul. + + :param str task_id: + The ID of the task, this will be used as the Consul service ID. + :param str host: + The host address of the machine the task is running on. + """ + return self.deregister_consul_service( + get_agent_endpoint(host), task_id) + + def deregister_consul_service(self, agent_endpoint, service_id): + """ + Deregister a service from a Consul agent. :param str agent_endpoint: - The HTTP endpoint of where Consul on the Mesos worker machine - can be accessed. - :param str app_id: - Marathon's App-id for the task. + The HTTP endpoint of the Consul agent. :param str service_id: - The service-id to register it as in Consul. + The ID of the Consul service to be deregistered. """ - log.msg('Deregistering %s at %s with %s' % ( - app_id, agent_endpoint, service_id,)) + log.msg('Deregistering service with ID "%s" at Consul endpoint %s ' % ( + service_id, agent_endpoint,)) return self.consul_client.deregister_agent_service( agent_endpoint, service_id) @@ -501,12 +518,8 @@ def _consul_key_to_marathon_label_key(self, consul_key): def sync_app_tasks(self, app): tasks = yield self.marathon_client.get_app_tasks(app['id']) for task in tasks: - yield self.sync_app_task(app, task) - - def sync_app_task(self, app, task): - return self.register_service( - get_agent_endpoint(task['host']), app['id'], task['id'], - task['host'], task['ports'][0]) + yield self.register_task_service( + app['id'], task['id'], task['host'], task['ports'][0]) @inlineCallbacks def purge_dead_app_labels(self, apps): @@ -606,7 +619,7 @@ def purge_service_if_dead(self, agent_endpoint, app_id, consul_task_ids): # Deregister the remaining old services for service_id in service_ids: - yield self.deregister_service(agent_endpoint, app_id, service_id) + yield self.deregister_consul_service(agent_endpoint, service_id) def _filter_marathon_tasks(self, marathon_tasks, consul_service_ids): if not marathon_tasks: diff --git a/consular/tests/test_main.py b/consular/tests/test_main.py index f04f001..afb3830 100644 --- a/consular/tests/test_main.py +++ b/consular/tests/test_main.py @@ -187,6 +187,25 @@ def test_TASK_RUNNING(self): "version": "2014-04-04T06:26:23.051Z" }) + # Store the task as a service in Consul + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') + self.assertEqual( + consul_request['url'], + 'http://slave-1234.acme.org:8500/v1/agent/service/register') + self.assertEqual(consul_request['data'], json.dumps({ + 'Name': 'my-app', + 'ID': 'my-app_0-1396592784349', + 'Address': 'slave-1234.acme.org', + 'Port': 31372, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + })) + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + # We should get the app info for the event marathon_app_request = yield self.requests.get() self.assertEqual(marathon_app_request['method'], 'GET') @@ -207,26 +226,32 @@ def test_TASK_RUNNING(self): consul_kv_request['deferred'].callback( FakeResponse(200, [], json.dumps([]))) - # Then we collect the tasks for the app - marathon_tasks_request = yield self.requests.get() - self.assertEqual(marathon_tasks_request['method'], 'GET') - self.assertEqual(marathon_tasks_request['url'], - 'http://localhost:8080/v2/apps/my-app/tasks') - marathon_tasks_request['deferred'].callback( - FakeResponse(200, [], json.dumps({ - 'tasks': [{ - 'id': 'my-app_0-1396592784349', - 'host': 'slave-1234.acme.org', - 'ports': [31372], - }] - }))) + response = yield d + self.assertEqual((yield response.json()), { + 'status': 'ok' + }) - request = yield self.requests.get() - self.assertEqual(request['method'], 'PUT') + @inlineCallbacks + def test_TASK_RUNNING_app_not_found(self): + d = self.request('POST', '/events', { + "eventType": "status_update_event", + "timestamp": "2014-03-01T23:29:30.158Z", + "slaveId": "20140909-054127-177048842-5050-1494-0", + "taskId": "my-app_0-1396592784349", + "taskStatus": "TASK_RUNNING", + "appId": "/my-app", + "host": "slave-1234.acme.org", + "ports": [31372], + "version": "2014-04-04T06:26:23.051Z" + }) + + # Store the task as a service in Consul + consul_request = yield self.requests.get() + self.assertEqual(consul_request['method'], 'PUT') self.assertEqual( - request['url'], + consul_request['url'], 'http://slave-1234.acme.org:8500/v1/agent/service/register') - self.assertEqual(request['data'], json.dumps({ + self.assertEqual(consul_request['data'], json.dumps({ 'Name': 'my-app', 'ID': 'my-app_0-1396592784349', 'Address': 'slave-1234.acme.org', @@ -236,8 +261,19 @@ def test_TASK_RUNNING(self): 'consular-app-id=/my-app', ], })) - request['deferred'].callback( + consul_request['deferred'].callback( FakeResponse(200, [], json.dumps({}))) + + # We try to get the app info for the event but the app is gone + marathon_app_request = yield self.requests.get() + self.assertEqual(marathon_app_request['method'], 'GET') + self.assertEqual(marathon_app_request['url'], + 'http://localhost:8080/v2/apps/my-app') + marathon_app_request['deferred'].callback( + FakeResponse(404, [], json.dumps({'message': 'Not found'}))) + + # So we do nothing... + response = yield d self.assertEqual((yield response.json()), { 'status': 'ok' @@ -402,10 +438,29 @@ def test_register_with_marathon_unexpected_response(self): 'http://localhost:8080/v2/eventSubscriptions') @inlineCallbacks - def test_sync_app_task(self): - app = {'id': '/my-app'} - task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} - d = self.consular.sync_app_task(app, task) + def test_sync_app_tasks(self): + """ + When syncing an app with a task, Consul is updated with a service entry + for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} + ]})) + ) + + # Consular should register the task in Consul consul_request = yield self.requests.get() self.assertEqual( consul_request['url'], @@ -431,9 +486,24 @@ def test_sync_app_task_grouped(self): When syncing an app in a group with a task, Consul is updated with a service entry for the task. """ - app = {'id': '/my-group/my-app'} - task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} - d = self.consular.sync_app_task(app, task) + d = self.consular.sync_app_tasks({'id': '/my-group/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-group/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} + ]})) + ) + + # Consular should register the task in Consul consul_request = yield self.requests.get() self.assertEqual( consul_request['url'], @@ -1036,8 +1106,8 @@ def test_purge_dead_app_labels_forbidden(self): @inlineCallbacks def test_fallback_to_main_consul(self): self.consular.consul_client.enable_fallback = True - self.consular.register_service( - 'http://foo:8500', '/app_id', 'service_id', 'foo', 1234) + self.consular.register_task_service( + '/app_id', 'service_id', 'foo', 1234) request = yield self.requests.get() self.assertEqual( request['url'],