Permalink
Browse files

Add Pyon HTTP gateway client

  • Loading branch information...
oldpatricka committed Sep 18, 2013
1 parent 7ec358d commit 6b60db93a4cb52fb69054f40bfc16ed62302546d
Showing with 609 additions and 12 deletions.
  1. +17 −2 ceiclient/cli.py
  2. +156 −2 ceiclient/client.py
  3. +354 −5 ceiclient/commands.py
  4. +82 −3 ceiclient/connection.py
View
@@ -9,8 +9,8 @@
import ceiclient
from ceiclient.exception import CeiClientError
-from ceiclient.commands import DASHI_SERVICES, PYON_SERVICES
-from ceiclient.connection import DashiCeiConnection, PyonCeiConnection
+from ceiclient.commands import DASHI_SERVICES, PYON_SERVICES, PYON_GATEWAY_SERVICES
+from ceiclient.connection import DashiCeiConnection, PyonCeiConnection, PyonHTTPGateWayCeiConnection
from ceiclient.common import safe_print
DEFAULT_RABBITMQ_USERNAME = 'guest'
@@ -25,6 +25,12 @@ def using_pyon():
"""
return '-P' in sys.argv or '--pyon' in sys.argv
+
+def using_pyon_gateway():
+ """Peek into argv to see if user wants to use pyon gateway or not
+ """
+ return '-G' in sys.argv or '--pyon-gateway' in sys.argv
+
parser = argparse.ArgumentParser(description='Client to control CEI services')
parser.add_argument('--broker', '-b', action='store', dest='broker')
@@ -40,9 +46,12 @@ def using_pyon():
parser.add_argument('--sysname', '-s', action='store', default=None)
parser.add_argument('--caller', '-c', action='store', dest='caller', default=None)
parser.add_argument('--pyon', '-P', action='store_const', const=True)
+parser.add_argument('--pyon-gateway', '-G', action='store_const', const=True)
if using_pyon():
SERVICES = PYON_SERVICES
+elif using_pyon_gateway():
+ SERVICES = PYON_GATEWAY_SERVICES
else:
SERVICES = DASHI_SERVICES
@@ -56,6 +65,7 @@ def using_pyon():
opts = parser.parse_args()
+
def main():
if opts.service not in SERVICES:
raise ValueError('Service %s is not supported' % opts.service)
@@ -97,6 +107,11 @@ def main():
sysname=amqp_settings.get('coi_services_system_name'),
timeout=opts.timeout)
client = service.client(conn, service_name=opts.service_name)
+ elif opts.pyon_gateway:
+ conn = PyonHTTPGateWayCeiConnection(amqp_settings['rabbitmq_host'],
+ port=5000,
+ timeout=opts.timeout)
+ client = service.client(conn, dashi_name=opts.service_name)
else:
conn = DashiCeiConnection(amqp_settings['rabbitmq_host'],
amqp_settings['rabbitmq_username'],
View
@@ -87,6 +87,12 @@ def del_if_present(_dict, key):
return changed
+class PyonHTTPGatewayCeiClient(object):
+
+ def __init__(self, connection, **kwargs):
+ self.connection = connection
+
+
class DTRSClient(DashiCeiClient):
dashi_name = 'dtrs'
@@ -109,7 +115,8 @@ def update_dt(self, caller, dt_name, dt_definition):
dt_name=dt_name, dt_definition=dt_definition)
def add_site(self, caller, site_name, site_definition):
- return self.connection.call(self.dashi_name, 'add_site', caller=caller, site_name=site_name, site_definition=site_definition)
+ return self.connection.call(
+ self.dashi_name, 'add_site', caller=caller, site_name=site_name, site_definition=site_definition)
def describe_site(self, caller, site_name):
return self.connection.call(self.dashi_name, 'describe_site', caller=caller, site_name=site_name)
@@ -188,6 +195,136 @@ def remove_domain_definition(self, name):
return self.connection.call(self.dashi_name, 'remove_domain_definition', definition_id=name)
+class PyonHTTPPDClient(PyonHTTPGatewayCeiClient):
+
+ dashi_name = 'process_dispatcher'
+
+ pyon_process_queue_map = {
+ 1: 'NEVER', 2: 'ALWAYS', 3: 'START_ONLY', 4: 'RESTART_ONLY',
+ }
+
+ pyon_process_restart_map = {
+ 1: 'NEVER', 2: 'ALWAYS', 3: 'ABNORMAL',
+ }
+
+ pyon_process_queue_reverse_map = {
+ 'NEVER': 1, 'ALWAYS': 2, 'START_ONLY': 3, 'RESTART_ONLY': 4,
+ }
+
+ pyon_process_restart_reverse_map = {
+ 'NEVER': 1, 'ALWAYS': 2, 'ABNORMAL': 3,
+ }
+
+ def get_ha_process_id(self, process_name):
+ processes = self.describe_processes()
+ found_id = None
+ for proc in processes:
+ try:
+ def_name = proc['process_configuration']['highavailability']['process_definition_name']
+ if def_name == process_name:
+ found_id = proc['process_configuration']['agent']['resource_id']
+ break
+
+ except KeyError:
+ continue
+ return found_id
+
+ def set_system_boot(self, system_boot):
+ self.connection.call(self.dashi_name, 'set_system_boot', system_boot=system_boot)
+
+ def create_process_definition(self, process_definition=None, process_definition_id=None):
+ if process_definition is None:
+ raise CeiClientError("You must provide a process defintion")
+
+ if process_definition_id is None:
+ raise CeiClientError("You must provide a process defintion id")
+
+ process_definition['type_'] = 'ProcessDefinition'
+ kwargs = dict(process_definition_id=process_definition_id, process_definition=process_definition)
+ # TODO: what is definition_type?
+ return self.connection.call(self.dashi_name, 'create_process_definition', **kwargs)
+
+ def update_process_definition(self, process_definition=None, process_definition_id=None):
+ if process_definition is None:
+ raise CeiClientError("You must provide a process defintion")
+
+ if process_definition_id is None:
+ raise CeiClientError("You must provide a process defintion id")
+
+ process_definition['type_'] = 'ProcessDefinition'
+ kwargs = dict(process_definition_id=process_definition_id, process_definition=process_definition)
+ # TODO: what is definition_type?
+ return self.connection.call(self.dashi_name, 'update_process_definition', **kwargs)
+
+ def describe_process_definition(self, process_definition_id='', process_definition_name=''):
+ kwargs = {}
+
+ if process_definition_id:
+ kwargs['process_definition_id'] = process_definition_id
+ if process_definition_name:
+ kwargs['process_definition_name'] = process_definition_name
+ return self.connection.call(self.dashi_name, 'read_process_definition', **kwargs)
+
+ def remove_process_definition(self, process_definition_id=''):
+ return self.connection.call(self.dashi_name,
+ 'delete_process_definition', process_definition_id=process_definition_id)
+
+ def list_process_definitions(self):
+ raise CeiClientError("The Pyon PD does not support listing process definitions")
+
+ def schedule_process(self, upid, process_definition_id=None,
+ process_definition_name=None, configuration=None,
+ subscribers=None, constraints=None, queueing_mode=None,
+ restart_mode=None, execution_engine_id=None, node_exclusive=None):
+
+ target = {}
+ if execution_engine_id is not None:
+ target['execution_engine_id'] = execution_engine_id
+ if node_exclusive is not None:
+ target['node_exclusive'] = node_exclusive
+ if constraints is not None:
+ target['constraints'] = constraints
+
+ schedule = {}
+ if target:
+ target['type_'] = 'ProcessTarget'
+ schedule['target'] = target
+ if restart_mode:
+ schedule['restart_mode'] = self.pyon_process_restart_reverse_map.get(restart_mode)
+ if queueing_mode:
+ schedule['queueing_mode'] = self.pyon_process_restart_reverse_map.get(queueing_mode)
+
+ if schedule:
+ schedule['type_'] = 'ProcessSchedule'
+
+ kwargs = dict(process_id=upid, process_definition_id=process_definition_id,
+ process_definition_name=process_definition_name,
+ configuration=configuration, schedule=schedule)
+
+ # only include this arg if it is provided
+ if process_definition_name is not None:
+ kwargs['process_definition_name'] = process_definition_name
+
+ return self.connection.call(self.dashi_name, 'schedule_process',
+ **kwargs)
+
+ def describe_process(self, upid):
+ return self.connection.call(self.dashi_name, 'read_process', process_id=upid)
+
+ def describe_processes(self):
+ return self.connection.call(self.dashi_name, 'list_processes')
+
+ def terminate_process(self, upid):
+ return self.connection.call(self.dashi_name, 'cancel_process', process_id=upid)
+
+ def restart_process(self, upid):
+ #return self.connection.call(self.dashi_name, 'restart_process', upid=upid)
+ raise CeiClientError("Pyon PD does not support restart")
+
+ def dump(self):
+ return self.connection.call(self.dashi_name, 'dump')
+
+
class PDClient(DashiCeiClient):
dashi_name = 'process_dispatcher'
@@ -229,11 +366,11 @@ def update_process_definition(self, process_definition=None, process_definition_
def describe_process_definition(self, process_definition_id='', process_definition_name=''):
kwargs = {}
+
if process_definition_id:
kwargs['definition_id'] = process_definition_id
if process_definition_name:
kwargs['definition_name'] = process_definition_name
-
return self.connection.call(self.dashi_name, 'describe_definition', **kwargs)
def remove_process_definition(self, process_definition_id=''):
@@ -292,6 +429,23 @@ def reconfigure_policy(self, new_policy_params, new_policy=None):
return self.connection.call(self.dashi_name, 'reconfigure_policy', **message)
+class PyonHTTPHAAgentClient(DashiCeiClient):
+
+ dashi_name = 'ha_agent' # this will almost always be overridden
+
+ def status(self):
+ return self.connection.call(self.dashi_name, 'status', call_type='agent')
+
+ def dump(self):
+ return self.connection.call(self.dashi_name, 'dump', call_type='agent')
+
+ def reconfigure_policy(self, new_policy_params, new_policy=None):
+ message = {'new_policy_params': new_policy_params}
+ if new_policy is not None:
+ message['new_policy_name'] = new_policy
+ return self.connection.call(self.dashi_name, 'reconfigure_policy', call_type='agent', **message)
+
+
class PyonPDClient(PyonCeiClient):
service_name = 'process_dispatcher'
Oops, something went wrong.

0 comments on commit 6b60db9

Please sign in to comment.