Skip to content

Commit

Permalink
use catalog instead of agent
Browse files Browse the repository at this point in the history
  • Loading branch information
smn committed Jul 17, 2015
1 parent 7943414 commit 58bd83d
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 59 deletions.
61 changes: 35 additions & 26 deletions consular/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def update_task_running(self, request, event):
return d

def update_task_killed(self, request, event):
d = self.deregister_service(event['taskId'])
d = self.deregister_service(event['host'], event['taskId'])
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

Expand All @@ -135,19 +135,26 @@ def handle_unknown_event(self, request, event):
'error': 'Event type %s not supported.' % (event_type,)
})

def register_service(self, name, id, address, port):
def register_service(self, node_id, name, id, address, port):
log.msg('Registering %s.' % (name,))
return self.consul_request('PUT', '/v1/agent/service/register', {
'Name': name,
'ID': id,
'Address': address,
'Port': port,
})

def deregister_service(self, service_id):
return self.consul_request(
'PUT', '/v1/catalog/service/register', {
'Node': node_id,
'Service': {
'Service': name,
'ID': id,
'Address': address,
'Port': port,
}
})

def deregister_service(self, node_id, service_id):
log.msg('Deregistering %s.' % (service_id,))
return self.consul_request('PUT', '/v1/agent/service/deregister/%s' % (
service_id,))
return self.consul_request(
'PUT', '/v1/catalog/deregister', {
'Node': node_id,
'ServiceID': service_id,
})

def sync_apps(self, purge=False):
d = self.marathon_request('GET', '/v2/apps')
Expand Down Expand Up @@ -189,32 +196,34 @@ def sync_app_tasks(self, app):

def sync_app_task(self, app, task):
return self.register_service(
task['host'],
get_appid(app['id']), task['id'], task['host'], task['ports'][0])

@inlineCallbacks
def purge_dead_services(self):
response = yield self.consul_request('GET', '/v1/agent/services')
data = yield response.json()
service_names_response = yield self.consul_request(
'GET', '/v1/catalog/services')
service_names_data = yield service_names_response.json()

# collect the task ids for the service name
services = {}
for service_id, service in data.items():
services.setdefault(service['Service'], set([])).add(service_id)

for app_id, task_ids in services.items():
yield self.purge_service_if_dead(app_id, task_ids)
for service_name in service_names_data.keys():
services_response = yield self.consul_request(
'GET', '/v1/catalog/service/%s' % (service_name,))
services = yield services_response.json()
yield self.purge_service_if_dead(service_name, services)

@inlineCallbacks
def purge_service_if_dead(self, app_id, consul_task_ids):
def purge_service_if_dead(self, service_name, services):
response = yield self.marathon_request(
'GET', '/v2/apps/%s/tasks' % (app_id,))
'GET', '/v2/apps/%s/tasks' % (service_name,))
data = yield response.json()
if 'tasks' not in data:
log.msg(('App %s does not look like a Marathon application, '
'skipping') % (str(app_id),))
'skipping') % (str(service_name),))
return

marathon_task_ids = set([task['id'] for task in data['tasks']])
tasks_to_be_purged = consul_task_ids - marathon_task_ids
for task_id in tasks_to_be_purged:
yield self.deregister_service(task_id)
for service in services:
if service['ServiceID'] not in marathon_task_ids:
yield self.deregister_service(
service['Node'], service['ServiceID'])
95 changes: 62 additions & 33 deletions consular/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,15 @@ def test_TASK_RUNNING(self):

request = yield self.consul_requests.get()
self.assertEqual(request['method'], 'PUT')
self.assertEqual(request['path'], '/v1/agent/service/register')
self.assertEqual(request['path'], '/v1/catalog/service/register')
self.assertEqual(request['data'], {
'Name': 'my-app',
'ID': 'my-app_0-1396592784349',
'Address': 'slave-1234.acme.org',
'Port': 31372,
'Node': 'slave-1234.acme.org',
'Service': {
'Service': 'my-app',
'ID': 'my-app_0-1396592784349',
'Address': 'slave-1234.acme.org',
'Port': 31372,
}
})
request['deferred'].callback('ok')
response = yield d
Expand All @@ -193,9 +196,11 @@ def test_TASK_KILLED(self):
})
request = yield self.consul_requests.get()
self.assertEqual(request['method'], 'PUT')
self.assertEqual(
request['path'],
'/v1/agent/service/deregister/my-app_0-1396592784349')
self.assertEqual(request['path'], '/v1/catalog/deregister')
self.assertEqual(request['data'], {
'Node': 'slave-1234.acme.org',
'ServiceID': 'my-app_0-1396592784349',
})
request['deferred'].callback('ok')
response = yield d
self.assertEqual((yield response.json()), {
Expand Down Expand Up @@ -245,12 +250,15 @@ def test_sync_app_task(self):
task = {'id': 'my-task-id', 'host': '0.0.0.0', 'ports': [1234]}
d = self.consular.sync_app_task(app, task)
consul_request = yield self.consul_requests.get()
self.assertEqual(consul_request['path'], '/v1/agent/service/register')
self.assertEqual(consul_request['path'], '/v1/catalog/service/register')
self.assertEqual(consul_request['data'], {
'Name': 'my-app',
'ID': 'my-task-id',
'Address': '0.0.0.0',
'Port': 1234,
'Node': '0.0.0.0',
'Service': {
'Service': 'my-app',
'ID': 'my-task-id',
'Address': '0.0.0.0',
'Port': 1234,
}
})
self.assertEqual(consul_request['method'], 'PUT')
consul_request['deferred'].callback('')
Expand Down Expand Up @@ -297,29 +305,47 @@ def test_sync_apps(self):
@inlineCallbacks
def test_purge_dead_services(self):
d = self.consular.purge_dead_services()
consul_request = yield self.consul_requests.get()
consul_services_request = yield self.consul_requests.get()

# Expecting a request to list of all services in Consul,
# returning 2
self.assertEqual(consul_request['path'], '/v1/agent/services')
self.assertEqual(consul_request['method'], 'GET')
consul_request['deferred'].callback(
# returning 1, testingapp
self.assertEqual(consul_services_request['method'], 'GET')
self.assertEqual(consul_services_request['path'],
'/v1/catalog/services')
consul_services_request['deferred'].callback(
FakeResponse(200, [], json.dumps({
"testingapp.someid1": {
"ID": "testingapp.someid1",
"Service": "testingapp",
"Tags": None,
"Address": "machine-1",
"Port": 8102
'testingapp': [],
}))
)

# Expecting a request to list all service ids for the given Service
# returning 2
testingapp_services_request = yield self.consul_requests.get()
self.assertEqual(testingapp_services_request['method'], 'GET')
self.assertEqual(testingapp_services_request['path'],
'/v1/catalog/service/testingapp')

testingapp_services_request['deferred'].callback(
FakeResponse(200, [], json.dumps([
{
"Node": "consul-node1",
"Address": "consul-address",
"ServiceID": "testingapp.someid1",
"ServiceName": "testingapp",
"ServiceTags": None,
"ServiceAddress": "machine-1",
"ServicePort": 8102
},
"testingapp.someid2": {
"ID": "testingapp.someid2",
"Service": "testingapp",
"Tags": None,
"Address": "machine-2",
"Port": 8103
{
"Node": "consul-node2",
"Address": "consul-address",
"ServiceID": "testingapp.someid2",
"ServiceName": "testingapp",
"ServiceTags": None,
"ServiceAddress": "machine-2",
"ServicePort": 8103
}
}))
]))
)

# Expecting a request for the tasks for a given app, returning
Expand All @@ -345,9 +371,12 @@ def test_purge_dead_services(self):
# Expecting a service registering in Consul as a result for one
# of these services
deregister_request = yield self.consul_requests.get()
self.assertEqual(deregister_request['path'],
'/v1/agent/service/deregister/testingapp.someid1')
self.assertEqual(deregister_request['path'], '/v1/catalog/deregister')
self.assertEqual(deregister_request['method'], 'PUT')
self.assertEqual(deregister_request['data'], {
'Node': 'consul-node1',
'ServiceID': 'testingapp.someid1'
})
deregister_request['deferred'].callback(
FakeResponse(200, [], json.dumps({})))
yield d

0 comments on commit 58bd83d

Please sign in to comment.