Permalink
Browse files

Change HA service to use the new PD API

  • Loading branch information...
1 parent 525cc43 commit 7273ad5e334937b5a47889f9dbcb0e51cf6b06d0 @oldpatricka oldpatricka committed Sep 5, 2012
View
@@ -14,6 +14,10 @@ class UserNotPermittedError(Exception):
by this user
"""
+class ProgrammingError(Exception):
+ """Something that you wouldn't expect to be able to happen happened.
+ Must be the programmer's fault
+ """
class GeneralIaaSException(Exception):
"""
@@ -24,11 +24,15 @@ def __init__(self, CFG, pd_client_kls, process_dispatchers, process_spec, Policy
self.provisioner_client_kls = pd_client_kls
self.process_dispatchers = process_dispatchers
self.process_spec = process_spec
+ self.process_definition_id = "ha_process_def_%s" % uuid.uuid1()
self.policy_params = None
+
+ self._create_process_def(self.process_definition_id, self.process_spec)
+
self.policy = Policy(parameters=self.policy_params,
- dispatch_process_callback=self._dispatch_pd_spec,
+ schedule_process_callback=self._schedule,
terminate_process_callback=self._terminate_upid,
- process_spec=self.process_spec)
+ process_definition_id=self.process_definition_id)
self.managed_upids = []
def apply_policy(self):
@@ -40,6 +44,20 @@ def apply_policy(self):
all_procs = self._query_process_dispatchers()
self.managed_upids = list(self.policy.apply_policy(all_procs, self.managed_upids))
+ def _create_process_def(self, definition_id, spec):
+ """Creates a process definition in all process dispatchers
+ """
+ definition_type = spec.get('definition_type')
+ executable = spec.get('executable')
+ name = spec.get('name')
+ description = spec.get('description')
+ for pd in self.process_dispatchers:
+ pd_client = self._get_pd_client(pd)
+ pd_client.create_definition(definition_id, definition_type,
+ executable, name, description)
+
+
+
def _query_process_dispatchers(self):
"""Get list of processes from each pd, and return a dictionary
indexed by the pd name
@@ -65,14 +83,14 @@ def _get_pd_client(self, name):
"""
return self.provisioner_client_kls(name)
- def _dispatch_pd_spec(self, pd_name, spec):
+ def _schedule(self, pd_name, pd_id):
"""Dispatches a process to the provided pd, and returns the upid used
to do so
"""
pd_client = self._get_pd_client(pd_name)
- upid = uuid.uuid4().hex
- proc = pd_client.dispatch_process(upid, spec, None, None)
+ upid = uuid.uuid4().hex
+ proc = pd_client.schedule_process(upid, pd_id, None, None)
try:
upid = proc['upid']
except TypeError:
@@ -83,12 +101,6 @@ def _dispatch_pd_spec(self, pd_name, spec):
return upid
- def _disptach_pd_spec_pyon(self, pd_name, spec):
- """Dispatches a process to the provided pd, and returns the upid used
- to do so
- """
- pd_client = self._get_pd_client(pd_name)
-
def _terminate_upid(self, upid):
"""Finds a upid among available PDs, and terminates it
"""
@@ -1,11 +1,12 @@
-
import logging
+
+
from epu.states import ProcessState, HAState
log = logging.getLogger(__name__)
-def dummy_dispatch_process_callback(*args, **kwargs):
- log.debug("dummy_dispatch_process_callback(%s, %s) called" % args, kwargs)
+def dummy_schedule_process_callback(*args, **kwargs):
+ log.debug("dummy_schedule_process_callback(%s, %s) called" % args, kwargs)
def dummy_terminate_process_callback(*args, **kwargs):
@@ -16,8 +17,8 @@ class IPolicy(object):
"""Interface for HA Policies
"""
- def __init__(self, parameters=None, process_spec=None,
- dispatch_process_callback=None, terminate_process_callback=None):
+ def __init__(self, parameters=None, process_definition_id=None,
+ schedule_process_callback=None, terminate_process_callback=None):
raise NotImplementedError("'__init__' is not implemented")
def apply_policy(self, all_procs, managed_upids):
@@ -34,34 +35,34 @@ class NPreservingPolicy(IPolicy):
(see __init__) are called to terminate or start VMs.
"""
- def __init__(self, parameters=None, process_spec=None,
- dispatch_process_callback=None, terminate_process_callback=None):
+ def __init__(self, parameters=None, process_definition_id=None,
+ schedule_process_callback=None, terminate_process_callback=None):
"""Set up the Policy
@param parameters: The parameters used by this policy to determine the
distribution and number of VMs. This policy expects a dictionary with
one key/val, like: {'preserve_n': n}
- @param process_spec: The process specification to send to the PD on
+ @param process_definition_id: The process definition id to send to the PD on
launch
- @param dispatch_process_callback: A callback to dispatch a process to a
- PD. Must have signature: dispatch(pd_name, process_spec), and return a
+ @param schedule_process_callback: A callback to schedule a process to a
+ PD. Must have signature: schedule(pd_name, process_def_id), and return a
upid as a string
@param terminate_process_callback: A callback to terminate a process on
a PD. Must have signature: terminate(upid)
"""
- self.dispatch_process = dispatch_process_callback or dummy_dispatch_process_callback
+ self.schedule_process = schedule_process_callback or dummy_schedule_process_callback
self.terminate_process = terminate_process_callback or dummy_terminate_process_callback
if parameters:
self.parameters = parameters
else:
self._parameters = None
- self.process_spec = process_spec
+ self.process_id = process_definition_id
self.previous_all_procs = {}
self._status = HAState.PENDING
@@ -141,7 +142,7 @@ def apply_policy(self, all_procs, managed_upids):
elif to_rebalance > 0:
for to_rebalance in range(0, to_rebalance):
pd_name = self._get_least_used_pd(all_procs)
- new_upid = self.dispatch_process(pd_name, self.process_spec)
+ new_upid = self.schedule_process(pd_name, self.process_id)
self._set_status(to_rebalance, managed_upids)
@@ -183,21 +184,21 @@ def _extract_upids_from_all_procs(self, all_procs):
class HSflowPolicy(IPolicy):
- def __init__(self, parameters=None, process_spec=None,
- dispatch_process_callback=None, terminate_process_callback=None,
+ def __init__(self, parameters=None, process_definition_id=None,
+ schedule_process_callback=None, terminate_process_callback=None,
ganglia_hostname=None, ganglia_port=None):
"""Set up the Policy
@param parameters: The parameters used by this policy to determine the
distribution and number of VMs. This policy expects a dictionary with
TODO
- @param process_spec: The process specification to send to the PD on
- launch
+ @param process_definition_id: The process definition id to send to the
+ PD on launch
- @param dispatch_process_callback: A callback to dispatch a process to a
- PD. Must have signature: dispatch(pd_name, process_spec), and return a
- upid as a string
+ @param schedule_process_callback: A callback to schedule a process to a
+ PD. Must have signature: schedule(pd_name, process_definition_id), and
+ return a upid as a string
@param terminate_process_callback: A callback to terminate a process on
a PD. Must have signature: terminate(upid)
@@ -207,15 +208,14 @@ def __init__(self, parameters=None, process_spec=None,
@param ganglia_port: port of Ganglia server to connect to
"""
- self.dispatch_process = dispatch_process_callback or dummy_dispatch_process_callback
+ self.schedule_process = schedule_process_callback or dummy_schedule_process_callback
self.terminate_process = terminate_process_callback or dummy_terminate_process_callback
if parameters:
self.parameters = parameters
else:
self._parameters = None
- self.process_spec = process_spec
self.previous_all_procs = {}
self._status = HAState.PENDING
@@ -94,15 +94,11 @@ def setUp(self):
policy_params = {'preserve_n': 0}
self.process_spec = {
'run_type': 'supd',
- 'parameters': {
+ 'executable': {
'exec': 'sleep',
'argv': ['1000']
}
}
- self.haservice = HighAvailabilityService(policy_parameters=policy_params,
- process_dispatchers=self.pd_names, exchange=self.exchange,
- process_spec=self.process_spec)
- self.haservice_greenlet = gevent.spawn(self.haservice.start)
self.epuharness = EPUHarness(exchange=self.exchange)
self.dashi = self.epuharness.dashi
@@ -114,6 +110,11 @@ def setUp(self):
pd_client = ProcessDispatcherClient(self.dashi, pd)
pd_client.dump()
+ self.haservice = HighAvailabilityService(policy_parameters=policy_params,
+ process_dispatchers=self.pd_names, exchange=self.exchange,
+ process_spec=self.process_spec)
+ self.haservice_greenlet = gevent.spawn(self.haservice.start)
+
self.dashi = self.haservice.dashi
self.haservice_client = HighAvailabilityServiceClient(self.dashi, topic=self.haservice.topic)
@@ -113,6 +113,8 @@ def schedule_process(self, owner, upid, definition_id, configuration=None,
# if not a real def, a NotFoundError will bubble up to caller
definition = self.store.get_definition(definition_id)
+ if definition is None:
+ raise NotFoundError("Couldn't find process definition %s in store" % definition_id)
process = ProcessRecord.new(owner, upid, definition,
ProcessState.REQUESTED, configuration, constraints, subscribers,
@@ -4,7 +4,7 @@
from itertools import islice
from copy import deepcopy
-from epu.exceptions import WriteConflictError, NotFoundError
+from epu.exceptions import WriteConflictError, NotFoundError, ProgrammingError
from epu.states import ProcessState
from epu.processdispatcher.modes import QueueingMode, RestartMode
@@ -359,10 +359,16 @@ def _dispatch_process(self, process, resource):
definition = process.definition
executable = definition['executable']
# build up the spec form EE Agent expects
- parameters = dict(name=definition['name'],
- module=executable['module'], cls=executable['class'])
- if process.configuration:
- parameters['config'] = process.configuration
+ if self.run_type in ('pyon', 'pyon_single'):
+ parameters = dict(name=definition['name'],
+ module=executable['module'], cls=executable['class'])
+ if process.configuration:
+ parameters['config'] = process.configuration
+ elif self.run_type == 'supd':
+ parameters = executable
+ else:
+ msg = "Don't know how to format params for '%s' run type" % self.run_type
+ raise ProgrammingError(msg)
self.resource_client.launch_process(
resource.resource_id, process.upid, process.round,
@@ -434,7 +434,7 @@ def wait_for_terminated_processes(self, count, timeout=60):
def test_dispatch_run_process(self):
procs = []
- spec = {"run_type":"supd", "parameters": {"exec": "sleep", "argv": ["1"]}}
+ spec = {"run_type":"supd", "executable": {"exec": "sleep", "argv": ["1"]}}
self.assertEqual(self.pd_client.describe_processes(), [])

0 comments on commit 7273ad5

Please sign in to comment.