Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
CC agent with announcement on identification request; can spawn a pro…
Browse files Browse the repository at this point in the history
…cess
  • Loading branch information
Michael Meisinger committed May 17, 2010
1 parent ea95772 commit e710e44
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Expand Up @@ -4,6 +4,6 @@
*.*.swp
.*.*.swo
*.log
twistd.pid
*.pid
_trial*
docs
24 changes: 23 additions & 1 deletion README.txt
Expand Up @@ -22,6 +22,8 @@ Start CC ("Magnet" Python Capability Container) shell with:
::
twistd -n magnet -h amoeba.ucsd.edu

(to end a magnet container shell, press Ctrl-D Ctrl-C)

Start system by executing within the CC shell:
><>
from ion.core import bootstrap
Expand All @@ -30,12 +32,13 @@ Start system by executing within the CC shell:
Alternatively from shell executing a script:
::
twistd -n magnet -h amoeba.ucsd.edu res/scripts/bootstrap.py
twistd -n magnet -h amoeba.ucsd.edu res/scripts/newcc.py

Run trial test cases (recursively)
::
trial ion
trial ion.core
trial ion.services.coi.test.test_resource_registry
trial ion


Install the dependencies: Magnet (see Magnet's Readme)
Expand Down Expand Up @@ -82,6 +85,25 @@ again (see above). Please review the branch logs for any cues.
Change log:
===========

2010-05-16:
- Removed support for BaseProcess.send_message and reply_message. Always use
send, reply and rpc_send now.
- Any BaseProcess instance can now spawn_child() other processes.
- Removed RpcClient class, because every process can do rpc_send()
- Service processes now also listen to their service name's queue. The service
name is determined from the service declaration. Two processes will listen
to the same queue and take messages round robin from the queue.
- Startup arguments evaluated, for instance to start with system name set:
twistd -n magnet -a sysname=mysys
twistd -n magnet -a "{'sysname':'mysys'}"
- Added capability container agent process. Start with:
twistd -n magnet res/scripts/newcc.py
Agents announce themselves to others in the same system and can spawn procs.
- Name scope 'local' for messaging names means now really local to one container.
Use scope 'system' for names unique for each bootstrapped system. Do not use
global names, because they will clash.
- Less verbose trace output for process init messages and changes to other
trace output as well.
2010-05-10:
- Based on entries in config files, service process modules are sought and
loaded in order to collect the service process declarations. This enables
Expand Down
7 changes: 7 additions & 0 deletions ion/core/bootstrap.py
Expand Up @@ -91,6 +91,13 @@ def _set_container_args(contargs=None):
ioninit.cont_args.update(evargs)
except Exception, e:
logging.error('Invalid argument format: ', e)
elif contargs.find('=') > 0:
# Key=value arguments separated by comma
print "Parsing KV"
args = contargs.split(',')
for a in args:
k,s,v = a.partition('=')
ioninit.cont_args[k.strip()] = v.strip()
else:
ioninit.cont_args['args'] = contargs

Expand Down
90 changes: 80 additions & 10 deletions ion/core/cc/cc_agent.py
Expand Up @@ -7,40 +7,110 @@
"""

import logging
import os

from magnet.spawnable import Receiver
from twisted.internet import defer
import magnet
from magnet.container import Container
from magnet.spawnable import Receiver, spawn

from ion.core.supervisor import Supervisor, ProcessDesc
from ion.core.base_process import BaseProcess, ProtocolFactory
from ion.agents.resource_agent import ResourceAgent
from ion.core import ionconst
from ion.core.base_process import BaseProcess, ProtocolFactory, ProcessDesc
from ion.core.supervisor import Supervisor
import ion.util.procutils as pu


class CCAgent(ResourceAgent):
"""Capability Container agent process interface
"""

Capability Container agent process interface
"""
@defer.inlineCallbacks
def plc_init(self):
self.supervisor = Supervisor(self.receiver)
# Init self and container
annName = 'cc_announce'
self.ann_name = self.get_scoped_name('system', annName)
self.start_time = pu.currenttime_ms()
self.containers = {}

# Declare CC announcement name
messaging = {'name_type':'fanout', 'args':{'scope':'system'}}
yield Container.configure_messaging(self.ann_name, messaging)
logging.info("Declared CC anounce name: "+str(self.ann_name))

# Attach to CC announcement name
annReceiver = Receiver(annName+'.'+self.receiver.label, self.ann_name)
annReceiver.group = self.receiver.group
self.ann_receiver = annReceiver
self.ann_receiver.handle(self.receive)
self.add_receiver(self.ann_receiver)
annid = yield spawn(self.ann_receiver)
logging.info("Listening to CC anouncements: "+str(annid))

# Start with an identify request. Will lead to an announce by myself
yield self.send(self.ann_name, 'identify', 'started', {'quiet':True})

@defer.inlineCallbacks
def _send_announcement(self, event):
"""
Send announce message to CC broadcast name
"""
cdesc = {'node':str(os.uname()[1]),
'container-id':str(Container.id),
'version':ionconst.VERSION,
'magnet':magnet.__version__,
'start-time':self.start_time,
'current-time':pu.currenttime_ms(),
'event':event}
yield self.send(self.ann_name, 'announce', cdesc)

def op_announce(self, content, headers, msg):
"""
Service operation: announce a capability container
"""
logging.info("op_announce(): Received CC announcement: " + repr(content))
contid = content['container-id']
event = content['event']
if event == 'started' or event == 'identify':
self.containers[contid] = content
elif event == 'terminate':
del self.containers[contid]
logging.info("op_announce(): Know about %s containers!" % (len(self.containers)))


@defer.inlineCallbacks
def op_identify(self, content, headers, msg):
"""
Service operation: ask for identification; respond with announcement
"""
logging.info("op_identify(). Send announcement")
yield self._send_announcement('identify')

def op_start_node(self, content, headers, msg):
pass

def op_terminate_node(self, content, headers, msg):
pass

@defer.inlineCallbacks
def op_spawn(self, content, headers, msg):
procMod = content['module']
procMod = str(content['module'])
child = ProcessDesc(procMod, procMod)
pass
yield self.spawn_child(child)

def op_get_node_id(self, content, headers, msg):
pass

def op_advertise(self, content, headers, msg):
pass

def op_get_config(self, content, headers, msg):
pass

# Spawn of the process using the module name
factory = ProtocolFactory(CCAgent)

"""
twistd -n --pidfile t1.pid magnet -h amoeba.ucsd.edu -a sysname=mm res/scripts/newcc.py
twistd -n --pidfile t2.pid magnet -h amoeba.ucsd.edu -a sysname=mm res/scripts/newcc.py
send (2, {'op':'identify','content':''})
"""
11 changes: 9 additions & 2 deletions ion/util/procutils.py
Expand Up @@ -46,7 +46,9 @@ def log_message(msg):
lstr = ""
procname = str(body.get('receiver',None))
lstr += "===Message=== receiver=%s op=%s" % (procname, body.get('op', None))
if not body.get('quiet', False):
if body.get('quiet', False):
lstr += " (Q)"
else:
amqpm = str(msg._amqp_message)
# Cut out the redundant or encrypted AMQP body to make log shorter
amqpm = re.sub("body='(\\\\'|[^'])*'","*BODY*", amqpm)
Expand All @@ -65,7 +67,12 @@ def log_message(msg):
value = mbody.get(attr)
lstr += "%s=%r, " % (attr, value)
lstr += "\n---CONTENT---\n"
lstr += repr(content)
if type(content) is dict:
for attr in sorted(content.keys()):
value = content.get(attr)
lstr += "%s=%r, " % (attr, value)
else:
lstr += repr(content)
lstr += "\n============="
logging.info(lstr)

Expand Down

0 comments on commit e710e44

Please sign in to comment.