Skip to content

Commit

Permalink
Merge branch 'release/0.1.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
smn committed Jul 13, 2015
2 parents d533690 + 3e75f0b commit 7d756aa
Show file tree
Hide file tree
Showing 17 changed files with 1,103 additions and 2 deletions.
9 changes: 9 additions & 0 deletions .deploy.yaml
@@ -0,0 +1,9 @@
name: consular
user: ubuntu
buildscript: sideloader/deploy.sh
postinstall: sideloader/postinstall.sh
virtualenv_prefix: consular
pip: []
dependencies:
- libssl-dev
- libffi-dev
2 changes: 2 additions & 0 deletions .gitignore
@@ -1,3 +1,5 @@
*.egg-info
*.pyc
.coverage
_trial_temp/
/docs/_build
3 changes: 3 additions & 0 deletions .travis.yml
@@ -1,7 +1,10 @@
language: python
python:
- "2.7"
- "pypy"
cache: pip
install:
- pip install coveralls
- pip install --upgrade pip
- pip install flake8
- pip install -r requirements-dev.txt
Expand Down
31 changes: 31 additions & 0 deletions README.rst
@@ -1,2 +1,33 @@
Consular
========

Receive events from Marathon_, update Consul_ with the relevant information
about services & tasks.

.. image:: https://travis-ci.org/universalcore/consular.svg?branch=develop
:target: https://travis-ci.org/universalcore/consular
:alt: Continuous Integration

.. image:: https://coveralls.io/repos/universalcore/consular/badge.png?branch=develop
:target: https://coveralls.io/r/universalcore/consular?branch=develop
:alt: Code Coverage

.. image:: https://readthedocs.org/projects/consular/badge/?version=latest
:target: https://consular.readthedocs.org
:alt: Consular Documentation

.. image:: https://badge.fury.io/py/consular.svg
:target: https://pypi.python.org/pypi/consular
:alt: Pypi Package

Usage
~~~~~

::

$ pip install consular
$ consular --help


.. _Marathon: http://mesosphere.github.io/marathon/
.. _Consul: http://consul.io/
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
0.0.4
0.1.0
16 changes: 16 additions & 0 deletions consular/cli.py
@@ -0,0 +1,16 @@
import click


@click.command()
@click.option('--host', default='localhost',
help='The host to bind to.')
@click.option('--port', default='7000', type=int,
help='The port to listen to.')
@click.option('--consul', default='http://localhost:8500',
help='The Consul HTTP API')
@click.option('--marathon', default='http://localhost:8080',
help='The Marathon HTTP API')
def main(host, port, consul, marathon):
from consular.main import Consular
consular = Consular(consul, marathon)
consular.app.run(host, port)
89 changes: 89 additions & 0 deletions consular/main.py
@@ -0,0 +1,89 @@
import json

from twisted.internet import reactor
from twisted.web.client import HTTPConnectionPool
from twisted.internet.defer import succeed

import treq
from klein import Klein


def get_appid(event):
return event['appId'].rsplit('/', 1)[1]


class Consular(object):

app = Klein()

def __init__(self, consul_endpoint, marathon_endpoint):
self.consul_endpoint = consul_endpoint
self.marathon_endpoint = marathon_endpoint
self.pool = HTTPConnectionPool(reactor, persistent=False)
self.event_dispatch = {
'status_update_event': self.handle_status_update_event,
}

def consul_request(self, method, path, data=None):
return treq.request(
method, '%s%s' % (self.consul_endpoint, path),
headers={
'Content-Type': 'application/json',
},
data=json.dumps(data),
pool=self.pool)

@app.route('/')
def index(self, request):
request.setHeader('Content-Type', 'application/json')
return json.dumps([])

@app.route('/events')
def events(self, request):
request.setHeader('Content-Type', 'application/json')
event = json.load(request.content)
handler = self.event_dispatch.get(
event.get('eventType'), self.handle_unknown_event)
return handler(request, event)

def handle_status_update_event(self, request, event):
dispatch = {
'TASK_STAGING': self.noop,
'TASK_STARTING': self.noop,
'TASK_RUNNING': self.update_task_running,
'TASK_FINISHED': self.update_task_killed,
'TASK_FAILED': self.update_task_killed,
'TASK_KILLED': self.update_task_killed,
'TASK_LOST': self.update_task_killed,
}
handler = dispatch.get(event['taskStatus'])
return handler(request, event)

def noop(self, request, event):
return succeed(json.dumps({
'status': 'ok'
}))

def update_task_running(self, request, event):
# NOTE: Marathon sends a list of ports, I don't know yet when & if
# there are multiple values in that list.
d = self.consul_request('PUT', '/v1/agent/service/register', {
"Name": get_appid(event),
"Address": event['host'],
"Port": event['ports'][0],
})
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

def update_task_killed(self, request, event):
d = self.consul_request('PUT', '/v1/agent/service/deregister/%s' % (
get_appid(event),))
d.addCallback(lambda _: json.dumps({'status': 'ok'}))
return d

def handle_unknown_event(self, request, event):
request.setHeader('Content-Type', 'application/json')
request.setResponseCode(400) # bad request
return json.dumps({
'error': 'Event type %s not supported.' % (event.get('eventType'),)
})
Empty file added consular/tests/__init__.py
Empty file.
149 changes: 149 additions & 0 deletions consular/tests/test_main.py
@@ -0,0 +1,149 @@
import json

from twisted.trial.unittest import TestCase
from twisted.web.server import Site
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, DeferredQueue, Deferred
from twisted.web.client import HTTPConnectionPool

from consular.main import Consular

import treq


class ConsularTest(TestCase):

timeout = 1

def setUp(self):
self.consular = Consular(
'http://localhost:8500',
'http://localhost:8080',
)

# spin up a site so we can test it, pretty sure Klein has better
# ways of doing this but they're not documented anywhere.
self.site = Site(self.consular.app.resource())
self.listener = reactor.listenTCP(0, self.site, interface='localhost')
self.listener_port = self.listener.getHost().port
self.addCleanup(self.listener.loseConnection)

# cleanup stuff for treq's global http request pool
self.pool = HTTPConnectionPool(reactor, persistent=False)
self.addCleanup(self.pool.closeCachedConnections)

# We use this to mock requests going to Consul
self.consul_requests = DeferredQueue()

def mock_consul_request(method, path, data=None):
d = Deferred()
self.consul_requests.put({
'method': method,
'path': path,
'data': data,
'deferred': d,
})
return d

self.patch(self.consular, 'consul_request', mock_consul_request)

def request(self, method, path, data=None):
return treq.request(
method, 'http://localhost:%s%s' % (
self.listener_port,
path
),
data=(json.dumps(data) if data is not None else None),
pool=self.pool)

def tearDown(self):
pass

@inlineCallbacks
def test_service(self):
response = yield self.request('GET', '/')
self.assertEqual(response.code, 200)
self.assertEqual((yield response.json()), [])

@inlineCallbacks
def test_handle_unknown_event(self):
response = yield self.request('POST', '/events', {'eventType': 'Foo'})
data = yield response.json()
self.assertEqual(data, {
'error': 'Event type Foo not supported.'
})

@inlineCallbacks
def test_handle_unspecified_event(self):
response = yield self.request('POST', '/events', {})
data = yield response.json()
self.assertEqual(data, {
'error': 'Event type None not supported.'
})

@inlineCallbacks
def test_TASK_STAGING(self):
response = yield self.request('POST', '/events', {
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_STAGING",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
})
self.assertEqual((yield response.json()), {
'status': 'ok'
})

@inlineCallbacks
def test_TASK_RUNNING(self):
d = self.request('POST', '/events', {
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_RUNNING",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
})
request = yield self.consul_requests.get()
self.assertEqual(request['method'], 'PUT')
self.assertEqual(request['path'], '/v1/agent/service/register')
self.assertEqual(request['data'], {
'Name': 'my-app',
'Address': 'slave-1234.acme.org',
'Port': 31372,
})
request['deferred'].callback('ok')
response = yield d
self.assertEqual((yield response.json()), {
'status': 'ok'
})

@inlineCallbacks
def test_TASK_KILLED(self):
d = self.request('POST', '/events', {
"eventType": "status_update_event",
"timestamp": "2014-03-01T23:29:30.158Z",
"slaveId": "20140909-054127-177048842-5050-1494-0",
"taskId": "my-app_0-1396592784349",
"taskStatus": "TASK_KILLED",
"appId": "/my-app",
"host": "slave-1234.acme.org",
"ports": [31372],
"version": "2014-04-04T06:26:23.051Z"
})
request = yield self.consul_requests.get()
self.assertEqual(request['method'], 'PUT')
self.assertEqual(
request['path'], '/v1/agent/service/deregister/my-app')
request['deferred'].callback('ok')
response = yield d
self.assertEqual((yield response.json()), {
'status': 'ok'
})

0 comments on commit 7d756aa

Please sign in to comment.