Skip to content

Commit

Permalink
Add a user identifiable name to process schedule operation in PD
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Oct 23, 2012
1 parent c02ab37 commit 2bc1f9c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
2 changes: 1 addition & 1 deletion extern/ion-definitions
84 changes: 67 additions & 17 deletions ion/services/cei/process_dispatcher_service.py
Expand Up @@ -171,7 +171,7 @@ def on_init(self):
pd_bridge_conf = self.CFG.process_dispatcher_bridge
except AttributeError:
pd_bridge_conf = None

if pd_conf.get('dashi_messaging', False) == True:

dashi_name = get_pd_dashi_name()
Expand Down Expand Up @@ -280,7 +280,7 @@ def create_process(self, process_definition_id=''):

return process_id

def schedule_process(self, process_definition_id='', schedule=None, configuration=None, process_id=''):
def schedule_process(self, process_definition_id='', schedule=None, configuration=None, process_id='', name=''):
"""Schedule a process definition for execution on an Execution Engine. If no process id is given,
a new unique ID is generated.
Expand Down Expand Up @@ -313,18 +313,22 @@ def schedule_process(self, process_definition_id='', schedule=None, configuratio
except TypeError, e:
raise BadRequest("bad configuration: " + str(e))

# If not provided, create a unique but still descriptive (valid) name
# If not provided, create a unique but still descriptive (valid) id
if not process_id:
process_id = str(process_definition.name or "process") + uuid.uuid4().hex
process_id = create_valid_identifier(process_id, ws_sub='_')

# If not provided, create a unique but still descriptive (valid) name
if not name:
name = self._get_process_name(process_definition, configuration)

try:
process = Process(process_id=process_id)
process = Process(process_id=process_id, name=name)
self.container.resource_registry.create(process, object_id=process_id)
except BadRequest:
log.debug("Tried to create Process %s, but already exists. This is normally ok." % process_id)
log.debug("Tried to create Process %s, but already exists. This is normally ok.")

return self.backend.spawn(process_id, process_definition_id, schedule, configuration)
return self.backend.spawn(process_id, process_definition_id, schedule, configuration, name)

def cancel_process(self, process_id=''):
"""Cancels the execution of the given process id.
Expand Down Expand Up @@ -359,6 +363,21 @@ def list_processes(self):
"""
return self.backend.list()

def _get_process_name(self, process_definition, configuration):

ha_pd_id = configuration.get('highavailability', {}).get('process_definition_id')
name_suffix = ""
if ha_pd_id is not None:
process_definition = self.backend.read_definition(ha_pd_id)
name_suffix = "ha"

name_parts = [str(process_definition.name or "process")]
if name_suffix:
name_parts.append(name_suffix)
name_parts.append(uuid.uuid4().hex)
name = '-'.join(name_parts)

return name

class PDDashiHandler(object):
"""Dashi messaging handlers for the Process Dispatcher"""
Expand All @@ -380,7 +399,7 @@ def __init__(self, backend, dashi):

def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):
definition = ProcessDefinition(name=name, description=description,
definition = ProcessDefinition(name=name, description=description,
definition_type=definition_type, executable=executable)
return self.backend.create_definition(definition, definition_id)

Expand All @@ -400,7 +419,7 @@ def list_definitions(self):
def schedule_process(self, upid, definition_id=None, definition_name=None,
configuration=None, subscribers=None, constraints=None,
queueing_mode=None, restart_mode=None,
execution_engine_id=None, node_exclusive=None):
execution_engine_id=None, node_exclusive=None, name=None):

if definition_id:
process_definition = self.backend.read_definition(definition_id)
Expand Down Expand Up @@ -455,20 +474,46 @@ def schedule_process(self, upid, definition_id=None, definition_name=None,
msg = "%s is not a known ProcessRestartMode" % (restart_mode)
raise BadRequest(msg)

return self.backend.spawn(upid, definition_id, schedule, configuration)
# If not provided, create a unique but still descriptive (valid) name
if not name:
name = self._get_process_name(process_definition, configuration)

return self.backend.spawn(upid, definition_id, schedule, configuration, name)

def describe_process(self, upid):
return _core_process_from_ion(self.backend.read_process(upid))
if hasattr(self.backend, 'read_core_process'):
return self.backend.read_core_process(upid)
else:
return _core_process_from_ion(self.backend.read_process(upid))

def describe_processes(self):
return [_core_process_from_ion(proc) for proc in self.backend.list()]
if hasattr(self.backend, 'read_core_process'):
return [self.backend.read_core_process(proc.process_id) for proc in self.backend.list()]
else:
return [_core_process_from_ion(proc) for proc in self.backend.list()]

def restart_process(self, upid):
raise BadRequest("The Pyon PD does not support restarting processes")

def terminate_process(self, upid):
return self.backend.cancel(upid)

def _get_process_name(self, process_definition, configuration):

ha_pd_id = configuration.get('highavailability', {}).get('process_definition_id')
name_suffix = ""
if ha_pd_id is not None:
process_definition = self.backend.read_definition(ha_pd_id)
name_suffix = "ha"

name_parts = [str(process_definition.name or "process")]
if name_suffix:
name_parts.append(name_suffix)
name_parts.append(uuid.uuid4().hex)
name = '-'.join(name_parts)

return name


class PDLocalBackend(object):
"""Scheduling backend to PD that manages processes in the local container
Expand Down Expand Up @@ -519,7 +564,7 @@ def read_definition_by_name(self, definition_name):
def delete_definition(self, definition_id):
return self.rr.delete(definition_id)

def spawn(self, process_id, definition_id, schedule, configuration):
def spawn(self, process_id, definition_id, schedule, configuration, name):

definition = self.read_definition(definition_id)

Expand Down Expand Up @@ -903,7 +948,7 @@ def delete_definition(self, definition_id):
# also delete in RR
self.rr.delete(definition_id)

def spawn(self, process_id, definition_id, schedule, configuration):
def spawn(self, process_id, definition_id, schedule, configuration, name):

# note: not doing anything with schedule mode yet: the backend PD
# service doesn't fully support it.
Expand Down Expand Up @@ -931,7 +976,7 @@ def spawn(self, process_id, definition_id, schedule, configuration):
subscribers=None, constraints=constraints,
node_exclusive=node_exclusive, queueing_mode=queueing_mode,
execution_engine_id=execution_engine_id,
restart_mode=restart_mode, configuration=configuration)
restart_mode=restart_mode, configuration=configuration, name=name)

return process_id

Expand All @@ -951,6 +996,9 @@ def read_process(self, process_id):

return process

def read_core_process(self, process_id):
return self.core.describe_process(None, process_id)


class PDBridgeBackend(object):
"""Scheduling backend to PD that bridges to external CEI Process Dispatcher
Expand Down Expand Up @@ -1058,7 +1106,7 @@ def delete_definition(self, definition_id):

self.rr.delete(definition_id)

def spawn(self, process_id, definition_id, schedule, configuration):
def spawn(self, process_id, definition_id, schedule, configuration, name):

# note: not doing anything with schedule mode yet: the backend PD
# service doesn't fully support it.
Expand All @@ -1075,7 +1123,7 @@ def spawn(self, process_id, definition_id, schedule, configuration):

proc = self.dashi.call(self.topic, "schedule_process",
upid=process_id, definition_id=definition_id, subscribers=self.pd_process_subscribers,
constraints=constraints, configuration=config)
constraints=constraints, configuration=config, name=name)

log.debug("Dashi Process Dispatcher returned process: %s", proc)

Expand Down Expand Up @@ -1121,14 +1169,16 @@ def _ion_process_from_core(core_process):

process = Process(process_id=process_id,
process_state=ion_process_state,
process_configuration=config)
process_configuration=config,
name=core_process.get('name'))

return process

def _core_process_from_ion(ion_process):
process = {
'state': _PD_PYON_PROCESS_STATE_MAP.get(ion_process.process_state),
'upid': ion_process.process_id,
'name': ion_process.name,
'configuration': ion_process.process_configuration,
}
return process
Expand Down
5 changes: 3 additions & 2 deletions ion/services/cei/test/test_process_dispatcher.py
Expand Up @@ -516,17 +516,18 @@ def test_create_schedule(self):
proc_schedule.target['execution_engine_id'] = None

configuration = {"some": "value"}
name = 'allthehats'

pid2 = self.pd_service.schedule_process("fake-process-def-id",
proc_schedule, configuration, pid)
proc_schedule, configuration, pid, name=name)

self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)
self.assertEqual(pid, pid2)
self.assertTrue(pid.startswith(proc_def.name) and pid != proc_def.name)
self.assertEqual(self.mock_dashi.call.call_count, 1)
call_args, call_kwargs = self.mock_dashi.call.call_args
self.assertEqual(set(call_kwargs),
set(['upid', 'definition_id', 'configuration', 'subscribers', 'constraints']))
set(['upid', 'definition_id', 'configuration', 'subscribers', 'constraints', 'name']))
self.assertEqual(call_kwargs['constraints'],
proc_schedule.target['constraints'])
self.assertEqual(call_kwargs['subscribers'],
Expand Down

0 comments on commit 2bc1f9c

Please sign in to comment.