Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add heartbeat monitor to PD doctor role

  • Loading branch information...
commit 3493c65c72eb91a12de457ccc3ee848a5ea5b4f1 1 parent db78ad5
David LaBissoniere authored April 05, 2013
51  epu/processdispatcher/core.py
@@ -464,12 +464,11 @@ def evacuate_node(self, node, is_system_restart=False,
464 464
                 # mark resource ineligible for scheduling
465 465
                 self._disable_resource(resource)
466 466
 
467  
-                # go through and reschedule processes as needed
468  
-                for owner, upid, round in resource.assigned:
469  
-                    self._evacuate_process(owner, upid, resource,
470  
-                        is_system_restart=is_system_restart,
471  
-                        dead_process_state=dead_process_state,
472  
-                        rescheduled_process_state=rescheduled_process_state)
  467
+                self.evacuate_resource(
  468
+                    resource,
  469
+                    is_system_restart=is_system_restart,
  470
+                    dead_process_state=dead_process_state,
  471
+                    rescheduled_process_state=rescheduled_process_state)
473 472
 
474 473
             try:
475 474
                 self.store.remove_resource(resource_id)
@@ -489,6 +488,15 @@ def _disable_resource(self, resource):
489 488
             except WriteConflictError:
490 489
                 resource = self.store.get_resource(resource.resource_id)
491 490
 
  491
+    def evacuate_resource(self, resource, is_system_restart=False,
  492
+                      dead_process_state=None, rescheduled_process_state=None):
  493
+        # go through and reschedule processes as needed
  494
+        for owner, upid, round in resource.assigned:
  495
+            self._evacuate_process(owner, upid, resource,
  496
+                is_system_restart=is_system_restart,
  497
+                dead_process_state=dead_process_state,
  498
+                rescheduled_process_state=rescheduled_process_state)
  499
+
492 500
     def _evacuate_process(self, owner, upid, resource, is_system_restart=False,
493 501
             dead_process_state=None, rescheduled_process_state=None):
494 502
         """Deal with a process on a terminating/terminated node
@@ -710,6 +718,22 @@ def process_should_restart(self, process, exit_state, is_system_restart=False):
710 718
 
711 719
         return should_restart
712 720
 
  721
+    def resource_change_state(self, resource, newstate):
  722
+        updated = False
  723
+        while resource and resource.state not in (ExecutionResourceState.DISABLED, newstate):
  724
+            resource.state = newstate
  725
+            try:
  726
+                self.store.update_resource(resource)
  727
+                updated = True
  728
+            except NotFoundError:
  729
+                resource = None
  730
+            except WriteConflictError:
  731
+                try:
  732
+                    resource = self.store.get_resource(resource.resource_id)
  733
+                except NotFoundError:
  734
+                    resource = None
  735
+        return resource, updated
  736
+
713 737
     def _first_heartbeat(self, sender, beat):
714 738
 
715 739
         node_id = beat.get('node_id')
@@ -746,7 +770,7 @@ def _first_heartbeat(self, sender, beat):
746 770
             log.exception("Node for EEagent %s has invalid domain_id!", sender)
747 771
             return
748 772
 
749  
-        engine_spec = self.ee_registry.get_engine_by_id(engine_id)
  773
+        engine_spec = self.get_engine(engine_id)
750 774
         slots = engine_spec.slots
751 775
 
752 776
         # just making engine type a generic property/constraint for now,
@@ -771,6 +795,19 @@ def _first_heartbeat(self, sender, beat):
771 795
             # no problem if this resource was just created by another worker
772 796
             log.info("Conflict writing new resource record %s. Ignoring.", sender)
773 797
 
  798
+    def get_resource_engine(self, resource):
  799
+        """Return an execution engine spec for a resource
  800
+        """
  801
+        engine_id = resource.properties.get('engine')
  802
+        if not engine_id:
  803
+            raise ValueError("Resource has no engine property")
  804
+        return self.get_engine(engine_id)
  805
+
  806
+    def get_engine(self, engine_id):
  807
+        """Return an execution engine spec object
  808
+        """
  809
+        return self.ee_registry.get_engine_by_id(engine_id)
  810
+
774 811
     def node_add_resource(self, node, resource_id):
775 812
         """Tentatively adds a resource to a node, retrying if conflict
776 813
 
387  epu/processdispatcher/doctor.py
... ...
@@ -1,7 +1,12 @@
1 1
 import logging
2 2
 import threading
  3
+import heapq
  4
+from collections import namedtuple
  5
+from datetime import timedelta
3 6
 
4  
-from epu.states import ProcessState, ProcessDispatcherState
  7
+from epu.util import now_datetime, ensure_timedelta
  8
+from epu.states import ProcessState, ProcessDispatcherState, ExecutionResourceState
  9
+from epu import tevent
5 10
 
6 11
 log = logging.getLogger(__name__)
7 12
 
@@ -16,21 +21,25 @@ class PDDoctor(object):
16 21
       doctor establishes the current state of the system before allowing any
17 22
       requests to be processed.
18 23
     - Tracks heartbeats from Execution Engine Agents (EEAgents). When an agent
19  
-      hasn't been heard from in too long, the doctor first attempts to provoke
20  
-      a response from the agent. If that fails, the agent is marked as disabled
21  
-      and ultimately its VM is killed. (TODO)
  24
+      hasn't been heard from in too long, the agent is marked as disabled.
22 25
     """
23 26
 
24 27
     _PROCESS_STATES_TO_REQUEUE = (ProcessState.REQUESTED,
25 28
         ProcessState.DIED_REQUESTED, ProcessState.WAITING)
26 29
 
27  
-    def __init__(self, core, store):
  30
+    CONFIG_MONITOR_HEARTBEATS = "monitor_resource_heartbeats"
  31
+
  32
+    def __init__(self, core, store, config=None):
28 33
         """
29 34
         @type core: ProcessDispatcherCore
30 35
         @type store: ProcessDispatcherStore
31 36
         """
32 37
         self.core = core
33 38
         self.store = store
  39
+        self.config = config or {}
  40
+
  41
+        self.monitor = None
  42
+        self.monitor_thread = None
34 43
 
35 44
         self.condition = threading.Condition()
36 45
 
@@ -62,15 +71,25 @@ def inaugurate(self):
62 71
             self.watching_system_boot = True
63 72
             self._watch_system_boot()
64 73
 
65  
-        # TODO this is where we can set up the heartbeat monitor
  74
+        with self.condition:
  75
+            if self.is_leader and self.config.get(self.CONFIG_MONITOR_HEARTBEATS, True):
  76
+                self.monitor = ExecutionResourceMonitor(self.core, self.store)
  77
+                self.monitor_thread = tevent.spawn(self.monitor.monitor)
66 78
 
67 79
         while self.is_leader:
68 80
             with self.condition:
69 81
                 self.condition.wait()
70 82
 
  83
+        log.debug("Waiting on monitor thread to exit")
  84
+        self.monitor_thread.join()
  85
+        self.monitor = None
  86
+        self.monitor_thread = None
  87
+
71 88
     def cancel(self):
72 89
         with self.condition:
73 90
             self.is_leader = False
  91
+            if self.monitor:
  92
+                self.monitor.cancel()
74 93
             self.condition.notify_all()
75 94
 
76 95
     def _watch_system_boot(self, *args):
@@ -153,3 +172,359 @@ def schedule_pending_processes(self):
153 172
             if updated:
154 173
                 self.store.enqueue_process(process.owner, process.upid,
155 174
                     process.round)
  175
+
  176
+
  177
+class ExecutionResourceMonitor(object):
  178
+    """Implements missing heartbeat detection for execution resources
  179
+
  180
+    Execution resources emit heartbeats to the Process Dispatcher (PD).
  181
+    These heartbeats serve two purposes:
  182
+        1. To notify the PD of state changes of managed processes
  183
+        2. To allow the PD to detect when resources have stalled or otherwise
  184
+           disappeared.
  185
+
  186
+    Heartbeats are sent on a regular interval, and are also sent immediately
  187
+    on a process state change. If the PD is aware of this interval, it can
  188
+    detect when something is wrong by comparing the last heartbeat time to
  189
+    the current time. However there may be clock sync or message queue backup
  190
+    delays that interfere with this detection. So we exercise restraint in
  191
+    declaring resources dead. The following algorithm is used:
  192
+
  193
+    - Each resource has a timestamp indicating the time the last heartbeat
  194
+      was *sent*. Each resource also has warning_time and missing_time
  195
+      thresholds
  196
+
  197
+    - When a resource's last heartbeat time is older than the warning_time
  198
+      threshold, the resource is declared to be in the WARNING state. This is
  199
+      based purely on the time delta and does not take into account potential
  200
+      clock differences or messaging delays. The result of this state change
  201
+      is some noisy logging but no countermeasures are used.
  202
+
  203
+    - If the resource stays in the WARNING state for missing_time-warning_time
  204
+      *without any heartbeats received* as observed by the Doctor, the
  205
+      resource is declared MISSING. This will not happen simply because the
  206
+      last heartbeat time is older than missing_time. If a heartbeat is
  207
+      received from a resource in the WARNING state, this event "resets" the
  208
+      doctor's clock, regardless of the timestamp on the heartbeat. This means
  209
+      that resources may stay in the WARNING state indefinitely if their clocks
  210
+      are significantly off from the Doctor's clock.
  211
+
  212
+    - If at any point in this process the resource heartbeats resume or
  213
+      "catch up" with warning_time, the resource is moved back to the OK
  214
+      state. Likewise, if a resource is DISABLED it is ignored by the doctor.
  215
+    """
  216
+
  217
+    _MONITOR_ERROR_DELAY_SECONDS = 60
  218
+
  219
+    _now_func = staticmethod(now_datetime)
  220
+
  221
+    def __init__(self, core, store):
  222
+        self.core = core
  223
+        self.store = store
  224
+
  225
+        self.condition = threading.Condition()
  226
+        self.cancelled = False
  227
+
  228
+        self.resources = {}
  229
+        self.resource_set_changed = True
  230
+        self.changed_resources = set()
  231
+
  232
+        self.resource_checks = _ResourceChecks()
  233
+
  234
+        # local times at which resources were moved to the WARNING state.
  235
+        self.resource_warnings = {}
  236
+
  237
+    def cancel(self):
  238
+        with self.condition:
  239
+            self.cancelled = True
  240
+            self.condition.notify_all()
  241
+
  242
+    def monitor(self):
  243
+        """Monitor execution resources until cancelled
  244
+        """
  245
+
  246
+        self.resource_set_changed = True
  247
+
  248
+        while not self.cancelled:
  249
+            try:
  250
+                delay = self.monitor_cycle()
  251
+
  252
+            except Exception:
  253
+                log.exception("Unexpected error monitoring heartbeats")
  254
+                delay = self._MONITOR_ERROR_DELAY_SECONDS
  255
+
  256
+            # wait until the next check time, unless a resource update or cancel request
  257
+            # happens first
  258
+            if delay is None or delay > 0:
  259
+                with self.condition:
  260
+                    if not any((self.cancelled, self.changed_resources,
  261
+                               self.resource_set_changed)):
  262
+                        self.condition.wait(delay)
  263
+
  264
+    def monitor_cycle(self):
  265
+        """Run a single cycle of the monitor
  266
+
  267
+        returns the number of seconds until the next needed check, or None if no checks are needed
  268
+        """
  269
+        changed = self._update()
  270
+        now = self._now_func()
  271
+
  272
+        for resource_id in changed:
  273
+            resource = self.resources.get(resource_id)
  274
+
  275
+            # skip resource and remove any checks if it is removed or disabled
  276
+            if resource is None or resource.state == ExecutionResourceState.DISABLED:
  277
+                self.resource_checks.discard_resource_check(resource_id)
  278
+                try:
  279
+                    del self.resource_warnings[resource_id]
  280
+                except KeyError:
  281
+                    pass
  282
+                continue
  283
+
  284
+            # otherwise, queue up the resource to be looked at in this cycle
  285
+            self.resource_checks.set_resource_check(resource_id, now)
  286
+
  287
+        # walk the resource checks, up to the current time. This data structure
  288
+        # ensures that we only check resources that have either been updated, or
  289
+        # are due to timeout
  290
+        for resource_id in self.resource_checks.walk_through_time(now):
  291
+
  292
+            if self.cancelled:  # back out early if cancelled
  293
+                return
  294
+
  295
+            try:
  296
+                resource = self.resources[resource_id]
  297
+                self._check_one_resource(resource, now)
  298
+            except Exception:
  299
+                log.exception("Problem checking execution resource %s. Will retry.")
  300
+                if resource_id not in self.resource_checks:
  301
+                    next_check_time = now + timedelta(seconds=self._MONITOR_ERROR_DELAY_SECONDS)
  302
+                    self.resource_checks.set_resource_check(resource_id, next_check_time)
  303
+
  304
+        # return the number of seconds until the next expected check, or None
  305
+        next_check_time = self.resource_checks.next_check_time
  306
+        if next_check_time is not None:
  307
+            return max((next_check_time - self._now_func()).total_seconds(), 0)
  308
+        return None
  309
+
  310
+    def _check_one_resource(self, resource, now):
  311
+
  312
+        last_heartbeat = resource.last_heartbeat_datetime
  313
+        warning_threshold, missing_threshold = self._get_resource_thresholds(resource)
  314
+        if warning_threshold is None or missing_threshold is None:
  315
+            return
  316
+
  317
+        log.debug("Examining heartbeat for resource %s (in state %s)",
  318
+            resource.resource_id, resource.state)
  319
+
  320
+        resource_id = resource.resource_id
  321
+        next_check_time = None
  322
+
  323
+        # this could end up negative in case of unsynced or shifting clocks
  324
+        heartbeat_age = now - last_heartbeat
  325
+
  326
+        if resource.state == ExecutionResourceState.OK:
  327
+
  328
+            # go to WARNING state if heartbeat is older than threshold
  329
+            if heartbeat_age >= warning_threshold:
  330
+                self._mark_resource_warning(resource, now, last_heartbeat)
  331
+                next_check_time = now + (missing_threshold - warning_threshold)
  332
+            else:
  333
+                next_check_time = last_heartbeat + warning_threshold
  334
+
  335
+        elif resource.state == ExecutionResourceState.WARNING:
  336
+
  337
+            # our real threshold is the gap between missing  and warning
  338
+            threshold = missing_threshold - warning_threshold
  339
+
  340
+            warning = self.resource_warnings.get(resource_id)
  341
+
  342
+            if heartbeat_age < warning_threshold:
  343
+                self._mark_resource_ok(resource, now, last_heartbeat)
  344
+
  345
+            elif not warning or warning.last_heartbeat != last_heartbeat:
  346
+                self.resource_warnings[resource_id] = _ResourceWarning(resource_id, last_heartbeat, now)
  347
+                next_check_time = now + threshold
  348
+
  349
+            else:
  350
+                delta = now - warning.warning_time
  351
+                if delta >= threshold:
  352
+                    self._mark_resource_missing(resource, now, last_heartbeat)
  353
+                else:
  354
+                    next_check_time = warning.warning_time + threshold
  355
+
  356
+        elif resource.state == ExecutionResourceState.MISSING:
  357
+            # go OK if a heartbeat has been receieved in warning_time threshold
  358
+            # go WARNING if a heartbeat has been receieved in missing_time threashold
  359
+            # set check time
  360
+            if heartbeat_age < warning_threshold:
  361
+                self._mark_resource_ok(resource, now, last_heartbeat)
  362
+
  363
+            elif heartbeat_age < missing_threshold:
  364
+                self._mark_resource_warning(resource, now, last_heartbeat)
  365
+
  366
+            elif resource.assigned:
  367
+                # if resource has assignments, evacuate them.
  368
+                # likely we would only get in this situation if previously
  369
+                # marked the resource MISSING but failed before we evacuated
  370
+                # everything
  371
+                self.core.evacuate_resource(resource)
  372
+
  373
+        if next_check_time is not None:
  374
+            self.resource_checks.set_resource_check(resource_id, next_check_time)
  375
+
  376
+    def _mark_resource_warning(self, resource, now, last_heartbeat):
  377
+        resource, updated = self.core.resource_change_state(resource,
  378
+            ExecutionResourceState.WARNING)
  379
+        if updated:
  380
+            resource_id = resource.resource_id
  381
+            heartbeat_age = (now - last_heartbeat).total_seconds()
  382
+            self.resource_warnings[resource_id] = _ResourceWarning(
  383
+                resource_id, last_heartbeat, now)
  384
+            log.warn("Execution resource %s is in WARNING state: Last known "
  385
+                "heartbeat was sent %s seconds ago. This may be a legitimately "
  386
+                "missing resource, OR it may be an issue of clock sync or "
  387
+                "messaging delays.", resource.resource_id, heartbeat_age)
  388
+
  389
+    def _mark_resource_missing(self, resource, now, last_heartbeat):
  390
+        resource, updated = self.core.resource_change_state(resource,
  391
+            ExecutionResourceState.MISSING)
  392
+        if updated:
  393
+            resource_id = resource.resource_id
  394
+
  395
+            try:
  396
+                del self.resource_warnings[resource_id]
  397
+            except KeyError:
  398
+                pass
  399
+
  400
+            heartbeat_age = (now - last_heartbeat).total_seconds()
  401
+            log.warn("Execution resource %s is MISSING: Last known "
  402
+                "heartbeat was sent %s seconds ago. ", resource.resource_id, heartbeat_age)
  403
+
  404
+            self.core.evacuate_resource(resource)
  405
+
  406
+    def _mark_resource_ok(self, resource, now, last_heartbeat):
  407
+        resource, updated = self.core.resource_change_state(resource,
  408
+            ExecutionResourceState.OK)
  409
+        if updated:
  410
+            heartbeat_age = (now - last_heartbeat).total_seconds()
  411
+            log.info("Execution resource %s is OK again: Last known "
  412
+                "heartbeat was sent %s seconds ago. ", resource.resource_id, heartbeat_age)
  413
+
  414
+    def _get_resource_thresholds(self, resource):
  415
+        engine = self.core.get_resource_engine(resource)
  416
+        warning = engine.heartbeat_warning
  417
+        if warning is not None:
  418
+            warning = ensure_timedelta(warning)
  419
+
  420
+        missing = engine.heartbeat_missing
  421
+        if missing is not None:
  422
+            missing = ensure_timedelta(missing)
  423
+
  424
+        return warning, missing
  425
+
  426
+    def _get_resource_missing_threshold(self, resource):
  427
+        engine = self.core.get_resource_engine(resource)
  428
+        return ensure_timedelta(engine.heartbeat_missing)
  429
+
  430
+    def _notify_resource_set_changed(self, *args):
  431
+        with self.condition:
  432
+            self.resource_set_changed = True
  433
+            self.condition.notify_all()
  434
+
  435
+    def _notify_resource_changed(self, resource_id, *args):
  436
+        with self.condition:
  437
+            self.changed_resources.add(resource_id)
  438
+            self.condition.notify_all()
  439
+
  440
+    def _update(self):
  441
+        changed = set()
  442
+        if self.resource_set_changed:
  443
+            changed.update(self._update_resource_set())
  444
+        if self.changed_resources:
  445
+            changed.update(self._update_resources())
  446
+        return changed
  447
+
  448
+    def _update_resource_set(self):
  449
+        self.resource_set_changed = False
  450
+        resource_ids = set(self.store.get_resource_ids(
  451
+            watcher=self._notify_resource_set_changed))
  452
+
  453
+        previous = set(self.resources.keys())
  454
+
  455
+        added = resource_ids - previous
  456
+        removed = previous - resource_ids
  457
+
  458
+        for resource_id in removed:
  459
+            del self.resources[resource_id]
  460
+
  461
+        for resource_id in added:
  462
+            self.resources[resource_id] = self.store.get_resource(
  463
+                resource_id, watcher=self._notify_resource_changed)
  464
+
  465
+        return added | removed
  466
+
  467
+    def _update_resources(self):
  468
+        with self.condition:
  469
+            changed = self.changed_resources.copy()
  470
+            self.changed_resources.clear()
  471
+
  472
+        for resource_id in changed:
  473
+            resource = self.store.get_resource(resource_id,
  474
+                                               watcher=self._notify_resource_changed)
  475
+            if resource:
  476
+                self.resources[resource_id] = resource
  477
+
  478
+        return changed
  479
+
  480
+
  481
+_ResourceWarning = namedtuple("_ResourceWarning",
  482
+    ["resource_id", "last_heartbeat", "warning_time"])
  483
+
  484
+
  485
+class _ResourceChecks(object):
  486
+
  487
+    def __init__(self):
  488
+        self.checks = {}
  489
+        self.check_heap = []
  490
+
  491
+    def __contains__(self, item):
  492
+        return item in self.checks
  493
+
  494
+    def set_resource_check(self, resource_id, check_time):
  495
+
  496
+        # do nothing if check already exists and is correct
  497
+        if self.checks.get(resource_id) == check_time:
  498
+            return
  499
+        self.checks[resource_id] = check_time
  500
+        heapq.heappush(self.check_heap, (check_time, resource_id))
  501
+
  502
+    def discard_resource_check(self, resource_id):
  503
+        try:
  504
+            del self.checks[resource_id]
  505
+        except KeyError:
  506
+            pass
  507
+
  508
+    def walk_through_time(self, now):
  509
+        while self.check_heap:
  510
+
  511
+            # heap guarantees smallest element is at index 0, so we can peek before we pop
  512
+            check_time, resource_id = self.check_heap[0]
  513
+
  514
+            # skip and drop entries that are no longer represented in our check set
  515
+            if check_time != self.checks.get(resource_id):
  516
+                heapq.heappop(self.check_heap)
  517
+
  518
+            elif check_time <= now:
  519
+                heapq.heappop(self.check_heap)
  520
+                del self.checks[resource_id]
  521
+                yield resource_id
  522
+
  523
+            else:
  524
+                break
  525
+
  526
+    @property
  527
+    def next_check_time(self):
  528
+        if self.check_heap:
  529
+            return self.check_heap[0][0]
  530
+        return None
38  epu/processdispatcher/engines.py
... ...
@@ -1,3 +1,5 @@
  1
+from epu.util import ensure_timedelta
  2
+
1 3
 DOMAIN_PREFIX = "pd_domain_"
2 4
 
3 5
 
@@ -40,7 +42,10 @@ def from_config(cls, config, default=None):
40 42
                 replicas=engine_conf.get('replicas', 1),
41 43
                 spare_slots=engine_conf.get('spare_slots', 0),
42 44
                 iaas_allocation=engine_conf.get('iaas_allocation', None),
43  
-                maximum_vms=engine_conf.get('maximum_vms', None))
  45
+                maximum_vms=engine_conf.get('maximum_vms', None),
  46
+                heartbeat_period=engine_conf.get('heartbeat_period', 30),
  47
+                heartbeat_warning=engine_conf.get('heartbeat_warning'),
  48
+                heartbeat_missing=engine_conf.get('heartbeat_missing'))
44 49
             registry.add(spec)
45 50
         return registry
46 51
 
@@ -64,10 +69,13 @@ def get_engine_by_id(self, engine):
64 69
         return self.by_engine[engine]
65 70
 
66 71
 
67  
-class EngineSpec(object):
  72
+_DEFAULT_HEARTBEAT_PERIOD = 30
  73
+
68 74
 
  75
+class EngineSpec(object):
69 76
     def __init__(self, engine_id, slots, base_need=0, config=None, replicas=1,
70  
-                 spare_slots=0, iaas_allocation=None, maximum_vms=None):
  77
+                 spare_slots=0, iaas_allocation=None, maximum_vms=None,
  78
+                 heartbeat_period=30, heartbeat_warning=45, heartbeat_missing=60):
71 79
         self.engine_id = engine_id
72 80
         self.config = config
73 81
         self.base_need = int(base_need)
@@ -94,3 +102,27 @@ def __init__(self, engine_id, slots, base_need=0, config=None, replicas=1,
94 102
             if maximum_vms < 0:
95 103
                 raise ValueError("maximum vms must be at least 0")
96 104
             self.maximum_vms = maximum_vms
  105
+
  106
+        self.heartbeat_period = heartbeat_period
  107
+        self.heartbeat_warning = heartbeat_warning
  108
+        self.heartbeat_missing = heartbeat_missing
  109
+
  110
+        if (heartbeat_missing is None or heartbeat_warning is None) and not (
  111
+                heartbeat_missing is None and heartbeat_warning is None):
  112
+            raise ValueError("All heartbeat parameters must be specified, or none")
  113
+
  114
+        if self.heartbeat_period is None:
  115
+            self.heartbeat_period = ensure_timedelta(_DEFAULT_HEARTBEAT_PERIOD)
  116
+        else:
  117
+            self.heartbeat_period = ensure_timedelta(self.heartbeat_period)
  118
+
  119
+        if self.heartbeat_missing is not None:
  120
+            self.heartbeat_warning = ensure_timedelta(self.heartbeat_warning)
  121
+            self.heartbeat_missing = ensure_timedelta(self.heartbeat_missing)
  122
+
  123
+            if self.heartbeat_period <= ensure_timedelta(0):
  124
+                raise ValueError("heartbeat_period must be a positive value")
  125
+            if self.heartbeat_warning <= self.heartbeat_period:
  126
+                raise ValueError("heartbeat_warning must be greater than heartbeat_period")
  127
+            if self.heartbeat_missing <= self.heartbeat_warning:
  128
+                raise ValueError("heartbeat_missing must be greater than heartbeat_warning")
7  epu/processdispatcher/matchmaker.py
@@ -174,6 +174,8 @@ def _get_domain_config(self, engine, initial_n=0):
174 174
         if engine_conf['provisioner_vars'].get('replicas') is None:
175 175
             engine_conf['provisioner_vars']['replicas'] = engine.replicas
176 176
 
  177
+        engine_conf['provisioner_vars']['heartbeat'] = engine.heartbeat_period.total_seconds()
  178
+
177 179
         engine_conf['preserve_n'] = initial_n
178 180
         return config
179 181
 
@@ -632,6 +634,11 @@ def get_available_resources(self):
632 634
         # first break down available resources by node
633 635
         available_by_node = defaultdict(list)
634 636
         for resource in self.resources.itervalues():
  637
+
  638
+            # only consider OK resources. We definitely don't want to consider
  639
+            # MISSING or DISABLED resources. We could arguably include WARNING
  640
+            # resources, but perhaps at a lower priority than OK.
  641
+
635 642
             if resource.state == ExecutionResourceState.OK and resource.available_slots:
636 643
                 node_id = resource.node_id
637 644
                 available_by_node[node_id].append(resource)
3  epu/processdispatcher/test/mocks.py
@@ -4,6 +4,7 @@
4 4
 import logging
5 5
 import threading
6 6
 from collections import defaultdict
  7
+from datetime import datetime
7 8
 
8 9
 from epu.epumanagement.conf import *  # noqa
9 10
 from epu.exceptions import NotFoundError
@@ -261,5 +262,7 @@ def minimum_time_between_starts_config(minimum_time=2):
261 262
 
262 263
 
263 264
 def make_beat(node_id, processes=None, timestamp=None):
  265
+    if timestamp and isinstance(timestamp, datetime):
  266
+        timestamp = timestamp.isoformat()
264 267
     return {"node_id": node_id, "processes": processes or [],
265 268
         "timestamp": timestamp or now_datetime().isoformat()}
141  epu/processdispatcher/test/test_doctor.py
@@ -2,28 +2,33 @@
2 2
 import logging
3 3
 import time
4 4
 import uuid
  5
+from datetime import timedelta, datetime
5 6
 
6  
-import epu.tevent as tevent
  7
+from mock import Mock
7 8
 
8  
-from epu.processdispatcher.doctor import PDDoctor
  9
+import epu.tevent as tevent
  10
+from epu.processdispatcher.doctor import PDDoctor, ExecutionResourceMonitor
9 11
 from epu.processdispatcher.core import ProcessDispatcherCore
10 12
 from epu.processdispatcher.modes import RestartMode
11 13
 from epu.processdispatcher.store import ProcessDispatcherStore, ProcessDispatcherZooKeeperStore
12 14
 from epu.processdispatcher.test.mocks import MockResourceClient, MockNotifier
13 15
 from epu.processdispatcher.store import ProcessRecord
14 16
 from epu.processdispatcher.engines import EngineRegistry, domain_id_from_engine
15  
-from epu.states import ProcessState, InstanceState, ProcessDispatcherState
  17
+from epu.states import ProcessState, InstanceState, ProcessDispatcherState, ExecutionResourceState
16 18
 from epu.processdispatcher.test.test_store import StoreTestMixin
17 19
 from epu.processdispatcher.test.mocks import nosystemrestart_process_config, make_beat
18 20
 from epu.test import ZooKeeperTestMixin
19 21
 from epu.test.util import wait
  22
+from epu.util import UTC
  23
+
20 24
 
21 25
 log = logging.getLogger(__name__)
22 26
 
23 27
 
24 28
 class PDDoctorTests(unittest.TestCase, StoreTestMixin):
25 29
 
26  
-    engine_conf = {'engine1': {'slots': 4}}
  30
+    engine_conf = {'engine1': {'slots': 4, 'heartbeat_period': 5,
  31
+                   'heartbeat_warning': 10, 'heartbeat_missing': 20}}
27 32
 
28 33
     def setUp(self):
29 34
         self.store = self.setup_store()
@@ -36,11 +41,14 @@ def setUp(self):
36 41
 
37 42
         self.docthread = None
38 43
 
  44
+        self.monitor = None
  45
+
39 46
     def tearDown(self):
40 47
         if self.docthread:
41 48
             self.doctor.cancel()
42 49
             self.docthread.join()
43 50
             self.docthread = None
  51
+
44 52
         self.teardown_store()
45 53
 
46 54
     def setup_store(self):
@@ -241,6 +249,125 @@ def test_initialized_not_system_boot(self):
241 249
         # we have nothing really to check here, yet. but at least we can make sure
242 250
         # the process is cancellable.
243 251
 
  252
+    def test_monitor_thread(self):
  253
+        self._run_in_thread()
  254
+
  255
+        assert self.store.wait_initialized(timeout=10)
  256
+        self.assertEqual(self.store.get_pd_state(),
  257
+                         ProcessDispatcherState.OK)
  258
+
  259
+        self.assertIsNotNone(self.doctor.monitor)
  260
+        monitor_thread = self.doctor.monitor_thread
  261
+        self.assertIsNotNone(monitor_thread)
  262
+        self.assertTrue(monitor_thread.is_alive())
  263
+
  264
+        # now cancel doctor. monitor should stop too
  265
+        self.doctor.cancel()
  266
+        wait(lambda: not monitor_thread.is_alive())
  267
+
  268
+    def _setup_resource_monitor(self):
  269
+        self.monitor = ExecutionResourceMonitor(self.core, self.store)
  270
+        return self.monitor
  271
+
  272
+    def _send_heartbeat(self, resource_id, node_id, timestamp):
  273
+        self.core.ee_heartbeat(resource_id, make_beat(node_id, timestamp=timestamp))
  274
+
  275
+    def assert_monitor_cycle(self, expected_delay, resource_states=None):
  276
+        self.assertEqual(expected_delay, self.monitor.monitor_cycle())
  277
+
  278
+        if resource_states:
  279
+            for resource_id, expected_state in resource_states.iteritems():
  280
+                found_state = self.store.get_resource(resource_id).state
  281
+                if found_state != expected_state:
  282
+                    self.fail("Resource %s state = %s. Expected %s" % (resource_id,
  283
+                        found_state, expected_state))
  284
+
  285
+    def test_resource_monitor(self):
  286
+        t0 = datetime(2012, 3, 13, 9, 30, 0, tzinfo=UTC)
  287
+        mock_now = Mock()
  288
+        mock_now.return_value = t0
  289
+
  290
+        def increment_now(seconds):
  291
+            t = mock_now.return_value + timedelta(seconds=seconds)
  292
+            mock_now.return_value = t
  293
+            log.debug("THE TIME IS NOW: %s", t)
  294
+            return t
  295
+
  296
+        monitor = self._setup_resource_monitor()
  297
+        monitor._now_func = mock_now
  298
+
  299
+        # before there are any resources, monitor should work but return a None delay
  300
+        self.assertIsNone(monitor.monitor_cycle())
  301
+
  302
+        self.core.node_state("node1", domain_id_from_engine("engine1"),
  303
+            InstanceState.RUNNING)
  304
+
  305
+        # 3 resources. all report in at t0
  306
+        r1, r2, r3 = "eeagent_1", "eeagent_2", "eeagent_3"
  307
+        self._send_heartbeat(r1, "node1", t0)
  308
+        self._send_heartbeat(r2, "node1", t0)
  309
+        self._send_heartbeat(r3, "node1", t0)
  310
+
  311
+        states = {r1: ExecutionResourceState.OK, r2: ExecutionResourceState.OK,
  312
+                  r3: ExecutionResourceState.OK}
  313
+
  314
+        self.assert_monitor_cycle(10, states)
  315
+
  316
+        t1 = increment_now(5)  # :05
  317
+        # heartbeat comes in for r1 5 seconds later
  318
+        self._send_heartbeat(r1, "node1", t1)
  319
+
  320
+        self.assert_monitor_cycle(5, states)
  321
+
  322
+        increment_now(5)  # :10
  323
+
  324
+        # no heartbeats for r2 and r3. they should be marked WARNING
  325
+        states[r2] = ExecutionResourceState.WARNING
  326
+        states[r3] = ExecutionResourceState.WARNING
  327
+        self.assert_monitor_cycle(5, states)
  328
+
  329
+        increment_now(4)  # :14
  330
+
  331
+        # r2 gets a heartbeat through, but its timestamp puts it still in the warning threshold
  332
+        self._send_heartbeat(r2, "node1", t0 + timedelta(seconds=1))
  333
+
  334
+        self.assert_monitor_cycle(1, states)
  335
+
  336
+        increment_now(6)  # :20
  337
+
  338
+        # r1 should go warning, r3 should go missing
  339
+        states[r1] = ExecutionResourceState.WARNING
  340
+        states[r3] = ExecutionResourceState.MISSING
  341
+        self.assert_monitor_cycle(4, states)
  342
+
  343
+        t2 = increment_now(3)  # :23
  344
+        self._send_heartbeat(r1, "node1", t2)
  345
+        states[r1] = ExecutionResourceState.OK
  346
+        self.assert_monitor_cycle(1, states)
  347
+
  348
+        t3 = increment_now(2)  # :25
  349
+        self._send_heartbeat(r3, "node1", t3)
  350
+        states[r2] = ExecutionResourceState.MISSING
  351
+        states[r3] = ExecutionResourceState.OK
  352
+        self.assert_monitor_cycle(8, states)
  353
+
  354
+        increment_now(5)  # :30
  355
+        # hearbeat r2 enough to go back to WARNING, but still late
  356
+        self.core.ee_heartbeat(r2, make_beat("node1", timestamp=t0 + timedelta(seconds=15)))
  357
+        self._send_heartbeat(r2, "node1", t0 + timedelta(seconds=15))
  358
+        states[r2] = ExecutionResourceState.WARNING
  359
+        self.assert_monitor_cycle(3, states)
  360
+
  361
+        t4 = increment_now(5)  # :35
  362
+        # disable r2 and heartbeat r1 and r3 (heartbeats arrive late, but that's ok)
  363
+        self._send_heartbeat(r1, "node1", t4)
  364
+        self._send_heartbeat(r3, "node1", t4)
  365
+        self.core.resource_change_state(self.store.get_resource(r2),
  366
+            ExecutionResourceState.DISABLED)
  367
+
  368
+        states[r2] = ExecutionResourceState.DISABLED
  369
+        self.assert_monitor_cycle(10, states)
  370
+
244 371
 
245 372
 class PDDoctorZooKeeperTests(PDDoctorTests, ZooKeeperTestMixin):
246 373
     def setup_store(self):
@@ -256,3 +383,9 @@ def teardown_store(self):
256 383
             self.store.shutdown()
257 384
 
258 385
         self.teardown_zookeeper()
  386
+
  387
+    def assert_monitor_cycle(self, *args, **kwargs):
  388
+        # occasionally ZK doesn't fire the watches quick enough.
  389
+        # so we add a little sleep before each cycle
  390
+        time.sleep(0.05)
  391
+        super(PDDoctorZooKeeperTests, self).assert_monitor_cycle(*args, **kwargs)
23  epu/states.py
@@ -256,5 +256,28 @@ class ProcessDispatcherState(object):
256 256
 class ExecutionResourceState(object):
257 257
 
258 258
     OK = "OK"
  259
+    """Resource is active and healthy
  260
+    """
  261
+
  262
+    WARNING = "WARNING"
  263
+    """The resource is under suspicion due to missing or late heartbeats
  264
+
  265
+    Running processes are not rescheduled yet, but the resource is not
  266
+    assigned any new processes while in this state. Note: This could later
  267
+    be refined to allow processes, but only if there are no compatible slots
  268
+    available on healthy resources.
  269
+    """
  270
+
  271
+    MISSING = "MISSING"
  272
+    """The resource has been declared dead by the PD Doctor due to a prolonged
  273
+    lack of heartbeats.
  274
+
  275
+    Running processes on the resource have been rescheduled (if applicable)
  276
+    and the resource is ineligible for running new processes. If the resource
  277
+    resumes sending heartbeats, it will be returned to the OK state and made
  278
+    available for processes.
  279
+    """
259 280
 
260 281
     DISABLED = "DISABLED"
  282
+    """The resource has been disabled, likely in advance of being terminated
  283
+    """
1  epu/util.py
@@ -126,4 +126,3 @@ def ensure_timedelta(t):
126 126
         return timedelta(seconds=t)
127 127
 
128 128
     raise TypeError("cannot convert %s to timedelta" % (t,))
129  
-

0 notes on commit 3493c65

Please sign in to comment.
Something went wrong with that request. Please try again.