Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Update matchmaker pending process test to be more thorough

  • Loading branch information...
commit 0c73433febb4a60a750274938b147b88f0099195 1 parent 50b7346
@oldpatricka oldpatricka authored
View
9 epu/processdispatcher/matchmaker.py
@@ -198,7 +198,12 @@ def _get_pending_processes(self):
if self._get_pd_state() == ProcessDispatcherState.SYSTEM_BOOTING:
- self.unscheduled_pending_processes = []
+ if self.unscheduled_pending_processes != []:
+ # This list shouldn't change while the system is booting
+ # so if it is set, we can safely skip querying the store
+ # for processes
+ return
+
process_ids = self.store.get_process_ids()
for process_id in process_ids:
process = self.store.get_process(process_id[0], process_id[1])
@@ -633,6 +638,8 @@ def calculate_need(self, engine_id):
def register_needs(self):
+ self._get_pending_processes()
+
for engine in list(self.ee_registry):
engine_id = engine.engine_id
View
71 epu/processdispatcher/test/test_matchmaker.py
@@ -573,6 +573,8 @@ def create_n_pending_processes(self, n_processes, engine_id):
p = ProcessRecord.new(None, upid, get_process_definition(),
ProcessState.UNSCHEDULED_PENDING, constraints={'engine': engine_id})
self.store.add_process(p)
+ pkey = p.get_key()
+ pkeys.append(pkey)
return pkeys
def enqueue_pending_processes(self):
@@ -580,11 +582,18 @@ def enqueue_pending_processes(self):
Normally, this would be done by the doctor in a full system
"""
+ pkeys = []
for pid in self.store.get_process_ids():
owner, upid = pid
- proc = self.store.get_process(owner, upid)
- if proc.state == ProcessState.UNSCHEDULED_PENDING:
- self.store.enqueue_process(owner, upid, proc.round)
+ process = self.store.get_process(owner, upid)
+ if process.state == ProcessState.UNSCHEDULED_PENDING:
+ pkey = process.get_key()
+ process.state = ProcessState.REQUESTED
+ self.store.update_process(process)
+ self.store.enqueue_process(*pkey)
+ self.mm.queued_processes.append(pkey)
+ pkeys.append(pkey)
+ return pkeys
def create_engine_resources(self, engine_id, node_count=1, assignments=None):
engine_spec = self.registry.get_engine_by_id(engine_id)
@@ -655,7 +664,6 @@ def test_needs(self):
self.assert_one_reconfigure(engine3_domain_id, 3, [])
self.assert_one_reconfigure(engine4_domain_id, 11, [])
self.epum_client.clear()
- print self.mm.queued_processes
# now add some resources with assigned processes
# and removed queued processes. need shouldn't change.
@@ -720,10 +728,10 @@ def test_needs(self):
def test_needs_unscheduled_pending(self):
# engine1 has 1 slot, expect a VM per process
- self.create_n_pending_processes(10, "engine1")
+ engine1_pending_procs = self.create_n_pending_processes(10, "engine1")
# engine2 has 2 slots, expect a VM per 2 processes
- self.create_n_pending_processes(10, "engine2")
+ engine2_pending_procs = self.create_n_pending_processes(10, "engine2")
# Normally this is done by the doctor, but we do it manually here,
# since there is no doctor in this test env
@@ -752,10 +760,10 @@ def test_needs_unscheduled_pending(self):
self.epum_client.clear()
# engine1 has 1 slot, expect a VM per process
- self.enqueue_n_processes(10, "engine1")
+ engine1_queued_procs = self.enqueue_n_processes(10, "engine1")
# engine2 has 2 slots, expect a VM per 2 processes
- self.enqueue_n_processes(10, "engine2")
+ engine2_queued_procs = self.enqueue_n_processes(10, "engine2")
self.mm.register_needs()
@@ -766,12 +774,59 @@ def test_needs_unscheduled_pending(self):
# When we enqueue the procs and mark the PD OK
self.enqueue_pending_processes()
+ self.mm.register_needs()
+
+ # The matchmaker won't have checked for the updated pending procs
+ self.assertEqual(len(self.mm.unscheduled_pending_processes), 20)
+
+ # But there should be no change to requested VMs, since we should
+ # deduplicate processes
+ self.assertFalse(self.epum_client.reconfigures)
+
self.store.set_pd_state(ProcessDispatcherState.OK)
self.mm.register_needs()
+ # The matchmaker should have no pending processes
+ self.assertEqual(len(self.mm.unscheduled_pending_processes), 0)
+
# There should be no change to requested VMs
self.assertFalse(self.epum_client.reconfigures)
+
+ self.epum_client.clear()
+
+ # now add some resources with assigned processes
+ # and removed queued processes. need shouldn't change.
+ engine1_procs = engine1_queued_procs + engine1_pending_procs
+ engine2_procs = engine2_queued_procs + engine2_pending_procs
+ engine1_resources = self.create_engine_resources("engine1",
+ node_count=20, assignments=engine1_procs)
+ self.assertEqual(len(engine1_resources), 20)
+ engine2_resources = self.create_engine_resources("engine2",
+ node_count=10, assignments=engine2_procs)
+ self.assertEqual(len(engine2_resources), 10)
+ self.mm.queued_processes = []
+
+ self.mm.register_needs()
+ self.assertFalse(self.epum_client.reconfigures)
+
+ # empty resources from engine1. all nodes should be terminated.
+ engine1_retirees = set()
+ for resource in engine1_resources:
+ engine1_retirees.add(resource.node_id)
+ resource.assigned = []
+
+ # empty resources from engine2. all nodes should be terminated
+ engine2_retirees = set()
+ for resource in engine2_resources:
+ engine2_retirees.add(resource.node_id)
+ resource.assigned = []
+
+ self.mm.register_needs()
+
+ # we should see for the queued and pending procs
+ self.assert_one_reconfigure(engine1_domain_id, 0, engine1_retirees)
+ self.assert_one_reconfigure(engine2_domain_id, 0, engine2_retirees)
self.epum_client.clear()
def test_needs_duplicate_process(self):
View
5 epu/processdispatcher/test/test_processdispatcher_service.py
@@ -1354,11 +1354,6 @@ def test_restart_system_boot(self):
self.notifier.wait_for_state('p5', ProcessState.UNSCHEDULED_PENDING)
self.notifier.wait_for_state('p6', ProcessState.TERMINATED)
- # check that the matchmaker still needs an engine 1 for the
- # UNSCHEDULED_PENDING processes
- self.assertEqual(self.pd.matchmaker.registered_needs,
- {'engine1': 1, 'engine2': 0})
-
# add resources back
self.client.node_state("node1", domain_id_from_engine("engine1"),
InstanceState.RUNNING)
Please sign in to comment.
Something went wrong with that request. Please try again.