Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add node_exclusive feature to pd dispatch

  • Loading branch information...
commit 14158bc40c440e737356dcc263d44c6cbdf157df 1 parent bb12bf2
@oldpatricka oldpatricka authored
View
10 epu/dashiproc/processdispatcher.py
@@ -113,10 +113,10 @@ def remove_definition(self, definition_id):
def dispatch_process(self, upid, spec, subscribers, constraints,
immediate=False, queueing_mode=None, restart_mode=None,
- execution_engine_id=None):
+ execution_engine_id=None, node_exclusive=None):
result = self.core.dispatch_process(None, upid, spec, subscribers,
constraints, immediate=immediate, queueing_mode=queueing_mode,
- restart_mode=restart_mode,
+ restart_mode=restart_mode, node_exclusive=node_exclusive,
execution_engine_id=execution_engine_id)
return self._make_process_dict(result)
@@ -226,11 +226,13 @@ def remove_definition(self, definition_id):
def dispatch_process(self, upid, spec, subscribers, constraints=None,
immediate=False, queueing_mode=None,
- restart_mode=None, execution_engine_id=None):
+ restart_mode=None, execution_engine_id=None,
+ node_exclusive=None):
request = dict(upid=upid, spec=spec, immediate=immediate,
subscribers=subscribers, constraints=constraints,
queueing_mode=queueing_mode, restart_mode=restart_mode,
- execution_engine_id=execution_engine_id)
+ execution_engine_id=execution_engine_id,
+ node_exclusive=node_exclusive)
return self.dashi.call(self.topic, "dispatch_process", args=request)
View
15 epu/processdispatcher/core.py
@@ -74,7 +74,7 @@ def remove_definition(self, definition_id):
def dispatch_process(self, owner, upid, spec, subscribers, constraints=None,
immediate=False, queueing_mode=None, restart_mode=None,
- execution_engine_id=None):
+ execution_engine_id=None, node_exclusive=None):
"""Dispatch a new process into the system
@param upid: unique process identifier
@@ -85,6 +85,7 @@ def dispatch_process(self, owner, upid, spec, subscribers, constraints=None,
@param queueing_mode: when a process can be queued
@param restart_mode: when and if failed/terminated procs should be restarted
@param execution_engine_id: dispatch a process to a specific eea
+ @param node_exclusive: property that will only be permitted once on a node
@rtype: ProcessRecord
@return: description of process launch status
@@ -104,7 +105,8 @@ def dispatch_process(self, owner, upid, spec, subscribers, constraints=None,
process = ProcessRecord.new(owner, upid, spec, ProcessState.REQUESTED,
constraints, subscribers, immediate=immediate,
- queueing_mode=queueing_mode, restart_mode=restart_mode)
+ queueing_mode=queueing_mode, restart_mode=restart_mode,
+ node_exclusive=node_exclusive)
existed = False
try:
@@ -487,10 +489,19 @@ def ee_heartbeart(self, sender, beat):
and process.state < ProcessState.TERMINATED):
new_assigned.append(key)
+ new_node_exclusive = []
+ for owner, upid, round in new_assigned:
+ process = self.store.get_process(owner, upid)
+ if process.node_exclusive is not None:
+ new_node_exclusive.append(process.node_exclusive)
+
if len(new_assigned) != len(resource.assigned):
log.debug("updating resource %s assignments. was %s, now %s",
resource.resource_id, resource.assigned, new_assigned)
resource.assigned = new_assigned
+ log.debug("updating resource %s node_exclusive. was %s, now %s",
+ resource.resource_id, resource.node_exclusive, new_node_exclusive)
+ resource.node_exclusive = new_node_exclusive
try:
self.store.update_resource(resource)
except (WriteConflictError, NotFoundError):
View
5 epu/processdispatcher/matchmaker.py
@@ -293,6 +293,8 @@ def matchmake(self):
if not matched_resource.is_assigned(owner, upid, round):
matched_resource.assigned.append((owner, upid, round))
+ if process.node_exclusive:
+ matched_resource.node_exclusive.append(process.node_exclusive)
try:
self.store.update_resource(matched_resource)
except WriteConflictError:
@@ -537,6 +539,9 @@ def matchmake_process(process, resources):
matched = None
for resource in resources:
log.debug("checking %s", resource.resource_id)
+ if not resource.node_exclusive_available(process.node_exclusive):
+ continue
+
if match_constraints(process.constraints, resource.properties):
matched = resource
break
View
16 epu/processdispatcher/store.py
@@ -1111,7 +1111,8 @@ class ProcessRecord(Record):
@classmethod
def new(cls, owner, upid, spec, state, constraints=None,
subscribers=None, round=0, immediate=False, assigned=None,
- hostname=None, queueing_mode=None, restart_mode=None):
+ hostname=None, queueing_mode=None, restart_mode=None,
+ node_exclusive=None):
if constraints:
const = constraints.copy()
else:
@@ -1121,7 +1122,7 @@ def new(cls, owner, upid, spec, state, constraints=None,
state=state, round=int(round), immediate=bool(immediate),
constraints=const, assigned=assigned, hostname=hostname,
queueing_mode=queueing_mode, restart_mode=restart_mode,
- starts=starts)
+ starts=starts, node_exclusive=node_exclusive)
return cls(d)
def get_key(self):
@@ -1148,9 +1149,18 @@ def new(cls, resource_id, node_id, slot_count, properties=None,
props['resource_id'] = resource_id
d = dict(resource_id=resource_id, node_id=node_id, enabled=enabled,
- slot_count=int(slot_count), properties=props, assigned=[])
+ slot_count=int(slot_count), properties=props, assigned=[],
+ node_exclusive=[])
return cls(d)
+ def node_exclusive_available(self, attr):
+ if attr is None:
+ return True
+ elif attr not in self.node_exclusive:
+ return True
+ else:
+ return False
+
@property
def available_slots(self):
return max(0, self.slot_count - len(self.assigned))
View
36 epu/processdispatcher/test/test_matchmaker.py
@@ -161,33 +161,43 @@ def test_match1(self):
lambda p: p.assigned == r1.resource_id and
p.state == ProcessState.PENDING)
- def test_match_requested_resource(self):
- # TODO: What will happen if the wanted node is full?
- # will we start nodes unneccesarily?
+ def test_node_exclusive(self):
self._run_in_thread()
props = {"engine": "engine1"}
- r1 = ResourceRecord.new("r1", "n1", 1, properties=props)
+ r1 = ResourceRecord.new("r1", "n1", 2, properties=props)
self.store.add_resource(r1)
- resource_wanted = "r2"
- r2 = ResourceRecord.new(resource_wanted, "n1", 1, properties=props)
- self.store.add_resource(r2)
- constraints = {"resource_id": resource_wanted}
+ xattr = "port5000"
+ constraints = {}
p1 = ProcessRecord.new(None, "p1", get_process_spec(),
- ProcessState.REQUESTED, constraints=constraints)
+ ProcessState.REQUESTED, constraints=constraints,
+ node_exclusive=xattr)
p1key = p1.get_key()
self.store.add_process(p1)
-
self.store.enqueue_process(*p1key)
- self.wait_resource(resource_wanted, lambda r: list(p1key) in r.assigned)
+ # The first process should be assigned, since nothing else needs this
+ # attr
+ self.wait_resource(r1.resource_id, lambda r: list(p1key) in r.assigned)
gevent.sleep(0.05)
- self.resource_client.check_process_launched(p1, resource_wanted)
+ self.resource_client.check_process_launched(p1, r1.resource_id)
self.wait_process(p1.owner, p1.upid,
- lambda p: p.assigned == resource_wanted and
+ lambda p: p.assigned == r1.resource_id and
p.state == ProcessState.PENDING)
+ p2 = ProcessRecord.new(None, "p2", get_process_spec(),
+ ProcessState.REQUESTED, constraints=constraints,
+ node_exclusive=xattr)
+ p2key = p2.get_key()
+ self.store.add_process(p2)
+ self.store.enqueue_process(*p2key)
+
+ # The second process should wait, since first process wants this attr
+ # as well
+ self.wait_process(p2.owner, p2.upid,
+ lambda p: p.state == ProcessState.WAITING)
+
def test_match_copy_hostname(self):
self._run_in_thread()
View
74 epu/processdispatcher/test/test_processdispatcher_service.py
@@ -227,6 +227,80 @@ def test_requested_ee(self):
self._wait_assert_pd_dump(self._assert_process_states,
ProcessState.RUNNING, ["proc3"])
+ def test_node_exclusive(self):
+ node = "node1"
+ node_properties = dict(engine="fedora")
+ self.client.dt_state(node, "dt1", InstanceState.RUNNING,
+ node_properties)
+
+ eeagent = self._spawn_eeagent(node, 4)
+
+ spec = {"run_type": "hats", "parameters": {}}
+ exclusive_attr = "hamsandwich"
+ constraints = {}
+ queued = []
+ rejected = []
+
+ proc1_queueing_mode = QueueingMode.ALWAYS
+
+ # Process should be scheduled, since no other procs have its
+ # exclusive attribute
+ self.client.dispatch_process("proc1", spec, None, constraints,
+ queueing_mode=proc1_queueing_mode,
+ node_exclusive=exclusive_attr)
+
+ self.notifier.wait_for_state("proc1", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc1"])
+
+ # Process should be queued, because proc1 has the same attribute
+ self.client.dispatch_process("proc2", spec, None, constraints,
+ queueing_mode=proc1_queueing_mode,
+ node_exclusive=exclusive_attr)
+
+ queued.append("proc2")
+ self._wait_assert_pd_dump(self._assert_process_distribution,
+ queued=queued)
+
+ # Now kill the first process, and proc2 should run.
+ self.client.terminate_process("proc1")
+ queued.remove("proc2")
+ self.notifier.wait_for_state("proc2", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc2"])
+
+ # Process should be queued, because proc2 has the same attribute
+ self.client.dispatch_process("proc3", spec, None, constraints,
+ queueing_mode=proc1_queueing_mode,
+ node_exclusive=exclusive_attr)
+
+ queued.append("proc3")
+ self._wait_assert_pd_dump(self._assert_process_distribution,
+ queued=queued)
+
+ # Process should be scheduled, since no other procs have its
+ # exclusive attribute
+ other_exclusive_attr = "hummussandwich"
+ self.client.dispatch_process("proc4", spec, None, constraints,
+ queueing_mode=proc1_queueing_mode,
+ node_exclusive=other_exclusive_attr)
+
+ self.notifier.wait_for_state("proc4", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc4"])
+
+ # Now that we've started another node, waiting node should start
+ node = "node2"
+ node_properties = dict(engine="fedora")
+ self.client.dt_state(node, "dt1", InstanceState.RUNNING,
+ node_properties)
+
+ eeagent = self._spawn_eeagent(node, 4)
+
+ self.notifier.wait_for_state("proc3", ProcessState.RUNNING)
+ self._wait_assert_pd_dump(self._assert_process_states,
+ ProcessState.RUNNING, ["proc3"])
+
def test_queueing(self):
#submit some processes before there are any resources available
Please sign in to comment.
Something went wrong with that request. Please try again.