Skip to content

Commit

Permalink
Merge 18148c9 into 8127615
Browse files Browse the repository at this point in the history
  • Loading branch information
smn committed Aug 23, 2016
2 parents 8127615 + 18148c9 commit 7d87d41
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 13 deletions.
9 changes: 6 additions & 3 deletions consular/main.py
Expand Up @@ -524,8 +524,9 @@ def _consul_key_to_marathon_label_key(self, consul_key):
def sync_app_tasks(self, app):
tasks = yield self.marathon_client.get_app_tasks(app['id'])
for task in tasks:
yield self.register_task_service(
app['id'], task['id'], task['host'], task['ports'])
if task['state'] == 'TASK_RUNNING':
yield self.register_task_service(
app['id'], task['id'], task['host'], task['ports'])

@inlineCallbacks
def purge_dead_app_labels(self, apps):
Expand Down Expand Up @@ -630,6 +631,8 @@ def _filter_marathon_tasks(self, marathon_tasks, consul_service_ids):
if not marathon_tasks:
return consul_service_ids

task_id_set = set([task['id'] for task in marathon_tasks])
task_id_set = set([task['id']
for task in marathon_tasks
if task['state'] == 'TASK_RUNNING'])
return [service_id for service_id in consul_service_ids
if service_id not in task_id_set]
177 changes: 169 additions & 8 deletions consular/tests/test_main.py
Expand Up @@ -79,8 +79,7 @@ def request(self, method, path, data=None):
return treq.request(
method, 'http://localhost:%s%s' % (
self.listener_port,
path
),
path),
data=(json.dumps(data) if data is not None else None),
pool=self.pool)

Expand Down Expand Up @@ -576,9 +575,12 @@ def test_sync_app_tasks(self):
# Respond with one task
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
]}))
'tasks': [{
'id': 'my-task-id',
'host': '0.0.0.0',
'ports': [1234],
'state': 'TASK_RUNNING',
}]}))
)

# Consular should register the task in Consul
Expand Down Expand Up @@ -620,7 +622,12 @@ def test_sync_app_tasks_grouped(self):
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
{
'id': 'my-task-id',
'host': '0.0.0.0',
'ports': [1234],
'state': 'TASK_RUNNING',
}
]}))
)

Expand All @@ -644,6 +651,61 @@ def test_sync_app_tasks_grouped(self):
FakeResponse(200, [], json.dumps({})))
yield d

@inlineCallbacks
def test_sync_app_tasks_task_lost(self):
"""
When syncing an app with a task that has the TASK_LOST state,
Consul should not be updated with a service entry
for the task.
"""
d = self.consular.sync_app_tasks({'id': '/my-app'})

# First Consular fetches the tasks for the app
marathon_request = yield self.requests.get()
self.assertEqual(marathon_request['method'], 'GET')
self.assertEqual(
marathon_request['url'],
'http://localhost:8080/v2/apps/my-app/tasks')

# Respond with one task
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{
'id': 'my-task-1',
'host': '0.0.0.0',
'ports': [1234],
'state': 'TASK_LOST',
},
{
'id': 'my-task-2',
'host': '0.0.0.0',
'ports': [5678],
'state': 'TASK_RUNNING',
}
]}))
)

# Consular should register the task in Consul
consul_request = yield self.requests.get()
self.assertEqual(
consul_request['url'],
'http://0.0.0.0:8500/v1/agent/service/register')
self.assertEqual(json.loads(consul_request['data']), {
'Name': 'my-app',
'ID': 'my-task-2',
'Address': '0.0.0.0',
'Port': 5678,
'Tags': [
'consular-reg-id=test',
'consular-app-id=/my-app',
],
})
self.assertEqual(consul_request['method'], 'PUT')
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))
yield d

@inlineCallbacks
def test_sync_app_tasks_no_ports(self):
"""
Expand All @@ -663,7 +725,12 @@ def test_sync_app_tasks_no_ports(self):
marathon_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
'tasks': [
{'id': 'my-task-id', 'host': '0.0.0.0', 'ports': []}
{
'id': 'my-task-id',
'host': '0.0.0.0',
'ports': [],
'state': 'TASK_RUNNING',
}
]}))
)

Expand Down Expand Up @@ -708,7 +775,8 @@ def test_sync_app_tasks_multiple_ports(self):
{
'id': 'my-task-id',
'host': '0.0.0.0',
'ports': [4567, 1234, 6789]
'ports': [4567, 1234, 6789],
'state': 'TASK_RUNNING',
}
]}))
)
Expand Down Expand Up @@ -1023,6 +1091,99 @@ def test_purge_dead_services(self):
"id": "taskid2",
"host": "machine-2",
"ports": [8103],
"state": "TASK_RUNNING",
"startedAt": "2015-07-14T14:54:31.934Z",
"stagedAt": "2015-07-14T14:54:31.544Z",
"version": "2015-07-14T13:07:32.095Z"
}]
}))
)

# Expecting a service registering in Consul as a result for one
# of these services
deregister_request = yield self.requests.get()
self.assertEqual(
deregister_request['url'],
('http://1.2.3.4:8500/v1/agent/service/deregister/'
'testinggroup-someid1'))
self.assertEqual(deregister_request['method'], 'PUT')
deregister_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))
yield d

@inlineCallbacks
def test_purge_task_lost_services(self):
"""
When a task has anything but the TASK_RUNNING state it should
be deregistered from Consul
"""
d = self.consular.purge_dead_services()
consul_request = yield self.requests.get()
consul_request['deferred'].callback(
FakeResponse(200, [], json.dumps([{
'Node': 'consul-node',
'Address': '1.2.3.4',
}]))
)

agent_request = yield self.requests.get()
# Expecting a request to list of all services in Consul,
# returning 2
self.assertEqual(
agent_request['url'],
'http://1.2.3.4:8500/v1/agent/services')
self.assertEqual(agent_request['method'], 'GET')
agent_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"testinggroup-someid1": {
"ID": "taskid1",
"Service": "testingapp",
"Tags": None,
"Address": "machine-1",
"Port": 8102,
"Tags": [
"consular-reg-id=test",
"consular-app-id=/testinggroup/someid1",
],
},
"testinggroup-someid1": {
"ID": "taskid2",
"Service": "testingapp",
"Tags": None,
"Address": "machine-2",
"Port": 8103,
"Tags": [
"consular-reg-id=test",
"consular-app-id=/testinggroup/someid1",
],
}
}))
)

# Expecting a request for the tasks for a given app,
# returning only 1 task with state `TASK_RUNNING`
testingapp_request = yield self.requests.get()
self.assertEqual(testingapp_request['url'],
'http://localhost:8080/v2/apps/testinggroup/someid1/'
'tasks')
self.assertEqual(testingapp_request['method'], 'GET')
testingapp_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"tasks": [{
"appId": "/testinggroup/someid1",
"id": "taskid1",
"host": "machine-1",
"ports": [8103],
"state": "TASK_RUNNING",
"startedAt": "2015-07-14T14:54:31.934Z",
"stagedAt": "2015-07-14T14:54:31.544Z",
"version": "2015-07-14T13:07:32.095Z"
}, {
"appId": "/testinggroup/someid1",
"id": "taskid2",
"host": "machine-2",
"ports": [8103],
"state": "TASK_LOST",
"startedAt": "2015-07-14T14:54:31.934Z",
"stagedAt": "2015-07-14T14:54:31.544Z",
"version": "2015-07-14T13:07:32.095Z"
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
@@ -1,4 +1,4 @@
pytest
pytest>=3.0.0
pytest-coverage
pytest-xdist
flake8
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
@@ -1,2 +1,2 @@
[pytest]
[tool:pytest]
addopts = --doctest-modules --verbose --ignore=ve/

0 comments on commit 7d87d41

Please sign in to comment.