# encoding: utf-8
The Base Application class for IPython.parallel apps
* Brian Granger
* Min RK
# Copyright (C) 2008-2011 The IPython Development Team
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
# Imports
from __future__ import with_statement
import os
import logging
import re
import sys
from subprocess import Popen, PIPE
from IPython.core import release
from IPython.core.crashhandler import CrashHandler
from IPython.core.application import (
base_aliases as base_ip_aliases,
base_flags as base_ip_flags
from IPython.utils.path import expand_path
from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
# Module errors
class PIDFileError(Exception):
# Crash handler for this application
class ParallelCrashHandler(CrashHandler):
"""sys.excepthook for IPython itself, leaves a detailed report on disk."""
def __init__(self, app):
contact_name = release.authors['Min'][0]
contact_email = release.authors['Min'][1]
bug_tracker = ''
app, contact_name, contact_email, bug_tracker
# Main application
base_aliases = {}
'profile-dir' : 'ProfileDir.location',
'work-dir' : 'BaseParallelApplication.work_dir',
'log-to-file' : 'BaseParallelApplication.log_to_file',
'clean-logs' : 'BaseParallelApplication.clean_logs',
'log-url' : 'BaseParallelApplication.log_url',
'cluster-id' : 'BaseParallelApplication.cluster_id',
base_flags = {
'log-to-file' : (
{'BaseParallelApplication' : {'log_to_file' : True}},
"send log output to a file"
class BaseParallelApplication(BaseIPythonApplication):
"""The base Application for IPython.parallel apps
Principle extensions to BaseIPyythonApplication:
* work_dir
* remote logging via pyzmq
* IOLoop instance
crash_handler_class = ParallelCrashHandler
def _log_level_default(self):
# temporarily override default_log_level to INFO
return logging.INFO
work_dir = Unicode(os.getcwdu(), config=True,
help='Set the working dir for the process.'
def _work_dir_changed(self, name, old, new):
self.work_dir = unicode(expand_path(new))
log_to_file = Bool(config=True,
help="whether to log to a file")
clean_logs = Bool(False, config=True,
help="whether to cleanup old logfiles before starting")
log_url = Unicode('', config=True,
help="The ZMQ URL of the iplogger to aggregate logging.")
cluster_id = Unicode('', config=True,
help="""String id to add to runtime files, to prevent name collisions when
using multiple clusters with a single profile simultaneously.
When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
Since this is text inserted into filenames, typical recommendations apply:
Simple character strings are ideal, and spaces are not recommended (but should
generally work).
def _cluster_id_changed(self, name, old, new): =
if new: += '-%s'%new
def _config_files_default(self):
return ['', '', '']
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
from zmq.eventloop.ioloop import IOLoop
return IOLoop.instance()
aliases = Dict(base_aliases)
flags = Dict(base_flags)
def initialize(self, argv=None):
"""initialize the app"""
super(BaseParallelApplication, self).initialize(argv)
def to_work_dir(self):
wd = self.work_dir
if unicode(wd) != os.getcwdu():
os.chdir(wd)"Changing to working dir: %s" % wd)
# This is the working dir by now.
sys.path.insert(0, '')
def reinit_logging(self):
# Remove old log files
log_dir = self.profile_dir.log_dir
if self.clean_logs:
for f in os.listdir(log_dir):
if re.match(r'%s-\d+\.(log|err|out)',f):
os.remove(os.path.join(log_dir, f))
if self.log_to_file:
# Start logging to the new log file
log_filename = + u'-' + str(os.getpid()) + u'.log'
logfile = os.path.join(log_dir, log_filename)
open_log_file = open(logfile, 'w')
open_log_file = None
if open_log_file is not None:
self._log_handler = logging.StreamHandler(open_log_file)
self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
# do not propagate log messages to root logger
# ipcluster app will sometimes print duplicate messages during shutdown
# if this is 1 (default):
self.log.propagate = False
def write_pid_file(self, overwrite=False):
"""Create a .pid file in the pid_dir with my pid.
This must be called after pre_construct, which sets `self.pid_dir`.
This raises :exc:`PIDFileError` if the pid file exists already.
pid_file = os.path.join(self.profile_dir.pid_dir, + u'.pid')
if os.path.isfile(pid_file):
pid = self.get_pid_from_file()
if not overwrite:
raise PIDFileError(
'The pid file [%s] already exists. \nThis could mean that this '
'server is already running with [pid=%s].' % (pid_file, pid)
with open(pid_file, 'w') as f:"Creating pid file: %s" % pid_file)
def remove_pid_file(self):
"""Remove the pid file.
This should be called at shutdown by registering a callback with
:func:`reactor.addSystemEventTrigger`. This needs to return
pid_file = os.path.join(self.profile_dir.pid_dir, + u'.pid')
if os.path.isfile(pid_file):
try:"Removing pid file: %s" % pid_file)
self.log.warn("Error removing the pid file: %s" % pid_file)
def get_pid_from_file(self):
"""Get the pid from the pid file.
If the pid file doesn't exist a :exc:`PIDFileError` is raised.
pid_file = os.path.join(self.profile_dir.pid_dir, + u'.pid')
if os.path.isfile(pid_file):
with open(pid_file, 'r') as f:
s =
pid = int(s)
raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
return pid
raise PIDFileError('pid file not found: %s' % pid_file)
def check_pid(self, pid):
if == 'nt':
import ctypes
# returns 0 if no such process (of ours) exists
# positive int otherwise
p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
except Exception:
"Could not determine whether pid %i is running via `OpenProcess`. "
" Making the likely assumption that it is."%pid
return True
return bool(p)
p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
output,_ = p.communicate()
except OSError:
"Could not determine whether pid %i is running via `ps x`. "
" Making the likely assumption that it is."%pid
return True
pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
return pid in pids
