Permalink
Browse files

update parallel apps to use ProfileDir

  • Loading branch information...
1 parent 6fa4c3f commit 5f11451451ba54aea00e53c555b4a001cfd20ffb @minrk committed May 28, 2011

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -27,12 +27,12 @@
from IPython.config.application import Application, boolean_flag
from IPython.config.loader import Config
-from IPython.core.newapplication import BaseIPythonApplication
+from IPython.core.newapplication import BaseIPythonApplication, ProfileDir
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Int, Unicode, Bool, CFloat, Dict, List
from IPython.parallel.apps.clusterdir import (
- ClusterApplication, ClusterDirError, ClusterDir,
+ BaseParallelApplication,
PIDFileError,
base_flags, base_aliases
)
@@ -86,7 +86,7 @@
subcommand of 'ipcluster'. If your cluster directory is in
the cwd or the ipython directory, you can simply refer to it
using its profile name, 'ipcluster start n=4 profile=<profile>`,
-otherwise use the 'cluster_dir' option.
+otherwise use the 'profile_dir' option.
"""
stop_help = """Stop a running IPython cluster
@@ -95,7 +95,7 @@
'cluster_<profile>'. If your cluster directory is in
the cwd or the ipython directory, you can simply refer to it
using its profile name, 'ipcluster stop profile=<profile>`, otherwise
-use the 'cluster_dir' option.
+use the 'profile_dir' option.
"""
engines_help = """Start engines connected to an existing IPython cluster
@@ -107,7 +107,7 @@
subcommand of 'ipcluster'. If your cluster directory is in
the cwd or the ipython directory, you can simply refer to it
using its profile name, 'ipcluster engines n=4 profile=<profile>`,
-otherwise use the 'cluster_dir' option.
+otherwise use the 'profile_dir' option.
"""
create_help = """Create an ipcluster profile by name
@@ -142,88 +142,70 @@ class IPClusterList(BaseIPythonApplication):
def _log_level_default(self):
return 20
- def list_cluster_dirs(self):
+ def list_profile_dirs(self):
# Find the search paths
- cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
- if cluster_dir_paths:
- cluster_dir_paths = cluster_dir_paths.split(':')
+ profile_dir_paths = os.environ.get('IPYTHON_PROFILE_PATH','')
+ if profile_dir_paths:
+ profile_dir_paths = profile_dir_paths.split(':')
else:
- cluster_dir_paths = []
+ profile_dir_paths = []
ipython_dir = self.ipython_dir
- paths = [os.getcwd(), ipython_dir] + cluster_dir_paths
+ paths = [os.getcwd(), ipython_dir] + profile_dir_paths
paths = list(set(paths))
- self.log.info('Searching for cluster dirs in paths: %r' % paths)
+ self.log.info('Searching for cluster profiles in paths: %r' % paths)
for path in paths:
files = os.listdir(path)
for f in files:
full_path = os.path.join(path, f)
- if os.path.isdir(full_path) and f.startswith('cluster_'):
- profile = full_path.split('_')[-1]
+ if os.path.isdir(full_path) and f.startswith('profile_') and \
+ os.path.isfile(os.path.join(full_path, 'ipcontroller_config.py')):
+ profile = f.split('_')[-1]
start_cmd = 'ipcluster start profile=%s n=4' % profile
print start_cmd + " ==> " + full_path
def start(self):
- self.list_cluster_dirs()
+ self.list_profile_dirs()
+
+
+# `ipcluster create` will be deprecated when `ipython profile create` or equivalent exists
create_flags = {}
create_flags.update(base_flags)
-create_flags.update(boolean_flag('reset', 'IPClusterCreate.reset',
+create_flags.update(boolean_flag('reset', 'IPClusterCreate.overwrite',
"reset config files to defaults", "leave existing config files"))
-class IPClusterCreate(ClusterApplication):
- name = u'ipcluster'
+class IPClusterCreate(BaseParallelApplication):
+ name = u'ipcluster-create'
description = create_help
- auto_create_cluster_dir = Bool(True,
- help="whether to create the cluster_dir if it doesn't exist")
+ auto_create = Bool(True)
config_file_name = Unicode(default_config_file_name)
- reset = Bool(False, config=True,
- help="Whether to reset config files as part of 'create'."
- )
-
flags = Dict(create_flags)
- aliases = Dict(dict(profile='ClusterDir.profile'))
+ aliases = Dict(dict(profile='BaseIPythonApplication.profile'))
- classes = [ClusterDir]
+ classes = [ProfileDir]
- def init_clusterdir(self):
- super(IPClusterCreate, self).init_clusterdir()
- self.log.info('Copying default config files to cluster directory '
- '[overwrite=%r]' % (self.reset,))
- self.cluster_dir.copy_all_config_files(overwrite=self.reset)
-
- def initialize(self, argv=None):
- self.parse_command_line(argv)
- self.init_clusterdir()
stop_aliases = dict(
signal='IPClusterStop.signal',
- profile='ClusterDir.profile',
- cluster_dir='ClusterDir.location',
+ profile='BaseIPythonApplication.profile',
+ profile_dir='ProfileDir.location',
)
-class IPClusterStop(ClusterApplication):
+class IPClusterStop(BaseParallelApplication):
name = u'ipcluster'
description = stop_help
- auto_create_cluster_dir = Bool(False)
config_file_name = Unicode(default_config_file_name)
signal = Int(signal.SIGINT, config=True,
help="signal to use for stopping processes.")
aliases = Dict(stop_aliases)
- def init_clusterdir(self):
- try:
- super(IPClusterStop, self).init_clusterdir()
- except ClusterDirError as e:
- self.log.fatal("Failed ClusterDir init: %s"%e)
- self.exit(1)
-
def start(self):
"""Start the app for the stop subcommand."""
try:
@@ -272,20 +254,19 @@ def start(self):
n='IPClusterEngines.n',
elauncher = 'IPClusterEngines.engine_launcher_class',
))
-class IPClusterEngines(ClusterApplication):
+class IPClusterEngines(BaseParallelApplication):
name = u'ipcluster'
description = engines_help
usage = None
config_file_name = Unicode(default_config_file_name)
default_log_level = logging.INFO
- auto_create_cluster_dir = Bool(False)
classes = List()
def _classes_default(self):
from IPython.parallel.apps import launcher
launchers = launcher.all_launchers
eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
- return [ClusterDir]+eslaunchers
+ return [ProfileDir]+eslaunchers
n = Int(2, config=True,
help="The number of engines to start.")
@@ -327,15 +308,15 @@ def build_launcher(self, clsname):
klass = import_item(clsname)
launcher = klass(
- work_dir=self.cluster_dir.location, config=self.config, logname=self.log.name
+ work_dir=self.profile_dir.location, config=self.config, logname=self.log.name
)
return launcher
def start_engines(self):
self.log.info("Starting %i engines"%self.n)
self.engine_launcher.start(
self.n,
- cluster_dir=self.cluster_dir.location
+ profile_dir=self.profile_dir.location
)
def stop_engines(self):
@@ -362,12 +343,12 @@ def sigint_handler(self, signum, frame):
def start_logging(self):
# Remove old log files of the controller and engine
if self.clean_logs:
- log_dir = self.cluster_dir.log_dir
+ log_dir = self.profile_dir.log_dir
for f in os.listdir(log_dir):
if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
os.remove(os.path.join(log_dir, f))
# This will remove old log files for ipcluster itself
- # super(IPClusterApp, self).start_logging()
+ # super(IPBaseParallelApplication, self).start_logging()
def start(self):
"""Start the app for the engines subcommand."""
@@ -410,12 +391,12 @@ class IPClusterStart(IPClusterEngines):
name = u'ipcluster'
description = start_help
default_log_level = logging.INFO
- auto_create_cluster_dir = Bool(True, config=True,
- help="whether to create the cluster_dir if it doesn't exist")
+ auto_create = Bool(True, config=True,
+ help="whether to create the profile_dir if it doesn't exist")
classes = List()
def _classes_default(self,):
from IPython.parallel.apps import launcher
- return [ClusterDir]+launcher.all_launchers
+ return [ProfileDir]+launcher.all_launchers
clean_logs = Bool(True, config=True,
help="whether to cleanup old logs before starting")
@@ -441,7 +422,7 @@ def init_launchers(self):
def start_controller(self):
self.controller_launcher.start(
- cluster_dir=self.cluster_dir.location
+ profile_dir=self.profile_dir.location
)
def stop_controller(self):
@@ -504,7 +485,7 @@ def start(self):
base='IPython.parallel.apps.ipclusterapp.IPCluster'
-class IPClusterApp(Application):
+class IPBaseParallelApplication(Application):
name = u'ipcluster'
description = _description
@@ -530,7 +511,7 @@ def start(self):
def launch_new_instance():
"""Create and run the IPython cluster."""
- app = IPClusterApp()
+ app = IPBaseParallelApplication()
app.initialize()
app.start()
@@ -17,9 +17,7 @@
from __future__ import with_statement
-import copy
import os
-import logging
import socket
import stat
import sys
@@ -33,26 +31,23 @@
from zmq.utils import jsonapi as json
from IPython.config.loader import Config
-
-from IPython.parallel import factory
+from IPython.core.newapplication import ProfileDir
from IPython.parallel.apps.clusterdir import (
- ClusterDir,
- ClusterApplication,
+ BaseParallelApplication,
base_flags
- # ClusterDirConfigLoader
)
from IPython.utils.importstring import import_item
from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
# from IPython.parallel.controller.controller import ControllerFactory
from IPython.parallel.streamsession import StreamSession
from IPython.parallel.controller.heartmonitor import HeartMonitor
-from IPython.parallel.controller.hub import Hub, HubFactory
+from IPython.parallel.controller.hub import HubFactory
from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
from IPython.parallel.controller.sqlitedb import SQLiteDB
-from IPython.parallel.util import signal_children,disambiguate_ip_address, split_url
+from IPython.parallel.util import signal_children, split_url
# conditional import of MongoDB backend class
@@ -80,7 +75,7 @@
configured using command line options or using a cluster directory. Cluster
directories contain config, log and security files and are usually located in
your ipython directory and named as "cluster_<profile>". See the `profile`
-and `cluster_dir` options for details.
+and `profile_dir` options for details.
"""
@@ -106,15 +101,17 @@
flags.update()
-class IPControllerApp(ClusterApplication):
+class IPControllerApp(BaseParallelApplication):
name = u'ipcontroller'
description = _description
config_file_name = Unicode(default_config_file_name)
- classes = [ClusterDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
+ classes = [ProfileDir, StreamSession, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
+
+ # change default to True
+ auto_create = Bool(True, config=True,
+ help="""Whether to create profile dir if it doesn't exist""")
- auto_create_cluster_dir = Bool(True, config=True,
- help="Whether to create cluster_dir if it exists.")
reuse_files = Bool(False, config=True,
help='Whether to reuse existing json connection files [default: False]'
)
@@ -146,8 +143,6 @@ def _use_threads_changed(self, name, old, new):
self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
aliases = Dict(dict(
- config = 'IPControllerApp.config_file',
- # file = 'IPControllerApp.url_file',
log_level = 'IPControllerApp.log_level',
log_url = 'IPControllerApp.log_url',
reuse_files = 'IPControllerApp.reuse_files',
@@ -172,8 +167,8 @@ def _use_threads_changed(self, name, old, new):
hwm = 'TaskScheduler.hwm',
- profile = "ClusterDir.profile",
- cluster_dir = 'ClusterDir.location',
+ profile = "BaseIPythonApplication.profile",
+ profile_dir = 'ProfileDir.location',
))
flags = Dict(flags)
@@ -192,7 +187,7 @@ def save_connection_dict(self, fname, cdict):
else:
location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
cdict['location'] = location
- fname = os.path.join(self.cluster_dir.security_dir, fname)
+ fname = os.path.join(self.profile_dir.security_dir, fname)
with open(fname, 'w') as f:
f.write(json.dumps(cdict, indent=2))
os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
@@ -201,7 +196,7 @@ def load_config_from_json(self):
"""load config from existing json connector files."""
c = self.config
# load from engine config
- with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-engine.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
cfg = json.loads(f.read())
key = c.StreamSession.key = cfg['exec_key']
xport,addr = cfg['url'].split('://')
@@ -212,7 +207,7 @@ def load_config_from_json(self):
self.location = cfg['location']
# load client config
- with open(os.path.join(self.cluster_dir.security_dir, 'ipcontroller-client.json')) as f:
+ with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
cfg = json.loads(f.read())
assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
xport,addr = cfg['url'].split('://')
@@ -237,7 +232,7 @@ def init_hub(self):
pass
elif self.secure:
key = str(uuid.uuid4())
- # keyfile = os.path.join(self.cluster_dir.security_dir, self.exec_key)
+ # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
# with open(keyfile, 'w') as f:
# f.write(key)
# os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
@@ -332,7 +327,7 @@ def save_urls(self):
"""save the registration urls to files."""
c = self.config
- sec_dir = self.cluster_dir.security_dir
+ sec_dir = self.profile_dir.security_dir
cf = self.factory
with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
Oops, something went wrong.

0 comments on commit 5f11451

Please sign in to comment.