Browse files

Merge branch 'newapp-parallel' into newapp

closes #486
  • Loading branch information...
2 parents f243974 + e08aa54 commit c5d26d9fb986831d1eb2b7e31331f5a0296ac13f @minrk minrk committed Jun 10, 2011
Showing with 1,807 additions and 2,271 deletions.
  1. +22 −21 IPython/config/profile/default/ipcluster_config.py
  2. +13 −12 IPython/config/profile/default/ipcontroller_config.py
  3. +11 −10 IPython/config/profile/default/ipengine_config.py
  4. +15 −9 IPython/extensions/parallelmagic.py
  5. +257 −0 IPython/parallel/apps/baseapp.py
  6. +0 −566 IPython/parallel/apps/clusterdir.py
  7. +355 −451 IPython/parallel/apps/ipclusterapp.py
  8. +235 −266 IPython/parallel/apps/ipcontrollerapp.py
  9. +130 −163 IPython/parallel/apps/ipengineapp.py
  10. +28 −64 IPython/parallel/apps/iploggerapp.py
  11. +242 −171 IPython/parallel/apps/launcher.py
  12. +19 −9 IPython/parallel/apps/logwatcher.py
  13. +37 −37 IPython/parallel/apps/winhpcjob.py
  14. +14 −14 IPython/parallel/client/client.py
  15. +0 −116 IPython/parallel/controller/controller.py
  16. +2 −2 IPython/parallel/controller/dictdb.py
  17. +6 −4 IPython/parallel/controller/heartmonitor.py
  18. +94 −99 IPython/parallel/controller/hub.py
  19. +15 −4 IPython/parallel/controller/mongodb.py
  20. +42 −20 IPython/parallel/controller/scheduler.py
  21. +20 −7 IPython/parallel/controller/sqlitedb.py
  22. +18 −9 IPython/parallel/engine/engine.py
  23. +6 −4 IPython/parallel/engine/streamkernel.py
  24. +21 −74 IPython/parallel/factory.py
  25. +139 −75 IPython/parallel/streamsession.py
  26. +2 −2 IPython/parallel/tests/__init__.py
  27. +3 −3 IPython/parallel/tests/test_streamsession.py
  28. +1 −0 IPython/parallel/tests/test_view.py
  29. +1 −1 docs/source/parallel/parallel_intro.txt
  30. +5 −5 docs/source/parallel/parallel_mpi.txt
  31. +1 −1 docs/source/parallel/parallel_multiengine.txt
  32. +20 −19 docs/source/parallel/parallel_process.txt
  33. +23 −23 docs/source/parallel/parallel_security.txt
  34. +5 −5 docs/source/parallel/parallel_task.txt
  35. +5 −5 docs/source/parallel/parallel_winhpc.txt
View
43 IPython/config/profile/default/ipcluster_config.py
@@ -23,34 +23,34 @@
# - PBSControllerLauncher
# - SGEControllerLauncher
# - WindowsHPCControllerLauncher
-# c.Global.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher'
-# c.Global.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
+# c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher'
+# c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
# Options are:
# - LocalEngineSetLauncher
# - MPIExecEngineSetLauncher
# - PBSEngineSetLauncher
# - SGEEngineSetLauncher
# - WindowsHPCEngineSetLauncher
-# c.Global.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
+# c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
#-----------------------------------------------------------------------------
-# Global configuration
+# Application configuration
#-----------------------------------------------------------------------------
# The default number of engines that will be started. This is overridden by
# the -n command line option: "ipcluster start -n 4"
-# c.Global.n = 2
+# c.IPClusterEnginesApp.n = 2
# Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
-# c.Global.log_to_file = False
+# c.BaseParallelApp.log_to_file = False
# Remove old logs from cluster_dir/log before starting.
-# c.Global.clean_logs = True
+# c.BaseParallelApp.clean_logs = True
# The working directory for the process. The application will use os.chdir
# to change to this directory before starting.
-# c.Global.work_dir = os.getcwd()
+# c.BaseParallelApp.work_dir = os.getcwd()
#-----------------------------------------------------------------------------
@@ -121,12 +121,12 @@
# in the location specified by the --cluster_dir argument.
# c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
-# Set the default args passed to ipenginez for SSH launched engines
+# Set the default args passed to ipengine for SSH launched engines
# c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
# SSH engines are launched as a dict of locations/n-engines.
# if a value is a tuple instead of an int, it is assumed to be of the form
-# (n, [args]), setting the arguments to passed to ipenginez on `host`.
+# (n, [args]), setting the arguments to passed to ipengine on `host`.
# otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
# In this case, there will be 3 engines at my.example.com, and
@@ -162,13 +162,13 @@
# The batch submission script used to start the controller. This is where
# environment variables would be setup, etc. This string is interpreted using
-# the Itpl module in IPython.external. Basically, you can use ${n} for the
-# number of engine and ${cluster_dir} for the cluster_dir.
+# Python's string formatting. Basically, you can use {queue} for the name
+# of the PBS queue, and {profile_dir} for the profile_dir.
# c.PBSControllerLauncher.batch_template = """
# #PBS -N ipcontroller
-# #PBS -q $queue
+# #PBS -q {queue}
#
-# ipcontrollerz --cluster-dir $cluster_dir
+# ipcontroller profile_dir={profile_dir}
# """
# You can also load this template from a file
@@ -180,13 +180,14 @@
# The batch submission script used to start the engines. This is where
# environment variables would be setup, etc. This string is interpreted using
-# the Itpl module in IPython.external. Basically, you can use ${n} for the
-# number of engine and ${cluster_dir} for the cluster_dir.
+# Python's string formatting. Basically, you can use {queue} for the name
+# of the PBS queue, and {profile_dir} for the profile_dir, and {n}
+# for the number of engines.
# c.PBSEngineSetLauncher.batch_template = """
-# #PBS -N ipcontroller
-# #PBS -l nprocs=$n
+# #PBS -N ipengine
+# #PBS -l nprocs={n}
#
-# ipenginez --cluster-dir $cluster_dir$s
+# ipengine profile_dir={profile_dir}
# """
# You can also load this template from a file
@@ -211,7 +212,7 @@
# c.IPControllerTask.task_name = 'IPController'
# c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
-# c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
+# c.IPControllerTask.controller_args = ['--log-to-file', 'log_level=40']
# c.IPControllerTask.environment_variables = {}
# c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
@@ -227,7 +228,7 @@
# c.IPEngineTask.task_name = 'IPEngine'
# c.IPEngineTask.engine_cmd = [u'ipengine.exe']
-# c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
+# c.IPEngineTask.engine_args = ['--log-to-file', 'log_level=40']
# c.IPEngineTask.environment_variables = {}
# c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
View
25 IPython/config/profile/default/ipcontroller_config.py
@@ -3,54 +3,55 @@
c = get_config()
#-----------------------------------------------------------------------------
-# Global configuration
+# Application configuration
#-----------------------------------------------------------------------------
+app = c.IPControllerApp
-# Basic Global config attributes
+# Basic Application config attributes
# Start up messages are logged to stdout using the logging module.
# These all happen before the twisted reactor is started and are
# useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
# and smaller is more verbose.
-# c.Global.log_level = 20
+# app.log_level = 20
# Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
-# c.Global.log_to_file = False
+# app.log_to_file = False
# Remove old logs from cluster_dir/log before starting.
-# c.Global.clean_logs = True
+# app.clean_logs = True
# A list of Python statements that will be run before starting the
# controller. This is provided because occasionally certain things need to
# be imported in the controller for pickling to work.
-# c.Global.import_statements = ['import math']
+# app.import_statements = ['import math']
# Reuse the controller's JSON files. If False, JSON files are regenerated
# each time the controller is run. If True, they will be reused, *but*, you
# also must set the network ports by hand. If set, this will override the
# values set for the client and engine connections below.
-# c.Global.reuse_files = True
+# app.reuse_files = True
# Enable exec_key authentication on all messages. Default is True
-# c.Global.secure = True
+# app.secure = True
# The working directory for the process. The application will use os.chdir
# to change to this directory before starting.
-# c.Global.work_dir = os.getcwd()
+# app.work_dir = os.getcwd()
# The log url for logging to an `iploggerz` application. This will override
# log-to-file.
-# c.Global.log_url = 'tcp://127.0.0.1:20202'
+# app.log_url = 'tcp://127.0.0.1:20202'
# The specific external IP that is used to disambiguate multi-interface URLs.
# The default behavior is to guess from external IPs gleaned from `socket`.
-# c.Global.location = '192.168.1.123'
+# app.location = '192.168.1.123'
# The ssh server remote clients should use to connect to this controller.
# It must be a machine that can see the interface specified in client_ip.
# The default for client_ip is localhost, in which case the sshserver must
# be an external IP of the controller machine.
-# c.Global.sshserver = 'controller.example.com'
+# app.sshserver = 'controller.example.com'
# the url to use for registration. If set, this overrides engine-ip,
# engine-transport client-ip,client-transport, and regport.
View
21 IPython/config/profile/default/ipengine_config.py
@@ -1,42 +1,43 @@
c = get_config()
#-----------------------------------------------------------------------------
-# Global configuration
+# Application configuration
#-----------------------------------------------------------------------------
+app = c.IPEngineApp
# Start up messages are logged to stdout using the logging module.
# These all happen before the twisted reactor is started and are
# useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
# and smaller is more verbose.
-# c.Global.log_level = 20
+# app.log_level = 20
# Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
-# c.Global.log_to_file = False
+# app.log_to_file = False
# Remove old logs from cluster_dir/log before starting.
-# c.Global.clean_logs = True
+# app.clean_logs = True
# A list of strings that will be executed in the users namespace on the engine
# before it connects to the controller.
-# c.Global.exec_lines = ['import numpy']
+# app.exec_lines = ['import numpy']
# The engine will try to connect to the controller multiple times, to allow
# the controller time to startup and write its FURL file. These parameters
# control the number of retries (connect_max_tries) and the initial delay
# (connect_delay) between attemps. The actual delay between attempts gets
# longer each time by a factor of 1.5 (delay[i] = 1.5*delay[i-1])
# those attemps.
-# c.Global.connect_delay = 0.1
-# c.Global.connect_max_tries = 15
+# app.connect_delay = 0.1
+# app.connect_max_tries = 15
# By default, the engine will look for the controller's JSON file in its own
# cluster directory. Sometimes, the JSON file will be elsewhere and this
# attribute can be set to the full path of the JSON file.
-# c.Global.url_file = u'/path/to/my/ipcontroller-engine.json'
+# app.url_file = u'/path/to/my/ipcontroller-engine.json'
# The working directory for the process. The application will use os.chdir
# to change to this directory before starting.
-# c.Global.work_dir = os.getcwd()
+# app.work_dir = os.getcwd()
#-----------------------------------------------------------------------------
# MPI configuration
@@ -78,7 +79,7 @@
# You should not have to change these attributes.
-# c.Global.url_file_name = u'ipcontroller-engine.furl'
+# app.url_file_name = u'ipcontroller-engine.furl'
View
24 IPython/extensions/parallelmagic.py
@@ -92,7 +92,7 @@ def magic_px(self, ipself, parameter_s=''):
Then you can do the following::
In [24]: %px a = 5
- Parallel execution on engines: all
+ Parallel execution on engine(s): all
Out[24]:
<Results List>
[0] In [7]: a = 5
@@ -102,7 +102,7 @@ def magic_px(self, ipself, parameter_s=''):
if self.active_view is None:
print NO_ACTIVE_VIEW
return
- print "Parallel execution on engines: %s" % self.active_view.targets
+ print "Parallel execution on engine(s): %s" % self.active_view.targets
result = self.active_view.execute(parameter_s, block=False)
if self.active_view.block:
result.get()
@@ -125,9 +125,9 @@ def magic_autopx(self, ipself, parameter_s=''):
%autopx to enabled
In [26]: a = 10
- Parallel execution on engines: [0,1,2,3]
+ Parallel execution on engine(s): [0,1,2,3]
In [27]: print a
- Parallel execution on engines: [0,1,2,3]
+ Parallel execution on engine(s): [0,1,2,3]
[stdout:0] 10
[stdout:1] 10
[stdout:2] 10
@@ -174,15 +174,21 @@ def _maybe_display_output(self, result):
If self.active_view.block is True, wait for the result
and display the result. Otherwise, this is a noop.
"""
+ if isinstance(result.stdout, basestring):
+ # single result
+ stdouts = [result.stdout.rstrip()]
+ else:
+ stdouts = [s.rstrip() for s in result.stdout]
+
targets = self.active_view.targets
if isinstance(targets, int):
targets = [targets]
- if targets == 'all':
+ elif targets == 'all':
targets = self.active_view.client.ids
- stdout = [s.rstrip() for s in result.stdout]
- if any(stdout):
- for i,eid in enumerate(targets):
- print '[stdout:%i]'%eid, stdout[i]
+
+ if any(stdouts):
+ for eid,stdout in zip(targets, stdouts):
+ print '[stdout:%i]'%eid, stdout
def pxrun_cell(self, raw_cell, store_history=True):
View
257 IPython/parallel/apps/baseapp.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python
+# encoding: utf-8
+"""
+The IPython cluster directory
+"""
+
+#-----------------------------------------------------------------------------
+# Copyright (C) 2008-2009 The IPython Development Team
+#
+# Distributed under the terms of the BSD License. The full license is in
+# the file COPYING, distributed as part of this software.
+#-----------------------------------------------------------------------------
+
+#-----------------------------------------------------------------------------
+# Imports
+#-----------------------------------------------------------------------------
+
+from __future__ import with_statement
+
+import os
+import logging
+import re
+import sys
+
+from subprocess import Popen, PIPE
+
+from IPython.core import release
+from IPython.core.crashhandler import CrashHandler
+from IPython.core.newapplication import (
+ BaseIPythonApplication,
+ base_aliases as base_ip_aliases,
+ base_flags as base_ip_flags
+)
+from IPython.utils.path import expand_path
+
+from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
+
+#-----------------------------------------------------------------------------
+# Module errors
+#-----------------------------------------------------------------------------
+
+class PIDFileError(Exception):
+ pass
+
+
+#-----------------------------------------------------------------------------
+# Crash handler for this application
+#-----------------------------------------------------------------------------
+
+
+_message_template = """\
+Oops, $self.app_name crashed. We do our best to make it stable, but...
+
+A crash report was automatically generated with the following information:
+ - A verbatim copy of the crash traceback.
+ - Data on your current $self.app_name configuration.
+
+It was left in the file named:
+\t'$self.crash_report_fname'
+If you can email this file to the developers, the information in it will help
+them in understanding and correcting the problem.
+
+You can mail it to: $self.contact_name at $self.contact_email
+with the subject '$self.app_name Crash Report'.
+
+If you want to do it now, the following command will work (under Unix):
+mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
+
+To ensure accurate tracking of this issue, please file a report about it at:
+$self.bug_tracker
+"""
+
+class ParallelCrashHandler(CrashHandler):
+ """sys.excepthook for IPython itself, leaves a detailed report on disk."""
+
+ message_template = _message_template
+
+ def __init__(self, app):
+ contact_name = release.authors['Min'][0]
+ contact_email = release.authors['Min'][1]
+ bug_tracker = 'http://github.com/ipython/ipython/issues'
+ super(ParallelCrashHandler,self).__init__(
+ app, contact_name, contact_email, bug_tracker
+ )
+
+
+#-----------------------------------------------------------------------------
+# Main application
+#-----------------------------------------------------------------------------
+base_aliases = {}
+base_aliases.update(base_ip_aliases)
+base_aliases.update({
+ 'profile_dir' : 'ProfileDir.location',
+ 'log_level' : 'BaseParallelApplication.log_level',
+ 'work_dir' : 'BaseParallelApplication.work_dir',
+ 'log_to_file' : 'BaseParallelApplication.log_to_file',
+ 'clean_logs' : 'BaseParallelApplication.clean_logs',
+ 'log_url' : 'BaseParallelApplication.log_url',
+})
+
+base_flags = {
+ 'log-to-file' : (
+ {'BaseParallelApplication' : {'log_to_file' : True}},
+ "send log output to a file"
+ )
+}
+base_flags.update(base_ip_flags)
+
+class BaseParallelApplication(BaseIPythonApplication):
+ """The base Application for IPython.parallel apps
+
+ Principle extensions to BaseIPyythonApplication:
+
+ * work_dir
+ * remote logging via pyzmq
+ * IOLoop instance
+ """
+
+ crash_handler_class = ParallelCrashHandler
+
+ def _log_level_default(self):
+ # temporarily override default_log_level to INFO
+ return logging.INFO
+
+ work_dir = Unicode(os.getcwdu(), config=True,
+ help='Set the working dir for the process.'
+ )
+ def _work_dir_changed(self, name, old, new):
+ self.work_dir = unicode(expand_path(new))
+
+ log_to_file = Bool(config=True,
+ help="whether to log to a file")
+
+ clean_logs = Bool(False, config=True,
+ help="whether to cleanup old logfiles before starting")
+
+ log_url = Unicode('', config=True,
+ help="The ZMQ URL of the iplogger to aggregate logging.")
+
+ def _config_files_default(self):
+ return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
+
+ loop = Instance('zmq.eventloop.ioloop.IOLoop')
+ def _loop_default(self):
+ from zmq.eventloop.ioloop import IOLoop
+ return IOLoop.instance()
+
+ aliases = Dict(base_aliases)
+ flags = Dict(base_flags)
+
+ def initialize(self, argv=None):
+ """initialize the app"""
+ super(BaseParallelApplication, self).initialize(argv)
+ self.to_work_dir()
+ self.reinit_logging()
+
+ def to_work_dir(self):
+ wd = self.work_dir
+ if unicode(wd) != os.getcwdu():
+ os.chdir(wd)
+ self.log.info("Changing to working dir: %s" % wd)
+ # This is the working dir by now.
+ sys.path.insert(0, '')
+
+ def reinit_logging(self):
+ # Remove old log files
+ log_dir = self.profile_dir.log_dir
+ if self.clean_logs:
+ for f in os.listdir(log_dir):
+ if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
+ os.remove(os.path.join(log_dir, f))
+ if self.log_to_file:
+ # Start logging to the new log file
+ log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
+ logfile = os.path.join(log_dir, log_filename)
+ open_log_file = open(logfile, 'w')
+ else:
+ open_log_file = None
+ if open_log_file is not None:
+ self.log.removeHandler(self._log_handler)
+ self._log_handler = logging.StreamHandler(open_log_file)
+ self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
+ self._log_handler.setFormatter(self._log_formatter)
+ self.log.addHandler(self._log_handler)
+
+ def write_pid_file(self, overwrite=False):
+ """Create a .pid file in the pid_dir with my pid.
+
+ This must be called after pre_construct, which sets `self.pid_dir`.
+ This raises :exc:`PIDFileError` if the pid file exists already.
+ """
+ pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
+ if os.path.isfile(pid_file):
+ pid = self.get_pid_from_file()
+ if not overwrite:
+ raise PIDFileError(
+ 'The pid file [%s] already exists. \nThis could mean that this '
+ 'server is already running with [pid=%s].' % (pid_file, pid)
+ )
+ with open(pid_file, 'w') as f:
+ self.log.info("Creating pid file: %s" % pid_file)
+ f.write(repr(os.getpid())+'\n')
+
+ def remove_pid_file(self):
+ """Remove the pid file.
+
+ This should be called at shutdown by registering a callback with
+ :func:`reactor.addSystemEventTrigger`. This needs to return
+ ``None``.
+ """
+ pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
+ if os.path.isfile(pid_file):
+ try:
+ self.log.info("Removing pid file: %s" % pid_file)
+ os.remove(pid_file)
+ except:
+ self.log.warn("Error removing the pid file: %s" % pid_file)
+
+ def get_pid_from_file(self):
+ """Get the pid from the pid file.
+
+ If the pid file doesn't exist a :exc:`PIDFileError` is raised.
+ """
+ pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
+ if os.path.isfile(pid_file):
+ with open(pid_file, 'r') as f:
+ pid = int(f.read().strip())
+ return pid
+ else:
+ raise PIDFileError('pid file not found: %s' % pid_file)
+
+ def check_pid(self, pid):
+ if os.name == 'nt':
+ try:
+ import ctypes
+ # returns 0 if no such process (of ours) exists
+ # positive int otherwise
+ p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
+ except Exception:
+ self.log.warn(
+ "Could not determine whether pid %i is running via `OpenProcess`. "
+ " Making the likely assumption that it is."%pid
+ )
+ return True
+ return bool(p)
+ else:
+ try:
+ p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
+ output,_ = p.communicate()
+ except OSError:
+ self.log.warn(
+ "Could not determine whether pid %i is running via `ps x`. "
+ " Making the likely assumption that it is."%pid
+ )
+ return True
+ pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
+ return pid in pids
View
566 IPython/parallel/apps/clusterdir.py
@@ -1,566 +0,0 @@
-#!/usr/bin/env python
-# encoding: utf-8
-"""
-The IPython cluster directory
-"""
-
-#-----------------------------------------------------------------------------
-# Copyright (C) 2008-2009 The IPython Development Team
-#
-# Distributed under the terms of the BSD License. The full license is in
-# the file COPYING, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
-
-from __future__ import with_statement
-
-import os
-import logging
-import re
-import shutil
-import sys
-
-from subprocess import Popen, PIPE
-
-from IPython.config.loader import PyFileConfigLoader
-from IPython.config.configurable import Configurable
-from IPython.core.application import Application, BaseAppConfigLoader
-from IPython.core.crashhandler import CrashHandler
-from IPython.core import release
-from IPython.utils.path import (
- get_ipython_package_dir,
- expand_path
-)
-from IPython.utils.traitlets import Unicode
-
-#-----------------------------------------------------------------------------
-# Module errors
-#-----------------------------------------------------------------------------
-
-class ClusterDirError(Exception):
- pass
-
-
-class PIDFileError(Exception):
- pass
-
-
-#-----------------------------------------------------------------------------
-# Class for managing cluster directories
-#-----------------------------------------------------------------------------
-
-class ClusterDir(Configurable):
- """An object to manage the cluster directory and its resources.
-
- The cluster directory is used by :command:`ipengine`,
- :command:`ipcontroller` and :command:`ipclsuter` to manage the
- configuration, logging and security of these applications.
-
- This object knows how to find, create and manage these directories. This
- should be used by any code that want's to handle cluster directories.
- """
-
- security_dir_name = Unicode('security')
- log_dir_name = Unicode('log')
- pid_dir_name = Unicode('pid')
- security_dir = Unicode(u'')
- log_dir = Unicode(u'')
- pid_dir = Unicode(u'')
- location = Unicode(u'')
-
- def __init__(self, location=u''):
- super(ClusterDir, self).__init__(location=location)
-
- def _location_changed(self, name, old, new):
- if not os.path.isdir(new):
- os.makedirs(new)
- self.security_dir = os.path.join(new, self.security_dir_name)
- self.log_dir = os.path.join(new, self.log_dir_name)
- self.pid_dir = os.path.join(new, self.pid_dir_name)
- self.check_dirs()
-
- def _log_dir_changed(self, name, old, new):
- self.check_log_dir()
-
- def check_log_dir(self):
- if not os.path.isdir(self.log_dir):
- os.mkdir(self.log_dir)
-
- def _security_dir_changed(self, name, old, new):
- self.check_security_dir()
-
- def check_security_dir(self):
- if not os.path.isdir(self.security_dir):
- os.mkdir(self.security_dir, 0700)
- os.chmod(self.security_dir, 0700)
-
- def _pid_dir_changed(self, name, old, new):
- self.check_pid_dir()
-
- def check_pid_dir(self):
- if not os.path.isdir(self.pid_dir):
- os.mkdir(self.pid_dir, 0700)
- os.chmod(self.pid_dir, 0700)
-
- def check_dirs(self):
- self.check_security_dir()
- self.check_log_dir()
- self.check_pid_dir()
-
- def load_config_file(self, filename):
- """Load a config file from the top level of the cluster dir.
-
- Parameters
- ----------
- filename : unicode or str
- The filename only of the config file that must be located in
- the top-level of the cluster directory.
- """
- loader = PyFileConfigLoader(filename, self.location)
- return loader.load_config()
-
- def copy_config_file(self, config_file, path=None, overwrite=False):
- """Copy a default config file into the active cluster directory.
-
- Default configuration files are kept in :mod:`IPython.config.default`.
- This function moves these from that location to the working cluster
- directory.
- """
- if path is None:
- import IPython.config.default
- path = IPython.config.default.__file__.split(os.path.sep)[:-1]
- path = os.path.sep.join(path)
- src = os.path.join(path, config_file)
- dst = os.path.join(self.location, config_file)
- if not os.path.isfile(dst) or overwrite:
- shutil.copy(src, dst)
-
- def copy_all_config_files(self, path=None, overwrite=False):
- """Copy all config files into the active cluster directory."""
- for f in [u'ipcontroller_config.py', u'ipengine_config.py',
- u'ipcluster_config.py']:
- self.copy_config_file(f, path=path, overwrite=overwrite)
-
- @classmethod
- def create_cluster_dir(csl, cluster_dir):
- """Create a new cluster directory given a full path.
-
- Parameters
- ----------
- cluster_dir : str
- The full path to the cluster directory. If it does exist, it will
- be used. If not, it will be created.
- """
- return ClusterDir(location=cluster_dir)
-
- @classmethod
- def create_cluster_dir_by_profile(cls, path, profile=u'default'):
- """Create a cluster dir by profile name and path.
-
- Parameters
- ----------
- path : str
- The path (directory) to put the cluster directory in.
- profile : str
- The name of the profile. The name of the cluster directory will
- be "cluster_<profile>".
- """
- if not os.path.isdir(path):
- raise ClusterDirError('Directory not found: %s' % path)
- cluster_dir = os.path.join(path, u'cluster_' + profile)
- return ClusterDir(location=cluster_dir)
-
- @classmethod
- def find_cluster_dir_by_profile(cls, ipython_dir, profile=u'default'):
- """Find an existing cluster dir by profile name, return its ClusterDir.
-
- This searches through a sequence of paths for a cluster dir. If it
- is not found, a :class:`ClusterDirError` exception will be raised.
-
- The search path algorithm is:
- 1. ``os.getcwd()``
- 2. ``ipython_dir``
- 3. The directories found in the ":" separated
- :env:`IPCLUSTER_DIR_PATH` environment variable.
-
- Parameters
- ----------
- ipython_dir : unicode or str
- The IPython directory to use.
- profile : unicode or str
- The name of the profile. The name of the cluster directory
- will be "cluster_<profile>".
- """
- dirname = u'cluster_' + profile
- cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
- if cluster_dir_paths:
- cluster_dir_paths = cluster_dir_paths.split(':')
- else:
- cluster_dir_paths = []
- paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
- for p in paths:
- cluster_dir = os.path.join(p, dirname)
- if os.path.isdir(cluster_dir):
- return ClusterDir(location=cluster_dir)
- else:
- raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
-
- @classmethod
- def find_cluster_dir(cls, cluster_dir):
- """Find/create a cluster dir and return its ClusterDir.
-
- This will create the cluster directory if it doesn't exist.
-
- Parameters
- ----------
- cluster_dir : unicode or str
- The path of the cluster directory. This is expanded using
- :func:`IPython.utils.genutils.expand_path`.
- """
- cluster_dir = expand_path(cluster_dir)
- if not os.path.isdir(cluster_dir):
- raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
- return ClusterDir(location=cluster_dir)
-
-
-#-----------------------------------------------------------------------------
-# Command line options
-#-----------------------------------------------------------------------------
-
-class ClusterDirConfigLoader(BaseAppConfigLoader):
-
- def _add_cluster_profile(self, parser):
- paa = parser.add_argument
- paa('-p', '--profile',
- dest='Global.profile',type=unicode,
- help=
- """The string name of the profile to be used. This determines the name
- of the cluster dir as: cluster_<profile>. The default profile is named
- 'default'. The cluster directory is resolve this way if the
- --cluster-dir option is not used.""",
- metavar='Global.profile')
-
- def _add_cluster_dir(self, parser):
- paa = parser.add_argument
- paa('--cluster-dir',
- dest='Global.cluster_dir',type=unicode,
- help="""Set the cluster dir. This overrides the logic used by the
- --profile option.""",
- metavar='Global.cluster_dir')
-
- def _add_work_dir(self, parser):
- paa = parser.add_argument
- paa('--work-dir',
- dest='Global.work_dir',type=unicode,
- help='Set the working dir for the process.',
- metavar='Global.work_dir')
-
- def _add_clean_logs(self, parser):
- paa = parser.add_argument
- paa('--clean-logs',
- dest='Global.clean_logs', action='store_true',
- help='Delete old log flies before starting.')
-
- def _add_no_clean_logs(self, parser):
- paa = parser.add_argument
- paa('--no-clean-logs',
- dest='Global.clean_logs', action='store_false',
- help="Don't Delete old log flies before starting.")
-
- def _add_arguments(self):
- super(ClusterDirConfigLoader, self)._add_arguments()
- self._add_cluster_profile(self.parser)
- self._add_cluster_dir(self.parser)
- self._add_work_dir(self.parser)
- self._add_clean_logs(self.parser)
- self._add_no_clean_logs(self.parser)
-
-
-#-----------------------------------------------------------------------------
-# Crash handler for this application
-#-----------------------------------------------------------------------------
-
-
-_message_template = """\
-Oops, $self.app_name crashed. We do our best to make it stable, but...
-
-A crash report was automatically generated with the following information:
- - A verbatim copy of the crash traceback.
- - Data on your current $self.app_name configuration.
-
-It was left in the file named:
-\t'$self.crash_report_fname'
-If you can email this file to the developers, the information in it will help
-them in understanding and correcting the problem.
-
-You can mail it to: $self.contact_name at $self.contact_email
-with the subject '$self.app_name Crash Report'.
-
-If you want to do it now, the following command will work (under Unix):
-mail -s '$self.app_name Crash Report' $self.contact_email < $self.crash_report_fname
-
-To ensure accurate tracking of this issue, please file a report about it at:
-$self.bug_tracker
-"""
-
-class ClusterDirCrashHandler(CrashHandler):
- """sys.excepthook for IPython itself, leaves a detailed report on disk."""
-
- message_template = _message_template
-
- def __init__(self, app):
- contact_name = release.authors['Brian'][0]
- contact_email = release.authors['Brian'][1]
- bug_tracker = 'http://github.com/ipython/ipython/issues'
- super(ClusterDirCrashHandler,self).__init__(
- app, contact_name, contact_email, bug_tracker
- )
-
-
-#-----------------------------------------------------------------------------
-# Main application
-#-----------------------------------------------------------------------------
-
-class ApplicationWithClusterDir(Application):
- """An application that puts everything into a cluster directory.
-
- Instead of looking for things in the ipython_dir, this type of application
- will use its own private directory called the "cluster directory"
- for things like config files, log files, etc.
-
- The cluster directory is resolved as follows:
-
- * If the ``--cluster-dir`` option is given, it is used.
- * If ``--cluster-dir`` is not given, the application directory is
- resolve using the profile name as ``cluster_<profile>``. The search
- path for this directory is then i) cwd if it is found there
- and ii) in ipython_dir otherwise.
-
- The config file for the application is to be put in the cluster
- dir and named the value of the ``config_file_name`` class attribute.
- """
-
- command_line_loader = ClusterDirConfigLoader
- crash_handler_class = ClusterDirCrashHandler
- auto_create_cluster_dir = True
- # temporarily override default_log_level to INFO
- default_log_level = logging.INFO
-
- def create_default_config(self):
- super(ApplicationWithClusterDir, self).create_default_config()
- self.default_config.Global.profile = u'default'
- self.default_config.Global.cluster_dir = u''
- self.default_config.Global.work_dir = os.getcwd()
- self.default_config.Global.log_to_file = False
- self.default_config.Global.log_url = None
- self.default_config.Global.clean_logs = False
-
- def find_resources(self):
- """This resolves the cluster directory.
-
- This tries to find the cluster directory and if successful, it will
- have done:
- * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
- the application.
- * Sets ``self.cluster_dir`` attribute of the application and config
- objects.
-
- The algorithm used for this is as follows:
- 1. Try ``Global.cluster_dir``.
- 2. Try using ``Global.profile``.
- 3. If both of these fail and ``self.auto_create_cluster_dir`` is
- ``True``, then create the new cluster dir in the IPython directory.
- 4. If all fails, then raise :class:`ClusterDirError`.
- """
-
- try:
- cluster_dir = self.command_line_config.Global.cluster_dir
- except AttributeError:
- cluster_dir = self.default_config.Global.cluster_dir
- cluster_dir = expand_path(cluster_dir)
- try:
- self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
- except ClusterDirError:
- pass
- else:
- self.log.info('Using existing cluster dir: %s' % \
- self.cluster_dir_obj.location
- )
- self.finish_cluster_dir()
- return
-
- try:
- self.profile = self.command_line_config.Global.profile
- except AttributeError:
- self.profile = self.default_config.Global.profile
- try:
- self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
- self.ipython_dir, self.profile)
- except ClusterDirError:
- pass
- else:
- self.log.info('Using existing cluster dir: %s' % \
- self.cluster_dir_obj.location
- )
- self.finish_cluster_dir()
- return
-
- if self.auto_create_cluster_dir:
- self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
- self.ipython_dir, self.profile
- )
- self.log.info('Creating new cluster dir: %s' % \
- self.cluster_dir_obj.location
- )
- self.finish_cluster_dir()
- else:
- raise ClusterDirError('Could not find a valid cluster directory.')
-
- def finish_cluster_dir(self):
- # Set the cluster directory
- self.cluster_dir = self.cluster_dir_obj.location
-
- # These have to be set because they could be different from the one
- # that we just computed. Because command line has the highest
- # priority, this will always end up in the master_config.
- self.default_config.Global.cluster_dir = self.cluster_dir
- self.command_line_config.Global.cluster_dir = self.cluster_dir
-
- def find_config_file_name(self):
- """Find the config file name for this application."""
- # For this type of Application it should be set as a class attribute.
- if not hasattr(self, 'default_config_file_name'):
- self.log.critical("No config filename found")
- else:
- self.config_file_name = self.default_config_file_name
-
- def find_config_file_paths(self):
- # Set the search path to to the cluster directory. We should NOT
- # include IPython.config.default here as the default config files
- # are ALWAYS automatically moved to the cluster directory.
- conf_dir = os.path.join(get_ipython_package_dir(), 'config', 'default')
- self.config_file_paths = (self.cluster_dir,)
-
- def pre_construct(self):
- # The log and security dirs were set earlier, but here we put them
- # into the config and log them.
- config = self.master_config
- sdir = self.cluster_dir_obj.security_dir
- self.security_dir = config.Global.security_dir = sdir
- ldir = self.cluster_dir_obj.log_dir
- self.log_dir = config.Global.log_dir = ldir
- pdir = self.cluster_dir_obj.pid_dir
- self.pid_dir = config.Global.pid_dir = pdir
- self.log.info("Cluster directory set to: %s" % self.cluster_dir)
- config.Global.work_dir = unicode(expand_path(config.Global.work_dir))
- # Change to the working directory. We do this just before construct
- # is called so all the components there have the right working dir.
- self.to_work_dir()
-
- def to_work_dir(self):
- wd = self.master_config.Global.work_dir
- if unicode(wd) != unicode(os.getcwd()):
- os.chdir(wd)
- self.log.info("Changing to working dir: %s" % wd)
-
- def start_logging(self):
- # Remove old log files
- if self.master_config.Global.clean_logs:
- log_dir = self.master_config.Global.log_dir
- for f in os.listdir(log_dir):
- if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
- # if f.startswith(self.name + u'-') and f.endswith('.log'):
- os.remove(os.path.join(log_dir, f))
- # Start logging to the new log file
- if self.master_config.Global.log_to_file:
- log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
- logfile = os.path.join(self.log_dir, log_filename)
- open_log_file = open(logfile, 'w')
- elif self.master_config.Global.log_url:
- open_log_file = None
- else:
- open_log_file = sys.stdout
- if open_log_file is not None:
- self.log.removeHandler(self._log_handler)
- self._log_handler = logging.StreamHandler(open_log_file)
- self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
- self._log_handler.setFormatter(self._log_formatter)
- self.log.addHandler(self._log_handler)
- # log.startLogging(open_log_file)
-
- def write_pid_file(self, overwrite=False):
- """Create a .pid file in the pid_dir with my pid.
-
- This must be called after pre_construct, which sets `self.pid_dir`.
- This raises :exc:`PIDFileError` if the pid file exists already.
- """
- pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
- if os.path.isfile(pid_file):
- pid = self.get_pid_from_file()
- if not overwrite:
- raise PIDFileError(
- 'The pid file [%s] already exists. \nThis could mean that this '
- 'server is already running with [pid=%s].' % (pid_file, pid)
- )
- with open(pid_file, 'w') as f:
- self.log.info("Creating pid file: %s" % pid_file)
- f.write(repr(os.getpid())+'\n')
-
- def remove_pid_file(self):
- """Remove the pid file.
-
- This should be called at shutdown by registering a callback with
- :func:`reactor.addSystemEventTrigger`. This needs to return
- ``None``.
- """
- pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
- if os.path.isfile(pid_file):
- try:
- self.log.info("Removing pid file: %s" % pid_file)
- os.remove(pid_file)
- except:
- self.log.warn("Error removing the pid file: %s" % pid_file)
-
- def get_pid_from_file(self):
- """Get the pid from the pid file.
-
- If the pid file doesn't exist a :exc:`PIDFileError` is raised.
- """
- pid_file = os.path.join(self.pid_dir, self.name + u'.pid')
- if os.path.isfile(pid_file):
- with open(pid_file, 'r') as f:
- pid = int(f.read().strip())
- return pid
- else:
- raise PIDFileError('pid file not found: %s' % pid_file)
-
- def check_pid(self, pid):
- if os.name == 'nt':
- try:
- import ctypes
- # returns 0 if no such process (of ours) exists
- # positive int otherwise
- p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
- except Exception:
- self.log.warn(
- "Could not determine whether pid %i is running via `OpenProcess`. "
- " Making the likely assumption that it is."%pid
- )
- return True
- return bool(p)
- else:
- try:
- p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
- output,_ = p.communicate()
- except OSError:
- self.log.warn(
- "Could not determine whether pid %i is running via `ps x`. "
- " Making the likely assumption that it is."%pid
- )
- return True
- pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
- return pid in pids
-
View
806 IPython/parallel/apps/ipclusterapp.py
@@ -25,12 +25,16 @@
import zmq
from zmq.eventloop import ioloop
-from IPython.external.argparse import ArgumentParser, SUPPRESS
+from IPython.config.application import Application, boolean_flag
+from IPython.config.loader import Config
+from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
from IPython.utils.importstring import import_item
+from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
-from IPython.parallel.apps.clusterdir import (
- ApplicationWithClusterDir, ClusterDirConfigLoader,
- ClusterDirError, PIDFileError
+from IPython.parallel.apps.baseapp import (
+ BaseParallelApplication,
+ PIDFileError,
+ base_flags, base_aliases
)
@@ -42,16 +46,15 @@
default_config_file_name = u'ipcluster_config.py'
-_description = """\
-Start an IPython cluster for parallel computing.\n\n
+_description = """Start an IPython cluster for parallel computing.
An IPython cluster consists of 1 controller and 1 or more engines.
This command automates the startup of these processes using a wide
range of startup methods (SSH, local processes, PBS, mpiexec,
Windows HPC Server 2008). To start a cluster with 4 engines on your
-local host simply do 'ipcluster start -n 4'. For more complex usage
-you will typically do 'ipcluster create -p mycluster', then edit
-configuration files, followed by 'ipcluster start -p mycluster -n 4'.
+local host simply do 'ipcluster start n=4'. For more complex usage
+you will typically do 'ipcluster create profile=mycluster', then edit
+configuration files, followed by 'ipcluster start profile=mycluster n=4'.
"""
@@ -72,426 +75,368 @@
#-----------------------------------------------------------------------------
-# Command line options
-#-----------------------------------------------------------------------------
-
-
-class IPClusterAppConfigLoader(ClusterDirConfigLoader):
-
- def _add_arguments(self):
- # Don't call ClusterDirConfigLoader._add_arguments as we don't want
- # its defaults on self.parser. Instead, we will put those on
- # default options on our subparsers.
-
- # This has all the common options that all subcommands use
- parent_parser1 = ArgumentParser(
- add_help=False,
- argument_default=SUPPRESS
- )
- self._add_ipython_dir(parent_parser1)
- self._add_log_level(parent_parser1)
-
- # This has all the common options that other subcommands use
- parent_parser2 = ArgumentParser(
- add_help=False,
- argument_default=SUPPRESS
- )
- self._add_cluster_profile(parent_parser2)
- self._add_cluster_dir(parent_parser2)
- self._add_work_dir(parent_parser2)
- paa = parent_parser2.add_argument
- paa('--log-to-file',
- action='store_true', dest='Global.log_to_file',
- help='Log to a file in the log directory (default is stdout)')
-
- # Create the object used to create the subparsers.
- subparsers = self.parser.add_subparsers(
- dest='Global.subcommand',
- title='ipcluster subcommands',
- description=
- """ipcluster has a variety of subcommands. The general way of
- running ipcluster is 'ipcluster <cmd> [options]'. To get help
- on a particular subcommand do 'ipcluster <cmd> -h'."""
- # help="For more help, type 'ipcluster <cmd> -h'",
- )
-
- # The "list" subcommand parser
- parser_list = subparsers.add_parser(
- 'list',
- parents=[parent_parser1],
- argument_default=SUPPRESS,
- help="List all clusters in cwd and ipython_dir.",
- description=
- """List all available clusters, by cluster directory, that can
- be found in the current working directly or in the ipython
- directory. Cluster directories are named using the convention
- 'cluster_<profile>'."""
- )
-
- # The "create" subcommand parser
- parser_create = subparsers.add_parser(
- 'create',
- parents=[parent_parser1, parent_parser2],
- argument_default=SUPPRESS,
- help="Create a new cluster directory.",
- description=
- """Create an ipython cluster directory by its profile name or
- cluster directory path. Cluster directories contain
- configuration, log and security related files and are named
- using the convention 'cluster_<profile>'. By default they are
- located in your ipython directory. Once created, you will
- probably need to edit the configuration files in the cluster
- directory to configure your cluster. Most users will create a
- cluster directory by profile name,
- 'ipcluster create -p mycluster', which will put the directory
- in '<ipython_dir>/cluster_mycluster'.
- """
- )
- paa = parser_create.add_argument
- paa('--reset-config',
- dest='Global.reset_config', action='store_true',
- help=
- """Recopy the default config files to the cluster directory.
- You will loose any modifications you have made to these files.""")
-
- # The "start" subcommand parser
- parser_start = subparsers.add_parser(
- 'start',
- parents=[parent_parser1, parent_parser2],
- argument_default=SUPPRESS,
- help="Start a cluster.",
- description=
- """Start an ipython cluster by its profile name or cluster
- directory. Cluster directories contain configuration, log and
- security related files and are named using the convention
- 'cluster_<profile>' and should be creating using the 'start'
- subcommand of 'ipcluster'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster start -n 4 -p <profile>`,
- otherwise use the '--cluster-dir' option.
- """
- )
-
- paa = parser_start.add_argument
- paa('-n', '--number',
- type=int, dest='Global.n',
- help='The number of engines to start.',
- metavar='Global.n')
- paa('--clean-logs',
- dest='Global.clean_logs', action='store_true',
- help='Delete old log flies before starting.')
- paa('--no-clean-logs',
- dest='Global.clean_logs', action='store_false',
- help="Don't delete old log flies before starting.")
- paa('--daemon',
- dest='Global.daemonize', action='store_true',
- help='Daemonize the ipcluster program. This implies --log-to-file')
- paa('--no-daemon',
- dest='Global.daemonize', action='store_false',
- help="Dont't daemonize the ipcluster program.")
- paa('--delay',
- type=float, dest='Global.delay',
- help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
-
- # The "stop" subcommand parser
- parser_stop = subparsers.add_parser(
- 'stop',
- parents=[parent_parser1, parent_parser2],
- argument_default=SUPPRESS,
- help="Stop a running cluster.",
- description=
- """Stop a running ipython cluster by its profile name or cluster
- directory. Cluster directories are named using the convention
- 'cluster_<profile>'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster stop -p <profile>`, otherwise
- use the '--cluster-dir' option.
- """
- )
- paa = parser_stop.add_argument
- paa('--signal',
- dest='Global.signal', type=int,
- help="The signal number to use in stopping the cluster (default=2).",
- metavar="Global.signal")
-
- # the "engines" subcommand parser
- parser_engines = subparsers.add_parser(
- 'engines',
- parents=[parent_parser1, parent_parser2],
- argument_default=SUPPRESS,
- help="Attach some engines to an existing controller or cluster.",
- description=
- """Start one or more engines to connect to an existing Cluster
- by profile name or cluster directory.
- Cluster directories contain configuration, log and
- security related files and are named using the convention
- 'cluster_<profile>' and should be creating using the 'start'
- subcommand of 'ipcluster'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster engines -n 4 -p <profile>`,
- otherwise use the '--cluster-dir' option.
- """
- )
- paa = parser_engines.add_argument
- paa('-n', '--number',
- type=int, dest='Global.n',
- help='The number of engines to start.',
- metavar='Global.n')
- paa('--daemon',
- dest='Global.daemonize', action='store_true',
- help='Daemonize the ipcluster program. This implies --log-to-file')
- paa('--no-daemon',
- dest='Global.daemonize', action='store_false',
- help="Dont't daemonize the ipcluster program.")
-
-#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
+start_help = """Start an IPython cluster for parallel computing
+
+Start an ipython cluster by its profile name or cluster
+directory. Cluster directories contain configuration, log and
+security related files and are named using the convention
+'cluster_<profile>' and should be creating using the 'start'
+subcommand of 'ipcluster'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster start n=4 profile=<profile>`,
+otherwise use the 'profile_dir' option.
+"""
+stop_help = """Stop a running IPython cluster
+
+Stop a running ipython cluster by its profile name or cluster
+directory. Cluster directories are named using the convention
+'cluster_<profile>'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster stop profile=<profile>`, otherwise
+use the 'profile_dir' option.
+"""
+engines_help = """Start engines connected to an existing IPython cluster
+
+Start one or more engines to connect to an existing Cluster
+by profile name or cluster directory.
+Cluster directories contain configuration, log and
+security related files and are named using the convention
+'cluster_<profile>' and should be creating using the 'start'
+subcommand of 'ipcluster'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster engines n=4 profile=<profile>`,
+otherwise use the 'profile_dir' option.
+"""
+create_help = """Create an ipcluster profile by name
+
+Create an ipython cluster directory by its profile name or
+cluster directory path. Cluster directories contain
+configuration, log and security related files and are named
+using the convention 'cluster_<profile>'. By default they are
+located in your ipython directory. Once created, you will
+probably need to edit the configuration files in the cluster
+directory to configure your cluster. Most users will create a
+cluster directory by profile name,
+`ipcluster create profile=mycluster`, which will put the directory
+in `<ipython_dir>/cluster_mycluster`.
+"""
+list_help = """List available cluster profiles
+List all available clusters, by cluster directory, that can
+be found in the current working directly or in the ipython
+directory. Cluster directories are named using the convention
+'cluster_<profile>'.
+"""
-class IPClusterApp(ApplicationWithClusterDir):
- name = u'ipcluster'
- description = _description
- usage = None
- command_line_loader = IPClusterAppConfigLoader
- default_config_file_name = default_config_file_name
- default_log_level = logging.INFO
- auto_create_cluster_dir = False
-
- def create_default_config(self):
- super(IPClusterApp, self).create_default_config()
- self.default_config.Global.controller_launcher = \
- 'IPython.parallel.apps.launcher.LocalControllerLauncher'
- self.default_config.Global.engine_launcher = \
- 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
- self.default_config.Global.n = 2
- self.default_config.Global.delay = 2
- self.default_config.Global.reset_config = False
- self.default_config.Global.clean_logs = True
- self.default_config.Global.signal = signal.SIGINT
- self.default_config.Global.daemonize = False
-
- def find_resources(self):
- subcommand = self.command_line_config.Global.subcommand
- if subcommand=='list':
- self.list_cluster_dirs()
- # Exit immediately because there is nothing left to do.
- self.exit()
- elif subcommand=='create':
- self.auto_create_cluster_dir = True
- super(IPClusterApp, self).find_resources()
- elif subcommand=='start' or subcommand=='stop':
- self.auto_create_cluster_dir = True
- try:
- super(IPClusterApp, self).find_resources()
- except ClusterDirError:
- raise ClusterDirError(
- "Could not find a cluster directory. A cluster dir must "
- "be created before running 'ipcluster start'. Do "
- "'ipcluster create -h' or 'ipcluster list -h' for more "
- "information about creating and listing cluster dirs."
- )
- elif subcommand=='engines':
- self.auto_create_cluster_dir = False
- try:
- super(IPClusterApp, self).find_resources()
- except ClusterDirError:
- raise ClusterDirError(
- "Could not find a cluster directory. A cluster dir must "
- "be created before running 'ipcluster start'. Do "
- "'ipcluster create -h' or 'ipcluster list -h' for more "
- "information about creating and listing cluster dirs."
- )
-
- def list_cluster_dirs(self):
+class IPClusterList(BaseIPythonApplication):
+ name = u'ipcluster-list'
+ description = list_help
+
+ # empty aliases
+ aliases=Dict()
+ flags = Dict(base_flags)
+
+ def _log_level_default(self):
+ return 20
+
+ def list_profile_dirs(self):
# Find the search paths
- cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
- if cluster_dir_paths:
- cluster_dir_paths = cluster_dir_paths.split(':')
+ profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
+ if profile_dir_paths:
+ profile_dir_paths = profile_dir_paths.split(':')
else:
- cluster_dir_paths = []
- try:
- ipython_dir = self.command_line_config.Global.ipython_dir
- except AttributeError:
- ipython_dir = self.default_config.Global.ipython_dir
- paths = [os.getcwd(), ipython_dir] + \
- cluster_dir_paths
+ profile_dir_paths = []
+
+ ipython_dir = self.ipython_dir
+
+ paths = [os.getcwd(), ipython_dir] + profile_dir_paths
paths = list(set(paths))
- self.log.info('Searching for cluster dirs in paths: %r' % paths)
+ self.log.info('Searching for cluster profiles in paths: %r' % paths)
for path in paths:
files = os.listdir(path)
for f in files:
full_path = os.path.join(path, f)
- if os.path.isdir(full_path) and f.startswith('cluster_'):
- profile = full_path.split('_')[-1]
- start_cmd = 'ipcluster start -p %s -n 4' % profile
+ if os.path.isdir(full_path) and f.startswith('profile_') and \
+ os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
+ profile = f.split('_')[-1]
+ start_cmd = 'ipcluster start profile=%s n=4' % profile
print start_cmd + " ==> " + full_path
+
+ def start(self):
+ self.list_profile_dirs()
- def pre_construct(self):
- # IPClusterApp.pre_construct() is where we cd to the working directory.
- super(IPClusterApp, self).pre_construct()
- config = self.master_config
- try:
- daemon = config.Global.daemonize
- if daemon:
- config.Global.log_to_file = True
- except AttributeError:
- pass
- def construct(self):
- config = self.master_config
- subcmd = config.Global.subcommand
- reset = config.Global.reset_config
- if subcmd == 'list':
- return
- if subcmd == 'create':
- self.log.info('Copying default config files to cluster directory '
- '[overwrite=%r]' % (reset,))
- self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
- if subcmd =='start':
- self.cluster_dir_obj.copy_all_config_files(overwrite=False)
- self.start_logging()
- self.loop = ioloop.IOLoop.instance()
- # reactor.callWhenRunning(self.start_launchers)
- dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
- dc.start()
- if subcmd == 'engines':
- self.start_logging()
- self.loop = ioloop.IOLoop.instance()
- # reactor.callWhenRunning(self.start_launchers)
- engine_only = lambda : self.start_launchers(controller=False)
- dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
- dc.start()
+# `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
- def start_launchers(self, controller=True):
- config = self.master_config
-
- # Create the launchers. In both bases, we set the work_dir of
- # the launcher to the cluster_dir. This is where the launcher's
- # subprocesses will be launched. It is not where the controller
- # and engine will be launched.
- if controller:
- cl_class = import_item(config.Global.controller_launcher)
- self.controller_launcher = cl_class(
- work_dir=self.cluster_dir, config=config,
- logname=self.log.name
+create_flags = {}
+create_flags.update(base_flags)
+create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
+ "reset config files to defaults", "leave existing config files"))
+
+class IPClusterCreate(BaseParallelApplication):
+ name = u'ipcluster-create'
+ description = create_help
+ auto_create = Bool(True)
+ config_file_name = Unicode(default_config_file_name)
+
+ flags = Dict(create_flags)
+
+ aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
+
+ classes = [ProfileDir]
+
+
+stop_aliases = dict(
+ signal='IPClusterStop.signal',
+ profile='BaseIPythonApplication.profile',
+ profile_dir='ProfileDir.location',
+)
+
+class IPClusterStop(BaseParallelApplication):
+ name = u'ipcluster'
+ description = stop_help
+ config_file_name = Unicode(default_config_file_name)
+
+ signal = Int(signal.SIGINT, config=True,
+ help="signal to use for stopping processes.")
+
+ aliases = Dict(stop_aliases)
+
+ def start(self):
+ """Start the app for the stop subcommand."""
+ try:
+ pid = self.get_pid_from_file()
+ except PIDFileError:
+ self.log.critical(
+ 'Could not read pid file, cluster is probably not running.'
)
- # Setup the observing of stopping. If the controller dies, shut
- # everything down as that will be completely fatal for the engines.
- self.controller_launcher.on_stop(self.stop_launchers)
- # But, we don't monitor the stopping of engines. An engine dying
- # is just fine and in principle a user could start a new engine.
- # Also, if we did monitor engine stopping, it is difficult to
- # know what to do when only some engines die. Currently, the
- # observing of engine stopping is inconsistent. Some launchers
- # might trigger on a single engine stopping, other wait until
- # all stop. TODO: think more about how to handle this.
- else:
- self.controller_launcher = None
+ # Here I exit with a unusual exit status that other processes
+ # can watch for to learn how I existed.
+ self.remove_pid_file()
+ self.exit(ALREADY_STOPPED)
- el_class = import_item(config.Global.engine_launcher)
- self.engine_launcher = el_class(
- work_dir=self.cluster_dir, config=config, logname=self.log.name
+ if not self.check_pid(pid):
+ self.log.critical(
+ 'Cluster [pid=%r] is not running.' % pid
+ )
+ self.remove_pid_file()
+ # Here I exit with a unusual exit status that other processes
+ # can watch for to learn how I existed.
+ self.exit(ALREADY_STOPPED)
+
+ elif os.name=='posix':
+ sig = self.signal
+ self.log.info(
+ "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
+ )
+ try:
+ os.kill(pid, sig)
+ except OSError:
+ self.log.error("Stopping cluster failed, assuming already dead.",
+ exc_info=True)
+ self.remove_pid_file()
+ elif os.name=='nt':
+ try:
+ # kill the whole tree
+ p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
+ except (CalledProcessError, OSError):
+ self.log.error("Stopping cluster failed, assuming already dead.",
+ exc_info=True)
+ self.remove_pid_file()
+
+engine_aliases = {}
+engine_aliases.update(base_aliases)
+engine_aliases.update(dict(
+ n='IPClusterEngines.n',
+ elauncher = 'IPClusterEngines.engine_launcher_class',
+))
+class IPClusterEngines(BaseParallelApplication):
+
+ name = u'ipcluster'
+ description = engines_help
+ usage = None
+ config_file_name = Unicode(default_config_file_name)
+ default_log_level = logging.INFO
+ classes = List()
+ def _classes_default(self):
+ from IPython.parallel.apps import launcher
+ launchers = launcher.all_launchers
+ eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
+ return [ProfileDir]+eslaunchers
+
+ n = Int(2, config=True,
+ help="The number of engines to start.")
+
+ engine_launcher_class = Unicode('LocalEngineSetLauncher',
+ config=True,
+ help="The class for launching a set of Engines."
)
+ daemonize = Bool(False, config=True,
+ help='Daemonize the ipcluster program. This implies --log-to-file')
- # Setup signals
- signal.signal(signal.SIGINT, self.sigint_handler)
+ def _daemonize_changed(self, name, old, new):
+ if new:
+ self.log_to_file = True
- # Start the controller and engines
- self._stopping = False # Make sure stop_launchers is not called 2x.
- if controller:
- self.start_controller()
- dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
- dc.start()
- self.startup_message()
+ aliases = Dict(engine_aliases)
+ # flags = Dict(flags)
+ _stopping = False
- def startup_message(self, r=None):
- self.log.info("IPython cluster: started")
- return r
-
- def start_controller(self, r=None):
- # self.log.info("In start_controller")
- config = self.master_config
- d = self.controller_launcher.start(
- cluster_dir=config.Global.cluster_dir
+ def initialize(self, argv=None):
+ super(IPClusterEngines, self).initialize(argv)
+ self.init_signal()
+ self.init_launchers()
+
+ def init_launchers(self):
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.engine_launcher.on_stop(lambda r: self.loop.stop())
+
+ def init_signal(self):
+ # Setup signals
+ signal.signal(signal.SIGINT, self.sigint_handler)
+
+ def build_launcher(self, clsname):
+ """import and instantiate a Launcher based on importstring"""
+ if '.' not in clsname:
+ # not a module, presume it's the raw name in apps.launcher
+ clsname = 'IPython.parallel.apps.launcher.'+clsname
+ # print repr(clsname)
+ klass = import_item(clsname)
+
+ launcher = klass(
+ work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
)
- return d
-
- def start_engines(self, r=None):
- # self.log.info("In start_engines")
- config = self.master_config
-
- d = self.engine_launcher.start(
- config.Global.n,
- cluster_dir=config.Global.cluster_dir
+ return launcher
+
+ def start_engines(self):
+ self.log.info("Starting %i engines"%self.n)
+ self.engine_launcher.start(
+ self.n,
+ self.profile_dir.location
)
- return d
-
- def stop_controller(self, r=None):
- # self.log.info("In stop_controller")
- if self.controller_launcher and self.controller_launcher.running:
- return self.controller_launcher.stop()
- def stop_engines(self, r=None):
- # self.log.info("In stop_engines")
+ def stop_engines(self):
+ self.log.info("Stopping Engines...")
if self.engine_launcher.running:
d = self.engine_launcher.stop()
- # d.addErrback(self.log_err)
return d
else:
return None
- def log_err(self, f):
- self.log.error(f.getTraceback())
- return None
-
def stop_launchers(self, r=None):
if not self._stopping:
self._stopping = True
- # if isinstance(r, failure.Failure):
- # self.log.error('Unexpected error in ipcluster:')
- # self.log.info(r.getTraceback())
self.log.error("IPython cluster: stopping")
- # These return deferreds. We are not doing anything with them
- # but we are holding refs to them as a reminder that they
- # do return deferreds.
- d1 = self.stop_engines()
- d2 = self.stop_controller()
+ self.stop_engines()
# Wait a few seconds to let things shut down.
dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
dc.start()
- # reactor.callLater(4.0, reactor.stop)
def sigint_handler(self, signum, frame):
+ self.log.debug("SIGINT received, stopping launchers...")
self.stop_launchers()
def start_logging(self):
# Remove old log files of the controller and engine
- if self.master_config.Global.clean_logs:
- log_dir = self.master_config.Global.log_dir
+ if self.clean_logs:
+ log_dir = self.profile_dir.log_dir
for f in os.listdir(log_dir):
if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
os.remove(os.path.join(log_dir, f))
# This will remove old log files for ipcluster itself
- super(IPClusterApp, self).start_logging()
-
- def start_app(self):
- """Start the application, depending on what subcommand is used."""
- subcmd = self.master_config.Global.subcommand
- if subcmd=='create' or subcmd=='list':
- return
- elif subcmd=='start':
- self.start_app_start()
- elif subcmd=='stop':
- self.start_app_stop()
- elif subcmd=='engines':
- self.start_app_engines()
-
- def start_app_start(self):
+ # super(IPBaseParallelApplication, self).start_logging()
+
+ def start(self):
+ """Start the app for the engines subcommand."""
+ self.log.info("IPython cluster: started")
+ # First see if the cluster is already running
+
+ # Now log and daemonize
+ self.log.info(
+ 'Starting engines with [daemon=%r]' % self.daemonize
+ )
+ # TODO: Get daemonize working on Windows or as a Windows Server.
+ if self.daemonize:
+ if os.name=='posix':
+ from twisted.scripts._twistd_unix import daemonize
+ daemonize()
+
+ dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
+ dc.start()
+ # Now write the new pid file AFTER our new forked pid is active.
+ # self.write_pid_file()
+ try:
+ self.loop.start()
+ except KeyboardInterrupt:
+ pass
+ except zmq.ZMQError as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ raise
+
+start_aliases = {}
+start_aliases.update(engine_aliases)
+start_aliases.update(dict(
+ delay='IPClusterStart.delay',
+ clean_logs='IPClusterStart.clean_logs',
+))
+
+class IPClusterStart(IPClusterEngines):
+
+ name = u'ipcluster'
+ description = start_help
+ default_log_level = logging.INFO
+ auto_create = Bool(True, config=True,
+ help="whether to create the profile_dir if it doesn't exist")
+ classes = List()
+ def _classes_default(self,):
+ from IPython.parallel.apps import launcher
+ return [ProfileDir]+launcher.all_launchers
+
+ clean_logs = Bool(True, config=True,
+ help="whether to cleanup old logs before starting")
+
+ delay = CFloat(1., config=True,
+ help="delay (in s) between starting the controller and the engines")
+
+ controller_launcher_class = Unicode('LocalControllerLauncher',
+ config=True,
+ help="The class for launching a Controller."
+ )
+ reset = Bool(False, config=True,
+ help="Whether to reset config files as part of '--create'."
+ )
+
+ # flags = Dict(flags)
+ aliases = Dict(start_aliases)
+
+ def init_launchers(self):
+ self.controller_launcher = self.build_launcher(self.controller_launcher_class)
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.controller_launcher.on_stop(self.stop_launchers)
+
+ def start_controller(self):
+ self.controller_launcher.start(
+ self.profile_dir.location
+ )
+
+ def stop_controller(self):
+ # self.log.info("In stop_controller")
+ if self.controller_launcher and self.controller_launcher.running:
+ return self.controller_launcher.stop()
+
+ def stop_launchers(self, r=None):
+ if not self._stopping:
+ self.stop_controller()
+ super(IPClusterStart, self).stop_launchers()
+
+ def start(self):
"""Start the app for the start subcommand."""
- config = self.master_config
# First see if the cluster is already running
try:
pid = self.get_pid_from_file()
@@ -512,14 +457,18 @@ def start_app_start(self):
# Now log and daemonize
self.log.info(
- 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
+ 'Starting ipcluster with [daemon=%r]' % self.daemonize
)
# TODO: Get daemonize working on Windows or as a Windows Server.
- if config.Global.daemonize:
+ if self.daemonize:
if os.name=='posix':
from twisted.scripts._twistd_unix import daemonize
daemonize()
+ dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
+ dc.start()
+ dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
+ dc.start()
# Now write the new pid file AFTER our new forked pid is active.
self.write_pid_file()
try:
@@ -534,81 +483,36 @@ def start_app_start(self):
finally:
self.remove_pid_file()
- def start_app_engines(self):
- """Start the app for the start subcommand."""
- config = self.master_config
- # First see if the cluster is already running
-
- # Now log and daemonize
- self.log.info(
- 'Starting engines with [daemon=%r]' % config.Global.daemonize
- )
- # TODO: Get daemonize working on Windows or as a Windows Server.
- if config.Global.daemonize:
- if os.name=='posix':
- from twisted.scripts._twistd_unix import daemonize
- daemonize()
+base='IPython.parallel.apps.ipclusterapp.IPCluster'
- # Now write the new pid file AFTER our new forked pid is active.
- # self.write_pid_file()
- try:
- self.loop.start()
- except KeyboardInterrupt:
- pass
- except zmq.ZMQError as e:
- if e.errno == errno.EINTR:
- pass
- else:
- raise
- # self.remove_pid_file()
+class IPBaseParallelApplication(Application):
+ name = u'ipcluster'
+ description = _description
- def start_app_stop(self):
- """Start the app for the stop subcommand."""
- config = self.master_config
- try:
- pid = self.get_pid_from_file()
- except PIDFileError:
- self.log.critical(
- 'Could not read pid file, cluster is probably not running.'
- )
- # Here I exit with a unusual exit status that other processes
- # can watch for to learn how I existed.
- self.remove_pid_file()
- self.exit(ALREADY_STOPPED)
-
- if not self.check_pid(pid):
- self.log.critical(
- 'Cluster [pid=%r] is not running.' % pid
- )
- self.remove_pid_file()
- # Here I exit with a unusual exit status that other processes
- # can watch for to learn how I existed.
- self.exit(ALREADY_STOPPED)
-
- elif os.name=='posix':
- sig = config.Global.signal
- self.log.info(
- "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
- )
- try:
- os.kill(pid, sig)
- except OSError:
- self.log.error("Stopping cluster failed, assuming already dead.",
- exc_info=True)
- self.remove_pid_file()
- elif os.name=='nt':
- try:
- # kill the whole tree
- p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
- except (CalledProcessError, OSError):
- self.log.error("Stopping cluster failed, assuming already dead.",
- exc_info=True)
- self.remove_pid_file()
-
+ subcommands = {'create' : (base+'Create', create_help),
+ 'list' : (base+'List', list_help),
+ 'start' : (base+'Start', start_help),
+ 'stop' : (base+'Stop', stop_help),
+ 'engines' : (base+'Engines', engines_help),
+ }
+
+ # no aliases or flags for parent App
+ aliases = Dict()
+ flags = Dict()
+
+ def start(self):
+ if self.subapp is None:
+ print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
+ print
+ self.print_subcommands()
+ self.exit(1)
+ else:
+ return self.subapp.start()
def launch_new_instance():
"""Create and run the IPython cluster."""
- app = IPClusterApp()
+ app = IPBaseParallelApplication.instance()
+ app.initialize()
app.start()
View
501 IPython/parallel/apps/ipcontrollerapp.py
@@ -17,31 +17,46 @@
from __future__ import with_statement
-import copy
import os
-import logging
import socket
import stat
import sys
import uuid
+from multiprocessing import Process
+
import zmq
+from zmq.devices import ProcessMonitoredQueue
from zmq.log.handlers import PUBHandler
from zmq.utils import jsonapi as json
-from IPython.config.loader import Config
-
-from IPython.parallel import factory
+from IPython.config.application import boolean_flag
+from IPython.core.newapplication import ProfileDir
-from IPython.parallel.apps.clusterdir import (
- ApplicationWithClusterDir,
- ClusterDirConfigLoader
+from IPython.parallel.apps.baseapp import (
+ BaseParallelApplication,
+ base_flags
)
-from IPython.parallel.util import disambiguate_ip_address, split_url
-# from IPython.kernel.fcutil import FCServiceFactory, FURLError
-from IPython.utils.traitlets import Instance, Unicode
+from IPython.utils.importstring import import_item
+from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
+
+# from IPython.parallel.controller.controller import ControllerFactory
+from IPython.parallel.streamsession import StreamSession
+from IPython.parallel.controller.heartmonitor import HeartMonitor
+from IPython.parallel.controller.hub import HubFactory
+from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
+from IPython.parallel.controller.sqlitedb import SQLiteDB
-from IPython.parallel.controller.controller import ControllerFactory
+from IPython.parallel.util import signal_children, split_url
+
+# conditional import of MongoDB backend class
+
+try:
+ from IPython.parallel.controller.mongodb import MongoDB
+except ImportError:
+ maybe_mongo = []
+else:
+ maybe_mongo = [MongoDB]
#-----------------------------------------------------------------------------
@@ -59,238 +74,112 @@
clients. The controller needs to be started before the engines and can be
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
-your ipython directory and named as "cluster_<profile>". See the --profile
-and --cluster-dir options for details.
+your ipython directory and named as "cluster_<profile>". See the `profile`
+and `profile_dir` options for details.
"""
-#-----------------------------------------------------------------------------
-# Default interfaces
-#-----------------------------------------------------------------------------
-
-# The default client interfaces for FCClientServiceFactory.interfaces
-default_client_interfaces = Config()
-default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
-
-# Make this a dict we can pass to Config.__init__ for the default
-default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
-
-
-
-# The default engine interfaces for FCEngineServiceFactory.interfaces
-default_engine_interfaces = Config()
-default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
-
-# Make this a dict we can pass to Config.__init__ for the default
-default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
-
-
-#-----------------------------------------------------------------------------
-# Service factories
-#-----------------------------------------------------------------------------
-
-#
-# class FCClientServiceFactory(FCServiceFactory):
-# """A Foolscap implementation of the client services."""
-#
-# cert_file = Unicode(u'ipcontroller-client.pem', config=True)
-# interfaces = Instance(klass=Config, kw=default_client_interfaces,
-# allow_none=False, config=True)
-#
-#
-# class FCEngineServiceFactory(FCServiceFactory):
-# """A Foolscap implementation of the engine services."""
-#
-# cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
-# interfaces = Instance(klass=dict, kw=default_engine_interfaces,
-# allow_none=False, config=True)
-#
-
-#-----------------------------------------------------------------------------
-# Command line options
-#-----------------------------------------------------------------------------
-
-class IPControllerAppConfigLoader(ClusterDirConfigLoader):
-
- def _add_arguments(self):
- super(IPControllerAppConfigLoader, self)._add_arguments()
- paa = self.parser.add_argument
-
- ## Hub Config:
- paa('--mongodb',
- dest='HubFactory.db_class', action='store_const',
- const='IPython.parallel.controller.mongodb.MongoDB',
- help='Use MongoDB for task storage [default: in-memory]')
- paa('--sqlite',
- dest='HubFactory.db_class', action='store_const',
- const='IPython.parallel.controller.sqlitedb.SQLiteDB',
- help='Use SQLite3 for DB task storage [default: in-memory]')
- paa('--hb',
- type=int, dest='HubFactory.hb', nargs=2,
- help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
- 'connections [default: random]',
- metavar='Hub.hb_ports')
- paa('--ping',
- type=int, dest='HubFactory.ping',
- help='The frequency at which the Hub pings the engines for heartbeats '
- ' (in ms) [default: 100]',
- metavar='Hub.ping')
-
- # Client config
- paa('--client-ip',
- type=str, dest='HubFactory.client_ip',
- help='The IP address or hostname the Hub will listen on for '
- 'client connections. Both engine-ip and client-ip can be set simultaneously '
- 'via --ip [default: loopback]',
- metavar='Hub.client_ip')
- paa('--client-transport',
- type=str, dest='HubFactory.client_transport',
- help='The ZeroMQ transport the Hub will use for '
- 'client connections. Both engine-transport and client-transport can be set simultaneously '
- 'via --transport [default: tcp]',
- metavar='Hub.client_transport')
- paa('--query',
- type=int, dest='HubFactory.query_port',
- help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
- metavar='Hub.query_port')
- paa('--notifier',
- type=int, dest='HubFactory.notifier_port',
- help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
- metavar='Hub.notifier_port')
-
- # Engine config
- paa('--engine-ip',
- type=str, dest='HubFactory.engine_ip',
- help='The IP address or hostname the Hub will listen on for '
- 'engine connections. This applies to the Hub and its schedulers'
- 'engine-ip and client-ip can be set simultaneously '
- 'via --ip [default: loopback]',
- metavar='Hub.engine_ip')