Skip to content

Commit

Permalink
Add dashi frontend to process dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Sep 12, 2012
1 parent 9c608cc commit 42f9820
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 15 deletions.
171 changes: 158 additions & 13 deletions ion/services/cei/process_dispatcher_service.py
Expand Up @@ -36,7 +36,7 @@
from interface.objects import ProcessStateEnum, Process
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient
from interface.objects import ProcessStateEnum, ProcessDefinition, ProcessDefinitionType,\
ProcessQueueingMode, ProcessRestartMode
ProcessQueueingMode, ProcessRestartMode, ProcessTarget, ProcessSchedule


class ProcessDispatcherService(BaseProcessDispatcherService):
Expand Down Expand Up @@ -76,6 +76,21 @@ def on_init(self):
pd_bridge_conf = self.CFG.process_dispatcher_bridge
except AttributeError:
pd_bridge_conf = None

if pd_conf.get('dashi_messaging', False) == True:

dashi_name = get_pd_dashi_name()

# grab config parameters used to connect to dashi
try:
uri = pd_conf.dashi_uri
exchange = pd_conf.dashi_exchange
except AttributeError, e:
log.warn("Needed Process Dispatcher config not found: %s", e)
raise
self.dashi = get_dashi(dashi_name, uri, exchange)
else:
self.dashi = None

pd_backend_name = pd_conf.get('backend')

Expand All @@ -96,6 +111,9 @@ def on_init(self):
else:
raise Exception("Unknown Process Dispatcher backend: %s" % pd_backend_name)

if self.dashi is not None:
self.dashi_handler = PDDashiHandler(self.backend, self.dashi)

def on_start(self):
self.backend.initialize()

Expand Down Expand Up @@ -182,7 +200,6 @@ def schedule_process(self, process_definition_id='', schedule=None, configuratio
raise NotFound('No process definition was provided')
process_definition = self.backend.read_definition(process_definition_id)

# early validation before we pass definition through to backend
try:
module = process_definition.executable['module']
cls = process_definition.executable['class']
Expand Down Expand Up @@ -226,7 +243,6 @@ def read_process(self, process_id=''):
@retval process Process
@throws NotFound object with specified id does not exist
"""
pass
if not process_id:
raise NotFound('No process was provided')

Expand All @@ -239,6 +255,106 @@ def list_processes(self):
"""
return self.backend.list()

class PDDashiHandler(object):
"""Dashi messaging handlers for the Process Dispatcher"""

def __init__(self, backend, dashi):
self.backend = backend
self.dashi = dashi

self.dashi.handle(self.create_definition)
self.dashi.handle(self.describe_definition)
self.dashi.handle(self.update_definition)
self.dashi.handle(self.remove_definition)
self.dashi.handle(self.list_definitions)
self.dashi.handle(self.schedule_process)
self.dashi.handle(self.describe_process)
self.dashi.handle(self.describe_processes)
self.dashi.handle(self.restart_process)
self.dashi.handle(self.terminate_process)

def create_definition(self, definition_id, definition_type, executable,
name=None, description=None):
definition = ProcessDefinition(name=name, description=description,
definition_type=definition_type, executable=executable)
return self.backend.create_definition(definition, definition_id)

def describe_definition(self, definition_id):
return _core_process_definition_from_ion(self.backend.read_definition(definition_id))

def update_definition(self, definition_id, definition_type, executable,
name=None, description=None):
raise BadRequest("The Pyon PD does not support updating process definitions")

def remove_definition(self, definition_id):
self.backend.delete_definition(definition_id)

def list_definitions(self):
raise BadRequest("The Pyon PD does not support listing process definitions")

def schedule_process(self, upid, definition_id, configuration=None,
subscribers=None, constraints=None,
queueing_mode=None, restart_mode=None,
execution_engine_id=None, node_exclusive=None):

if not definition_id:
raise NotFound('No process definition was provided')
process_definition = self.backend.read_definition(definition_id)

# early validation before we pass definition through to backend
try:
module = process_definition.executable['module']
cls = process_definition.executable['class']
except KeyError, e:
raise BadRequest("Process definition incomplete. missing: %s", e)

if configuration is None:
configuration = {}
else:
# push the config through a JSON serializer to ensure that the same
# config would work with the bridge backend

try:
json.dumps(configuration)
except TypeError, e:
raise BadRequest("bad configuration: " + str(e))

target = ProcessTarget()
if execution_engine_id is not None:
target.execution_engine_id = execution_engine_id
if node_exclusive is not None:
target.node_exclusive = node_exclusive
if constraints is not None:
target.constraints = constraints

schedule = ProcessSchedule(target=target)
if queueing_mode is not None:
try:
schedule.queueing_mode = ProcessQueueingMode._value_map[queueing_mode]
except KeyError:
msg = "%s is not a known ProcessQueueingMode" % (queueing_mode)
raise BadRequest(msg)

if restart_mode is not None:
try:
schedule.restart_mode = ProcessRestartMode._value_map[restart_mode]
except KeyError:
msg = "%s is not a known ProcessRestartMode" % (restart_mode)
raise BadRequest(msg)

return self.backend.spawn(upid, definition_id, schedule, configuration)

def describe_process(self, upid):
return _core_process_from_ion(self.backend.read_process(upid))

def describe_processes(self):
return [_core_process_from_ion(proc) for proc in self.backend.list()]

def restart_process(self, upid):
raise BadRequest("The Pyon PD does not support restarting processes")

def terminate_process(self, upid):
return self.backend.cancel(upid)

class PDLocalBackend(object):
"""Scheduling backend to PD that manages processes in the local container
Expand Down Expand Up @@ -342,6 +458,11 @@ def list(self):
"900-REJECTED": ProcessStateEnum.ERROR
}

_PD_PYON_PROCESS_STATE_MAP = {
ProcessStateEnum.SPAWN: "500-RUNNING",
ProcessStateEnum.TERMINATE: "800-EXITED",
ProcessStateEnum.ERROR: "850-FAILED"
}

class Notifier(object):
"""Sends Process state notifications via ION events
Expand Down Expand Up @@ -404,7 +525,7 @@ class AnyEEAgentClient(object):
def __init__(self, process):
self.process = process

def _get_client_for_eeagent(self, resource_id, attempts=10):
def _get_client_for_eeagent(self, resource_id, attempts=60):
exception = None
for i in range(0, attempts):
try:
Expand Down Expand Up @@ -460,17 +581,22 @@ def __init__(self, conf, service):
# but it still uses dashi to talk to the EPU Management Service, until
# it also is fronted with an ION interface.

dashi_name = get_pd_dashi_name()
if service.dashi is not None:
self.dashi = service.dashi
else:
dashi_name = get_pd_dashi_name()

# grab config parameters used to connect to dashi
try:
uri = conf.dashi_uri
exchange = conf.dashi_exchange
except AttributeError, e:
log.warn("Needed Process Dispatcher config not found: %s", e)
raise
# grab config parameters used to connect to dashi
try:
uri = conf.dashi_uri
exchange = conf.dashi_exchange
except AttributeError, e:
log.warn("Needed Process Dispatcher config not found: %s", e)
raise

self.dashi = get_dashi(dashi_name, uri, exchange)
self.dashi = get_dashi(dashi_name, uri, exchange)

dashi_name = self.dashi.name

# "static resources" mode is used in lightweight launch where the PD
# has a fixed set of Execution Engines and cannot ask for more.
Expand Down Expand Up @@ -804,12 +930,31 @@ def _ion_process_from_core(core_process):

return process

def _core_process_from_ion(ion_process):
process = {
'state': _PD_PYON_PROCESS_STATE_MAP.get(ion_process.process_state),
'upid': ion_process.process_id,
'configuration': ion_process.process_configuration,
}
return process

def _ion_process_definition_from_core(core_process_definition):
return ProcessDefinition(name=core_process_definition.get('name'),
description=core_process_definition.get('description'),
definition_type=core_process_definition.get('definition_type'),
executable=core_process_definition.get('executable'))

def _core_process_definition_from_ion(ion_process_definition):
definition = {
'name': ion_process_definition.name,
'description': ion_process_definition.description,
'definition_type': ion_process_definition.definition_type,
'executable': ion_process_definition.executable,
}
return definition



def get_dashi(*args, **kwargs):
try:
import dashi
Expand Down
61 changes: 59 additions & 2 deletions ion/services/cei/test/test_process_dispatcher.py
Expand Up @@ -20,11 +20,12 @@

from interface.services.cei.iprocess_dispatcher_service import ProcessDispatcherServiceClient
from interface.objects import ProcessDefinition, ProcessSchedule, ProcessTarget,\
ProcessStateEnum, ProcessQueueingMode, ProcessRestartMode
ProcessStateEnum, ProcessQueueingMode, ProcessRestartMode, ProcessDefinitionType
from interface.services.icontainer_agent import ContainerAgentClient

from ion.services.cei.process_dispatcher_service import ProcessDispatcherService,\
PDLocalBackend, PDNativeBackend, PDBridgeBackend, get_dashi, get_pd_dashi_name
PDLocalBackend, PDNativeBackend, PDBridgeBackend, get_dashi, get_pd_dashi_name,\
PDDashiHandler

try:
from epu.states import InstanceState
Expand Down Expand Up @@ -122,6 +123,62 @@ def test_local_cancel(self):
self.assertTrue(ok)
self.mock_cc_terminate.assert_called_once_with("process-id")

@attr('UNIT', group='cei')
class ProcessDispatcherServiceDashiHandlerTest(PyonTestCase):
"""Tests the dashi frontend of the PD
"""

#TODO: add some more thorough tests

def setUp(self):

self.mock_backend = DotDict()
self.mock_backend['create_definition'] = Mock()
self.mock_backend['read_definition'] = Mock()
self.mock_backend['delete_definition'] = Mock()
self.mock_backend['spawn'] = Mock()
self.mock_backend['read_process'] = Mock()
self.mock_backend['list'] = Mock()
self.mock_backend['cancel'] = Mock()

self.mock_dashi = DotDict()
self.mock_dashi['handle'] = Mock()

self.pd_dashi_handler = PDDashiHandler(self.mock_backend, self.mock_dashi)

def test_process_definitions(self):

definition_id = "hello"
definition_type = ProcessDefinitionType.PYON_STREAM
executable = {'class': 'SomeThing', 'module': 'some.module'}
name = "whataname"
description = "describing stuff"

self.pd_dashi_handler.create_definition(definition_id, definition_type,
executable, name, description)
self.assertEqual(self.mock_backend.create_definition.call_count, 1)

self.pd_dashi_handler.describe_definition(definition_id)
self.assertEqual(self.mock_backend.read_definition.call_count, 1)

raised = False
try:
self.pd_dashi_handler.update_definition(definition_id, definition_type,
executable, name, description)
except BadRequest:
raised = True
assert raised, "update_definition didn't raise badrequest"

self.pd_dashi_handler.remove_definition(definition_id)
self.assertEqual(self.mock_backend.delete_definition.call_count, 1)

raised = False
try:
self.pd_dashi_handler.list_definitions()
except BadRequest:
raised = True
assert raised, "list_definitions didn't raise badrequest"


@attr('UNIT', group='cei')
class ProcessDispatcherServiceNativeTest(PyonTestCase):
Expand Down

0 comments on commit 42f9820

Please sign in to comment.