Skip to content

Commit

Permalink
Include last heartbeat time in resource record
Browse files Browse the repository at this point in the history
Embed rfc3339 module from
https://github.com/tonyg/python-rfc3339/blob/master/rfc3339.py
to assist in datetime parsing.
  • Loading branch information
labisso committed Apr 8, 2013
1 parent e64c6a4 commit 1181eec
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 13 deletions.
20 changes: 18 additions & 2 deletions epu/processdispatcher/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from epu.states import InstanceState, ProcessState
from epu.exceptions import NotFoundError, WriteConflictError, BadRequestError
from epu.processdispatcher.engines import engine_id_from_domain
from epu.util import is_valid_identifier

from epu.util import is_valid_identifier, parse_datetime, ceiling_datetime
from epu.processdispatcher.store import ProcessRecord, NodeRecord, \
ResourceRecord, ProcessDefinitionRecord
from epu.processdispatcher.modes import RestartMode
Expand Down Expand Up @@ -549,6 +548,16 @@ def ee_heartbeat(self, sender, beat):
self._first_heartbeat(sender, beat)
return # *** EARLY RETURN **

resource_updated = False

timestamp_str = beat['timestamp']
timestamp = ceiling_datetime(parse_datetime(timestamp_str))

resource_timestamp = resource.last_heartbeat_datetime
if resource_timestamp is None or timestamp > resource_timestamp:
resource.new_last_heartbeat_datetime(timestamp)
resource_updated = True

assigned_procs = set()
processes = beat['processes']
node_exclusives_to_remove = []
Expand Down Expand Up @@ -671,6 +680,9 @@ def ee_heartbeat(self, sender, beat):
resource.resource_id, difference_message)

resource.assigned = new_assigned
resource_updated = True

if resource_updated:
try:
self.store.update_resource(resource)
except (WriteConflictError, NotFoundError):
Expand Down Expand Up @@ -748,7 +760,11 @@ def _first_heartbeat(self, sender, beat):
"node_id=%s sender=%s.", node_id, sender)
return

timestamp_str = beat['timestamp']
timestamp = ceiling_datetime(parse_datetime(timestamp_str))

resource = ResourceRecord.new(sender, node_id, slots, properties)
resource.new_last_heartbeat_datetime(timestamp)
try:
self.store.add_resource(resource)
except WriteConflictError:
Expand Down
16 changes: 14 additions & 2 deletions epu/processdispatcher/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from epu.exceptions import NotFoundError, WriteConflictError
from epu import zkutil
from epu.states import ProcessDispatcherState
from epu.util import parse_datetime

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1471,7 +1472,7 @@ def __hash__(self):
class ResourceRecord(Record):
@classmethod
def new(cls, resource_id, node_id, slot_count, properties=None,
enabled=True):
enabled=True, last_heartbeat=None):
if properties:
props = properties.copy()
else:
Expand All @@ -1481,9 +1482,20 @@ def new(cls, resource_id, node_id, slot_count, properties=None,
props['resource_id'] = resource_id

d = dict(resource_id=resource_id, node_id=node_id, enabled=enabled,
slot_count=int(slot_count), properties=props, assigned=[])
slot_count=int(slot_count), properties=props, assigned=[],
last_heartbeat=last_heartbeat)
return cls(d)

@property
def last_heartbeat_datetime(self):
if self.last_heartbeat is None:
return None
return parse_datetime(self.last_heartbeat)

def new_last_heartbeat_datetime(self, d):
log.debug("setting last heartbeat to %s", d.isoformat())
self.last_heartbeat = d.isoformat()

@property
def available_slots(self):
return max(0, self.slot_count - len(self.assigned))
Expand Down
8 changes: 7 additions & 1 deletion epu/processdispatcher/test/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from epu.epumanagement.conf import * # noqa
from epu.exceptions import NotFoundError
from epu.states import ProcessState
from epu.util import now_datetime

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -210,7 +211,7 @@ def cleanup(self, u_pid, round):
del self.history[u_pid]

def make_heartbeat(self, timestamp=None):
now = time.time() if timestamp is None else timestamp
now = now_datetime().isoformat() if timestamp is None else timestamp

processes = []
for process in chain(self.processes.itervalues(), self.history):
Expand Down Expand Up @@ -257,3 +258,8 @@ def nosystemrestart_process_config():

def minimum_time_between_starts_config(minimum_time=2):
return {'process': {'minimum_time_between_starts': minimum_time}}


def make_beat(node_id, processes=None, timestamp=None):
return {"node_id": node_id, "processes": processes or [],
"timestamp": timestamp or now_datetime().isoformat()}
29 changes: 26 additions & 3 deletions epu/processdispatcher/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from epu.processdispatcher.core import ProcessDispatcherCore
from epu.processdispatcher.store import ProcessDispatcherStore, ProcessRecord
from epu.processdispatcher.engines import EngineRegistry, domain_id_from_engine
from epu.processdispatcher.test.mocks import MockNotifier, nosystemrestart_process_config
from epu.processdispatcher.test.mocks import nosystemrestart_process_config, \
MockNotifier, make_beat
from epu.processdispatcher.modes import RestartMode, QueueingMode
from epu.exceptions import NotFoundError, BadRequestError
from epu.util import parse_datetime


class ProcessDispatcherCoreTests(unittest.TestCase):
Expand Down Expand Up @@ -426,6 +428,27 @@ def patched_update_node(node):
self.core.ee_heartbeat("eeagent1", beat)
self.assertEqual(self.store.get_resource("eeagent1"), None)

def test_heartbeat_timestamps(self):

def make_beat(node_id, processes=None):
return {"node_id": node_id, "processes": processes or []}
# test processing a heartbeat where node is removed partway through
node_id = uuid.uuid4().hex
self.core.node_state(node_id, domain_id_from_engine("engine1"),
InstanceState.RUNNING)

d1 = parse_datetime("2013-04-02T19:37:57.617734+00:00")
d2 = parse_datetime("2013-04-02T19:38:57.617734+00:00")
d3 = parse_datetime("2013-04-02T19:39:57.617734+00:00")

self.core.ee_heartbeat("eeagent1", make_beat(node_id, timestamp=d1.isoformat()))

resource = self.store.get_resource("eeagent1")
self.assertEqual(resource.last_heartbeat_datetime, d1)

self.core.ee_heartbeat("eeagent1", make_beat(node_id, timestamp=d3.isoformat()))
resource = self.store.get_resource("eeagent1")
self.assertEqual(resource.last_heartbeat_datetime, d3)

# out of order hbeat. time shouln't be updated
self.core.ee_heartbeat("eeagent1", make_beat(node_id, timestamp=d2.isoformat()))
resource = self.store.get_resource("eeagent1")
self.assertEqual(resource.last_heartbeat_datetime, d3)
6 changes: 1 addition & 5 deletions epu/processdispatcher/test/test_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from epu.processdispatcher.engines import EngineRegistry, domain_id_from_engine
from epu.states import ProcessState, InstanceState, ProcessDispatcherState
from epu.processdispatcher.test.test_store import StoreTestMixin
from epu.processdispatcher.test.mocks import nosystemrestart_process_config
from epu.processdispatcher.test.mocks import nosystemrestart_process_config, make_beat
from epu.test import ZooKeeperTestMixin
from epu.test.util import wait

Expand Down Expand Up @@ -256,7 +256,3 @@ def teardown_store(self):
self.store.shutdown()

self.teardown_zookeeper()


def make_beat(node_id, processes=None):
return {"node_id": node_id, "processes": processes or []}
Loading

0 comments on commit 1181eec

Please sign in to comment.