Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
Switch to new PD api
Browse files Browse the repository at this point in the history
  • Loading branch information
oldpatricka committed Sep 10, 2012
1 parent 2b300b8 commit f456863
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 15 deletions.
87 changes: 76 additions & 11 deletions ceiclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from commands import DTRSAddCredentials, DTRSDescribeCredentials, DTRSListCredentials, DTRSRemoveCredentials, DTRSUpdateCredentials
from commands import EPUMAdd, EPUMDescribe, EPUMList, EPUMReconfigure, EPUMRemove
from commands import EPUMAddDefinition, EPUMDescribeDefinition, EPUMListDefinitions, EPUMRemoveDefinition, EPUMUpdateDefinition
from commands import PDDispatch, PDDescribeProcess, PDDescribeProcesses, PDTerminateProcess, PDDump, PDRestartProcess, PDWaitProcess
from commands import PDSchedule, PDDescribeProcess, PDDescribeProcesses, PDTerminateProcess, PDDump, PDRestartProcess, PDWaitProcess
from commands import PDCreateProcessDefinition, PDDescribeProcessDefinition, PDUpdateProcessDefinition, PDRemoveProcessDefinition, PDListProcessDefinitions
from commands import PyonPDCreateProcessDefinition, PyonPDUpdateProcessDefinition, PyonPDReadProcessDefinition, PyonPDDeleteProcessDefinition, PyonPDListProcessDefinitions
from commands import PyonPDAssociateExecutionEngine, PyonPDDissociateExecutionEngine
from commands import PyonPDCreateProcess, PyonPDScheduleProcess, PyonPDCancelProcess, PyonPDReadProcess, PyonPDListProcesses, PyonPDWaitProcess
Expand Down Expand Up @@ -250,6 +251,65 @@ def remove_domain_definition(self, name):
for command in [EPUMDescribeDefinition, EPUMListDefinitions, EPUMUpdateDefinition, EPUMAddDefinition, EPUMRemoveDefinition]:
commands[command.name] = command

class PDProcessDefinitionClient(CeiClient):

dashi_name = 'processdispatcher'
name = 'process-definition'
help = 'Control the Process Dispatcher Service'

def __init__(self, connection, dashi_name=None):
if dashi_name:
self.dashi_name = dashi_name
self._connection = connection

def create_process_definition(self, process_definition=None, process_definition_id=None):
if process_definition is None:
msg = "You must provide a process defintion"
sys.exit(msg)

if process_definition_id is None:
msg = "You must provide a process defintion id"
sys.exit(msg)

executable = process_definition.get('executable')
definition_type = process_definition.get('definition_type')
name = process_definition.get('name')
description = process_definition.get('description')
args = dict(definition_id=process_definition_id, definition_type=definition_type,
executable=executable, name=name, description=description)
# TODO: what is definition_type?
return self._connection.call(self.dashi_name, 'create_definition', args=args)

def update_process_definition(self, process_definition=None, process_definition_id=None):
if process_definition is None:
msg = "You must provide a process defintion"
sys.exit(msg)

if process_definition_id is None:
msg = "You must provide a process defintion id"
sys.exit(msg)

executable = process_definition.get('executable')
definition_type = process_definition.get('type')
name = process_definition.get('name')
description = process_definition.get('description')
# TODO: what is definition_type?
return self._connection.call(self.dashi_name, 'update_definition',
definition_id=process_definition_id, executable=executable,
definition_type=definition_type, name=name, description=description)

def describe_process_definition(self, process_definition_id=''):
return self._connection.call(self.dashi_name, 'describe_definition', definition_id=process_definition_id)

def remove_process_definition(self, process_definition_id=''):
return self._connection.call(self.dashi_name, 'remove_definition', definition_id=process_definition_id)

def list_process_definitions(self):
return self._connection.call(self.dashi_name, 'list_definitions')

commands = {}
for command in [PDCreateProcessDefinition, PDUpdateProcessDefinition, PDDescribeProcessDefinition, PDRemoveProcessDefinition, PDListProcessDefinitions]:
commands[command.name] = command

class PDClient(CeiClient):

Expand All @@ -262,11 +322,11 @@ def __init__(self, connection, dashi_name=None):
self.dashi_name = dashi_name
self._connection = connection

def dispatch_process(self, upid, spec, subscribers, constraints):
return self._connection.call(self.dashi_name, 'dispatch_process',
upid=upid, spec=spec,
subscribers=subscribers,
constraints=constraints)
def schedule_process(self, upid, process_definition_id, subscribers, constraints):
args = dict(upid=upid, definition_id=process_definition_id,
subscribers=subscribers, constraints=constraints)
return self._connection.call(self.dashi_name, 'schedule_process',
args=args)

def describe_process(self, upid):
return self._connection.call(self.dashi_name, 'describe_process', upid=upid)
Expand All @@ -284,7 +344,7 @@ def dump(self):
return self._connection.call(self.dashi_name, 'dump')

commands = {}
for command in [PDDispatch, PDDescribeProcess, PDDescribeProcesses, PDTerminateProcess, PDDump, PDRestartProcess, PDWaitProcess]:
for command in [PDSchedule, PDDescribeProcess, PDDescribeProcesses, PDTerminateProcess, PDDump, PDRestartProcess, PDWaitProcess]:
commands[command.name] = command


Expand All @@ -309,11 +369,16 @@ def create_process_definition(self, process_definition=None, process_definition_
message['process_definition_id'] = process_definition_id
return self._connection.call(self.service_name, 'create_process_definition', **message)

def update_process_definition(self, process_definition=None):
def update_process_definition(self, process_definition=None, process_definition_id=None):
if process_definition is None:
process_definition = {}

message = self._format_pyon_dict(process_definition, key='process_definition', type_='ProcessDefinition')
message = {}
if process_definition_id is not None:
message['process_definition_id'] = process_definition_id
message['process_definition'] = self._format_pyon_dict(process_definition, type_='ProcessDefinition')
if process_definition_id is not None:
message['process_definition_id'] = process_definition_id
return self._connection.call(self.service_name, 'update_process_definition', **message)

def read_process_definition(self, process_definition_id=''):
Expand Down Expand Up @@ -477,8 +542,8 @@ def terminate_all(self):

DASHI_SERVICES = {}
for service in [DTRSDTClient, DTRSSiteClient, DTRSCredentialsClient,
EPUMClient, EPUMDefinitionClient, PDClient, ProvisionerClient,
PyonPDProcessDefinitionClient]:
EPUMClient, EPUMDefinitionClient, PDClient, PDProcessDefinitionClient,
ProvisionerClient, ]:
DASHI_SERVICES[service.name] = service

PYON_SERVICES = {}
Expand Down
112 changes: 108 additions & 4 deletions ceiclient/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,49 @@ def execute(client, opts):
return client.update_domain_definition(opts.definition_id, definition)


class PDDispatch(CeiCommand):
class PDCreateProcessDefinition(CeiCommand):

name = 'dispatch'
name = 'create'

def __init__(self, subparsers):
parser = subparsers.add_parser(self.name)
parser.add_argument('process_spec', metavar='process_spec.yml')
parser.add_argument('-i', '--definition-id', dest="definition_id", metavar='ID')

@staticmethod
def execute(client, opts):
try:
with open(opts.process_spec) as f:
process_spec = yaml.load(f)
except Exception, e:
print "Problem reading process specification file %s: %s" % (opts.process_spec, e)
sys.exit(1)

return client.create_process_definition(process_definition=process_spec,
process_definition_id=opts.definition_id)


class PDDescribeProcessDefinition(CeiCommand):

name = 'describe'

def __init__(self, subparsers):
parser = subparsers.add_parser(self.name)
parser.add_argument('process_definition_id', metavar='pd_id')

@staticmethod
def execute(client, opts):
return client.describe_process_definition(opts.process_definition_id)


class PDUpdateProcessDefinition(CeiCommand):

name = 'update'

def __init__(self, subparsers):
parser = subparsers.add_parser(self.name)
parser.add_argument('process_spec', metavar='process_spec.yml')
parser.add_argument('-i', '--definition-id', dest="definition_id", metavar='ID')

@staticmethod
def execute(client, opts):
Expand All @@ -462,7 +497,75 @@ def execute(client, opts):
print "Problem reading process specification file %s: %s" % (opts.process_spec, e)
sys.exit(1)

return client.dispatch_process(str(uuid.uuid4().hex), process_spec, None, None)
return client.update_process_definition(process_spec, opts.definition_id)


class PDRemoveProcessDefinition(CeiCommand):

name = 'remove'

def __init__(self, subparsers):
parser = subparsers.add_parser(self.name)
parser.add_argument('process_definition_id', metavar='pd_id')

@staticmethod
def execute(client, opts):
return client.remove_process_definition(opts.process_definition_id)


class PDListProcessDefinitions(CeiCommand):

name = 'list'

def __init__(self, subparsers):
parser = subparsers.add_parser(self.name)

@staticmethod
def execute(client, opts):
return client.list_process_definitions()


class PDScheduleProcess(CeiCommand):

name = 'schedule'

def __init__(self, subparsers):

parser = subparsers.add_parser(self.name)
parser.add_argument('process_definition_id', metavar='pd_id')
parser.add_argument('configuration', metavar='process_configuration.yml')
parser.add_argument('process_id', metavar='proc_id')

@staticmethod
def execute(client, opts):
try:
with open(opts.schedule) as f:
schedule = yaml.load(f)
except Exception, e:
print "Problem reading process schedule file %s: %s" % (opts.schedule, e)
sys.exit(1)

try:
with open(opts.configuration) as f:
configuration = yaml.load(f)
except exception, e:
print "problem reading process configuration file %s: %s" % (opts.configuration, e)
sys.exit(1)
return client.schedule_process(opts.process_definition_id, configuration, opts.process_id)


class PDSchedule(CeiCommand):

name = 'schedule'

def __init__(self, subparsers):

parser = subparsers.add_parser(self.name)
parser.add_argument('process_definition_id', metavar='pd_id')

@staticmethod
def execute(client, opts):
return client.schedule_process(str(uuid.uuid4().hex), opts.process_definition_id, None, None)


class PDDescribeProcesses(CeiCommand):
Expand Down Expand Up @@ -598,6 +701,7 @@ def __init__(self, subparsers):

parser = subparsers.add_parser(self.name)
parser.add_argument('process_spec', metavar='process_spec.yml')
parser.add_argument('-i', '--definition-id', dest="definition_id", metavar='ID')

@staticmethod
def execute(client, opts):
Expand All @@ -608,7 +712,7 @@ def execute(client, opts):
print "Problem reading process specification file %s: %s" % (opts.process_spec, e)
sys.exit(1)

return client.update_process_definition(process_spec)
return client.update_process_definition(process_spec, opts.definition_id)


class PyonPDReadProcessDefinition(CeiCommand):
Expand Down

0 comments on commit f456863

Please sign in to comment.