Permalink
Browse files

Revise parameter validation for ha service

  • Loading branch information...
1 parent c0ba858 commit 9830ae6704a38e404bb9ca47d765d305c03b768d @oldpatricka oldpatricka committed Mar 29, 2013
@@ -40,10 +40,9 @@ def __init__(self, CFG, control, process_dispatchers, policy,
self.CFG = CFG
self.control = control
- self.policy_type = policy
+ self.policy_type = None
self.process_dispatchers = process_dispatchers
self.process_configuration = process_configuration
- self.policy_params = parameters
self.aggregator_config = aggregator_config
self.name = name
if self.name:
@@ -55,7 +54,7 @@ def __init__(self, CFG, control, process_dispatchers, policy,
raise ProgrammingError("You must have a process_definition_id")
self.process_definition_id = process_definition_id
- self.reconfigure_policy(self.policy_params, self.policy_type)
+ self.reconfigure_policy(parameters, policy)
self.managed_upids = []
def apply_policy(self):
@@ -126,9 +125,9 @@ def status(self):
return self.policy.status()
def reconfigure_policy(self, new_policy_params, new_policy=None):
- """Change the number of needed instances of service
+ """Reconfigure the policy of this ha service
"""
- if new_policy is not None:
+ if new_policy is not None and new_policy != self.policy_type:
Policy = policy_map.get(new_policy)
if Policy is None:
raise PolicyError("HA doesn't know how to use %s policy" % new_policy)
@@ -139,16 +138,15 @@ def reconfigure_policy(self, new_policy_params, new_policy=None):
process_definition_id=self.process_definition_id,
process_configuration=self.process_configuration,
aggregator_config=self.aggregator_config, name=self.name)
- self.policy_params = new_policy_params
+ self.policy_type = new_policy
elif new_policy_params is not None:
- self.policy_params = new_policy_params
self.policy.parameters = new_policy_params
def dump(self):
state = {}
- state['policy'] = self.policy_params
- state['policy_params'] = self.policy_params
+ state['policy'] = self.policy_type
+ state['policy_params'] = self.policy.parameters
state['managed_upids'] = self.managed_upids
return state
@@ -103,6 +103,8 @@ class NPreservingPolicy(IPolicy):
(see __init__) are called to terminate or start VMs.
"""
+ _NPRESERVING_PARAMS = ('preserve_n', )
+
def __init__(self, parameters=None, process_definition_id=None,
process_configuration=None, schedule_process_callback=None,
terminate_process_callback=None, process_state_callback=None, **kwargs):
@@ -170,17 +172,32 @@ def parameters(self):
@parameters.setter
def parameters(self, new_parameters):
+
+ for key in new_parameters.keys():
+ if key not in _SCHEDULE_PROCESS_KWARGS + self._NPRESERVING_PARAMS:
+ raise PolicyError("%s not a valid parameter for npreserving" % key)
+
try:
- new_parameters['preserve_n']
+ preserve_n = int(new_parameters['preserve_n'])
+ if preserve_n < 0:
+ raise PolicyError("preserve_n must be greater than 0, you have %s" % new_parameters['preserve_n'])
+ new_parameters['preserve_n'] = preserve_n
+ except ValueError:
+ raise PolicyError("preserve_n must be an integer")
except TypeError:
- raise HAPolicyException('parameters must be a dictionary')
+ raise PolicyError("parameters must be a dictionary")
except KeyError:
- raise HAPolicyException('parameters must have a preserve_n value')
+ if self._parameters.get('preserve_n') is None and new_parameters.get('preserve_n') is None:
+ raise PolicyError("parameters must have a preserve_n value %s" % new_parameters)
if self._status in (HAState.READY, HAState.STEADY):
self._status = HAState.READY
- self._parameters = new_parameters
+ if self._parameters is None:
+ self._parameters = {}
+
+ for key, val in new_parameters.iteritems():
+ self._parameters[key] = val
self._schedule_kwargs = get_schedule_process_kwargs(new_parameters)
def apply_policy(self, all_procs, managed_upids):
@@ -247,6 +264,10 @@ def status(self):
class SensorPolicy(IPolicy):
+ _SENSOR_PARAMS = ('metric', 'minimum_processes', 'maximum_processes',
+ 'sample_period', 'sample_function', 'cooldown_period', 'scale_up_threshold',
+ 'scale_up_n_processes', 'scale_down_threshold', 'scale_down_n_processes')
+
def __init__(self, parameters=None, process_definition_id=None,
schedule_process_callback=None, terminate_process_callback=None,
process_state_callback=None,
@@ -345,80 +366,78 @@ def parameters(self):
@parameters.setter
def parameters(self, new_parameters):
+ for key in new_parameters.keys():
+ if key not in _SCHEDULE_PROCESS_KWARGS + self._SENSOR_PARAMS:
+ raise PolicyError("%s not a valid parameter for sensor" % key)
+
if new_parameters.get('metric') is None:
- log.error("metric_name cannot be None")
- return
+ msg = "a metric_name must be provided"
+ raise PolicyError(msg)
- try:
- sample = int(new_parameters.get('sample_period'))
- if sample < 0:
- raise ValueError()
- except ValueError:
- log.error("sample_period '%s' is not a positive integer" % (
- new_parameters.get('sample_period')))
+ sample = int(new_parameters.get('sample_period'))
+ if sample < 0:
+ msg = "sample_period '%s' is not a positive integer" % (
+ new_parameters.get('sample_period'))
+ raise PolicyError(msg)
if new_parameters.get('sample_function') not in Statistics.ALL:
- log.error("'%s' is not a known sample_function. Choose from %s" % (
- new_parameters.get('sample_function'), Statistics.ALL))
- return
+ msg = "'%s' is not a known sample_function. Choose from %s" % (
+ new_parameters.get('sample_function'), Statistics.ALL)
+ raise PolicyError(msg)
- try:
- cool = int(new_parameters.get('cooldown_period'))
- if cool < 0:
- raise ValueError()
- except ValueError:
- log.error("cooldown_period '%s' is not a positive integer" % (
- new_parameters.get('cooldown_period')))
- return
+ cool = int(new_parameters.get('cooldown_period'))
+ if cool < 0:
+ msg = "cooldown_period '%s' is not a positive integer" % (
+ new_parameters.get('cooldown_period'))
+ raise PolicyError(msg)
try:
float(new_parameters.get('scale_up_threshold'))
except ValueError:
- log.error("scale_up_threshold '%s' is not a floating point number" % (
- new_parameters.get('scale_up_threshold')))
- return
+ msg = "scale_up_threshold '%s' is not a floating point number" % (
+ new_parameters.get('scale_up_threshold'))
+ raise PolicyError(msg)
try:
int(new_parameters.get('scale_up_n_processes'))
except ValueError:
- log.error("scale_up_n_processes '%s' is not an integer" % (
- new_parameters.get('scale_up_n_processes')))
- return
+ msg = "scale_up_n_processes '%s' is not an integer" % (
+ new_parameters.get('scale_up_n_processes'))
+ raise PolicyError(msg)
try:
float(new_parameters.get('scale_down_threshold'))
except ValueError:
- log.error("scale_down_threshold '%s' is not a floating point number" % (
- new_parameters.get('scale_down_threshold')))
- return
+ msg = "scale_down_threshold '%s' is not a floating point number" % (
+ new_parameters.get('scale_down_threshold'))
+ raise PolicyError(msg)
try:
int(new_parameters.get('scale_down_n_processes'))
except ValueError:
- log.error("scale_down_n_processes '%s' is not an integer" % (
- new_parameters.get('scale_up_n_processes')))
- return
+ msg = "scale_down_n_processes '%s' is not an integer" % (
+ new_parameters.get('scale_up_n_processes'))
+ raise PolicyError(msg)
- try:
- minimum_processes = int(new_parameters.get('minimum_processes'))
- if minimum_processes < 0:
- raise ValueError()
- except ValueError:
- log.error("minimum_processes '%s' is not a positive integer" % (
- new_parameters.get('minimum_processes')))
- return
+ minimum_processes = int(new_parameters.get('minimum_processes'))
+ if minimum_processes < 0:
+ msg = "minimum_processes '%s' is not a positive integer" % (
+ new_parameters.get('minimum_processes'))
+ raise PolicyError(msg)
- try:
- maximum_processes = int(new_parameters.get('maximum_processes'))
- if maximum_processes < 0:
- raise ValueError()
- except ValueError:
- log.error("maximum_processes '%s' is not a positive integer" % (
- new_parameters.get('maximum_processes')))
- return
+ maximum_processes = int(new_parameters.get('maximum_processes'))
+ if maximum_processes < 0:
+ msg = "maximum_processes '%s' is not a positive integer" % (
+ new_parameters.get('maximum_processes'))
+ raise PolicyError(msg)
# phew!
- self._parameters = new_parameters
+
+ if self._parameters is None:
+ self._parameters = {}
+
+ for key, val in new_parameters.iteritems():
+ self._parameters[key] = val
self._schedule_kwargs = get_schedule_process_kwargs(new_parameters)
@@ -583,7 +602,3 @@ def get_schedule_process_kwargs(parameters):
if k in parameters:
kwargs[k] = parameters[k]
return kwargs
-
-
-class HAPolicyException(BaseException):
- pass
@@ -102,8 +102,8 @@ def _update_policy_params_and_assert(self, new_params, maxattempts=None):
except:
break
- assert self.haservice.core.policy_params == new_params, "%s != %s" % (
- self.haservice.core.policy_params, new_params)
+ assert self.haservice.core.policy.parameters == new_params, "%s != %s" % (
+ self.haservice.core.policy.parameters, new_params)
def _find_procs_pd(self, upid):
all_procs = self._get_all_procs()
@@ -56,6 +56,7 @@ def test_apply_policy(self):
}
parameters['preserve_n'] = 1
+ self.policy.parameters = parameters
self.policy.apply_policy(all_procs, upids)
self.assertEqual(self.mock_terminate.call_count, 1)
@@ -119,6 +120,7 @@ def test_rejected_procs(self):
self.mock_terminate.reset_mock()
parameters['preserve_n'] = 1
+ self.policy.parameters = parameters
self.policy.apply_policy(all_procs, upids)
self.assertEqual(self.mock_terminate.call_count, 1)

0 comments on commit 9830ae6

Please sign in to comment.