Skip to content
Browse files

ipcluster implemented with new subcommands

  • Loading branch information...
1 parent 1940df6 commit ad7f39d289f66b24ab80f37e9718e69782d9a4db @minrk committed May 21, 2011
View
115 IPython/parallel/apps/clusterdir.py
@@ -82,6 +82,11 @@ class ClusterDir(Configurable):
'default'. The cluster directory is resolve this way if the
`cluster_dir` option is not used.""", config=True
)
+ auto_create = Bool(False,
+ help="""Whether to automatically create the ClusterDirectory if it does
+ not exist""")
+ overwrite = Bool(False,
+ help="""Whether to overwrite existing config files""")
_location_isset = Bool(False) # flag for detecting multiply set location
_new_dir = Bool(False) # flag for whether a new dir was created
@@ -96,10 +101,14 @@ def _location_changed(self, name, old, new):
raise RuntimeError("Cannot set ClusterDir more than once.")
self._location_isset = True
if not os.path.isdir(new):
- os.makedirs(new)
- self._new_dir = True
+ if self.auto_create:
+ os.makedirs(new)
+ self._new_dir = True
+ else:
+ raise ClusterDirError('Directory not found: %s' % new)
+
# ensure config files exist:
- self.copy_all_config_files(overwrite=False)
+ self.copy_all_config_files(overwrite=self.overwrite)
self.security_dir = os.path.join(new, self.security_dir_name)
self.log_dir = os.path.join(new, self.log_dir_name)
self.pid_dir = os.path.join(new, self.pid_dir_name)
@@ -289,22 +298,23 @@ def __init__(self, app):
base_aliases = {
'profile' : "ClusterDir.profile",
'cluster_dir' : 'ClusterDir.location',
- 'log_level' : 'Application.log_level',
- 'work_dir' : 'ClusterDirApplicaiton.work_dir',
- 'log_to_file' : 'ClusterDirApplicaiton.log_to_file',
- 'clean_logs' : 'ClusterDirApplicaiton.clean_logs',
- 'log_url' : 'ClusterDirApplicaiton.log_url',
+ 'auto_create' : 'ClusterDirApplication.auto_create',
+ 'log_level' : 'ClusterApplication.log_level',
+ 'work_dir' : 'ClusterApplication.work_dir',
+ 'log_to_file' : 'ClusterApplication.log_to_file',
+ 'clean_logs' : 'ClusterApplication.clean_logs',
+ 'log_url' : 'ClusterApplication.log_url',
}
base_flags = {
- 'debug' : ( {"Application" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
- 'clean-logs' : ( {"ClusterDirApplication" : {"clean_logs" : True}}, "cleanup old logfiles"),
- 'log-to-file' : ( {"ClusterDirApplication" : {"log_to_file" : True}}, "log to a file")
+ 'debug' : ( {"ClusterApplication" : {"log_level" : logging.DEBUG}}, "set loglevel to DEBUG"),
+ 'quiet' : ( {"ClusterApplication" : {"log_level" : logging.CRITICAL}}, "set loglevel to CRITICAL (minimal output)"),
+ 'log-to-file' : ( {"ClusterApplication" : {"log_to_file" : True}}, "redirect log output to a file"),
}
for k,v in base_flags.iteritems():
base_flags[k] = (Config(v[0]),v[1])
-class ClusterDirApplication(BaseIPythonApplication):
+class ClusterApplication(BaseIPythonApplication):
"""An application that puts everything into a cluster directory.
Instead of looking for things in the ipython_dir, this type of application
@@ -326,9 +336,12 @@ class ClusterDirApplication(BaseIPythonApplication):
crash_handler_class = ClusterDirCrashHandler
auto_create_cluster_dir = Bool(True, config=True,
help="whether to create the cluster_dir if it doesn't exist")
- # temporarily override default_log_level to INFO
- default_log_level = logging.INFO
cluster_dir = Instance(ClusterDir)
+ classes = [ClusterDir]
+
+ def _log_level_default(self):
+ # temporarily override default_log_level to INFO
+ return logging.INFO
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
@@ -339,7 +352,7 @@ def _work_dir_changed(self, name, old, new):
log_to_file = Bool(config=True,
help="whether to log to a file")
- clean_logs = Bool(True, shortname='--clean-logs', config=True,
+ clean_logs = Bool(False, shortname='--clean-logs', config=True,
help="whether to cleanup old logfiles before starting")
log_url = CStr('', shortname='--log-url', config=True,
@@ -349,6 +362,11 @@ def _work_dir_changed(self, name, old, new):
help="""Path to ipcontroller configuration file. The default is to use
<appname>_config.py, as found by cluster-dir."""
)
+
+ loop = Instance('zmq.eventloop.ioloop.IOLoop')
+ def _loop_default(self):
+ from zmq.eventloop.ioloop import IOLoop
+ return IOLoop.instance()
aliases = Dict(base_aliases)
flags = Dict(base_flags)
@@ -370,14 +388,30 @@ def init_clusterdir(self):
``True``, then create the new cluster dir in the IPython directory.
4. If all fails, then raise :class:`ClusterDirError`.
"""
- self.cluster_dir = ClusterDir(config=self.config)
+ self.cluster_dir = ClusterDir(config=self.config, auto_create=self.auto_create_cluster_dir)
if self.cluster_dir._new_dir:
self.log.info('Creating new cluster dir: %s' % \
self.cluster_dir.location)
else:
self.log.info('Using existing cluster dir: %s' % \
self.cluster_dir.location)
-
+
+ def initialize(self, argv=None):
+ """initialize the app"""
+ self.parse_command_line(argv)
+ cl_config = self.config
+ self.init_clusterdir()
+ if self.config_file:
+ self.load_config_file(self.config_file)
+ else:
+ self.load_config_file(self.default_config_file_name, path=self.cluster_dir.location)
+ # command-line should *override* config file, but command-line is necessary
+ # to determine clusterdir, etc.
+ self.update_config(cl_config)
+ self.reinit_logging()
+
+ self.to_work_dir()
+
def to_work_dir(self):
wd = self.work_dir
if unicode(wd) != os.getcwdu():
@@ -386,37 +420,36 @@ def to_work_dir(self):
def load_config_file(self, filename, path=None):
"""Load a .py based config file by filename and path."""
+ # use config.application.Application.load_config
+ # instead of inflexible
+ # core.newapplication.BaseIPythonApplication.load_config
return Application.load_config_file(self, filename, path=path)
#
# def load_default_config_file(self):
# """Load a .py based config file by filename and path."""
# return BaseIPythonApplication.load_config_file(self)
# disable URL-logging
- # def init_logging(self):
- # # Remove old log files
- # if self.master_config.Global.clean_logs:
- # log_dir = self.master_config.Global.log_dir
- # for f in os.listdir(log_dir):
- # if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
- # # if f.startswith(self.name + u'-') and f.endswith('.log'):
- # os.remove(os.path.join(log_dir, f))
- # # Start logging to the new log file
- # if self.master_config.Global.log_to_file:
- # log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
- # logfile = os.path.join(self.log_dir, log_filename)
- # open_log_file = open(logfile, 'w')
- # elif self.master_config.Global.log_url:
- # open_log_file = None
- # else:
- # open_log_file = sys.stdout
- # if open_log_file is not None:
- # self.log.removeHandler(self._log_handler)
- # self._log_handler = logging.StreamHandler(open_log_file)
- # self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
- # self._log_handler.setFormatter(self._log_formatter)
- # self.log.addHandler(self._log_handler)
- # # log.startLogging(open_log_file)
+ def reinit_logging(self):
+ # Remove old log files
+ log_dir = self.cluster_dir.log_dir
+ if self.clean_logs:
+ for f in os.listdir(log_dir):
+ if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
+ os.remove(os.path.join(log_dir, f))
+ if self.log_to_file:
+ # Start logging to the new log file
+ log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
+ logfile = os.path.join(log_dir, log_filename)
+ open_log_file = open(logfile, 'w')
+ else:
+ open_log_file = None
+ if open_log_file is not None:
+ self.log.removeHandler(self._log_handler)
+ self._log_handler = logging.StreamHandler(open_log_file)
+ self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
+ self._log_handler.setFormatter(self._log_formatter)
+ self.log.addHandler(self._log_handler)
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
View
706 IPython/parallel/apps/ipclusterapp.py
@@ -25,14 +25,16 @@
import zmq
from zmq.eventloop import ioloop
+from IPython.config.application import Application, boolean_flag
from IPython.config.loader import Config
+from IPython.core.newapplication import BaseIPythonApplication
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, CStr, CUnicode, Str, Bool, CFloat, Dict, List
from IPython.parallel.apps.clusterdir import (
- ClusterDirApplication, ClusterDirError,
+ ClusterApplication, ClusterDirError, ClusterDir,
PIDFileError,
- base_flags,
+ base_flags, base_aliases
)
@@ -52,8 +54,8 @@
range of startup methods (SSH, local processes, PBS, mpiexec,
Windows HPC Server 2008). To start a cluster with 4 engines on your
local host simply do 'ipcluster start n=4'. For more complex usage
-you will typically do 'ipcluster --create profile=mycluster', then edit
-configuration files, followed by 'ipcluster --start -p mycluster -n 4'.
+you will typically do 'ipcluster create profile=mycluster', then edit
+configuration files, followed by 'ipcluster start profile=mycluster n=4'.
"""
@@ -76,171 +78,76 @@
#-----------------------------------------------------------------------------
# Main application
#-----------------------------------------------------------------------------
-start_help = """Start an ipython cluster by its profile name or cluster
- directory. Cluster directories contain configuration, log and
- security related files and are named using the convention
- 'cluster_<profile>' and should be creating using the 'start'
- subcommand of 'ipcluster'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster start -n 4 -p <profile>`,
- otherwise use the '--cluster-dir' option.
- """
-stop_help = """Stop a running ipython cluster by its profile name or cluster
- directory. Cluster directories are named using the convention
- 'cluster_<profile>'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster stop -p <profile>`, otherwise
- use the '--cluster-dir' option.
- """
-engines_help = """Start one or more engines to connect to an existing Cluster
- by profile name or cluster directory.
- Cluster directories contain configuration, log and
- security related files and are named using the convention
- 'cluster_<profile>' and should be creating using the 'start'
- subcommand of 'ipcluster'. If your cluster directory is in
- the cwd or the ipython directory, you can simply refer to it
- using its profile name, 'ipcluster --engines -n 4 -p <profile>`,
- otherwise use the 'cluster_dir' option.
- """
-create_help = """Create an ipython cluster directory by its profile name or
- cluster directory path. Cluster directories contain
- configuration, log and security related files and are named
- using the convention 'cluster_<profile>'. By default they are
- located in your ipython directory. Once created, you will
- probably need to edit the configuration files in the cluster
- directory to configure your cluster. Most users will create a
- cluster directory by profile name,
- 'ipcluster create -p mycluster', which will put the directory
- in '<ipython_dir>/cluster_mycluster'.
- """
+start_help = """
+Start an ipython cluster by its profile name or cluster
+directory. Cluster directories contain configuration, log and
+security related files and are named using the convention
+'cluster_<profile>' and should be creating using the 'start'
+subcommand of 'ipcluster'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster start n=4 profile=<profile>`,
+otherwise use the 'cluster_dir' option.
+"""
+stop_help = """
+Stop a running ipython cluster by its profile name or cluster
+directory. Cluster directories are named using the convention
+'cluster_<profile>'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster stop profile=<profile>`, otherwise
+use the 'cluster_dir' option.
+"""
+engines_help = """
+Start one or more engines to connect to an existing Cluster
+by profile name or cluster directory.
+Cluster directories contain configuration, log and
+security related files and are named using the convention
+'cluster_<profile>' and should be creating using the 'start'
+subcommand of 'ipcluster'. If your cluster directory is in
+the cwd or the ipython directory, you can simply refer to it
+using its profile name, 'ipcluster engines n=4 profile=<profile>`,
+otherwise use the 'cluster_dir' option.
+"""
+create_help = """
+Create an ipython cluster directory by its profile name or
+cluster directory path. Cluster directories contain
+configuration, log and security related files and are named
+using the convention 'cluster_<profile>'. By default they are
+located in your ipython directory. Once created, you will
+probably need to edit the configuration files in the cluster
+directory to configure your cluster. Most users will create a
+cluster directory by profile name,
+`ipcluster create profile=mycluster`, which will put the directory
+in `<ipython_dir>/cluster_mycluster`.
+"""
list_help = """List all available clusters, by cluster directory, that can
- be found in the current working directly or in the ipython
- directory. Cluster directories are named using the convention
- 'cluster_<profile>'."""
-
-
-flags = {}
-flags.update(base_flags)
-flags.update({
- 'start' : ({ 'IPClusterApp': Config({'subcommand' : 'start'})} , start_help),
- 'stop' : ({ 'IPClusterApp': Config({'subcommand' : 'stop'})} , stop_help),
- 'create' : ({ 'IPClusterApp': Config({'subcommand' : 'create'})} , create_help),
- 'engines' : ({ 'IPClusterApp': Config({'subcommand' : 'engines'})} , engines_help),
- 'list' : ({ 'IPClusterApp': Config({'subcommand' : 'list'})} , list_help),
-
-})
-
-class IPClusterApp(ClusterDirApplication):
-
- name = u'ipcluster'
- description = _description
- usage = None
- default_config_file_name = default_config_file_name
- default_log_level = logging.INFO
- auto_create_cluster_dir = False
- classes = List()
- def _classes_default(self,):
- from IPython.parallel.apps import launcher
- return launcher.all_launchers
-
- n = Int(0, config=True,
- help="The number of engines to start.")
- signal = Int(signal.SIGINT, config=True,
- help="signal to use for stopping. [default: SIGINT]")
- delay = CFloat(1., config=True,
- help="delay (in s) between starting the controller and the engines")
-
- subcommand = Str('', config=True,
- help="""ipcluster has a variety of subcommands. The general way of
- running ipcluster is 'ipcluster --<cmd> [options]'."""
- )
-
- controller_launcher_class = Str('IPython.parallel.apps.launcher.LocalControllerLauncher',
- config=True,
- help="The class for launching a Controller."
- )
- engine_launcher_class = Str('IPython.parallel.apps.launcher.LocalEngineSetLauncher',
- config=True,
- help="The class for launching Engines."
- )
- reset = Bool(False, config=True,
- help="Whether to reset config files as part of '--create'."
- )
- daemonize = Bool(False, config=True,
- help='Daemonize the ipcluster program. This implies --log-to-file')
-
- def _daemonize_changed(self, name, old, new):
- if new:
- self.log_to_file = True
-
- def _n_changed(self, name, old, new):
- # propagate n all over the place...
- # TODO make this clean
- # ensure all classes are covered.
- self.config.LocalEngineSetLauncher.n=new
- self.config.MPIExecEngineSetLauncher.n=new
- self.config.SSHEngineSetLauncher.n=new
- self.config.PBSEngineSetLauncher.n=new
- self.config.SGEEngineSetLauncher.n=new
- self.config.WinHPEngineSetLauncher.n=new
-
- aliases = Dict(dict(
- n='IPClusterApp.n',
- signal = 'IPClusterApp.signal',
- delay = 'IPClusterApp.delay',
- clauncher = 'IPClusterApp.controller_launcher_class',
- elauncher = 'IPClusterApp.engine_launcher_class',
- ))
- flags = Dict(flags)
+be found in the current working directly or in the ipython
+directory. Cluster directories are named using the convention
+'cluster_<profile>'.
+"""
- def init_clusterdir(self):
- subcommand = self.subcommand
- if subcommand=='list':
- self.list_cluster_dirs()
- self.exit(0)
- if subcommand=='create':
- reset = self.reset_config
- self.auto_create_cluster_dir = True
- super(IPClusterApp, self).init_clusterdir()
- self.log.info('Copying default config files to cluster directory '
- '[overwrite=%r]' % (reset,))
- self.cluster_dir.copy_all_config_files(overwrite=reset)
- elif subcommand=='start' or subcommand=='stop':
- self.auto_create_cluster_dir = True
- try:
- super(IPClusterApp, self).init_clusterdir()
- except ClusterDirError:
- raise ClusterDirError(
- "Could not find a cluster directory. A cluster dir must "
- "be created before running 'ipcluster start'. Do "
- "'ipcluster create -h' or 'ipcluster list -h' for more "
- "information about creating and listing cluster dirs."
- )
- elif subcommand=='engines':
- self.auto_create_cluster_dir = False
- try:
- super(IPClusterApp, self).init_clusterdir()
- except ClusterDirError:
- raise ClusterDirError(
- "Could not find a cluster directory. A cluster dir must "
- "be created before running 'ipcluster start'. Do "
- "'ipcluster create -h' or 'ipcluster list -h' for more "
- "information about creating and listing cluster dirs."
- )
+class IPClusterList(BaseIPythonApplication):
+ name = u'ipcluster-list'
+ description = list_help
+
+ # empty aliases
+ aliases=Dict()
+ flags = Dict(base_flags)
+
+ def _log_level_default(self):
+ return 20
+
def list_cluster_dirs(self):
# Find the search paths
cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
if cluster_dir_paths:
cluster_dir_paths = cluster_dir_paths.split(':')
else:
cluster_dir_paths = []
- try:
- ipython_dir = self.ipython_dir
- except AttributeError:
- ipython_dir = self.ipython_dir
- paths = [os.getcwd(), ipython_dir] + \
- cluster_dir_paths
+
+ ipython_dir = self.ipython_dir
+
+ paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
paths = list(set(paths))
self.log.info('Searching for cluster dirs in paths: %r' % paths)
@@ -250,135 +157,195 @@ def list_cluster_dirs(self):
full_path = os.path.join(path, f)
if os.path.isdir(full_path) and f.startswith('cluster_'):
profile = full_path.split('_')[-1]
- start_cmd = 'ipcluster --start profile=%s n=4' % profile
+ start_cmd = 'ipcluster start profile=%s n=4' % profile
print start_cmd + " ==> " + full_path
+
+ def start(self):
+ self.list_cluster_dirs()
- def init_launchers(self):
- config = self.config
- subcmd = self.subcommand
- if subcmd =='start':
- self.start_logging()
- self.loop = ioloop.IOLoop.instance()
- # reactor.callWhenRunning(self.start_launchers)
- dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
- dc.start()
- if subcmd == 'engines':
- self.start_logging()
- self.loop = ioloop.IOLoop.instance()
- # reactor.callWhenRunning(self.start_launchers)
- engine_only = lambda : self.start_launchers(controller=False)
- dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
- dc.start()
+create_flags = {}
+create_flags.update(base_flags)
+create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
+ "reset config files to defaults", "leave existing config files"))
+
+class IPClusterCreate(ClusterApplication):
+ name = u'ipcluster'
+ description = create_help
+ auto_create_cluster_dir = Bool(True,
+ help="whether to create the cluster_dir if it doesn't exist")
+ default_config_file_name = default_config_file_name
+
+ reset = Bool(False, config=True,
+ help="Whether to reset config files as part of 'create'."
+ )
+
+ flags = Dict(create_flags)
+
+ aliases = Dict(dict(profile='ClusterDir.profile'))
+
+ classes = [ClusterDir]
+
+ def init_clusterdir(self):
+ super(IPClusterCreate, self).init_clusterdir()
+ self.log.info('Copying default config files to cluster directory '
+ '[overwrite=%r]' % (self.reset,))
+ self.cluster_dir.copy_all_config_files(overwrite=self.reset)
+
+ def initialize(self, argv=None):
+ self.parse_command_line(argv)
+ self.init_clusterdir()
+
+stop_aliases = dict(
+ signal='IPClusterStop.signal',
+ profile='ClusterDir.profile',
+ cluster_dir='ClusterDir.location',
+)
+
+class IPClusterStop(ClusterApplication):
+ name = u'ipcluster'
+ description = stop_help
+ auto_create_cluster_dir = Bool(False,
+ help="whether to create the cluster_dir if it doesn't exist")
+ default_config_file_name = default_config_file_name
+
+ signal = Int(signal.SIGINT, config=True,
+ help="signal to use for stopping processes.")
+
+ aliases = Dict(stop_aliases)
- def start_launchers(self, controller=True):
- config = self.config
-
- # Create the launchers. In both bases, we set the work_dir of
- # the launcher to the cluster_dir. This is where the launcher's
- # subprocesses will be launched. It is not where the controller
- # and engine will be launched.
- if controller:
- clsname = self.controller_launcher_class
- if '.' not in clsname:
- clsname = 'IPython.parallel.apps.launcher.'+clsname
- cl_class = import_item(clsname)
- self.controller_launcher = cl_class(
- work_dir=self.cluster_dir.location, config=config,
- logname=self.log.name
+ def start(self):
+ """Start the app for the stop subcommand."""
+ try:
+ pid = self.get_pid_from_file()
+ except PIDFileError:
+ self.log.critical(
+ 'Could not read pid file, cluster is probably not running.'
)
- # Setup the observing of stopping. If the controller dies, shut
- # everything down as that will be completely fatal for the engines.
- self.controller_launcher.on_stop(self.stop_launchers)
- # But, we don't monitor the stopping of engines. An engine dying
- # is just fine and in principle a user could start a new engine.
- # Also, if we did monitor engine stopping, it is difficult to
- # know what to do when only some engines die. Currently, the
- # observing of engine stopping is inconsistent. Some launchers
- # might trigger on a single engine stopping, other wait until
- # all stop. TODO: think more about how to handle this.
- else:
- self.controller_launcher = None
+ # Here I exit with a unusual exit status that other processes
+ # can watch for to learn how I existed.
+ self.remove_pid_file()
+ self.exit(ALREADY_STOPPED)
- clsname = self.engine_launcher_class
- if '.' not in clsname:
- # not a module, presume it's the raw name in apps.launcher
- clsname = 'IPython.parallel.apps.launcher.'+clsname
- print repr(clsname)
- el_class = import_item(clsname)
+ if not self.check_pid(pid):
+ self.log.critical(
+ 'Cluster [pid=%r] is not running.' % pid
+ )
+ self.remove_pid_file()
+ # Here I exit with a unusual exit status that other processes
+ # can watch for to learn how I existed.
+ self.exit(ALREADY_STOPPED)
+
+ elif os.name=='posix':
+ sig = self.signal
+ self.log.info(
+ "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
+ )
+ try:
+ os.kill(pid, sig)
+ except OSError:
+ self.log.error("Stopping cluster failed, assuming already dead.",
+ exc_info=True)
+ self.remove_pid_file()
+ elif os.name=='nt':
+ try:
+ # kill the whole tree
+ p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
+ except (CalledProcessError, OSError):
+ self.log.error("Stopping cluster failed, assuming already dead.",
+ exc_info=True)
+ self.remove_pid_file()
+
+engine_aliases = {}
+engine_aliases.update(base_aliases)
+engine_aliases.update(dict(
+ n='IPClusterEngines.n',
+ elauncher = 'IPClusterEngines.engine_launcher_class',
+))
+class IPClusterEngines(ClusterApplication):
+
+ name = u'ipcluster'
+ description = engines_help
+ usage = None
+ default_config_file_name = default_config_file_name
+ default_log_level = logging.INFO
+ auto_create_cluster_dir = Bool(False)
+ classes = List()
+ def _classes_default(self):
+ from IPython.parallel.apps import launcher
+ launchers = launcher.all_launchers
+ eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
+ return [ClusterDir]+eslaunchers
+
+ n = Int(2, config=True,
+ help="The number of engines to start.")
- self.engine_launcher = el_class(
- work_dir=self.cluster_dir.location, config=config, logname=self.log.name
+ engine_launcher_class = Str('LocalEngineSetLauncher',
+ config=True,
+ help="The class for launching a set of Engines."
)
+ daemonize = Bool(False, config=True,
+ help='Daemonize the ipcluster program. This implies --log-to-file')
+ def _daemonize_changed(self, name, old, new):
+ if new:
+ self.log_to_file = True
+
+ aliases = Dict(engine_aliases)
+ # flags = Dict(flags)
+ _stopping = False
+
+ def initialize(self, argv=None):
+ super(IPClusterEngines, self).initialize(argv)
+ self.init_signal()
+ self.init_launchers()
+
+ def init_launchers(self):
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.engine_launcher.on_stop(lambda r: self.loop.stop())
+
+ def init_signal(self):
# Setup signals
signal.signal(signal.SIGINT, self.sigint_handler)
+
+ def build_launcher(self, clsname):
+ """import and instantiate a Launcher based on importstring"""
+ if '.' not in clsname:
+ # not a module, presume it's the raw name in apps.launcher
+ clsname = 'IPython.parallel.apps.launcher.'+clsname
+ # print repr(clsname)
+ klass = import_item(clsname)
- # Start the controller and engines
- self._stopping = False # Make sure stop_launchers is not called 2x.
- if controller:
- self.start_controller()
- dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay*controller, self.loop)
- dc.start()
- self.startup_message()
-
- def startup_message(self, r=None):
- self.log.info("IPython cluster: started")
- return r
-
- def start_controller(self, r=None):
- # self.log.info("In start_controller")
- config = self.config
- d = self.controller_launcher.start(
- cluster_dir=self.cluster_dir.location
+ launcher = klass(
+ work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
)
- return d
-
- def start_engines(self, r=None):
- # self.log.info("In start_engines")
- config = self.config
-
- d = self.engine_launcher.start(
+ return launcher
+
+ def start_engines(self):
+ self.log.info("Starting %i engines"%self.n)
+ self.engine_launcher.start(
self.n,
cluster_dir=self.cluster_dir.location
)
- return d
- def stop_controller(self, r=None):
- # self.log.info("In stop_controller")
- if self.controller_launcher and self.controller_launcher.running:
- return self.controller_launcher.stop()
-
- def stop_engines(self, r=None):
- # self.log.info("In stop_engines")
+ def stop_engines(self):
+ self.log.info("Stopping Engines...")
if self.engine_launcher.running:
d = self.engine_launcher.stop()
- # d.addErrback(self.log_err)
return d
else:
return None
- def log_err(self, f):
- self.log.error(f.getTraceback())
- return None
-
def stop_launchers(self, r=None):
if not self._stopping:
self._stopping = True
- # if isinstance(r, failure.Failure):
- # self.log.error('Unexpected error in ipcluster:')
- # self.log.info(r.getTraceback())
self.log.error("IPython cluster: stopping")
- # These return deferreds. We are not doing anything with them
- # but we are holding refs to them as a reminder that they
- # do return deferreds.
- d1 = self.stop_engines()
- d2 = self.stop_controller()
+ self.stop_engines()
# Wait a few seconds to let things shut down.
dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
dc.start()
- # reactor.callLater(4.0, reactor.stop)
def sigint_handler(self, signum, frame):
+ self.log.debug("SIGINT received, stopping launchers...")
self.stop_launchers()
def start_logging(self):
@@ -392,25 +359,105 @@ def start_logging(self):
# super(IPClusterApp, self).start_logging()
def start(self):
- """Start the application, depending on what subcommand is used."""
- subcmd = self.subcommand
- if subcmd=='create':
- # init_clusterdir step completed create action
- return
- elif subcmd=='start':
- self.start_app_start()
- elif subcmd=='stop':
- self.start_app_stop()
- elif subcmd=='engines':
- self.start_app_engines()
- else:
- self.log.fatal("one command of '--start', '--stop', '--list', '--create', '--engines'"
- " must be specified")
- self.exit(-1)
+ """Start the app for the engines subcommand."""
+ self.log.info("IPython cluster: started")
+ # First see if the cluster is already running
+
+ # Now log and daemonize
+ self.log.info(
+ 'Starting engines with [daemon=%r]' % self.daemonize
+ )
+ # TODO: Get daemonize working on Windows or as a Windows Server.
+ if self.daemonize:
+ if os.name=='posix':
+ from twisted.scripts._twistd_unix import daemonize
+ daemonize()
+
+ dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
+ dc.start()
+ # Now write the new pid file AFTER our new forked pid is active.
+ # self.write_pid_file()
+ try:
+ self.loop.start()
+ except KeyboardInterrupt:
+ pass
+ except zmq.ZMQError as e:
+ if e.errno == errno.EINTR:
+ pass
+ else:
+ raise
+
+start_aliases = {}
+start_aliases.update(engine_aliases)
+start_aliases.update(dict(
+ delay='IPClusterStart.delay',
+ clean_logs='IPClusterStart.clean_logs',
+))
+
+class IPClusterStart(IPClusterEngines):
+
+ name = u'ipcluster'
+ description = start_help
+ usage = None
+ default_config_file_name = default_config_file_name
+ default_log_level = logging.INFO
+ auto_create_cluster_dir = Bool(True, config=True,
+ help="whether to create the cluster_dir if it doesn't exist")
+ classes = List()
+ def _classes_default(self,):
+ from IPython.parallel.apps import launcher
+ return [ClusterDir]+launcher.all_launchers
+
+ clean_logs = Bool(True, config=True,
+ help="whether to cleanup old logs before starting")
+
+ delay = CFloat(1., config=True,
+ help="delay (in s) between starting the controller and the engines")
+
+ controller_launcher_class = Str('LocalControllerLauncher',
+ config=True,
+ help="The class for launching a Controller."
+ )
+ reset = Bool(False, config=True,
+ help="Whether to reset config files as part of '--create'."
+ )
+
+ # flags = Dict(flags)
+ aliases = Dict(start_aliases)
+
+ def init_clusterdir(self):
+ try:
+ super(IPClusterStart, self).init_clusterdir()
+ except ClusterDirError:
+ raise ClusterDirError(
+ "Could not find a cluster directory. A cluster dir must "
+ "be created before running 'ipcluster start'. Do "
+ "'ipcluster create -h' or 'ipcluster list -h' for more "
+ "information about creating and listing cluster dirs."
+ )
+
+ def init_launchers(self):
+ self.controller_launcher = self.build_launcher(self.controller_launcher_class)
+ self.engine_launcher = self.build_launcher(self.engine_launcher_class)
+ self.controller_launcher.on_stop(self.stop_launchers)
+
+ def start_controller(self):
+ self.controller_launcher.start(
+ cluster_dir=self.cluster_dir.location
+ )
+
+ def stop_controller(self):
+ # self.log.info("In stop_controller")
+ if self.controller_launcher and self.controller_launcher.running:
+ return self.controller_launcher.stop()
+
+ def stop_launchers(self, r=None):
+ if not self._stopping:
+ self.stop_controller()
+ super(IPClusterStart, self).stop_launchers()
- def start_app_start(self):
+ def start(self):
"""Start the app for the start subcommand."""
- config = self.config
# First see if the cluster is already running
try:
pid = self.get_pid_from_file()
@@ -439,6 +486,10 @@ def start_app_start(self):
from twisted.scripts._twistd_unix import daemonize
daemonize()
+ dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
+ dc.start()
+ dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
+ dc.start()
# Now write the new pid file AFTER our new forked pid is active.
self.write_pid_file()
try:
@@ -453,95 +504,36 @@ def start_app_start(self):
finally:
self.remove_pid_file()
- def start_app_engines(self):
- """Start the app for the start subcommand."""
- config = self.config
- # First see if the cluster is already running
-
- # Now log and daemonize
- self.log.info(
- 'Starting engines with [daemon=%r]' % self.daemonize
- )
- # TODO: Get daemonize working on Windows or as a Windows Server.
- if self.daemonize:
- if os.name=='posix':
- from twisted.scripts._twistd_unix import daemonize
- daemonize()
+base='IPython.parallel.apps.ipclusterapp.IPCluster'
- # Now write the new pid file AFTER our new forked pid is active.
- # self.write_pid_file()
- try:
- self.loop.start()
- except KeyboardInterrupt:
- pass
- except zmq.ZMQError as e:
- if e.errno == errno.EINTR:
- pass
- else:
- raise
- # self.remove_pid_file()
+class IPClusterApp(Application):
+ name = u'ipcluster'
+ description = _description
- def start_app_stop(self):
- """Start the app for the stop subcommand."""
- config = self.config
- try:
- pid = self.get_pid_from_file()
- except PIDFileError:
- self.log.critical(
- 'Could not read pid file, cluster is probably not running.'
- )
- # Here I exit with a unusual exit status that other processes
- # can watch for to learn how I existed.
- self.remove_pid_file()
- self.exit(ALREADY_STOPPED)
-
- if not self.check_pid(pid):
- self.log.critical(
- 'Cluster [pid=%r] is not running.' % pid
- )
- self.remove_pid_file()
- # Here I exit with a unusual exit status that other processes
- # can watch for to learn how I existed.
- self.exit(ALREADY_STOPPED)
-
- elif os.name=='posix':
- sig = self.signal
- self.log.info(
- "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
- )
- try:
- os.kill(pid, sig)
- except OSError:
- self.log.error("Stopping cluster failed, assuming already dead.",
- exc_info=True)
- self.remove_pid_file()
- elif os.name=='nt':
- try:
- # kill the whole tree
- p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
- except (CalledProcessError, OSError):
- self.log.error("Stopping cluster failed, assuming already dead.",
- exc_info=True)
- self.remove_pid_file()
-
+ subcommands = {'create' : (base+'Create', create_help),
+ 'list' : (base+'List', list_help),
+ 'start' : (base+'Start', start_help),
+ 'stop' : (base+'Stop', stop_help),
+ 'engines' : (base+'Engines', engines_help),
+ }
+
+ # no aliases or flags for parent App
+ aliases = Dict()
+ flags = Dict()
+
+ def start(self):
+ if self.subapp is None:
+ print "No subcommand specified! Must specify one of: %s"%(self.subcommands.keys())
+ print
+ self.print_subcommands()
+ self.exit(1)
+ else:
+ return self.subapp.start()
def launch_new_instance():
"""Create and run the IPython cluster."""
app = IPClusterApp()
- app.parse_command_line()
- cl_config = app.config
- app.init_clusterdir()
- if app.config_file:
- app.load_config_file(app.config_file)
- else:
- app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
- # command-line should *override* config file, but command-line is necessary
- # to determine clusterdir, etc.
- app.update_config(cl_config)
-
- app.to_work_dir()
- app.init_launchers()
-
+ app.initialize()
app.start()
View
28 IPython/parallel/apps/ipcontrollerapp.py
@@ -38,7 +38,7 @@
from IPython.parallel.apps.clusterdir import (
ClusterDir,
- ClusterDirApplication,
+ ClusterApplication,
base_flags
# ClusterDirConfigLoader
)
@@ -104,7 +104,7 @@
flags.update()
-class IPControllerApp(ClusterDirApplication):
+class IPControllerApp(ClusterApplication):
name = u'ipcontroller'
description = _description
@@ -361,6 +361,12 @@ def do_import_statements(self):
# handler.setLevel(self.log_level)
# self.log.addHandler(handler)
# #
+
+ def initialize(self, argv=None):
+ super(IPControllerApp, self).initialize(argv)
+ self.init_hub()
+ self.init_schedulers()
+
def start(self):
# Start the subprocesses:
self.factory.start()
@@ -380,27 +386,13 @@ def start(self):
self.factory.loop.start()
except KeyboardInterrupt:
self.log.critical("Interrupted, Exiting...\n")
+
def launch_new_instance():
"""Create and run the IPython controller"""
app = IPControllerApp()
- app.parse_command_line()
- cl_config = app.config
- # app.load_config_file()
- app.init_clusterdir()
- if app.config_file:
- app.load_config_file(app.config_file)
- else:
- app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
- # command-line should *override* config file, but command-line is necessary
- # to determine clusterdir, etc.
- app.update_config(cl_config)
-
- app.to_work_dir()
- app.init_hub()
- app.init_schedulers()
-
+ app.initialize()
app.start()
View
33 IPython/parallel/apps/ipengineapp.py
@@ -23,7 +23,7 @@
from zmq.eventloop import ioloop
from IPython.parallel.apps.clusterdir import (
- ClusterDirApplication,
+ ClusterApplication,
ClusterDir,
base_aliases,
# ClusterDirConfigLoader
@@ -99,13 +99,16 @@ def _on_use_changed(self, old, new):
#-----------------------------------------------------------------------------
-class IPEngineApp(ClusterDirApplication):
+class IPEngineApp(ClusterApplication):
app_name = Unicode(u'ipengine')
description = Unicode(_description)
default_config_file_name = default_config_file_name
classes = List([ClusterDir, StreamSession, EngineFactory, Kernel, MPI])
+ auto_create_cluster_dir = Bool(False, config=True,
+ help="whether to create the cluster_dir if it doesn't exist")
+
startup_script = Unicode(u'', config=True,
help='specify a script to be run at startup')
startup_command = Str('', config=True,
@@ -262,7 +265,11 @@ def init_mpi(self):
else:
mpi = None
-
+ def initialize(self, argv=None):
+ super(IPEngineApp, self).initialize(argv)
+ self.init_mpi()
+ self.init_engine()
+
def start(self):
self.engine.start()
try:
@@ -274,25 +281,7 @@ def start(self):
def launch_new_instance():
"""Create and run the IPython engine"""
app = IPEngineApp()
- app.parse_command_line()
- cl_config = app.config
- app.init_clusterdir()
- # app.load_config_file()
- # print app.config
- if app.config_file:
- app.load_config_file(app.config_file)
- else:
- app.load_config_file(app.default_config_file_name, path=app.cluster_dir.location)
-
- # command-line should *override* config file, but command-line is necessary
- # to determine clusterdir, etc.
- app.update_config(cl_config)
-
- # print app.config
- app.to_work_dir()
- app.init_mpi()
- app.init_engine()
- print app.config
+ app.initialize()
app.start()
View
4 IPython/parallel/apps/iploggerapp.py
@@ -21,7 +21,7 @@
import zmq
from IPython.parallel.apps.clusterdir import (
- ClusterDirApplication,
+ ClusterApplication,
ClusterDirConfigLoader
)
from IPython.parallel.apps.logwatcher import LogWatcher
@@ -74,7 +74,7 @@ def _add_arguments(self):
#-----------------------------------------------------------------------------
-class IPLoggerApp(ClusterDirApplication):
+class IPLoggerApp(ClusterApplication):
name = u'iploggerz'
description = _description

0 comments on commit ad7f39d

Please sign in to comment.
Something went wrong with that request. Please try again.