diff --git a/itests/itest_utils.py b/itests/itest_utils.py index 66be2b9..ed54055 100644 --- a/itests/itest_utils.py +++ b/itests/itest_utils.py @@ -2,6 +2,8 @@ from functools import wraps import os import signal +import sys +import re import time import requests @@ -31,7 +33,7 @@ def wrapper(*args, **kwargs): return decorator -@timeout(10) +@timeout(30) def wait_for_marathon(): """Blocks until marathon is up""" marathon_service = get_marathon_connection_string() @@ -64,7 +66,14 @@ def get_marathon_connection_string(): return 'localhost:8080' else: service_port = get_service_internal_port('marathon') - return get_compose_service('marathon').get_container().get_local_port(service_port) + local_port = get_compose_service('marathon').get_container().get_local_port(service_port) + + # Check if we're at OSX. Use ip from DOCKER_HOST + if sys.platform == 'darwin': + m = re.match("(.*?)://(.*?):(\d+)", os.environ["DOCKER_HOST"]) + local_port = "{}:{}".format(m.group(2), local_port.split(":")[1]) + + return local_port def get_service_internal_port(service_name): diff --git a/itests/marathon_tasks.feature b/itests/marathon_tasks.feature index f9b68f4..c3e53fc 100644 --- a/itests/marathon_tasks.feature +++ b/itests/marathon_tasks.feature @@ -17,3 +17,12 @@ Feature: marathon-python can operate marathon app tasks When we create a trivial new app And we wait the trivial app deployment finish Then we should be able to kill the #0,1,2 tasks of the trivial app + + Scenario: Events can be listened in stream + Given a working marathon instance + When marathon version is greater than 0.11.0 + And we start listening for events + And we create a trivial new app + And we wait the trivial app deployment finish + Then we should be able to kill the tasks + And we should see list of events diff --git a/itests/steps/marathon_steps.py b/itests/steps/marathon_steps.py index ac329c6..a004f89 100644 --- a/itests/steps/marathon_steps.py +++ b/itests/steps/marathon_steps.py @@ -1,5 +1,7 @@ import sys import time +import multiprocessing +from distutils.version import StrictVersion import marathon from behave import given, when, then @@ -26,7 +28,7 @@ def get_marathon_info(context): @when(u'we create a trivial new app') def create_trivial_new_app(context): context.client.create_app('test-trivial-app', marathon.MarathonApp( - cmd='sleep 3600', mem=16, cpus=1, instances=5)) + cmd='sleep 3600', mem=16, cpus=0.1, instances=5)) @then(u'we should be able to kill the tasks') @@ -113,3 +115,43 @@ def list_tasks(context, which): app = context.client.get_app('test-%s-app' % which) tasks = context.client.list_tasks('test-%s-app' % which) assert len(tasks) == app.instances + + +def listen_for_events(client, events): + for msg in client.event_stream(): + events.append(msg) + +@when(u'marathon version is greater than {version}') +def marathon_version_chech(context, version): + info = context.client.get_info() + if StrictVersion(info.version) < StrictVersion(version): + context.scenario.skip(reason='Marathon version is too low for this scenario') + +@when(u'we start listening for events') +def start_listening_stream(context): + manager = multiprocessing.Manager() + mlist = manager.list() + context.manager = manager + context.events = mlist + p = multiprocessing.Process(target=listen_for_events, args=(context.client, mlist)) + p.start() + context.p = p + +@then(u'we should see list of events') +def stop_listening_stream(context): + time.sleep(10) + context.p.terminate() + + print(context.events) + + # event list should contain 5 status_update_event with taskStatus == TASK_RUNNING + filtered_events = [e for e in context.events if e.event_type == "status_update_event" and e.task_status == "TASK_RUNNING"] + assert len(filtered_events) == 5 + + # and 1 status_update_event with taskStatus == TASK_KILLED + filtered_events = [e for e in context.events if e.event_type == "status_update_event" and e.task_status == "TASK_KILLED"] + assert len(filtered_events) == 1 + + # and 2 deployment_step_success events + filtered_events = [e for e in context.events if e.event_type == "deployment_success"] + assert len(filtered_events) == 2 diff --git a/marathon/client.py b/marathon/client.py index 8e59f7d..dfd6734 100644 --- a/marathon/client.py +++ b/marathon/client.py @@ -12,6 +12,7 @@ import marathon from .models import MarathonApp, MarathonDeployment, MarathonGroup, MarathonInfo, MarathonTask, MarathonEndpoint, MarathonQueueItem from .exceptions import InternalServerError, NotFoundError, MarathonHttpError, MarathonError +from .models.events import EventFactory class MarathonClient(object): @@ -89,6 +90,26 @@ def _do_request(self, method, path, params=None, data=None): return response + def _do_sse_request(self, path, params=None, data=None): + from sseclient import SSEClient + + headers = {'Accept': 'text/event-stream'} + messages = None + servers = list(self.servers) + while servers and messages is None: + server = servers.pop(0) + url = ''.join([server.rstrip('/'), path]) + try: + messages = SSEClient(url, params=params, data=data, headers=headers, + auth=self.auth) + except Exception as e: + marathon.log.error('Error while calling %s: %s', url, e.message) + + if messages is None: + raise MarathonError('No remaining Marathon servers to try') + + return messages + def list_endpoints(self): """List the current endpoints for all applications @@ -631,3 +652,19 @@ def get_metrics(self): """ response = self._do_request('GET', '/metrics') return response.json() + + def event_stream(self): + """Polls event bus using /v2/events + + :returns: iterator with events + :rtype: iterator + """ + + messages = self._do_sse_request('/v2/events') + + ef = EventFactory() + for message in messages: + if not message.data: + continue + data = json.loads(message.data) + yield ef.process(data) diff --git a/marathon/models/events.py b/marathon/models/events.py index 6157deb..1c7a0f9 100644 --- a/marathon/models/events.py +++ b/marathon/models/events.py @@ -106,6 +106,11 @@ class MarathonDeploymentStepSuccess(MarathonEvent): class MarathonDeploymentStepFailure(MarathonEvent): KNOWN_ATTRIBUTES = ['plan'] +class MarathonEventStreamAttached(MarathonEvent): + KNOWN_ATTRIBUTES = ['remote_address'] + +class MarathonEventStreamDetached(MarathonEvent): + KNOWN_ATTRIBUTES = ['remote_address'] class EventFactory: @@ -133,7 +138,9 @@ def __init__(self): 'deployment_failed': MarathonDeploymentFailed, 'deployment_info': MarathonDeploymentInfo, 'deployment_step_success': MarathonDeploymentStepSuccess, - 'deployment_step_failure': MarathonDeploymentStepFailure + 'deployment_step_failure': MarathonDeploymentStepFailure, + 'event_stream_attached': MarathonEventStreamAttached, + 'event_stream_detached': MarathonEventStreamDetached, } def process(self, event): diff --git a/requirements.txt b/requirements.txt index 40e7bdf..cb1d49e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ requests-mock +sseclient diff --git a/setup.py b/setup.py index a41a6f8..2eb60ef 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ long_description="""Python interface to the Mesos Marathon REST API.""", author='Mike Babineau', author_email='michael.babineau@gmail.com', - install_requires=['requests>=2.0.0'], + install_requires=['requests>=2.0.0', 'sseclient'], url='https://github.com/thefactory/marathon-python', packages=['marathon', 'marathon.models'], license='MIT', diff --git a/tox.ini b/tox.ini index 0e93077..005f07a 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ basepython = python2.7 envlist = py,pep8 [testenv:itests] -passenv = TRAVIS MARATHONVERSION +passenv = TRAVIS MARATHONVERSION DOCKER_HOST DOCKER_TLS_VERIFY DOCKER_CERT_PATH DOCKER_MACHINE_NAME basepython = python2.7 whitelist_externals=/bin/bash skipsdist=True