Permalink
Browse files

Fix PD node retire behavior

  • Loading branch information...
1 parent 01109f3 commit 2ceaf9afba61500b95fd624b6a5ea6e924dbc414 @labisso labisso committed with oldpatricka Mar 12, 2012
Showing with 40 additions and 38 deletions.
  1. +20 −4 epu/processdispatcher/matchmaker.py
  2. +20 −34 epu/processdispatcher/test/test_processdispatcher_service.py
@@ -285,6 +285,20 @@ def _backout_resource_assignment(self, resource, process):
return resource, removed
+ def _set_resource_enabled_state(self, resource, enabled):
+ updated = False
+ while resource and resource.enabled != enabled:
+ resource.enabled = enabled
+ try:
+ self.store.update_resource(resource)
+ updated = True
+ except WriteConflictError:
+ resource = self.store.get_resource(resource.resource_id)
+ except NotFoundError:
+ resource = None
+
+ return resource, updated
+
def _mark_process_waiting(self, process):
# update process record to indicate queuing state. if writes conflict
@@ -351,12 +365,14 @@ def register_needs(self):
# on scale down, request for specific nodes to be terminated
if need < self.registered_need:
- retirables = (r.node_id for r in self.resources.itervalues()\
- if not r.assigned)
+ retirables = (r for r in self.resources.itervalues()
+ if r.enabled and not r.assigned)
retirees = list(islice(retirables, self.registered_need - need))
for retiree in retirees:
- log.debug("Retiring empty node %s", retiree)
- self.epum_client.retire_node(retiree)
+ log.debug("Retiring empty node %s", retiree.node_id)
+ self.epum_client.retire_node(retiree.node_id)
+
+ self._set_resource_enabled_state(retiree, False)
log.debug("Registering need for %d instances of DT %s (was %s)", need,
self.engine.deployable_type, self.registered_need)
@@ -1,6 +1,7 @@
import logging
import unittest
from collections import defaultdict
+import random
import gevent
from dashi import bootstrap, DashiConnection
@@ -362,63 +363,48 @@ def test_process_exited(self):
self._wait_assert_pd_dump(self._assert_process_states,
ProcessState.EXITED, [proc])
- def test_neediness(self):
+ def test_neediness(self, process_count=20, node_count=5):
- # submit 8 processes. 2 needs should be registered
spec = {"run_type":"hats", "parameters": {}}
- procs = ["proc" + str(i) for i in range(8)]
+ procs = ["proc" + str(i) for i in range(process_count)]
for proc in procs:
procstate = self.client.dispatch_process(proc, spec, None)
self.assertEqual(procstate['upid'], proc)
self._wait_assert_pd_dump(self._assert_process_states,
ProcessState.WAITING, procs)
- self.epum_client.assert_needs("dt1", [0,1,2])
+ self.epum_client.assert_needs("dt1", range(node_count+1))
self.epum_client.clear()
# now provide nodes and resources, processes should start
- node1 = "node1"
- self.client.dt_state(node1, "dt1", InstanceState.RUNNING)
- self._spawn_eeagent(node1, 4)
-
- node1_procs = procs[:4]
- self._wait_assert_pd_dump(self._assert_process_states,
- ProcessState.RUNNING, node1_procs)
+ nodes = ["node" + str(i) for i in range(node_count)]
+ for node in nodes:
+ self.client.dt_state(node, "dt1", InstanceState.RUNNING)
- node2 = "node2"
- self.client.dt_state(node2, "dt1", InstanceState.RUNNING)
- self._spawn_eeagent(node2, 4)
+ for node in nodes:
+ self._spawn_eeagent(node, 4)
self._wait_assert_pd_dump(self._assert_process_states,
ProcessState.RUNNING, procs)
- # now kill all processes on node1
- for proc in node1_procs:
+ # now kill all processes in a random order
+ killlist = list(procs)
+ random.shuffle(killlist)
+ for proc in killlist:
self.client.terminate_process(proc)
- self._wait_assert_pd_dump(self._assert_process_states,
- ProcessState.TERMINATED, node1_procs)
- # we should get a retire node request and new need
- with self.epum_client.condition:
- if not self.epum_client.retires:
- self.epum_client.condition.wait(5)
- self.assertEqual(self.epum_client.retires, [node1])
- self.epum_client.assert_needs("dt1", [1])
- self.epum_client.clear()
-
- # terminate that node
- self.client.dt_state(node1, "dt1", InstanceState.TERMINATED)
-
- # kill the rest of the procs
- for proc in procs[4:]:
- self.client.terminate_process(proc)
self._wait_assert_pd_dump(self._assert_process_states,
ProcessState.TERMINATED, procs)
- self.assertEqual(self.epum_client.retires, [node2])
- self.epum_client.assert_needs("dt1", [0])
+ # all nodes should be retired
+ with self.epum_client.condition:
+ for _ in range(len(nodes)):
+ if len(self.epum_client.retires) != len(nodes):
+ self.epum_client.condition.wait(0.1)
+ self.assertEqual(set(self.epum_client.retires), set(nodes))
+
class RabbitProcessDispatcherServiceTests(ProcessDispatcherServiceTests):

0 comments on commit 2ceaf9a

Please sign in to comment.