Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
BaseProcess.reply_ok and reply_err added and service operations chang…
Browse files Browse the repository at this point in the history
…ed; CC Agent improvement; timeouts

- Some but not all reply calls replaced.
- Added timeout to BaseProcess.rpc_send. Use with kwarg timeout=<secs>
- CC-Agent detects missing known containers and removes them from the list
- Enhanced CC-Agent operations and CC shell helpers
- Added sequence numbers for messages
- Added glue functions BaseProcess.reply_ok and reply_err and changes some
  RPC style service operations.
- Changed main bootstrap supervisor process start to similar to others
  • Loading branch information
Michael Meisinger committed May 22, 2010
1 parent 91c26cb commit 241d1d0
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 50 deletions.
7 changes: 7 additions & 0 deletions README.txt
Expand Up @@ -120,6 +120,13 @@ again (see above). Please review the branch logs for any hints.
Change log:
===========

2010-05-22:
- Added timeout to BaseProcess.rpc_send. Use with kwarg timeout=<secs>
- CC-Agent detects missing known containers and removes them from the list
- Enhanced CC-Agent operations and CC shell helpers
- Added sequence numbers for messages
- Added glue functions BaseProcess.reply_ok and reply_err and changes some
RPC style service operations.
2010-05-20:
- The system now looks for a local config file ionlocal.config and if exists,
overrides entries in ion.config.
Expand Down
67 changes: 54 additions & 13 deletions ion/core/base_process.py
Expand Up @@ -59,6 +59,7 @@ def __init__(self, receiver=None, spawnArgs=None):
self.procState = "UNINITIALIZED"
spawnArgs = spawnArgs.copy() if spawnArgs else {}
self.spawnArgs = spawnArgs
self.init_time = pu.currenttime_ms()

# Name (human readable label) of this process.
self.procName = self.spawnArgs.get('proc-name', __name__)
Expand Down Expand Up @@ -119,7 +120,7 @@ def op_init(self, content, headers, msg):
yield defer.maybeDeferred(self.plc_init)
logging.info('----- Process %s INITIALIZED -----' % (self.procName))

yield self.reply(msg, 'inform_init', {'status':'OK'}, {})
yield self.reply_ok(msg)
self.procState = "INITIALIZED"

def plc_init(self):
Expand Down Expand Up @@ -160,32 +161,38 @@ def op_none(self, content, headers, msg):
"""
logging.info('Catch message')

def rpc_send(self, recv, operation, content, headers=None):
def rpc_send(self, recv, operation, content, headers=None, **kwargs):
"""
Sends a message RPC style and waits for conversation message reply.
@retval a deferred with the message value
@brief Sends a message RPC style and waits for conversation message reply.
@retval a Deferred with the message value on receipt
"""
msgheaders = self._prepare_message(headers)
convid = msgheaders['conv-id']
# Create a new deferred that the caller can yield on to wait for RPC
rpc_deferred = defer.Deferred()
# Timeout handling
timeout = float(kwargs.get('timeout',0))
def _timeoutf(d, convid, *args, **kwargs):
logging.info("RPC on conversation %s timed out! "%(convid))
# Remove RPC. Delayed result will go to catch operation
d = self.rpc_conv.pop(convid)
d.errback(defer.TimeoutError())
if timeout:
rpc_deferred.setTimeout(timeout, _timeoutf, convid)
self.rpc_conv[convid] = rpc_deferred
d = self.send(recv, operation, content, msgheaders)
# Continue with deferred d. The caller can yield for the new deferred.
return rpc_deferred

@defer.inlineCallbacks
def send(self, recv, operation, content, headers=None):
"""
Send a message via the process receiver to destination.
@brief Send a message via the process receiver to destination.
Starts a new conversation.
@retval Deferred for send of message
"""
send = self.receiver.spawned.id.full
msgheaders = self._prepare_message(headers)
convid = msgheaders['conv-id']

yield pu.send(self.receiver, send, recv, operation,
content, msgheaders)
return pu.send(self.receiver, send, recv, operation, content, msgheaders)

def _prepare_message(self, headers):
msgheaders = {}
Expand All @@ -194,6 +201,7 @@ def _prepare_message(self, headers):
if not 'conv-id' in msgheaders:
convid = self._create_convid()
msgheaders['conv-id'] = convid
msgheaders['conv-seq'] = 1
self.conversations[convid] = Conversation()
return msgheaders

Expand All @@ -207,7 +215,8 @@ def _create_convid(self):

def reply(self, msg, operation, content, headers=None):
"""
Replies to a given message, continuing the ongoing conversation
@brief Replies to a given message, continuing the ongoing conversation
@retval Deferred or None
"""
ionMsg = msg.payload
recv = ionMsg.get('reply-to', None)
Expand All @@ -217,7 +226,34 @@ def reply(self, msg, operation, content, headers=None):
logging.error('No reply-to given for message '+str(msg))
else:
headers['conv-id'] = ionMsg.get('conv-id','')
self.send(pu.get_process_id(recv), operation, content, headers)
headers['conv-seq'] = int(ionMsg.get('conv-seq',0)) + 1
return self.send(pu.get_process_id(recv), operation, content, headers)

def reply_ok(self, msg, content=None, headers=None):
"""
Glue method that replies to a given message with a success message and
a given result value
@retval Deferred for send of reply
"""
rescont = {'status':'OK'}
if type(content) is dict:
rescont.update(content)
else:
rescont['value'] = content
return self.reply(msg, 'result', rescont, headers)

def reply_err(self, msg, content=None, headers=None):
"""
Glue method that replies to a given message with an error message and
an indication of the error.
@retval Deferred for send of reply
"""
rescont = {'status':'ERROR'}
if type(content) is dict:
rescont.update(content)
else:
rescont['value'] = content
return self.reply(msg, 'result', rescont, headers)

def get_conversation(self, headers):
convid = headers.get('conv-id', None)
Expand Down Expand Up @@ -257,6 +293,7 @@ def spawn_child(self, childproc, init=True):
assert not childproc in self.child_procs
self.child_procs.append(childproc)
child_id = yield childproc.spawn(self)
yield procRegistry.put(str(childproc.procName), str(child_id))
if init:
yield childproc.init()
defer.returnValue(child_id)
Expand Down Expand Up @@ -339,7 +376,8 @@ def spawn(self, supProc=None):

@defer.inlineCallbacks
def init(self):
(content, headers, msg) = yield self.supProcess.rpc_send(self.procId, 'init', {}, {'quiet':True})
(content, headers, msg) = yield self.supProcess.rpc_send(self.procId,
'init', {}, {'quiet':True})
if content.get('status','ERROR') == 'OK':
self.procState = 'INIT_OK'
else:
Expand Down Expand Up @@ -386,6 +424,9 @@ def build(self, spawnArgs=None):
receivers.append(receiver)
return receiver

# Spawn of the process using the module name
factory = ProtocolFactory(BaseProcess)

class BaseProcessClient(object):
"""
This is the base class for a process client. A process client is code that
Expand Down
5 changes: 2 additions & 3 deletions ion/core/bootstrap.py
Expand Up @@ -148,8 +148,8 @@ def spawn_processes(procs, sup=None):
supname = "bootstrap"
else:
supname = "supervisor."+str(sup_seq)
sup = BaseProcess()
sup.receiver.label = supname
suprec = base_process.factory.build({'proc-name':supname})
sup = suprec.procinst
sup.receiver.group = supname
supId = yield sup.spawn()
yield base_process.procRegistry.put(supname, str(supId))
Expand All @@ -158,7 +158,6 @@ def spawn_processes(procs, sup=None):
logging.info("Spawning child processes")
for child in children:
child_id = yield sup.spawn_child(child)
yield base_process.procRegistry.put(str(child.procName), str(child_id))

logging.debug("process_ids: "+ str(base_process.procRegistry.kvs))

Expand Down
107 changes: 88 additions & 19 deletions ion/core/cc/cc_agent.py
Expand Up @@ -3,20 +3,22 @@
"""
@file ion/core/cc/cc_agent.py
@author Michael Meisinger
@brief capability container control process
@brief capability container control process (agent)
"""

import logging
import os

from twisted.internet import defer
from twisted.internet import defer, reactor
import magnet
from magnet.container import Container
from magnet.spawnable import Receiver, spawn

from ion.agents.resource_agent import ResourceAgent
from ion.core import ionconst
from ion.core.base_process import BaseProcess, ProtocolFactory, ProcessDesc
from ion.core.base_process import procRegistry, processes, receivers
from ion.core.ioninit import ion_config
from ion.core.supervisor import Supervisor
import ion.util.procutils as pu

Expand All @@ -32,6 +34,8 @@ def plc_init(self):
self.ann_name = self.get_scoped_name('system', annName)
self.start_time = pu.currenttime_ms()
self.containers = {}
self.contalive = {}
self.last_identify = 0

# Declare CC announcement name
messaging = {'name_type':'fanout', 'args':{'scope':'system'}}
Expand Down Expand Up @@ -60,6 +64,7 @@ def _send_announcement(self, event):
"""
cdesc = {'node':str(os.uname()[1]),
'container-id':str(Container.id),
'agent':str(self.receiver.spawned.id.full),
'version':ionconst.VERSION,
'magnet':magnet.__version__,
'start-time':self.start_time,
Expand All @@ -76,19 +81,39 @@ def op_announce(self, content, headers, msg):
event = content['event']
if event == 'started' or event == 'identify':
self.containers[contid] = content
self.contalive[contid] = int(pu.currenttime_ms())
elif event == 'terminate':
del self.containers[contid]
logging.info("op_announce(): Know about %s containers!" % (len(self.containers)))
del self.contalive[contid]

logging.info("op_announce(): Know about %s containers!" % (len(self.containers)))

@defer.inlineCallbacks
def op_identify(self, content, headers, msg):
"""
Service operation: ask for identification; respond with announcement
"""
logging.info("op_identify(). Send announcement")
logging.info("op_identify(). Sending announcement")
self._check_alive()

# Set the new reference. All alive containers will respond afterwards
self.last_identify = int(pu.currenttime_ms())

reactor.callLater(3, self._check_alive)
yield self._send_announcement('identify')

def _check_alive(self):
"""
Check through all containers if we have a potential down one.
A container is deemed down if it has not responded since the preceding
identify message.
"""
for cid,cal in self.contalive.copy().iteritems():
if cal<self.last_identify:
logging.info("Container %s missing. Deemed down, remove." % (cid))
del self.containers[cid ]
del self.contalive[cid ]

@defer.inlineCallbacks
def op_spawn(self, content, headers, msg):
"""
Expand All @@ -97,20 +122,41 @@ def op_spawn(self, content, headers, msg):
procMod = str(content['module'])
child = ProcessDesc(name=procMod.rpartition('.')[2], module=procMod)
pid = yield self.spawn_child(child)
yield self.reply(msg, 'result', {'status':'OK', 'process-id':str(pid)})
yield self.reply_ok(msg, {'process-id':str(pid)})

def op_start_node(self, content, headers, msg):
pass

def op_terminate_node(self, content, headers, msg):
pass

def op_get_node_id(self, content, headers, msg):
pass

@defer.inlineCallbacks
def op_ping(self, content, headers, msg):
"""
Service operation: ping reply
"""
yield self.reply_ok(msg, None, {'quiet':True})

def op_get_config(self, content, headers, msg):
pass
@defer.inlineCallbacks
def op_get_info(self, content, headers, msg):
"""
Service operation: replies with all kinds of local information
"""
procsnew = processes.copy()
for pn,p in procsnew.iteritems():
cls = p.pop('class')
p['classname'] = cls.__name__
p['module'] = cls.__module__
res = {'services':procsnew}
procs = {}
for rec in receivers:
recinfo = {}
recinfo['classname'] = rec.procinst.__class__.__name__
recinfo['module'] = rec.procinst.__class__.__module__
recinfo['label'] = rec.label
procs[rec.spawned.id.full] = recinfo
res['processes'] = procs
yield self.reply_ok(msg, res)


def _augment_shell(self):
Expand All @@ -123,30 +169,42 @@ def _augment_shell(self):
return
logging.info("Augmenting Container Shell...")
control.cc.agent = self
from ion.core.ioninit import ion_config
control.cc.config = ion_config
from ion.core.base_process import procRegistry, processes, receivers
control.cc.pids = procRegistry.kvs
control.cc.svcs = processes
control.cc.procs = receivers
def send(recv, op, content=None, headers=None):
def send(recv, op, content=None, headers=None, **kwargs):
if content == None: content = {}
if recv in control.cc.pids: recv = control.cc.pids[recv]
d = self.send(recv, op, content, headers)
d = self.send(recv, op, content, headers, **kwargs)
control.cc.send = send
def rpc_send(recv, op, content=None, headers=None):
def rpc_send(recv, op, content=None, headers=None, **kwargs):
if content == None: content = {}
if recv in control.cc.pids: recv = control.cc.pids[recv]
d = self.rpc_send(recv, op, content, headers)
d = self.rpc_send(recv, op, content, headers, **kwargs)
control.cc.rpc_send = rpc_send
def spawn(name):
def _get_target(name):
mod = name
for p in control.cc.svcs.keys():
if p.startswith(name):
mod = control.cc.svcs[p]['class'].__module__
name = p
break
d = self.spawn_child(ProcessDesc(name=name, module=mod))
return (mod, name)
def _get_node(node=None):
if type(node) is int:
for cid in self.containers.keys():
if cid.find(str(node)) >= 0:
node = str(self.containers[cid]['agent'])
break
return node
def spawn(name, node=None, args=None):
(mod,name) = _get_target(name)
if node != None:
node = _get_node(node)
self.send(node,'spawn',{'module':mod})
else:
d = self.spawn_child(ProcessDesc(name=name, module=mod))
control.cc.spawn = spawn
def svc():
for pk,p in control.cc.svcs.iteritems():
Expand All @@ -157,7 +215,18 @@ def ps():
print r.label, r.name
setattr(control.cc, r.label, r.procinst)
control.cc.ps = ps

def nodes():
nodes = {}
for c in self.containers.values():
nodes[str(c['node'])] = 1
return nodes.keys()
control.cc.nodes = nodes
control.cc.cont = lambda: [str(k) for k in self.containers.keys()]
control.cc.info = lambda: self.containers[str(Container.id)]
control.cc.identify = lambda: self.send(self.ann_name, 'identify', '', {'quiet':True})
control.cc.getinfo = lambda n: self.send(_get_node(n), 'get_info', '')
control.cc.ping = lambda n: self.send(_get_node(n), 'ping', '', {'quiet':True})
control.cc.help = "CC Helpers. ATTRS: agent, config, pids, svcs, procs, help FUNC: send, rpc_send, spawn, svc, ps, nodes, cont, info, identify, ping"

# Spawn of the process using the module name
factory = ProtocolFactory(CCAgent)
Expand Down
2 changes: 1 addition & 1 deletion ion/play/hello_service.py
Expand Up @@ -33,7 +33,7 @@ def op_hello(self, content, headers, msg):
logging.info('op_hello: '+str(content))

# The following line shows how to reply to a message
yield self.reply(msg, 'reply', {'value':'Hello there, '+str(content)}, {})
yield self.reply_ok(msg, {'value':'Hello there, '+str(content)}, {})


class HelloServiceClient(BaseServiceClient):
Expand Down

0 comments on commit 241d1d0

Please sign in to comment.