Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions marathon/models/events.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""
This module is used to translate Events from Marathon's EventBus system.
See: https://mesosphere.github.io/marathon/docs/event-bus.html
See:
* https://mesosphere.github.io/marathon/docs/event-bus.html
* https://github.com/mesosphere/marathon/blob/master/src/main/scala/mesosphere/marathon/core/event/Events.scala
"""

from marathon.models.base import MarathonObject
from marathon.models.app import MarathonHealthCheck
from marathon.models.task import MarathonIpAddress
from marathon.models.deployment import MarathonDeploymentPlan
from marathon.exceptions import MarathonError

Expand All @@ -19,7 +22,11 @@ class MarathonEvent(MarathonObject):
KNOWN_ATTRIBUTES = []
attribute_name_to_marathon_object = { # Allows embedding of MarathonObjects inside events.
'health_check': MarathonHealthCheck,
'plan': MarathonDeploymentPlan
'plan': MarathonDeploymentPlan,
'ip_address': MarathonIpAddress,
}
seq_name_to_singular = {
'ip_addresses': 'ip_address',
}

def __init__(self, event_type, timestamp, **kwargs):
Expand All @@ -28,13 +35,25 @@ def __init__(self, event_type, timestamp, **kwargs):
for attribute in self.KNOWN_ATTRIBUTES:
self._set(attribute, kwargs.get(attribute))

def __to_marathon_object(self, attribute_name, attribute):
if attribute_name in self.attribute_name_to_marathon_object:
clazz = self.attribute_name_to_marathon_object[attribute_name]
# If this attribute already has a Marathon object instantiate it.
attribute = clazz.from_json(attribute)
return attribute

def _set(self, attribute_name, attribute):
if not attribute:
return
if attribute_name in self.attribute_name_to_marathon_object:
clazz = self.attribute_name_to_marathon_object[attribute_name]
attribute = clazz.from_json(
attribute) # If this attribute already has a Marathon object instantiate it.
# Special handling for lists...
if isinstance(attribute, list):
name = self.seq_name_to_singular.get(attribute_name)
attribute = [
self.__to_marathon_object(name, v)
for v in attribute
]
else:
attribute = self.__to_marathon_object(attribute_name, attribute)
setattr(self, attribute_name, attribute)


Expand All @@ -44,7 +63,7 @@ class MarathonApiPostEvent(MarathonEvent):

class MarathonStatusUpdateEvent(MarathonEvent):
KNOWN_ATTRIBUTES = [
'slave_id', 'task_id', 'task_status', 'app_id', 'host', 'ports', 'version', 'message']
'slave_id', 'task_id', 'task_status', 'app_id', 'host', 'ports', 'version', 'message', 'ip_addresses']


class MarathonFrameworkMessageEvent(MarathonEvent):
Expand All @@ -68,11 +87,11 @@ class MarathonRemoveHealthCheckEvent(MarathonEvent):


class MarathonFailedHealthCheckEvent(MarathonEvent):
KNOWN_ATTRIBUTES = ['app_id', 'health_check', 'task_id']
KNOWN_ATTRIBUTES = ['app_id', 'health_check', 'task_id', 'instance_id']


class MarathonHealthStatusChangedEvent(MarathonEvent):
KNOWN_ATTRIBUTES = ['app_id', 'health_check', 'task_id', 'alive']
KNOWN_ATTRIBUTES = ['app_id', 'health_check', 'task_id', 'instance_id', 'alive']


class MarathonGroupChangeSuccess(MarathonEvent):
Expand All @@ -92,7 +111,7 @@ class MarathonDeploymentFailed(MarathonEvent):


class MarathonDeploymentInfo(MarathonEvent):
KNOWN_ATTRIBUTES = ['plan']
KNOWN_ATTRIBUTES = ['plan', 'current_step']


class MarathonDeploymentStepSuccess(MarathonEvent):
Expand All @@ -112,7 +131,7 @@ class MarathonEventStreamDetached(MarathonEvent):


class MarathonUnhealthyTaskKillEvent(MarathonEvent):
KNOWN_ATTRIBUTES = ['app_id', 'task_id', 'version', 'reason']
KNOWN_ATTRIBUTES = ['app_id', 'task_id', 'instance_id', 'version', 'reason']


class MarathonAppTerminatedEvent(MarathonEvent):
Expand Down
48 changes: 46 additions & 2 deletions tests/test_model_event.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,54 @@
# encoding: utf-8

from marathon.models.events import EventFactory
from marathon.models.events import EventFactory, MarathonStatusUpdateEvent
from marathon.models.task import MarathonIpAddress
import unittest


class MarathonEventTest(unittest.TestCase):

def test_event_factory(self):
self.assertEqual(set(EventFactory.event_to_class.keys()), set(EventFactory.class_to_event.values()))
self.assertEqual(
set(EventFactory.event_to_class.keys()),
set(EventFactory.class_to_event.values()),
)

def test_marathon_event(self):
"""Test that we can process at least one kind of event."""
payload = {
"eventType": "status_update_event",
"slaveId": "slave-01",
"taskId": "task-01",
"taskStatus": "TASK_RUNNING",
"message": "Some message",
"appId": "/foo/bar",
"host": "host-01",
"ipAddresses": [
{"ip_address": "127.0.0.1", "protocol": "tcp"},
{"ip_address": "127.0.0.1", "protocol": "udp"},
],
"ports": [0, 1],
"version": "1234",
"timestamp": 12345,
}
factory = EventFactory()
event = factory.process(payload)

expected_event = MarathonStatusUpdateEvent(
event_type="status_update_event",
timestamp=12345,
slave_id="slave-01",
task_id="task-01",
task_status="TASK_RUNNING",
message="Some message",
app_id="/foo/bar",
host="host-01",
ports=[0, 1],
version="1234",
)
expected_event.ip_addresses = [
MarathonIpAddress(ip_address="127.0.0.1", protocol="tcp"),
MarathonIpAddress(ip_address="127.0.0.1", protocol="udp"),
]

self.assertEqual(event.to_json(), expected_event.to_json())