Permalink
Browse files

WIP pyon gateway

  • Loading branch information...
1 parent 494dcb3 commit 83aeb348bb919c327fad5afc8d7fa18de2fdcc9a @oldpatricka oldpatricka committed Nov 13, 2013
Showing with 72 additions and 13 deletions.
  1. +12 −2 ceiclient/cli.py
  2. +5 −4 ceiclient/client.py
  3. +20 −4 ceiclient/commands.py
  4. +14 −0 ceiclient/common.py
  5. +21 −3 ceiclient/connection.py
View
@@ -18,6 +18,8 @@
DEFAULT_RABBITMQ_HOSTNAME = 'localhost'
DEFAULT_RABBITMQ_EXCHANGE = None
DEFAULT_TIMEOUT = 5
+DEFAULT_GATEWAY_HOSTNAME = 'localhost'
+DEFAULT_GATEWAY_PORT = 5001
def using_pyon():
@@ -47,6 +49,8 @@ def using_pyon_gateway():
parser.add_argument('--caller', '-c', action='store', dest='caller', default=None)
parser.add_argument('--pyon', '-P', action='store_const', const=True)
parser.add_argument('--pyon-gateway', '-G', action='store_const', const=True)
+parser.add_argument('--gateway-port', '-R', action='store', default=None)
+parser.add_argument('--gateway-host', '-H', action='store', default=None)
if using_pyon():
SERVICES = PYON_SERVICES
@@ -82,6 +86,8 @@ def main():
amqp_settings['rabbitmq_username'] = DEFAULT_RABBITMQ_USERNAME
amqp_settings['rabbitmq_password'] = DEFAULT_RABBITMQ_PASSWORD
amqp_settings['rabbitmq_exchange'] = DEFAULT_RABBITMQ_EXCHANGE
+ amqp_settings['gateway_port'] = DEFAULT_GATEWAY_PORT
+ amqp_settings['gateway_host'] = DEFAULT_GATEWAY_HOSTNAME
# Read AMQP settings and credentials from the cloudinitd DB if possible
if opts.run_name:
@@ -96,6 +102,10 @@ def main():
amqp_settings['rabbitmq_password'] = opts.password
if opts.exchange:
amqp_settings['rabbitmq_exchange'] = opts.exchange
+ if opts.gateway_port:
+ amqp_settings['gateway_port'] = opts.gateway_port
+ if opts.gateway_host:
+ amqp_settings['gateway_host'] = opts.gateway_host
if opts.sysname:
amqp_settings['coi_services_system_name'] = opts.sysname
amqp_settings['dashi_sysname'] = opts.sysname
@@ -108,8 +118,8 @@ def main():
timeout=opts.timeout)
client = service.client(conn, service_name=opts.service_name)
elif opts.pyon_gateway:
- conn = PyonHTTPGateWayCeiConnection(amqp_settings['rabbitmq_host'],
- port=5000,
+ conn = PyonHTTPGateWayCeiConnection(amqp_settings['gateway_host'],
+ port=amqp_settings.get('gateway_port', DEFAULT_GATEWAY_PORT),
timeout=opts.timeout)
client = service.client(conn, dashi_name=opts.service_name)
else:
View
@@ -377,6 +377,7 @@ def remove_process_definition(self, process_definition_id=''):
return self.connection.call(self.dashi_name, 'remove_definition', definition_id=process_definition_id)
def list_process_definitions(self):
+ print self.connection
return self.connection.call(self.dashi_name, 'list_definitions')
def schedule_process(self, upid, process_definition_id=None,
@@ -440,16 +441,16 @@ class PyonHTTPHAAgentClient(DashiCeiClient):
dashi_name = 'ha_agent' # this will almost always be overridden
def status(self):
- return self.connection.call(self.dashi_name, 'status', call_type='agent')
+ return self.connection.call(self.dashi_name, 'status', call_type='agent_resource')
def dump(self):
- return self.connection.call(self.dashi_name, 'dump', call_type='agent')
+ return self.connection.call(self.dashi_name, 'dump', call_type='agent_resource')
def reconfigure_policy(self, new_policy_params, new_policy=None):
message = {'new_policy_params': new_policy_params}
if new_policy is not None:
message['new_policy_name'] = new_policy
- return self.connection.call(self.dashi_name, 'reconfigure_policy', call_type='agent', **message)
+ return self.connection.call(self.dashi_name, 'reconfigure_policy', call_type='agent_resource', **message)
class PyonPDClient(PyonCeiClient):
@@ -582,7 +583,7 @@ def provision(self, deployable_type, site, allocation, vars, caller=None):
return self.connection.call(self.dashi_name, 'provision',
launch_id=launch_id, deployable_type=deployable_type,
instance_ids=instance_ids,
- subscribers=[], site=site,
+ site=site,
allocation=allocation, vars=vars, caller=caller)
def describe_nodes(self, nodes=None, caller=None):
View
@@ -1635,7 +1635,9 @@ def __init__(self, subparsers):
@staticmethod
def execute(client, opts):
+ print "HELLO"
process_id = client.get_ha_process_id(opts.process)
+ print process_id
if not process_id:
raise CeiClientError("Couldn't find agent for process %s" % opts.process)
ha_client = PyonHTTPHAAgent.ha_client(client.connection, dashi_name=process_id)
@@ -1731,13 +1733,14 @@ def __init__(self, subparsers):
@staticmethod
def execute(client, opts):
- ha_dashi_name = "ha_%s" % opts.process
- ha_client = HAAgent.ha_client(client.connection, dashi_name=ha_dashi_name)
+ process_id = client.get_ha_process_id(opts.process)
+ ha_client = PyonHTTPHAAgent.ha_client(client.connection, dashi_name=process_id)
deadline = time.time() + opts.max
while 1:
status = ha_client.status()
+ status = status['result']
if status:
if status in ("READY", "STEADY"):
return status
@@ -1956,6 +1959,20 @@ def client(connection, dashi_name=None):
return PDClient(connection, dashi_name=dashi_name)
+class PyonHTTPPDSystemBoot(CeiService):
+
+ name = 'system-boot'
+ help = 'Control the Process Dispatcher system boot state'
+
+ commands = {}
+ for command in [PDSystemBootOn, PDSystemBootOff]:
+ commands[command.name] = command
+
+ @staticmethod
+ def client(connection, dashi_name=None):
+ return PyonHTTPPDClient(connection, dashi_name=dashi_name)
+
+
class PyonHTTPProcess(CeiService):
name = 'process'
@@ -2066,7 +2083,6 @@ def client(connection, dashi_name=None):
@staticmethod
def ha_client(connection, dashi_name=None):
-
return PyonHTTPHAAgentClient(connection, dashi_name=dashi_name)
@@ -2139,5 +2155,5 @@ def client(connection, service_name=None):
PYON_SERVICES[service.name] = service
PYON_GATEWAY_SERVICES = {}
-for service in [PyonHTTPProcess, PyonHTTPProcessDefinition, PyonHTTPHAAgent]:
+for service in [PyonHTTPProcess, PyonHTTPProcessDefinition, PyonHTTPHAAgent, PyonHTTPPDSystemBoot]:
PYON_GATEWAY_SERVICES[service.name] = service
View
@@ -3,6 +3,7 @@
import errno
import pprint
+
def safe_print(p_str):
try:
print(p_str)
@@ -12,6 +13,7 @@ def safe_print(p_str):
else:
raise
+
def safe_pprint(p_str):
try:
pprint.pprint(p_str)
@@ -21,6 +23,7 @@ def safe_pprint(p_str):
else:
raise
+
def load_cloudinitd_db(run_name):
# doing imports within function because they are not needed elsewhere
@@ -69,6 +72,17 @@ def load_cloudinitd_db(run_name):
vars['coi_services_system_name'] = basenode.get_attr_from_bag("coi_services_system_name")
except ConfigException:
vars['coi_services_system_name'] = None
+
+ try:
+ vars['gateway_port'] = basenode.get_attr_from_bag("gateway_port")
+ except ConfigException:
+ vars['gateway_port'] = None
+
+ try:
+ vars['gateway_host'] = basenode.get_attr_from_bag("hostname")
+ except ConfigException:
+ vars['gateway_host'] = None
+
try:
vars['dashi_sysname'] = basenode.get_attr_from_bag("dashi_sysname")
except ConfigException:
@@ -6,7 +6,7 @@
from dashi import DashiConnection
from dashi.bootstrap import DEFAULT_EXCHANGE
-from dashi.exceptions import NotFoundError, WriteConflictError, BadRequestError
+from dashi.exceptions import NotFoundError
from ceiclient.exception import CeiClientError
@@ -143,7 +143,7 @@ def disconnect(self):
class PyonHTTPGateWayCeiConnection(CeiConnection):
- def __init__(self, hostname, timeout=None, port=5000, ssl=False):
+ def __init__(self, hostname, timeout=None, port=5001, ssl=False):
self.hostname = hostname
self.timeout = timeout
@@ -161,7 +161,11 @@ def _make_url(self, service, operation, call_type=None):
if call_type is None:
call_type = 'service'
elif call_type == 'agent':
+ call_type = 'agent'
operation = 'execute_agent'
+ elif call_type == 'agent_resource':
+ call_type = 'agent'
+ operation = 'execute_resource'
return "%s/ion-%s/%s/%s" % (self.url, call_type, service, operation)
def _make_parameters(self, service, operation, params, call_type=None):
@@ -176,7 +180,7 @@ def _make_parameters(self, service, operation, params, call_type=None):
'params': params
}
}
- else:
+ elif call_type == 'agent':
params['command'] = {
'type_': 'AgentCommand',
'command': operation
@@ -189,6 +193,19 @@ def _make_parameters(self, service, operation, params, call_type=None):
'params': params
}
}
+ elif call_type == 'agent_resource':
+ params['command'] = {
+ 'type_': 'AgentCommand',
+ 'command': operation
+ }
+
+ payload = {
+ 'agentRequest': {
+ 'agentId': service,
+ 'agentOp': 'execute_resource',
+ 'params': params
+ }
+ }
params = {'payload': json.dumps(payload)}
return params
@@ -202,6 +219,7 @@ def call(self, service, operation, retry=PYON_RETRIES, call_type=None, **kwargs)
try:
return result_json['data']['GatewayResponse']
except KeyError:
+ print result_json
error = result_json['data']['GatewayError']
if error.get('Exception') == 'NotFound':
raise NotFoundError("%s: %s" % (error['Exception'], error['Message']))

0 comments on commit 83aeb34

Please sign in to comment.