Permalink
Browse files

newparallel tweaks, fixes

* warning on unregistered engine
* Python LRU scheduler is now default
* Client registration includes task scheme
* Fix typos associated with some renaming
* fix demo links
* warning typo
  • Loading branch information...
minrk committed Feb 16, 2011
1 parent afabc88 commit 86f3ca88acf68e1841c1891bc07adffc527b01c8
@@ -15,6 +15,7 @@
from getpass import getpass
from pprint import pprint
from datetime import datetime
+import warnings
import json
pjoin = os.path.join
@@ -249,6 +250,7 @@ class Client(object):
_notification_socket=None
_mux_socket=None
_task_socket=None
+ _task_scheme=None
block = False
outstanding=None
results = None
@@ -298,7 +300,6 @@ def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipytho
self._config = cfg
-
self._ssh = bool(sshserver or sshkey or password)
if self._ssh and sshserver is None:
# default to ssh via localhost
@@ -360,7 +361,7 @@ def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
@property
def ids(self):
- """Always up to date ids property."""
+ """Always up-to-date ids property."""
self._flush_notifications()
return self._ids
@@ -370,6 +371,23 @@ def _update_engines(self, engines):
eid = int(k)
self._engines[eid] = bytes(v) # force not unicode
self._ids.add(eid)
+ if sorted(self._engines.keys()) != range(len(self._engines)) and \
+ self._task_scheme == 'pure' and self._task_socket:
+ self._stop_scheduling_tasks()
+
+ def _stop_scheduling_tasks(self):
+ """Stop scheduling tasks because an engine has been unregistered
+ from a pure ZMQ scheduler.
+ """
+
+ self._task_socket.close()
+ self._task_socket = None
+ msg = "An engine has been unregistered, and we are using pure " +\
+ "ZMQ task scheduling. Task farming will be disabled."
+ if self.outstanding:
+ msg += " If you were running tasks when this happened, " +\
+ "some `outstanding` msg_ids may never resolve."
+ warnings.warn(msg, RuntimeWarning)
def _build_targets(self, targets):
"""Turn valid target IDs or 'all' into two lists:
@@ -389,6 +407,8 @@ def _build_targets(self, targets):
def _connect(self, sshserver, ssh_kwargs):
"""setup all our socket connections to the controller. This is called from
__init__."""
+
+ # Maybe allow reconnecting?
if self._connected:
return
self._connected=True
@@ -406,15 +426,17 @@ def connect_socket(s, url):
pprint(msg)
msg = ss.Message(msg)
content = msg.content
+ self._config['registration'] = dict(content)
if content.status == 'ok':
if content.mux:
self._mux_socket = self.context.socket(zmq.PAIR)
self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
connect_socket(self._mux_socket, content.mux)
if content.task:
+ self._task_scheme, task_addr = content.task
self._task_socket = self.context.socket(zmq.PAIR)
self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
- connect_socket(self._task_socket, content.task)
+ connect_socket(self._task_socket, task_addr)
if content.notification:
self._notification_socket = self.context.socket(zmq.SUB)
connect_socket(self._notification_socket, content.notification)
@@ -457,6 +479,8 @@ def _unregister_engine(self, msg):
if eid in self._ids:
self._ids.remove(eid)
self._engines.pop(eid)
+ if self._task_socket and self._task_scheme == 'pure':
+ self._stop_scheduling_tasks()
def _extract_metadata(self, header, parent, content):
md = {'msg_id' : parent['msg_id'],
@@ -937,8 +961,15 @@ def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
options = dict(bound=bound, block=block)
if targets is None:
- return self._apply_balanced(f, args, kwargs, timeout=timeout,
+ if self._task_socket:
+ return self._apply_balanced(f, args, kwargs, timeout=timeout,
after=after, follow=follow, **options)
+ else:
+ msg = "Task farming is disabled"
+ if self._task_scheme == 'pure':
+ msg += " because the pure ZMQ scheduler cannot handle"
+ msg += " disappearing engines."
+ raise RuntimeError(msg)
else:
return self._apply_direct(f, args, kwargs, targets=targets, **options)
@@ -1103,12 +1134,13 @@ def get_results(self, msg_ids, status_only=False):
completed = []
local_results = {}
- # temporarily disable local shortcut
- # for msg_id in list(theids):
- # if msg_id in self.results:
- # completed.append(msg_id)
- # local_results[msg_id] = self.results[msg_id]
- # theids.remove(msg_id)
+
+ # comment this block out to temporarily disable local shortcut:
+ for msg_id in list(theids):
+ if msg_id in self.results:
+ completed.append(msg_id)
+ local_results[msg_id] = self.results[msg_id]
+ theids.remove(msg_id)
if theids: # some not locally cached
content = dict(msg_ids=theids, status_only=status_only)
@@ -37,7 +37,6 @@
class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
- scheme = Str('pure', config=True)
usethreads = Bool(False, config=True)
# internal
@@ -65,34 +64,34 @@ def construct_schedulers(self):
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
- q.bind_in(self.client_addrs['iopub'])
- q.bind_out(self.engine_addrs['iopub'])
+ q.bind_in(self.client_info['iopub'])
+ q.bind_out(self.engine_info['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
- q.bind_in(self.client_addrs['mux'])
- q.bind_out(self.engine_addrs['mux'])
+ q.bind_in(self.client_info['mux'])
+ q.bind_out(self.engine_info['mux'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
- q.bind_in(self.client_addrs['control'])
- q.bind_out(self.engine_addrs['control'])
+ q.bind_in(self.client_info['control'])
+ q.bind_out(self.engine_info['control'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
- q.bind_in(self.client_addrs['task'])
- q.bind_out(self.engine_addrs['task'])
+ q.bind_in(self.client_info['task'][1])
+ q.bind_out(self.engine_info['task'])
q.connect_mon(self.monitor_url)
q.daemon=True
children.append(q)
@@ -101,8 +100,9 @@ def construct_schedulers(self):
else:
self.log.info("task::using Python %s Task scheduler"%self.scheme)
- sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
- q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
+ sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
+ kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
+ q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
q.daemon=True
children.append(q)
@@ -99,6 +99,9 @@ class EngineConnector(HasTraits):
class HubFactory(RegistrationFactory):
"""The Configurable for setting up a Hub."""
+ # name of a scheduler scheme
+ scheme = Str('lru', config=True)
+
# port-pairs for monitoredqueues:
hb = Instance(list, config=True)
def _hb_default(self):
@@ -238,7 +241,7 @@ def construct_hub(self):
time.sleep(.25)
# build connection dicts
- self.engine_addrs = {
+ self.engine_info = {
'control' : engine_iface%self.control[1],
'mux': engine_iface%self.mux[1],
'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
@@ -247,19 +250,19 @@ def construct_hub(self):
# 'monitor' : engine_iface%self.mon_port,
}
- self.client_addrs = {
+ self.client_info = {
'control' : client_iface%self.control[0],
'query': client_iface%self.query_port,
'mux': client_iface%self.mux[0],
- 'task' : client_iface%self.task[0],
+ 'task' : (self.scheme, client_iface%self.task[0]),
'iopub' : client_iface%self.iopub[0],
'notification': client_iface%self.notifier_port
}
- self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
- self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
+ self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
+ self.log.debug("hub::Hub client addrs: %s"%self.client_info)
self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
registrar=reg, clientele=c, notifier=n, db=self.db,
- engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
+ engine_info=self.engine_info, client_info=self.client_info,
logname=self.log.name)
@@ -279,9 +282,9 @@ class Hub(LoggingFactory):
notifier: ZMQStream for broadcasting engine registration changes (PUB)
db: connection to db for out of memory logging of commands
NotImplemented
- engine_addrs: dict of zmq connection information for engines to connect
+ engine_info: dict of zmq connection information for engines to connect
to the queues.
- client_addrs: dict of zmq connection information for engines to connect
+ client_info: dict of zmq connection information for engines to connect
to the queues.
"""
# internal data structures:
@@ -309,8 +312,8 @@ class Hub(LoggingFactory):
heartmonitor=Instance(HeartMonitor)
notifier=Instance(ZMQStream)
db=Instance(object)
- client_addrs=Dict()
- engine_addrs=Dict()
+ client_info=Dict()
+ engine_info=Dict()
def __init__(self, **kwargs):
@@ -326,16 +329,21 @@ def __init__(self, **kwargs):
clientele: ZMQStream for client connections
# extra:
db: ZMQStream for db connection (NotImplemented)
- engine_addrs: zmq address/protocol dict for engine connections
- client_addrs: zmq address/protocol dict for client connections
+ engine_info: zmq address/protocol dict for engine connections
+ client_info: zmq address/protocol dict for client connections
"""
super(Hub, self).__init__(**kwargs)
self.registration_timeout = max(5000, 2*self.heartmonitor.period)
# validate connection dicts:
- validate_url_container(self.client_addrs)
- validate_url_container(self.engine_addrs)
+ for k,v in self.client_info.iteritems():
+ if k == 'task':
+ validate_url_container(v[1])
+ else:
+ validate_url_container(v)
+ # validate_url_container(self.client_info)
+ validate_url_container(self.engine_info)
# register our callbacks
self.registrar.on_recv(self.dispatch_register_request)
@@ -764,7 +772,7 @@ def connection_request(self, client_id, msg):
"""Reply with connection addresses for clients."""
self.log.info("client::client %s connected"%client_id)
content = dict(status='ok')
- content.update(self.client_addrs)
+ content.update(self.client_info)
jsonable = {}
for k,v in self.keytable.iteritems():
jsonable[str(k)] = v
@@ -787,7 +795,7 @@ def register_engine(self, reg, msg):
self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
content = dict(id=eid,status='ok')
- content.update(self.engine_addrs)
+ content.update(self.engine_info)
# check if requesting available IDs:
if queue in self.by_ident:
try:
@@ -190,7 +190,7 @@ def _add_arguments(self):
'connections, respectively [default: random]',
metavar='Scheduler.iopub_ports')
paa('--scheme',
- type=str, dest='ControllerFactory.scheme',
+ type=str, dest='HubFactory.scheme',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
help='select the task scheduler scheme [default: Python LRU]',
metavar='Scheduler.scheme')
@@ -294,7 +294,7 @@ def dispatch_submission(self, raw_msg):
else:
self.save_unmet(msg_id, raw_msg, after, follow, timeout)
- @logged
+ # @logged
def audit_timeouts(self):
"""Audit all waiting tasks for expired timeouts."""
now = datetime.now()
@@ -506,13 +506,13 @@ def finish_job(self, idx):
-def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
+def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ctx = zmq.Context()
loop = ioloop.IOLoop()
-
+ print (in_addr, out_addr, mon_addr, not_addr)
ins = ZMQStream(ctx.socket(zmq.XREP),loop)
ins.bind(in_addr)
outs = ZMQStream(ctx.socket(zmq.XREP),loop)
@@ -532,14 +532,11 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_a
scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
mon_stream=mons, notifier_stream=nots,
- scheme=scheme, loop=loop, logname=logname)
+ scheme=scheme, loop=loop, logname=logname,
+ config=config)
scheduler.start()
try:
loop.start()
except KeyboardInterrupt:
print ("interrupted, exiting...", file=sys.__stderr__)
-
-if __name__ == '__main__':
- iface = 'tcp://127.0.0.1:%i'
- launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -97,8 +97,8 @@ def disambiguate_ip_address(ip, location=None):
external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
if location is None or location in external_ips:
ip='127.0.0.1'
- elif external_ips:
- ip=external_ips[0]
+ elif location:
+ return location
return ip
def disambiguate_url(url, location=None):
@@ -342,7 +342,7 @@ class LoadBalancedView(View):
TODO: allow subset of engines across which to balance.
"""
def __repr__(self):
- return "<%s %s>"%(self.__class__.__name__, self.client._addr)
+ return "<%s %s>"%(self.__class__.__name__, self.client._config['url'])
@property
def targets(self):
Oops, something went wrong.

0 comments on commit 86f3ca8

Please sign in to comment.