Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6df9214
Add support for /v2/events stream
ala-allunite Apr 14, 2016
7d337cd
Add sseclient dependency to setup.py
ala-allunite Apr 14, 2016
b91915d
Make it possible to run itests at osx with docker-machine
ala-allunite Apr 15, 2016
48229c8
Make it possible to run itests at osx with docker-machine
ala-allunite Apr 15, 2016
23d5925
Merge branch 'master' of github.com:nuclon/marathon-python
ala-allunite Apr 15, 2016
4806a5a
Increase timeout to allow Marathon to start, update CPU resource for …
ala-allunite Apr 15, 2016
a4424fd
Add tests for event stream
ala-allunite Apr 16, 2016
e8ba5ca
Add checking marathon version
ala-allunite Apr 16, 2016
6fc5896
Reorganize test slightly
ala-allunite Apr 16, 2016
e0940a0
Add support for /v2/events stream
ala-allunite Apr 14, 2016
aff5bb4
Add sseclient dependency to setup.py
ala-allunite Apr 14, 2016
34f8244
Make it possible to run itests at osx with docker-machine
ala-allunite Apr 15, 2016
6c44763
Make it possible to run itests at osx with docker-machine
ala-allunite Apr 15, 2016
9c2536c
Increase timeout to allow Marathon to start, update CPU resource for …
ala-allunite Apr 15, 2016
c1625a4
Add tests for event stream
ala-allunite Apr 16, 2016
8119e9d
Add checking marathon version
ala-allunite Apr 16, 2016
39c219c
Reorganize test slightly
ala-allunite Apr 16, 2016
d8a4a9d
Fix for pep8
ala-allunite Apr 16, 2016
4ddd588
Add debug
ala-allunite Apr 17, 2016
ae5dea9
Make event test simplier
ala-allunite Apr 17, 2016
efc8802
Fix test
ala-allunite Apr 17, 2016
8a46ea7
It looks like the events are correctly captured starting from 0.11
ala-allunite Apr 17, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions itests/itest_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from functools import wraps
import os
import signal
import sys
import re
import time

import requests
Expand Down Expand Up @@ -31,7 +33,7 @@ def wrapper(*args, **kwargs):
return decorator


@timeout(10)
@timeout(30)
def wait_for_marathon():
"""Blocks until marathon is up"""
marathon_service = get_marathon_connection_string()
Expand Down Expand Up @@ -64,7 +66,14 @@ def get_marathon_connection_string():
return 'localhost:8080'
else:
service_port = get_service_internal_port('marathon')
return get_compose_service('marathon').get_container().get_local_port(service_port)
local_port = get_compose_service('marathon').get_container().get_local_port(service_port)

# Check if we're at OSX. Use ip from DOCKER_HOST
if sys.platform == 'darwin':
m = re.match("(.*?)://(.*?):(\d+)", os.environ["DOCKER_HOST"])
local_port = "{}:{}".format(m.group(2), local_port.split(":")[1])

return local_port


def get_service_internal_port(service_name):
Expand Down
9 changes: 9 additions & 0 deletions itests/marathon_tasks.feature
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,12 @@ Feature: marathon-python can operate marathon app tasks
When we create a trivial new app
And we wait the trivial app deployment finish
Then we should be able to kill the #0,1,2 tasks of the trivial app

Scenario: Events can be listened in stream
Given a working marathon instance
When marathon version is greater than 0.11.0
And we start listening for events
And we create a trivial new app
And we wait the trivial app deployment finish
Then we should be able to kill the tasks
And we should see list of events
44 changes: 43 additions & 1 deletion itests/steps/marathon_steps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import time
import multiprocessing
from distutils.version import StrictVersion

import marathon
from behave import given, when, then
Expand All @@ -26,7 +28,7 @@ def get_marathon_info(context):
@when(u'we create a trivial new app')
def create_trivial_new_app(context):
context.client.create_app('test-trivial-app', marathon.MarathonApp(
cmd='sleep 3600', mem=16, cpus=1, instances=5))
cmd='sleep 3600', mem=16, cpus=0.1, instances=5))


@then(u'we should be able to kill the tasks')
Expand Down Expand Up @@ -113,3 +115,43 @@ def list_tasks(context, which):
app = context.client.get_app('test-%s-app' % which)
tasks = context.client.list_tasks('test-%s-app' % which)
assert len(tasks) == app.instances


def listen_for_events(client, events):
for msg in client.event_stream():
events.append(msg)

@when(u'marathon version is greater than {version}')
def marathon_version_chech(context, version):
info = context.client.get_info()
if StrictVersion(info.version) < StrictVersion(version):
context.scenario.skip(reason='Marathon version is too low for this scenario')

@when(u'we start listening for events')
def start_listening_stream(context):
manager = multiprocessing.Manager()
mlist = manager.list()
context.manager = manager
context.events = mlist
p = multiprocessing.Process(target=listen_for_events, args=(context.client, mlist))
p.start()
context.p = p

@then(u'we should see list of events')
def stop_listening_stream(context):
time.sleep(10)
context.p.terminate()

print(context.events)

# event list should contain 5 status_update_event with taskStatus == TASK_RUNNING
filtered_events = [e for e in context.events if e.event_type == "status_update_event" and e.task_status == "TASK_RUNNING"]
assert len(filtered_events) == 5

# and 1 status_update_event with taskStatus == TASK_KILLED
filtered_events = [e for e in context.events if e.event_type == "status_update_event" and e.task_status == "TASK_KILLED"]
assert len(filtered_events) == 1

# and 2 deployment_step_success events
filtered_events = [e for e in context.events if e.event_type == "deployment_success"]
assert len(filtered_events) == 2
37 changes: 37 additions & 0 deletions marathon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import marathon
from .models import MarathonApp, MarathonDeployment, MarathonGroup, MarathonInfo, MarathonTask, MarathonEndpoint, MarathonQueueItem
from .exceptions import InternalServerError, NotFoundError, MarathonHttpError, MarathonError
from .models.events import EventFactory


class MarathonClient(object):
Expand Down Expand Up @@ -89,6 +90,26 @@ def _do_request(self, method, path, params=None, data=None):

return response

def _do_sse_request(self, path, params=None, data=None):
from sseclient import SSEClient

headers = {'Accept': 'text/event-stream'}
messages = None
servers = list(self.servers)
while servers and messages is None:
server = servers.pop(0)
url = ''.join([server.rstrip('/'), path])
try:
messages = SSEClient(url, params=params, data=data, headers=headers,
auth=self.auth)
except Exception as e:
marathon.log.error('Error while calling %s: %s', url, e.message)

if messages is None:
raise MarathonError('No remaining Marathon servers to try')

return messages

def list_endpoints(self):
"""List the current endpoints for all applications

Expand Down Expand Up @@ -631,3 +652,19 @@ def get_metrics(self):
"""
response = self._do_request('GET', '/metrics')
return response.json()

def event_stream(self):
"""Polls event bus using /v2/events

:returns: iterator with events
:rtype: iterator
"""

messages = self._do_sse_request('/v2/events')

ef = EventFactory()
for message in messages:
if not message.data:
continue
data = json.loads(message.data)
yield ef.process(data)
9 changes: 8 additions & 1 deletion marathon/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ class MarathonDeploymentStepSuccess(MarathonEvent):
class MarathonDeploymentStepFailure(MarathonEvent):
KNOWN_ATTRIBUTES = ['plan']

class MarathonEventStreamAttached(MarathonEvent):
KNOWN_ATTRIBUTES = ['remote_address']

class MarathonEventStreamDetached(MarathonEvent):
KNOWN_ATTRIBUTES = ['remote_address']

class EventFactory:

Expand Down Expand Up @@ -133,7 +138,9 @@ def __init__(self):
'deployment_failed': MarathonDeploymentFailed,
'deployment_info': MarathonDeploymentInfo,
'deployment_step_success': MarathonDeploymentStepSuccess,
'deployment_step_failure': MarathonDeploymentStepFailure
'deployment_step_failure': MarathonDeploymentStepFailure,
'event_stream_attached': MarathonEventStreamAttached,
'event_stream_detached': MarathonEventStreamDetached,
}

def process(self, event):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
requests-mock
sseclient
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
long_description="""Python interface to the Mesos Marathon REST API.""",
author='Mike Babineau',
author_email='michael.babineau@gmail.com',
install_requires=['requests>=2.0.0'],
install_requires=['requests>=2.0.0', 'sseclient'],
url='https://github.com/thefactory/marathon-python',
packages=['marathon', 'marathon.models'],
license='MIT',
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ basepython = python2.7
envlist = py,pep8

[testenv:itests]
passenv = TRAVIS MARATHONVERSION
passenv = TRAVIS MARATHONVERSION DOCKER_HOST DOCKER_TLS_VERIFY DOCKER_CERT_PATH DOCKER_MACHINE_NAME
basepython = python2.7
whitelist_externals=/bin/bash
skipsdist=True
Expand Down