Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Cleanup, and add system_name config option for eeagent

  • Loading branch information...
commit d53cdf99046806808b23f39f587a902a12823129 1 parent fbc6cec
@oldpatricka oldpatricka authored
Showing with 31 additions and 27 deletions.
  1. +31 −27 epuharness/harness.py
View
58 epuharness/harness.py
@@ -22,6 +22,7 @@
log = logging.getLogger(__name__)
ADVERTISE_RETRIES = 10
+
class EPUHarness(object):
"""EPUHarness. Sets up Process Dispatchers and EEAgents for testing.
"""
@@ -74,7 +75,6 @@ def status(self, exit=True):
else:
return status
-
def stop(self, services=None, force=False):
"""Stop services that were previously started by epuharness
@@ -150,13 +150,11 @@ def start(self, deployment_file=None, deployment_str=None):
for epum_name, epum in self.epums.iteritems():
self._start_epum(epum_name, epum.get('config', {}))
-
# Start Process Dispatchers
self.process_dispatchers = deployment.get('process-dispatchers', {})
for pd_name, pd in self.process_dispatchers.iteritems():
self._start_process_dispatcher(pd_name, pd.get('config', {}))
-
# Start Nodes and EEAgents
nodes = deployment.get('nodes', {})
for node_name, node in nodes.iteritems():
@@ -177,6 +175,7 @@ def start(self, deployment_file=None, deployment_str=None):
pyon_directory=eeagent.get('pyon_directory'),
logfile=eeagent.get('logfile'),
slots=eeagent.get('slots'),
+ system_name=eeagent.get('system_name'),
supd_directory=os.path.join(self.pidantic_dir, eeagent_name))
# Start Phantom
@@ -221,8 +220,6 @@ def _build_phantom_authz_file(self, users):
pw_f.write(pw_file_contents)
return pw_filename
-
-
def _build_phantom_config(self, name, exchange, config, authz_file, logfile=None):
@@ -230,8 +227,8 @@ def _build_phantom_config(self, name, exchange, config, authz_file, logfile=None
logfile = os.path.join(self.logdir, "%s.log" % name)
default = {
- 'phantom':{
- 'system':{
+ 'phantom': {
+ 'system': {
'type': 'epu',
'broker': 'localhost',
'broker_port': 5672,
@@ -240,7 +237,7 @@ def _build_phantom_config(self, name, exchange, config, authz_file, logfile=None
'rabbit_pw': 'guest',
'rabbit_exchange': exchange
},
- 'authz':{
+ 'authz': {
'type': 'simple_file',
'filename': authz_file
}
@@ -289,12 +286,12 @@ def _build_epum_config(self, name, exchange, config, logfile=None, instance=None
logfile = os.path.join(self.logdir, "%s%s.log" % (name, instance_tag))
default = {
- 'server':{
- 'amqp':{
+ 'server': {
+ 'amqp': {
'exchange': exchange
}
},
- 'epumanagement':{
+ 'epumanagement': {
'service_name': name,
},
'logging': {
@@ -357,12 +354,12 @@ def _build_provisioner_config(self, name, exchange, config, logfile=None, instan
logfile = os.path.join(self.logdir, "%s%s.log" % (name, instance_tag))
default = {
- 'server':{
- 'amqp':{
+ 'server': {
+ 'amqp': {
'exchange': exchange
}
},
- 'provisioner':{
+ 'provisioner': {
'service_name': name,
},
'logging': {
@@ -428,12 +425,12 @@ def _build_dtrs_config(self, name, exchange, config, logfile=None, instance=None
logfile = os.path.join(self.logdir, "%s%s.log" % (name, instance_tag))
default = {
- 'server':{
- 'amqp':{
+ 'server': {
+ 'amqp': {
'exchange': exchange
}
},
- 'dtrs':{
+ 'dtrs': {
'service_name': name
},
'logging': {
@@ -488,7 +485,6 @@ def _start_process_dispatcher(self, name, config, logfile=None,
directory=self.pidantic_dir)
pid.start()
-
def _build_process_dispatcher_config(self, exchange, name, config,
logfile=None, static_resources=True, instance=None):
"""Builds a yaml config file to feed to the process dispatcher
@@ -546,7 +542,7 @@ def _build_process_dispatcher_config(self, exchange, name, config,
def _start_eeagent(self, name, process_dispatcher, launch_type,
pyon_directory=None, logfile=None, exe_name="eeagent", slots=None,
- supd_directory=None):
+ system_name=None, supd_directory=None):
"""Starts an eeagent with SupervisorD
@param name: Name of process dispatcher to start
@@ -557,18 +553,22 @@ def _start_eeagent(self, name, process_dispatcher, launch_type,
@param logfile: the log file for the eeagent
@param exe_name: the name of the eeagent executable
@param slots: the number of slots available for processes
+ @param system_name: pyon system name
"""
log.info("Starting EEAgent '%s'" % name)
config_file = self._build_eeagent_config(self.exchange, name,
process_dispatcher, launch_type, pyon_directory,
- logfile=logfile, slots=slots, supd_directory=supd_directory)
+ logfile=logfile, slots=slots, supd_directory=supd_directory,
+ system_name=system_name)
cmd = "%s %s" % (exe_name, config_file)
pid = self.factory.get_pidantic(command=cmd, process_name=name,
directory=self.pidantic_dir, autorestart=True)
pid.start()
- def _build_eeagent_config(self, exchange, name, process_dispatcher, launch_type, pyon_directory=None, logfile=None, supd_directory=None, slots=None):
+ def _build_eeagent_config(self, exchange, name, process_dispatcher,
+ launch_type, pyon_directory=None, logfile=None, supd_directory=None,
+ slots=None, system_name=None):
"""Builds a yaml config file to feed to the eeagent
@param exchange: the AMQP exchange the service should be on
@@ -579,15 +579,18 @@ def _build_eeagent_config(self, exchange, name, process_dispatcher, launch_type,
@param launch_type: launch_type of eeagent (fork, supd, or pyon_single)
@param logfile: the log file for the eeagent
@param slots: the number of slots available for processes
+ @param system_name: pyon system name
"""
if not logfile:
- logfile="/dev/null"
+ logfile = "/dev/null"
if not supd_directory:
supd_directory = '/tmp/SupD'
if launch_type.startswith('pyon'):
container_args = '--noshell'
else:
container_args = ''
+ if system_name:
+ container_args = "%s -s %s" % (container_args, system_name)
if launch_type.startswith('pyon') and not pyon_directory:
msg = "EEagents with a pyon launch_type must supply a pyon directory"
raise DeploymentDescriptionError(msg)
@@ -643,7 +646,6 @@ def _build_eeagent_config(self, exchange, name, process_dispatcher, launch_type,
return config_filename
-
def announce_node(self, node_name, deployable_type, process_dispatcher,
state=None):
"""Announce a node to each process dispatcher.
@@ -654,7 +656,7 @@ def announce_node(self, node_name, deployable_type, process_dispatcher,
@param state: the state to advertise to the pd
"""
if not state:
- state = InstanceState.RUNNING
+ state = InstanceState.RUNNING
pd_client = ProcessDispatcherClient(self.dashi, process_dispatcher)
log.info("Announcing %s of type %s is '%s' to %s" % (node_name,
@@ -665,15 +667,17 @@ def announce_node(self, node_name, deployable_type, process_dispatcher,
pd_client.dt_state(node_name, deployable_type, state)
break
except timeout:
- wait_time = i*i # Exponentially increasing wait
+ wait_time = i * i # Exponentially increasing wait
log.warning("PD '%s' not available yet. Waiting %ss" % (process_dispatcher, wait_time))
- time.sleep(2**i)
+ time.sleep(2 ** i)
# dict_merge from: http://appdelegateinc.com/blog/2011/01/12/merge-deeply-nested-dicts-in-python/
+
def quacks_like_dict(object):
"""Check if object is dict-like"""
return isinstance(object, collections.Mapping)
+
def dict_merge(a, b):
"""Merge two deep dicts non-destructively
@@ -695,7 +699,7 @@ def dict_merge(a, b):
if key not in current_dst:
current_dst[key] = current_src[key]
else:
- if quacks_like_dict(current_src[key]) and quacks_like_dict(current_dst[key]) :
+ if quacks_like_dict(current_src[key]) and quacks_like_dict(current_dst[key]):
stack.append((current_dst[key], current_src[key]))
else:
current_dst[key] = current_src[key]
Please sign in to comment.
Something went wrong with that request. Please try again.