Skip to content
Browse files

add default ip<x>z_config files

  • Loading branch information...
1 parent 9a84a98 commit d34f19346d534aa7544ac0db6cd3cfa2b428ab9a @minrk minrk committed Feb 22, 2011
View
6 IPython/config/default/ipclusterz_config.py
@@ -12,7 +12,7 @@
# - Start using mpiexec.
# - Start using the Windows HPC Server 2008 scheduler
# - Start using PBS
-# - Start using SSH (currently broken)
+# - Start using SSH
# The selected launchers can be configured below.
@@ -22,14 +22,14 @@
# - MPIExecControllerLauncher
# - PBSControllerLauncher
# - WindowsHPCControllerLauncher
-# c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
+# c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
# Options are:
# - LocalEngineSetLauncher
# - MPIExecEngineSetLauncher
# - PBSEngineSetLauncher
# - WindowsHPCEngineSetLauncher
-# c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
+# c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
#-----------------------------------------------------------------------------
# Global configuration
View
160 IPython/config/default/ipcontrollerz_config.py
@@ -25,112 +25,112 @@
# be imported in the controller for pickling to work.
# c.Global.import_statements = ['import math']
-# Reuse the controller's FURL files. If False, FURL files are regenerated
+# Reuse the controller's JSON files. If False, JSON files are regenerated
# each time the controller is run. If True, they will be reused, *but*, you
# also must set the network ports by hand. If set, this will override the
# values set for the client and engine connections below.
-# c.Global.reuse_furls = True
+# c.Global.reuse_files = True
-# Enable SSL encryption on all connections to the controller. If set, this
-# will override the values set for the client and engine connections below.
+# Enable exec_key authentication on all messages. Default is True
# c.Global.secure = True
# The working directory for the process. The application will use os.chdir
# to change to this directory before starting.
# c.Global.work_dir = os.getcwd()
-#-----------------------------------------------------------------------------
-# Configure the client services
-#-----------------------------------------------------------------------------
+# The log url for logging to an `iploggerz` application. This will override
+# log-to-file.
+# c.Global.log_url = 'tcp://127.0.0.1:20202'
-# Basic client service config attributes
+# The specific external IP that is used to disambiguate multi-interface URLs.
+# The default behavior is to guess from external IPs gleaned from `socket`.
+# c.Global.location = '192.168.1.123'
-# The network interface the controller will listen on for client connections.
-# This should be an IP address or hostname of the controller's host. The empty
-# string means listen on all interfaces.
-# c.FCClientServiceFactory.ip = ''
+# The ssh server remote clients should use to connect to this controller.
+# It must be a machine that can see the interface specified in client_ip.
+# The default for client_ip is localhost, in which case the sshserver must
+# be an external IP of the controller machine.
+# c.Global.sshserver = 'controller.example.com'
+
+# the url to use for registration. If set, this overrides engine-ip,
+# engine-transport client-ip,client-transport, and regport.
+# c.RegistrationFactory.url = 'tcp://*:12345'
-# The TCP/IP port the controller will listen on for client connections. If 0
-# a random port will be used. If the controller's host has a firewall running
-# it must allow incoming traffic on this port.
-# c.FCClientServiceFactory.port = 0
+# the port to use for registration. Clients and Engines both use this
+# port for registration.
+# c.RegistrationFactory.regport = 10101
-# The client learns how to connect to the controller by looking at the
-# location field embedded in the FURL. If this field is empty, all network
-# interfaces that the controller is listening on will be listed. To have the
-# client connect on a particular interface, list it here.
-# c.FCClientServiceFactory.location = ''
+#-----------------------------------------------------------------------------
+# Configure the Task Scheduler
+#-----------------------------------------------------------------------------
-# Use SSL encryption for the client connection.
-# c.FCClientServiceFactory.secure = True
+# The routing scheme. 'pure' will use the pure-ZMQ scheduler. Any other
+# value will use a Python scheduler with various routing schemes.
+# python schemes are: lru, weighted, random, twobin. Default is 'weighted'.
+# Note that the pure ZMQ scheduler does not support many features, such as
+# dying engines, dependencies, or engine-subset load-balancing.
+# c.ControllerFactory.scheme = 'pure'
-# Reuse the client FURL each time the controller is started. If set, you must
-# also pick a specific network port above (FCClientServiceFactory.port).
-# c.FCClientServiceFactory.reuse_furls = False
+# The pure ZMQ scheduler can limit the number of outstanding tasks per engine
+# by using the ZMQ HWM option. This allows engines with long-running tasks
+# to not steal too many tasks from other engines. The default is 0, which
+# means agressively distribute messages, never waiting for them to finish.
+# c.ControllerFactory.hwm = 1
+
+# Whether to use Threads or Processes to start the Schedulers. Threads will
+# use less resources, but potentially reduce throughput. Default is to
+# use processes. Note that the a Python scheduler will always be in a Process.
+# c.ControllerFactory.usethreads
#-----------------------------------------------------------------------------
-# Configure the engine services
+# Configure the Hub
#-----------------------------------------------------------------------------
-# Basic config attributes for the engine services.
+# Which class to use for the db backend. Currently supported are DictDB (the
+# default), and MongoDB. Uncomment this line to enable MongoDB, which will
+# slow-down the Hub's responsiveness, but also reduce its memory footprint.
+# c.HubFactory.db_class = 'IPython.zmq.parallel.mongodb.MongoDB'
-# The network interface the controller will listen on for engine connections.
-# This should be an IP address or hostname of the controller's host. The empty
-# string means listen on all interfaces.
-# c.FCEngineServiceFactory.ip = ''
+# The heartbeat ping frequency. This is the frequency (in ms) at which the
+# Hub pings engines for heartbeats. This determines how quickly the Hub
+# will react to engines coming and going. A lower number means faster response
+# time, but more network activity. The default is 100ms
+# c.HubFactory.ping = 100
-# The TCP/IP port the controller will listen on for engine connections. If 0
-# a random port will be used. If the controller's host has a firewall running
-# it must allow incoming traffic on this port.
-# c.FCEngineServiceFactory.port = 0
+# HubFactory queue port pairs, to set by name: mux, iopub, control, task. Set
+# each as a tuple of length 2 of ints. The default is to find random
+# available ports
+# c.HubFactory.mux = (10102,10112)
-# The engine learns how to connect to the controller by looking at the
-# location field embedded in the FURL. If this field is empty, all network
-# interfaces that the controller is listening on will be listed. To have the
-# client connect on a particular interface, list it here.
-# c.FCEngineServiceFactory.location = ''
+#-----------------------------------------------------------------------------
+# Configure the client connections
+#-----------------------------------------------------------------------------
-# Use SSL encryption for the engine connection.
-# c.FCEngineServiceFactory.secure = True
+# Basic client connection config attributes
+
+# The network interface the controller will listen on for client connections.
+# This should be an IP address or interface on the controller. An asterisk
+# means listen on all interfaces. The transport can be any transport
+# supported by zeromq (tcp,epgm,pgm,ib,ipc):
+# c.HubFactory.client_ip = '*'
+# c.HubFactory.client_transport = 'tcp'
-# Reuse the client FURL each time the controller is started. If set, you must
-# also pick a specific network port above (FCClientServiceFactory.port).
-# c.FCEngineServiceFactory.reuse_furls = False
+# individual client ports to configure by name: query_port, notifier_port
+# c.HubFactory.query_port = 12345
#-----------------------------------------------------------------------------
-# Developer level configuration attributes
+# Configure the engine connections
#-----------------------------------------------------------------------------
-# You shouldn't have to modify anything in this section. These attributes
-# are more for developers who want to change the behavior of the controller
-# at a fundamental level.
-
-# c.FCClientServiceFactory.cert_file = u'ipcontroller-client.pem'
-
-# default_client_interfaces = Config()
-# default_client_interfaces.Task.interface_chain = [
-# 'IPython.kernel.task.ITaskController',
-# 'IPython.kernel.taskfc.IFCTaskController'
-# ]
-#
-# default_client_interfaces.Task.furl_file = u'ipcontroller-tc.furl'
-#
-# default_client_interfaces.MultiEngine.interface_chain = [
-# 'IPython.kernel.multiengine.IMultiEngine',
-# 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
-# ]
-#
-# default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
-#
-# c.FCEngineServiceFactory.interfaces = default_client_interfaces
-
-# c.FCEngineServiceFactory.cert_file = u'ipcontroller-engine.pem'
-
-# default_engine_interfaces = Config()
-# default_engine_interfaces.Default.interface_chain = [
-# 'IPython.kernel.enginefc.IFCControllerBase'
-# ]
-#
-# default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
-#
-# c.FCEngineServiceFactory.interfaces = default_engine_interfaces
+# Basic config attributes for the engine connections.
+
+# The network interface the controller will listen on for engine connections.
+# This should be an IP address or interface on the controller. An asterisk
+# means listen on all interfaces. The transport can be any transport
+# supported by zeromq (tcp,epgm,pgm,ib,ipc):
+# c.HubFactory.engine_ip = '*'
+# c.HubFactory.engine_transport = 'tcp'
+
+# set the engine heartbeat ports to use:
+# c.HubFactory.hb = (10303,10313)
+
View
15 IPython/config/default/ipenginez_config.py
@@ -29,10 +29,10 @@
# c.Global.connect_delay = 0.1
# c.Global.connect_max_tries = 15
-# By default, the engine will look for the controller's FURL file in its own
-# cluster directory. Sometimes, the FURL file will be elsewhere and this
-# attribute can be set to the full path of the FURL file.
-# c.Global.furl_file = u''
+# By default, the engine will look for the controller's JSON file in its own
+# cluster directory. Sometimes, the JSON file will be elsewhere and this
+# attribute can be set to the full path of the JSON file.
+# c.Global.url_file = u'/path/to/my/ipcontroller-engine.json'
# The working directory for the process. The application will use os.chdir
# to change to this directory before starting.
@@ -78,12 +78,7 @@
# You should not have to change these attributes.
-# c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
-
-# c.Global.furl_file_name = u'ipcontroller-engine.furl'
-
-
-
+# c.Global.url_file_name = u'ipcontroller-engine.furl'
View
23 IPython/zmq/parallel/controller.py
@@ -18,7 +18,7 @@
from multiprocessing import Process
import zmq
-
+from zmq.devices import ProcessMonitoredQueue
# internal:
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, Str, Instance, List, Bool
@@ -38,6 +38,8 @@ class ControllerFactory(HubFactory):
"""Configurable for setting up a Hub and Schedulers."""
usethreads = Bool(False, config=True)
+ # pure-zmq downstream HWM
+ hwm = Int(0, config=True)
# internal
children = List()
@@ -52,47 +54,54 @@ def __init__(self, **kwargs):
def start(self):
super(ControllerFactory, self).start()
+ child_procs = []
for child in self.children:
child.start()
- if not self.usethreads:
- signal_children([ getattr(c, 'launcher', c) for c in self.children ])
+ if isinstance(child, ProcessMonitoredQueue):
+ child_procs.append(child.launcher)
+ elif isinstance(child, Process):
+ child_procs.append(child)
+ if child_procs:
+ signal_children(child_procs)
def construct_schedulers(self):
children = self.children
mq = import_item(self.mq_class)
+ maybe_inproc = 'inproc://monitor' if self.usethreads else self.monitor_url
# IOPub relay (in a Process)
q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
q.bind_in(self.client_info['iopub'])
q.bind_out(self.engine_info['iopub'])
q.setsockopt_out(zmq.SUBSCRIBE, '')
- q.connect_mon(self.monitor_url)
+ q.connect_mon(maybe_inproc)
q.daemon=True
children.append(q)
# Multiplexer Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(self.client_info['mux'])
q.bind_out(self.engine_info['mux'])
- q.connect_mon(self.monitor_url)
+ q.connect_mon(maybe_inproc)
q.daemon=True
children.append(q)
# Control Queue (in a Process)
q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(self.client_info['control'])
q.bind_out(self.engine_info['control'])
- q.connect_mon(self.monitor_url)
+ q.connect_mon(maybe_inproc)
q.daemon=True
children.append(q)
# Task Queue (in a Process)
if self.scheme == 'pure':
self.log.warn("task::using pure XREQ Task scheduler")
q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
+ q.setsockopt_out(zmq.HWM, self.hwm)
q.bind_in(self.client_info['task'][1])
q.bind_out(self.engine_info['task'])
- q.connect_mon(self.monitor_url)
+ q.connect_mon(maybe_inproc)
q.daemon=True
children.append(q)
elif self.scheme == 'none':
View
19 IPython/zmq/parallel/hub.py
@@ -26,7 +26,7 @@
# internal:
from IPython.config.configurable import Configurable
-from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
+from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
from IPython.utils.importstring import import_item
from entry_point import select_random_ports
@@ -138,18 +138,18 @@ def _notifier_port_default(self):
ping = Int(1000, config=True) # ping frequency
- engine_ip = Str('127.0.0.1', config=True)
- engine_transport = Str('tcp', config=True)
+ engine_ip = CStr('127.0.0.1', config=True)
+ engine_transport = CStr('tcp', config=True)
- client_ip = Str('127.0.0.1', config=True)
- client_transport = Str('tcp', config=True)
+ client_ip = CStr('127.0.0.1', config=True)
+ client_transport = CStr('tcp', config=True)
- monitor_ip = Str('127.0.0.1', config=True)
- monitor_transport = Str('tcp', config=True)
+ monitor_ip = CStr('127.0.0.1', config=True)
+ monitor_transport = CStr('tcp', config=True)
- monitor_url = Str('')
+ monitor_url = CStr('')
- db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
+ db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
# not configurable
db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
@@ -234,6 +234,7 @@ def construct_hub(self):
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
sub.bind(self.monitor_url)
+ sub.bind('inproc://monitor')
sub = ZMQStream(sub, loop)
# connect the db
View
2 IPython/zmq/parallel/ipclusterapp.py
@@ -38,7 +38,7 @@
#-----------------------------------------------------------------------------
-default_config_file_name = u'ipcluster_config.py'
+default_config_file_name = u'ipclusterz_config.py'
_description = """\
View
90 IPython/zmq/parallel/ipcontrollerapp.py
@@ -49,7 +49,7 @@
#: The default config file name for this application
-default_config_file_name = u'ipcontroller_config.py'
+default_config_file_name = u'ipcontrollerz_config.py'
_description = """Start the IPython controller for parallel computing.
@@ -58,7 +58,7 @@
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
-your .ipython directory and named as "cluster_<profile>". See the --profile
+your ipython directory and named as "cluster_<profile>". See the --profile
and --cluster-dir options for details.
"""
@@ -189,6 +189,7 @@ def _add_arguments(self):
help='The (2) ports the IOPub scheduler will listen on for client,engine '
'connections, respectively [default: random]',
metavar='Scheduler.iopub_ports')
+
paa('--scheme',
type=str, dest='HubFactory.scheme',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
@@ -198,6 +199,12 @@ def _add_arguments(self):
dest='ControllerFactory.usethreads', action="store_true",
help='Use threads instead of processes for the schedulers',
)
+ paa('--hwm',
+ dest='ControllerFactory.hwm', type=int,
+ help='specify the High Water Mark (HWM) for the downstream '
+ 'socket in the pure ZMQ scheduler. This is the maximum number '
+ 'of allowed outstanding tasks on each engine.',
+ )
## Global config
paa('--log-to-file',
@@ -206,9 +213,9 @@ def _add_arguments(self):
paa('--log-url',
type=str, dest='Global.log_url',
help='Broadcast logs to an iploggerz process [default: disabled]')
- paa('-r','--reuse-key',
- action='store_true', dest='Global.reuse_key',
- help='Try to reuse existing execution keys.')
+ paa('-r','--reuse-files',
+ action='store_true', dest='Global.reuse_files',
+ help='Try to reuse existing json connection files.')
paa('--no-secure',
action='store_false', dest='Global.secure',
help='Turn off execution keys (default).')
@@ -255,7 +262,7 @@ def create_default_config(self):
self.default_config.Global.import_statements = []
self.default_config.Global.clean_logs = True
self.default_config.Global.secure = True
- self.default_config.Global.reuse_key = False
+ self.default_config.Global.reuse_files = False
self.default_config.Global.exec_key = "exec_key.key"
self.default_config.Global.sshserver = None
self.default_config.Global.location = None
@@ -293,24 +300,53 @@ def save_connection_dict(self, fname, cdict):
with open(fname, 'w') as f:
f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
+
+ def load_config_from_json(self):
+ """load config from existing json connector files."""
+ c = self.master_config
+ # load from engine config
+ with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
+ cfg = json.loads(f.read())
+ key = c.SessionFactory.exec_key = cfg['exec_key']
+ xport,addr = cfg['url'].split('://')
+ c.HubFactory.engine_transport = xport
+ ip,ports = addr.split(':')
+ c.HubFactory.engine_ip = ip
+ c.HubFactory.regport = int(ports)
+ c.Global.location = cfg['location']
+ # load client config
+ with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
+ cfg = json.loads(f.read())
+ assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
+ xport,addr = cfg['url'].split('://')
+ c.HubFactory.client_transport = xport
+ ip,ports = addr.split(':')
+ c.HubFactory.client_ip = ip
+ c.Global.sshserver = cfg['ssh']
+ assert int(ports) == c.HubFactory.regport, "regport mismatch"
+
def construct(self):
# This is the working dir by now.
sys.path.insert(0, '')
c = self.master_config
self.import_statements()
-
- if c.Global.secure:
+ reusing = c.Global.reuse_files
+ if reusing:
+ try:
+ self.load_config_from_json()
+ except (AssertionError,IOError):
+ reusing=False
+ # check again, because reusing may have failed:
+ if reusing:
+ pass
+ elif c.Global.secure:
keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
- if not c.Global.reuse_key or not os.path.exists(keyfile):
- key = str(uuid.uuid4())
- with open(keyfile, 'w') as f:
- f.write(key)
- os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
- else:
- with open(keyfile) as f:
- key = f.read().strip()
+ key = str(uuid.uuid4())
+ with open(keyfile, 'w') as f:
+ f.write(key)
+ os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
c.SessionFactory.exec_key = key
else:
c.SessionFactory.exec_key = ''
@@ -324,16 +360,18 @@ def construct(self):
self.log.error("Couldn't construct the Controller", exc_info=True)
self.exit(1)
- f = self.factory
- cdict = {'exec_key' : key,
- 'ssh' : c.Global.sshserver,
- 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
- 'location' : c.Global.location
- }
- self.save_connection_dict('ipcontroller-client.json', cdict)
- edict = cdict
- edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
- self.save_connection_dict('ipcontroller-engine.json', edict)
+ if not reusing:
+ # save to new json config files
+ f = self.factory
+ cdict = {'exec_key' : key,
+ 'ssh' : c.Global.sshserver,
+ 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
+ 'location' : c.Global.location
+ }
+ self.save_connection_dict('ipcontroller-client.json', cdict)
+ edict = cdict
+ edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
+ self.save_connection_dict('ipcontroller-engine.json', edict)
def save_urls(self):
View
6 IPython/zmq/parallel/ipengineapp.py
@@ -40,7 +40,7 @@
#-----------------------------------------------------------------------------
#: The default config file name for this application
-default_config_file_name = u'ipengine_config.py'
+default_config_file_name = u'ipenginez_config.py'
mpi4py_init = """from mpi4py import MPI as mpi
@@ -64,7 +64,7 @@ class SimpleStruct:
and controller. A controller needs to be started before the engines. The
engine can be configured using command line options or using a cluster
directory. Cluster directories contain config, log and security files and are
-usually located in your .ipython directory and named as "cluster_<profile>".
+usually located in your ipython directory and named as "cluster_<profile>".
See the --profile and --cluster-dir options for details.
"""
@@ -79,7 +79,7 @@ def _add_arguments(self):
super(IPEngineAppConfigLoader, self)._add_arguments()
paa = self.parser.add_argument
# Controller config
- paa('--file',
+ paa('--file', '-f',
type=unicode, dest='Global.url_file',
help='The full location of the file containing the connection information fo '
'controller. If this is not given, the file must be in the '
View
2 IPython/zmq/parallel/iploggerapp.py
@@ -39,7 +39,7 @@
by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
logger can be configured using command line options or using a cluster
directory. Cluster directories contain config, log and security files and are
-usually located in your .ipython directory and named as "cluster_<profile>".
+usually located in your ipython directory and named as "cluster_<profile>".
See the --profile and --cluster-dir options for details.
"""

0 comments on commit d34f193

Please sign in to comment.
Something went wrong with that request. Please try again.