Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Draft minimum free slots implementation

  • Loading branch information...
commit edaa7aa1924e9ca22bcf7538e7e0cdd0b93a80a1 1 parent 8c6806b
@oldpatricka oldpatricka authored
View
23 epu/processdispatcher/engines.py
@@ -1,5 +1,6 @@
DOMAIN_PREFIX = "pd_domain_"
+
def engine_id_from_domain(domain_id):
if not (domain_id and domain_id.startswith(DOMAIN_PREFIX)):
raise ValueError("domain_id %s doesn't have expected prefix %s" % (
@@ -9,6 +10,7 @@ def engine_id_from_domain(domain_id):
raise ValueError("domain_id has empty engine id")
return engine_id
+
def domain_id_from_engine(engine_id):
if not engine_id and not isinstance(engine_id, basestring):
raise ValueError("invalid engine_id %s" % engine_id)
@@ -33,8 +35,10 @@ def from_config(cls, config, default=None):
registry = cls(default=default)
for engine_id, engine_conf in config.iteritems():
spec = EngineSpec(engine_id, engine_conf['slots'],
- engine_conf.get('base_need', 0),
- engine_conf.get('config'), engine_conf.get('replicas', 1))
+ base_need=engine_conf.get('base_need', 0),
+ config=engine_conf.get('config'),
+ replicas=engine_conf.get('replicas', 1),
+ minimum_free_slots=engine_conf.get('minimum_free_slots', 0))
registry.add(spec)
return registry
@@ -59,15 +63,24 @@ def get_engine_by_id(self, engine):
class EngineSpec(object):
- def __init__(self, engine_id, slots, base_need=0, config=None, replicas=1):
+
+ def __init__(self, engine_id, slots, base_need=0, config=None, replicas=1,
+ minimum_free_slots=0):
self.engine_id = engine_id
+ self.config = config
+ self.base_need = int(base_need)
+
slots = int(slots)
if slots < 1:
raise ValueError("slots must be a positive integer")
self.slots = slots
- self.config = config
- self.base_need = int(base_need)
+
replicas = int(replicas)
if replicas < 1:
raise ValueError("replicas must be a positive integer")
self.replicas = replicas
+
+ minimum_free_slots = int(minimum_free_slots)
+ if minimum_free_slots < 0:
+ raise ValueError("minimum must be at least 0")
+ self.minimum_free_slots = minimum_free_slots
View
22 epu/processdispatcher/matchmaker.py
@@ -326,7 +326,8 @@ def matchmake(self):
matched_resource.node_id)
return
- log.debug("updating %s with node_exclusive %s for %s" % (matched_node.node_id, process.node_exclusive, process.upid))
+ log.debug("updating %s with node_exclusive %s for %s" % (
+ matched_node.node_id, process.node_exclusive, process.upid))
matched_node.node_exclusive.append(process.node_exclusive)
try:
@@ -487,13 +488,15 @@ def _mark_process_waiting(self, process):
process.state = ProcessState.WAITING
else:
process.state = ProcessState.REJECTED
- log.info("Process %s: no available slots. REJECTED due to START_ONLY queueing mode, and process has started before.",
- process.upid)
+ log.info("Process %s: no available slots. REJECTED due to "
+ "START_ONLY queueing mode, and process has "
+ "started before.", process.upid)
elif process.queueing_mode == QueueingMode.RESTART_ONLY:
if process.starts == 0:
process.state = ProcessState.REJECTED
- log.info("Process %s: no available slots. REJECTED due to RESTART_ONLY queueing mode, and process hasn't started before.",
- process.upid)
+ log.info("Process %s: no available slots. REJECTED due to "
+ "RESTART_ONLY queueing mode, and process hasn't "
+ "started before.", process.upid)
else:
log.info("Process %s: no available slots. WAITING in queue",
process.upid)
@@ -585,8 +588,9 @@ def calculate_need(self, engine_id):
# by the current process set
engine = self.engine(engine_id)
- # total number of unique runnable processes in the system
- process_count = len(process_set)
+ # total number of unique runnable processes in the system plus
+ # minimum free slots
+ process_count = len(process_set) + engine.minimum_free_slots
process_need = int(ceil(process_count / float(engine.slots * engine.replicas)))
need = max(engine.base_need, len(occupied_node_set), process_need)
@@ -643,7 +647,9 @@ def matchmake_process(self, process, node_containers):
log.warning("Can't find node %s?", node_id)
continue
if not node.node_exclusive_available(process.node_exclusive):
- log.debug("Process %s with node_exclusive %s is not being matched to %s, which has this attribute" % (process.upid, process.node_exclusive, node_id))
+ log.debug("Process %s with node_exclusive %s is not being "
+ "matched to %s, which has this attribute" % (
+ process.upid, process.node_exclusive, node_id))
continue
# now inspect each resource in the node looking for a match
View
36 epu/processdispatcher/test/test_matchmaker.py
@@ -36,6 +36,10 @@ class PDMatchmakerTests(unittest.TestCase, StoreTestMixin):
'engine3': {
'slots': 2,
'replicas': 2
+ },
+ 'engine4': {
+ 'slots': 1,
+ 'minimum_free_slots': 1
}
}
@@ -602,12 +606,15 @@ def test_needs(self):
self.mm.register_needs()
for engine_id in self.engine_conf:
domain_id = domain_id_from_engine(engine_id)
- self.assert_one_reconfigure(domain_id, 0, [])
+ engine = self.engine_conf[engine_id]
+ if engine.get('minimum_free_slots', 0) == 0:
+ self.assert_one_reconfigure(domain_id, 0, [])
self.epum_client.clear()
engine1_domain_id = domain_id_from_engine("engine1")
engine2_domain_id = domain_id_from_engine("engine2")
engine3_domain_id = domain_id_from_engine("engine3")
+ engine4_domain_id = domain_id_from_engine("engine4")
# engine1 has 1 slot and 1 replica per node, expect a VM per process
engine1_procs = self.enqueue_n_processes(10, "engine1")
@@ -618,10 +625,15 @@ def test_needs(self):
# engine3 has 2 slots and 2 replicas per node, expect a VM per 4 processes
engine3_procs = self.enqueue_n_processes(10, "engine3")
+ # engine4 has 2 slots and 1 replica per node, and a
+ # minimum of 1 free slot, expect a VM per process + 1
+ engine4_procs = self.enqueue_n_processes(10, "engine4")
+
self.mm.register_needs()
self.assert_one_reconfigure(engine1_domain_id, 10, [])
self.assert_one_reconfigure(engine2_domain_id, 5, [])
self.assert_one_reconfigure(engine3_domain_id, 3, [])
+ self.assert_one_reconfigure(engine4_domain_id, 11, [])
self.epum_client.clear()
print self.mm.queued_processes
@@ -636,6 +648,9 @@ def test_needs(self):
engine3_resources = self.create_engine_resources("engine3",
node_count=3, assignments=engine3_procs)
self.assertEqual(len(engine3_resources), 6)
+ engine4_resources = self.create_engine_resources("engine4",
+ node_count=11, assignments=engine4_procs)
+ self.assertEqual(len(engine4_resources), 11)
self.mm.queued_processes = []
self.mm.register_needs()
@@ -660,6 +675,16 @@ def test_needs(self):
resource.assigned = []
engine3_retirees = set([engine3_resources[0].node_id])
+ # empty 2 resources from engine4. 2 nodes should be terminated
+ engine4_retirees = set()
+ for resource in engine4_resources:
+ if len(resource.assigned) > 0:
+ engine4_retirees.add(resource.node_id)
+ resource.assigned = []
+
+ if len(engine4_retirees) >= 2:
+ break
+
self.mm.register_needs()
self.assert_one_reconfigure(engine1_domain_id, 8,
engine1_retirees)
@@ -667,6 +692,9 @@ def test_needs(self):
engine2_retirees)
self.assert_one_reconfigure(engine3_domain_id, 2,
engine3_retirees)
+ # Note that we cannot check which nodes have retired, since the spare
+ # one may be terminated
+ self.assert_one_reconfigure(engine4_domain_id, 9)
self.epum_client.clear()
def test_needs_duplicate_process(self):
@@ -687,7 +715,9 @@ def test_needs_duplicate_process(self):
self.mm.register_needs()
for engine_id in self.engine_conf:
domain_id = domain_id_from_engine(engine_id)
- self.assert_one_reconfigure(domain_id, 0, [])
+ engine = self.engine_conf[engine_id]
+ if engine.get('minimum_free_slots', 0) == 0:
+ self.assert_one_reconfigure(domain_id, 0, [])
self.epum_client.clear()
engine1_domain_id = domain_id_from_engine("engine1")
@@ -891,7 +921,7 @@ def test_stale_optimization(self):
if optimized_time > 0:
ratio = unoptimized_time / optimized_time
print "Unoptimised Time: %s Optimised Time: %s ratio: %s" % (
- unoptimized_time, optimized_time, ratio)
+ unoptimized_time, optimized_time, ratio)
self.assertTrue(ratio >= 100,
"Our optimized matchmake didn't have a 100 fold improvement")
else:
Please sign in to comment.
Something went wrong with that request. Please try again.