diff --git a/consular/main.py b/consular/main.py index 4b66254..824bfcc 100644 --- a/consular/main.py +++ b/consular/main.py @@ -532,8 +532,9 @@ def sync_app_tasks(self, app): return for task in tasks: - yield self.register_task_service( - app['id'], task['id'], task['host'], task['ports']) + if task['state'] == 'TASK_RUNNING': + yield self.register_task_service( + app['id'], task['id'], task['host'], task['ports']) @inlineCallbacks def purge_dead_app_labels(self, apps): @@ -638,6 +639,8 @@ def _filter_marathon_tasks(self, marathon_tasks, consul_service_ids): if not marathon_tasks: return consul_service_ids - task_id_set = set([task['id'] for task in marathon_tasks]) + task_id_set = set([task['id'] + for task in marathon_tasks + if task['state'] == 'TASK_RUNNING']) return [service_id for service_id in consul_service_ids if service_id not in task_id_set] diff --git a/consular/tests/test_main.py b/consular/tests/test_main.py index 64b9060..776069a 100644 --- a/consular/tests/test_main.py +++ b/consular/tests/test_main.py @@ -79,8 +79,7 @@ def request(self, method, path, data=None): return treq.request( method, 'http://localhost:%s%s' % ( self.listener_port, - path - ), + path), data=(json.dumps(data) if data is not None else None), pool=self.pool) @@ -576,9 +575,12 @@ def test_sync_app_tasks(self): # Respond with one task marathon_request['deferred'].callback( FakeResponse(200, [], json.dumps({ - 'tasks': [ - {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} - ]})) + 'tasks': [{ + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_RUNNING', + }]})) ) # Consular should register the task in Consul @@ -620,7 +622,12 @@ def test_sync_app_tasks_grouped(self): marathon_request['deferred'].callback( FakeResponse(200, [], json.dumps({ 'tasks': [ - {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]} + { + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_RUNNING', + } ]})) ) @@ -644,6 +651,61 @@ def test_sync_app_tasks_grouped(self): FakeResponse(200, [], json.dumps({}))) yield d + @inlineCallbacks + def test_sync_app_tasks_task_lost(self): + """ + When syncing an app with a task that has the TASK_LOST state, + Consul should not be updated with a service entry + for the task. + """ + d = self.consular.sync_app_tasks({'id': '/my-app'}) + + # First Consular fetches the tasks for the app + marathon_request = yield self.requests.get() + self.assertEqual(marathon_request['method'], 'GET') + self.assertEqual( + marathon_request['url'], + 'http://localhost:8080/v2/apps/my-app/tasks') + + # Respond with one task + marathon_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + 'tasks': [ + { + 'id': 'my-task-1', + 'host': '0.0.0.0', + 'ports': [1234], + 'state': 'TASK_LOST', + }, + { + 'id': 'my-task-2', + 'host': '0.0.0.0', + 'ports': [5678], + 'state': 'TASK_RUNNING', + } + ]})) + ) + + # Consular should register the task in Consul + consul_request = yield self.requests.get() + self.assertEqual( + consul_request['url'], + 'http://0.0.0.0:8500/v1/agent/service/register') + self.assertEqual(json.loads(consul_request['data']), { + 'Name': 'my-app', + 'ID': 'my-task-2', + 'Address': '0.0.0.0', + 'Port': 5678, + 'Tags': [ + 'consular-reg-id=test', + 'consular-app-id=/my-app', + ], + }) + self.assertEqual(consul_request['method'], 'PUT') + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + @inlineCallbacks def test_sync_app_tasks_no_ports(self): """ @@ -663,7 +725,12 @@ def test_sync_app_tasks_no_ports(self): marathon_request['deferred'].callback( FakeResponse(200, [], json.dumps({ 'tasks': [ - {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': []} + { + 'id': 'my-task-id', + 'host': '0.0.0.0', + 'ports': [], + 'state': 'TASK_RUNNING', + } ]})) ) @@ -708,7 +775,8 @@ def test_sync_app_tasks_multiple_ports(self): { 'id': 'my-task-id', 'host': '0.0.0.0', - 'ports': [4567, 1234, 6789] + 'ports': [4567, 1234, 6789], + 'state': 'TASK_RUNNING', } ]})) ) @@ -1048,6 +1116,99 @@ def test_purge_dead_services(self): "id": "taskid2", "host": "machine-2", "ports": [8103], + "state": "TASK_RUNNING", + "startedAt": "2015-07-14T14:54:31.934Z", + "stagedAt": "2015-07-14T14:54:31.544Z", + "version": "2015-07-14T13:07:32.095Z" + }] + })) + ) + + # Expecting a service registering in Consul as a result for one + # of these services + deregister_request = yield self.requests.get() + self.assertEqual( + deregister_request['url'], + ('http://1.2.3.4:8500/v1/agent/service/deregister/' + 'testinggroup-someid1')) + self.assertEqual(deregister_request['method'], 'PUT') + deregister_request['deferred'].callback( + FakeResponse(200, [], json.dumps({}))) + yield d + + @inlineCallbacks + def test_purge_task_lost_services(self): + """ + When a task has anything but the TASK_RUNNING state it should + be deregistered from Consul + """ + d = self.consular.purge_dead_services() + consul_request = yield self.requests.get() + consul_request['deferred'].callback( + FakeResponse(200, [], json.dumps([{ + 'Node': 'consul-node', + 'Address': '1.2.3.4', + }])) + ) + + agent_request = yield self.requests.get() + # Expecting a request to list of all services in Consul, + # returning 2 + self.assertEqual( + agent_request['url'], + 'http://1.2.3.4:8500/v1/agent/services') + self.assertEqual(agent_request['method'], 'GET') + agent_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "testinggroup-someid1": { + "ID": "taskid1", + "Service": "testingapp", + "Tags": None, + "Address": "machine-1", + "Port": 8102, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], + }, + "testinggroup-someid1": { + "ID": "taskid2", + "Service": "testingapp", + "Tags": None, + "Address": "machine-2", + "Port": 8103, + "Tags": [ + "consular-reg-id=test", + "consular-app-id=/testinggroup/someid1", + ], + } + })) + ) + + # Expecting a request for the tasks for a given app, + # returning only 1 task with state `TASK_RUNNING` + testingapp_request = yield self.requests.get() + self.assertEqual(testingapp_request['url'], + 'http://localhost:8080/v2/apps/testinggroup/someid1/' + 'tasks') + self.assertEqual(testingapp_request['method'], 'GET') + testingapp_request['deferred'].callback( + FakeResponse(200, [], json.dumps({ + "tasks": [{ + "appId": "/testinggroup/someid1", + "id": "taskid1", + "host": "machine-1", + "ports": [8103], + "state": "TASK_RUNNING", + "startedAt": "2015-07-14T14:54:31.934Z", + "stagedAt": "2015-07-14T14:54:31.544Z", + "version": "2015-07-14T13:07:32.095Z" + }, { + "appId": "/testinggroup/someid1", + "id": "taskid2", + "host": "machine-2", + "ports": [8103], + "state": "TASK_LOST", "startedAt": "2015-07-14T14:54:31.934Z", "stagedAt": "2015-07-14T14:54:31.544Z", "version": "2015-07-14T13:07:32.095Z" diff --git a/requirements-dev.txt b/requirements-dev.txt index 8417d9e..78bdd69 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,4 @@ -pytest +pytest>=3.0.0 pytest-coverage pytest-xdist flake8 diff --git a/setup.cfg b/setup.cfg index e18604d..28516f8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ -[pytest] +[tool:pytest] addopts = --doctest-modules --verbose --ignore=ve/