Permalink
Browse files

More WIP

  • Loading branch information...
1 parent 2b36764 commit ae49cd0bc51116f1508a7e597eb7bbdc0057d683 @labisso labisso committed Apr 1, 2013
Showing with 22 additions and 31 deletions.
  1. +22 −31 epu/processdispatcher/core.py
@@ -54,17 +54,9 @@ class ProcessDispatcherCore(object):
"""
- def __init__(self, store, ee_registry, eeagent_client, notifier):
- """
-
- @param store:
- @type store: ProcessDispatcherStore
- @param ee_registry:
- @param eeagent_client:
- @param notifier:
- @return:
- """
+ def __init__(self, store, sync, ee_registry, eeagent_client, notifier):
self.store = store
+ self.sync = sync
self.ee_registry = ee_registry
self.eeagent_client = eeagent_client
self.notifier = notifier
@@ -77,7 +69,7 @@ def set_system_boot(self, system_boot):
"""
if system_boot is not False and system_boot is not True:
raise BadRequestError("expected a boolean value for system boot")
- self.store.set_system_boot(system_boot)
+ self.sync.set_system_boot(system_boot)
def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):
@@ -262,7 +254,7 @@ def schedule_process(self, owner, upid, definition_id=None,
# queue. this is harmless and provides an easy way to "nudge" matchmaking
# along in the face of bugs.
log.debug("Enqueing process %s", upid)
- self.store.enqueue_process(owner, upid, process.round)
+ self.sync.enqueue_process(owner, upid, process.round)
return process
@@ -377,7 +369,7 @@ def terminate_process(self, owner, upid):
# also try to remove process from queue
try:
- self.store.remove_queued_process(process.owner,
+ self.sync.remove_queued_process(process.owner,
process.upid, process.round)
except NotFoundError:
pass
@@ -456,21 +448,21 @@ def evacuate_node(self, node, is_system_restart=False,
for resource_id in node.resources:
resource = self.store.get_resource(resource_id)
+ shadow_resource = self.sync.get_shadow_resource(resource_id)
- if not resource:
+ if not (resource and shadow_resource):
log.warn("Node has unknown resource %s", node_id, resource_id)
continue
- else:
- # mark resource ineligible for scheduling
- self._disable_resource(resource)
+ # mark resource ineligible for scheduling
+ self._disable_resource(shadow_resource)
- # go through and reschedule processes as needed
- for owner, upid, round in resource.assigned:
- self._evacuate_process(owner, upid, resource,
- is_system_restart=is_system_restart,
- dead_process_state=dead_process_state,
- rescheduled_process_state=rescheduled_process_state)
+ # go through and reschedule processes as needed
+ for owner, upid, round in shadow_resource.assigned:
+ self._evacuate_process(owner, upid,
+ is_system_restart=is_system_restart,
+ dead_process_state=dead_process_state,
+ rescheduled_process_state=rescheduled_process_state)
try:
self.store.remove_resource(resource_id)
@@ -482,15 +474,15 @@ def evacuate_node(self, node, is_system_restart=False,
except NotFoundError:
pass
- def _disable_resource(self, resource):
- while resource.enabled:
- resource.enabled = False
+ def _disable_resource(self, shadow_resource):
+ while shadow_resource.enabled:
+ shadow_resource.enabled = False
try:
- self.store.update_resource(resource)
+ self.sync.update_shadow_resource(shadow_resource)
except WriteConflictError:
- resource = self.store.get_resource(resource.resource_id)
+ shadow_resource = self.sync.get_shadow_resource(shadow_resource.resource_id)
- def _evacuate_process(self, owner, upid, resource, is_system_restart=False,
+ def _evacuate_process(self, owner, upid, is_system_restart=False,
dead_process_state=None, rescheduled_process_state=None):
"""Deal with a process on a terminating/terminated node
"""
@@ -509,8 +501,7 @@ def _evacuate_process(self, owner, upid, resource, is_system_restart=False,
elif process.state < ProcessState.TERMINATING:
if self.process_should_restart(process, dead_process_state,
is_system_restart=is_system_restart):
- log.debug("Rescheduling process %s from dead node %s",
- upid, resource.node_id)
+ log.debug("Rescheduling process %s from evacuated node", upid)
if rescheduled_process_state:
self.process_next_round(process,
newstate=rescheduled_process_state)

0 comments on commit ae49cd0

Please sign in to comment.