diff --git a/pathomx/Pathomx.py b/pathomx/Pathomx.py index 608ec58..4a34487 100755 --- a/pathomx/Pathomx.py +++ b/pathomx/Pathomx.py @@ -428,8 +428,6 @@ def do_open_demo(f): # IPython Widget for internal (user) console self.console = RichIPythonWidget() self.console._call_tip = lambda: None - self.console.kernel_manager = notebook_queue.in_process_runner.kernel_manager - self.console.kernel_client = notebook_queue.in_process_runner.kernel_client self.central = QTabWidget() self.central.setDocumentMode(True) @@ -1191,7 +1189,6 @@ def main(): app.installTranslator(translator_mp) # We've got a qApp instance going, set up timers - notebook_queue.create_user_kernel() notebook_queue.create_runners() notebook_queue.start_timers() ''' diff --git a/pathomx/globals.py b/pathomx/globals.py index 4e239d5..90a453f 100644 --- a/pathomx/globals.py +++ b/pathomx/globals.py @@ -7,7 +7,7 @@ from collections import defaultdict from .qt import * -from .runqueue import RunManager +from .runqueue import Queue from pyqtconfig import QSettingsManager from yapsy.PluginManager import PluginManagerSingleton @@ -45,7 +45,7 @@ # Manager objects logging.debug('Setting up managers...') styles = StylesManager() - notebook_queue = RunManager() + notebook_queue = Queue() settings = QSettingsManager() settings.set_defaults({ diff --git a/pathomx/kernel_helpers.py b/pathomx/kernel_helpers.py index 85e617e..d37218a 100644 --- a/pathomx/kernel_helpers.py +++ b/pathomx/kernel_helpers.py @@ -44,10 +44,10 @@ def __init__(self, name, *args, **kwargs): self._name = name -def pathomx_notebook_start(varsi, vars): +def pathomx_notebook_start(vars): - for k, v in varsi.items(): - vars[k] = v + #for k, v in varsi.items(): + # vars[k] = v # _keep_input_vars = ['styles'] # vars['_pathomx_exclude_input_vars'] = [x for x in varsi.keys() if x not in _keep_input_vars] @@ -60,16 +60,20 @@ def pathomx_notebook_start(varsi, vars): else: vars[k] = None - if 'rcParams' in vars: + if '_rcParams' in vars: global rcParams from matplotlib import rcParams # Block warnings from deprecated rcParams here with warnings.catch_warnings(): warnings.simplefilter("ignore") - for k, v in vars['rcParams'].items(): + for k, v in vars['_rcParams'].items(): rcParams[k] = v + # Legacy shim + if '_styles' in vars: + vars['styles'] = vars['_styles'] + def pathomx_notebook_stop(vars): varso = {} diff --git a/pathomx/runqueue.py b/pathomx/runqueue.py index 217fe30..dfab08c 100644 --- a/pathomx/runqueue.py +++ b/pathomx/runqueue.py @@ -1,6 +1,6 @@ import logging -from collections import namedtuple +from collections import namedtuple, defaultdict from .qt import * @@ -9,6 +9,8 @@ from IPython.qt.console.ansi_code_processor import QtAnsiCodeProcessor from IPython.parallel import Client, TimeoutError, RemoteError +from IPython.utils.pickleutil import use_dill +use_dill() from datetime import datetime import re @@ -17,6 +19,8 @@ from subprocess import Popen from IPython.parallel.apps import ipclusterapp +from matplotlib import rcParams + # Kernel is busy but not because of us STATUS_BLOCKED = -1 @@ -24,489 +28,705 @@ STATUS_READY = 0 STATUS_RUNNING = 1 STATUS_COMPLETE = 2 + +# Error status STATUS_ERROR = 3 +AR_PUSH = 1 +AR_EXECUTE = 2 +AR_PULL = 3 + +# Job submitted as initiating tool +# Pre-calculate the execution order and flow (save the round-trip execution) stopping at paused tools +# - the set of this available on job object for mapping +# Compare to existing jobs in the queue; if any tool's lists are a subset of this tools list kill that job +# - is this interactive mode only? decided later; may require override +# +# Initiate job +# - check run flag (still to run) +# Set all included tools as status-clear +# Run first task, with callback trigger to next step (+callback to tool; multiples? how to handle) +# - Each run-task should check run-flag before continuing +# +# Job complete; delete job from queue + + + + + + # from pkg_resources import load_entry_point # load_entry_point('ipython==3.0.0-dev', 'console_scripts', 'ipcluster')() # IPython.parallel.apps.ipclusterapp:launch_new_instance' -# FIXME; we need to base-class the runner code -def setup_languages(execute, language): - if language == 'r': - # Init R library loader (will take time first time; but instant thereafter) - execute(r'''%load_ext rpy2.ipython''') - elif language == 'matlab': - # Init MATLAB - execute(r'''%load_ext pymatbridge''') -class ClusterRunner(QObject): +class Runner(QObject): """ - A runner object that handles running IPython code on an IPython cluster for - parallel processing without blocking the UI. + A runner object that handles running an Task object on an IPython cluster kernel + + Each Task object consists of multiple Exec objects which can be executed immediately in turn. + (scheduling is already handled by the Job object the Task is provided by). + + Each Exec can include varsi (vars in), code, varso (vars out) and pre-execution non-Python code setup. + The code is passed as a list of multiple objects, which must be executed then watched for output + via the AsyncResult object. + + Callbacks are fired on the originating Exec object for progress, success, failure of any step. """ - pass - def __init__(self, e, *args, **kwargs): - super(ClusterRunner, self).__init__(*args, **kwargs) + def __init__(self, k, *args, **kwargs): + super(Runner, self).__init__(*args, **kwargs) + + self.k = k - self.e = e - self.ar = None - self.aro = None - self._is_active = False + # Reset settings + self.reset() self._status = STATUS_READY - self.stdout = "" - ''' - Runner metadata; - - tool-metadata (?): - - last-run kernel [check for lookup/push requirement before starting] - - - ''' + # Check over ASync objects for updated status self.status_timer = QTimer() self.status_timer.timeout.connect(self.check_status) self.status_timer.start(100) # 0.1 sec + # Trigger to update the progress of running Exec objects (scan stdout of ARs for progress) self.progress_timer = QTimer() self.progress_timer.timeout.connect(self.check_progress) - self.progress_timer.start(1000) # 1 sec + self.progress_timer.start(500) # 1 sec @property def is_active(self): - return self._is_active or self.e.queue_status()['queue'] > 0 + return self._is_active or self.k.queue_status()['queue'] > 0 @property def status(self): - if self._status == STATUS_READY and self.e.queue_status()['queue'] > 0: + if self._status == STATUS_READY and self.k.queue_status()['queue'] > 0: return STATUS_BLOCKED else: return self._status - def run(self, tool, varsi, progress_callback=None, result_callback=None): - code = tool.code + def setup_language(self, language): + if language == 'r': + # Init R library loader (will take time first time; but instant thereafter) + self.k.execute(r'''%load_ext rpy2.ipython''') + + elif language == 'matlab': + # Init MATLAB + self.k.execute(r'''%load_ext pymatbridge''') + + def run(self, task): + """ + Takes a single Task object and executes all Exec objects from within immediately on the available kernel. + """ + + if self.is_active: + logging.error("Runner kernel not ready, but it got a job. Job ignored.") + return False + + # Dicts for accessing the running Exec from the AR and vice versa + self.reset() self._is_active = True self._status = STATUS_RUNNING - self.stdout = "" - self._progress_callback = progress_callback - self._result_callback = result_callback + self.task = task + + + for e in task.execute: - # Check metadata to see if this kernel has the outputs for the previous tool - self.e.execute('%reset_selective -f [^_]') - print(varsi) - self.e.push({'varsi': varsi}) - self.e.execute(r'''from pathomx.kernel_helpers import pathomx_notebook_start, pathomx_notebook_stop, progress, open_with_progress -pathomx_notebook_start(varsi, vars());''') + # Clear workspace before each task run + self.k.execute('%reset_selective -f [^_]') - setup_languages(self.e.execute, tool.language) + # Execute language-specific setup if required; note: fails silently! + self.setup_language(e.language) + ars = [] + + if e.varsi: + ar = self.k.push( e.varsi, block=False) + ars.append(ar) + self.ar_types[ar] = AR_PUSH + + if e.code: + for c in e.code: + ar = self.k.execute(c, block=False) + ars.append(ar) + self.ar_types[ar] = AR_EXECUTE + + if e.varso: + for v in e.varso: + ar = self.k.pull(v, block=False) + ars.append(ar) + self.ar_types[ar] = AR_PULL + + self.ar_by_exec[e] = ars - self.ar = self.e.execute(code) - self.e.execute(r'''pathomx_notebook_stop(vars());''') # This will queue directly after the main code block def check_status(self): - result = {} - - if self.ar: - - self.stdout = self.ar.stdout - try: - r = self.ar.get(0) - except TimeoutError: - pass - except RemoteError as e: - # Handle all code exceptions and pass back the exception - result['status'] = -1 - result['traceback'] = '\n'.join(e.render_traceback()) - result['stdout'] = self.stdout - if self._result_callback: - self._result_callback(result) - - self.ar = None - self._is_active = False # Release this kernel - self._status = STATUS_ERROR - - else: - self.ar = None - self.aro = self.e.pull('varso', block=False) - self._status = STATUS_COMPLETE - - elif self.aro: - try: - varso = self.aro.get(0) - except TimeoutError: - pass - except RemoteError as e: - result['status'] = -1 - result['traceback'] = '\n'.join(e.render_traceback()) - result['stdout'] = self.stdout - result['varso'] = [] - if self._result_callback: - self._result_callback(result) - - self.aro = None - self._is_active = False # Release this kernel - self._status = STATUS_ERROR - - else: - self.aro = None - result['status'] = 0 - result['varso'] = varso - result['stdout'] = self.stdout - if self._result_callback: - self._result_callback(result) - - self._is_active = False # Release this kernel - self._status = STATUS_READY + """ + Automatically check for progress on the executing Execute objects. + + We can iterate the dict of ASyncResult objects by original Execute. Using the + type of the ASyncResult we can then perform the appropriate action on the data, + using the Execute-linked callbacks to pass the data on. This makes handling + different Execute types relatively seamless. + + """ + if not self.is_active: + return False + + ars_waiting = False + + for ex, ars in self.ar_by_exec.items(): + # e is the original Exec object (for callbacks) + # ars is a list of ASyncResult objects originating from it + + # Finally iterate each object + for ar in ars: + ar_type = self.ar_types[ar] + try: + ar_result = ar.get(0) + + except TimeoutError: + ars_waiting = True + + except RemoteError as e: + # Handle all code exceptions and pass back the exception + result = { + 'status': -1, + 'traceback': '\n'.join(e.render_traceback()), + 'stdout': self.stdout + ar.stdout, + } + + # Emit the task-error signal (cancel dependencies) + self.task.error.emit() + + # Emit the error via the Execute object (to the tool) + ex.result.emit(result) + self._status = STATUS_ERROR + # Interrupt and stop here + self.reset() + + else: + # Success on retrieve + if ar_type == AR_PUSH: + pass + + if ar_type == AR_EXECUTE: + # We should emit the complete notification here; but we have no way of + # knowing whether the AR relates the first, or 50th exec that was triggered? + self.stdout += ar.stdout + + if ar_type == AR_PULL: + result = { + 'status': 0, + 'varso': ar_result, + 'stdout': self.stdout + ar.stdout, + } + ex.result.emit(result) + + # Check if all jobs have completed; then shutdown and exit + if ars_waiting == False: + if self.task: + # Emit the task-complete signal (allow dependencies to run) + self.task.complete.emit() + + # Reset this kernel, ready to go + self._status = STATUS_READY + self.reset() + + def reset(self): + self._is_active = False # Release this kernel + self.ar_by_exec = {} + self.ar_types = {} + self.stdout = "" + self.task = None def check_progress(self): - if self.ar and self._progress_callback: - lines = self.ar.stdout.split('\n') - cre = re.compile("____pathomx_execute_progress_(.*)____") - for l in lines: - m = cre.match(l) - if m: - self._progress_callback(float(m.group(1))) - else: - return None - - -class InProcessRunner(BaseFrontendMixin, QObject): - ''' - A runner object that handles running the running tool code via an in-process IPython - kernel. Base off initial runipy code amended to handle in-process running and the IPython FrontendWidget. - ''' - - # Emitted when a user visible 'execute_request' has been submitted to the - # kernel from the FrontendWidget. Contains the code to be executed. - executing = pyqtSignal(object) - - # Emitted when a user-visible 'execute_reply' has been received from the - # kernel and processed by the FrontendWidget. Contains the response message. - executed = pyqtSignal(object) - - # Emitted when an exit request has been received from the kernel. - exit_requested = pyqtSignal(object) - - # Execute next cell - execute_next = pyqtSignal() - - # Emit current cell number - progress = pyqtSignal(object) - - _CallTipRequest = namedtuple('_CallTipRequest', ['id', 'pos']) - _CompletionRequest = namedtuple('_CompletionRequest', ['id', 'pos']) - _ExecutionRequest = namedtuple('_ExecutionRequest', ['id', 'kind']) - _local_kernel = False - _hidden = False - - MIME_MAP = { - 'image/jpeg': 'jpeg', - 'image/png': 'png', - 'text/plain': 'text', - 'text/html': 'html', - 'text/latex': 'latex', - 'application/javascript': 'html', - 'image/svg+xml': 'svg', - } - #--------------------------------------------------------------------------- - # 'object' interface - #--------------------------------------------------------------------------- - - def __init__(self, *args, **kwargs): - super(InProcessRunner, self).__init__(*args, **kwargs) - # FrontendWidget protected variables. - self._kernel_manager = None - self._kernel_client = None - self._request_info = { - 'execute': {} - } - - self._callback_dict = {} - - self._result_queue = [] - self._final_msg_id = None - self._cell_execute_ids = {} + if self.status != STATUS_RUNNING: + return False + + cre = re.compile("____pathomx_execute_progress_(.*)____") + + for ex, ars in self.ar_by_exec.items(): + for ar in ars: + lines = ar.stdout.split('\n') + for l in lines: + m = cre.match(l) + if m: + ex.progress.emit(float(m.group(1))) - self.is_active = False - self.status = STATUS_READY - self._executing = False - # Set flag for whether we are connected via localhost. - self._local_kernel = kwargs.get('local_kernel', - InProcessRunner._local_kernel) +class Execute(QObject): + """ + Execute is a set of execution step to run on the kernel - self.kernel_manager = KernelManager() - self.kernel_manager.start_kernel() + An exec consists of the code to run (a list) and supports callbacks + for all the possible results of that execution: success, failure, result, etc. + """ - self.kernel_client = self.kernel_manager.client() - self.kernel_client.start_channels() + result = pyqtSignal(dict) + progress = pyqtSignal(float) + complete = pyqtSignal() - def __del__(self): - if self.kernel_client: - self.kernel_client.stop_channels() - if self.kernel_manager: - self.kernel_manager.shutdown_kernel() + def __init__(self, varsi=None, code=None, varso=None, language="python", metadata=None): + super(Execute, self).__init__() + + self.language = language - def run(self, tool, varsi, progress_callback=None, result_callback=None): - ''' - Run all the cells of a notebook in order and update - the outputs in-place. - ''' - self.is_active = True self.varsi = varsi + self.code = code + self.varso = varso - self._progress_callback = progress_callback - self._result_callback = result_callback + self.metadata = metadata - self._result_queue = [] # Cache for unhandled messages - self._cell_execute_ids = {} - self._execute_start = datetime.now() - self._execute('%reset_selective -f [^_]') - self.kernel_manager.kernel.shell.push({'varsi': varsi}) - self._execute(r'''from pathomx.kernel_helpers import pathomx_notebook_start, pathomx_notebook_stop, progress, open_with_progress -pathomx_notebook_start(varsi, vars());''') +class Task(QObject): + """ + Task is a set of Exec objects to run on the kernel - setup_languages(self._execute, tool.language) + An task consists of the Code objects to run (a list). - msg_id = self._execute(tool.code) - self._cell_execute_ids[msg_id] = (tool.code, 1, 100) # Store cell and progress - self._final_msg_id = self._execute(r'''pathomx_notebook_stop(vars());''') + The Runner takes the Task and executes all Exec objects *immediately* on the kernel, + lined up in order. Execution then continues until completion, or error. In + the event of error the kernel is interrupted and execution will stop. - def run_completed(self, error=False, traceback=None): - logging.info("Notebook run took %s" % (datetime.now() - self._execute_start)) - result = {} - if error: - result['status'] = -1 - result['traceback'] = traceback - else: - # Apply unhandled results - for msg in self._result_queue: - self._handle_execute_result(msg) + When the 'complete' function is called on this object, it is disposed + of by the parent job. - result['status'] = 0 - # Return input; temp - result['varso'] = self.kernel_manager.kernel.shell.user_ns['varso'] + The parent job can connect to these events and pass the even onto the relevant handler. + """ - self.is_active = False - if self._result_callback: - self._result_callback(result) + status = pyqtSignal() + error = pyqtSignal() - def _execute(self, source): - """ Execute 'source'. If 'hidden', do not show any output. + traceback = pyqtSignal(dict) - See parent class :meth:`execute` docstring for full details. - """ + complete = pyqtSignal() - msg_id = self.kernel_client.execute(source, True) - self._request_info['execute'][msg_id] = self._ExecutionRequest(msg_id, 'user') - return msg_id - - #--------------------------------------------------------------------------- - # 'BaseFrontendMixin' abstract interface - #--------------------------------------------------------------------------- - def _handle_clear_output(self, msg): - """Handle clear output messages.""" - if not self._hidden: # and self._is_from_this_session(msg): - wait = msg['content'].get('wait', True) - if wait: - self._pending_clearoutput = True - else: - self.clear_output() - - def _handle_execute_reply(self, msg): - """ Handles replies for code execution. - """ - logging.debug("execute: %s", msg.get('content', '')) - msg_id = msg['parent_header']['msg_id'] + def __repr__(self): + s = "Task %s: " % id(self) + if self.execute: + for e in self.execute: + s += "%s\n" % e.metadata + return s - if msg_id == self._final_msg_id: - return self.run_completed() - if msg_id not in self._cell_execute_ids: - return + def __init__(self, job, execute=None, dependencies=None): + super(Task, self).__init__() - (self._current_cell, n, pc) = self._cell_execute_ids[msg_id] - - logging.info("Execute cell %d complete in %s" % (n, datetime.now() - self._execute_start)) - - #self.progress.emit( pc ) - if self._progress_callback: - self._progress_callback(pc) - - info = self._request_info['execute'].get(msg_id) - # unset reading flag, because if execute finished, raw_input can't - # still be pending. - self._reading = False - - if info and info.kind == 'user' and not self._hidden: - # Make sure that all output from the SUB channel has been processed - # before writing a new prompt. - self.kernel_client.iopub_channel.flush() - - content = msg['content'] - status = content['status'] - if status == 'ok': - self._process_execute_ok(msg) - self.execute_next.emit() - elif status == 'error': - self._process_execute_error(msg) - elif status == 'aborted': - self._process_execute_abort(msg) - - self.executed.emit(msg) - self._request_info['execute'].pop(msg_id) - elif info and info.kind == 'silent_exec_callback' and not self._hidden: - self._handle_exec_callback(msg) - self._request_info['execute'].pop(msg_id) - else: - super(FrontendWidget, self)._handle_execute_reply(msg) + self.job = job - def _process_execute_abort(self, msg): - """ Process a reply for an aborted execution request. - """ - logging.error("ERROR: execution aborted\n") + if dependencies is None: + dependencies = [] - def _process_execute_error(self, msg): - """ Process a reply for an execution request that resulted in an error. - """ - content = msg['content'] - # If a SystemExit is passed along, this means exit() was called - also - # all the ipython %exit magic syntax of '-k' to be used to keep - # the kernel running - if content['ename'] == 'SystemExit': - keepkernel = content['evalue'] == '-k' or content['evalue'] == 'True' - self._keep_kernel_on_exit = keepkernel - self.exit_requested.emit(self) - else: - traceback = '\n'.join(content['traceback']) - self.run_completed(error=True, traceback=traceback) + # Execs that must run before this task + self.dependencies = dependencies - def _process_execute_ok(self, msg): - """ Process a reply for a successful execution request. - """ - pass + # Execution + self.execute = execute - def _handle_kernel_died(self, since_last_heartbeat): - """Handle the kernel's death (if we do not own the kernel). - """ - logging.warn("kernel died") - self.reset() + # Kernel that this Exec is/was running on + self.kernel = None + + # Complete callback (default) others can be added + self.complete.connect(self.completed) + self.error.connect(self.errored) - def _handle_kernel_info_reply(self, *args, **kwargs): + def ready(self): pass - def _handle_kernel_restarted(self, died=True): - """Notice that the autorestarter restarted the kernel. + def completed(self): + # Remove this exec object from the running list + self.job.tasks_running.remove(self) + # Add ourselves to the complete list + self.job.tasks_complete.append(self) + + def errored(self): + self.job.tasks_running.remove(self) + self.job.tasks_errored.append(self) + + + +class Job(QObject): + """ + A Job to be executed on the available kernels. + + Each Job can be as simple or as complex as needed, but presents a consistent API to the Queue system + and kernel. This base Job class handles the simplest case of executing bare script on the remote server. + For a more complex implementation see ToolJob, which builds a series of Exec objects from a given tool. + + The Job.next() callback returns a Task consisting of [list] of Exec objects to pass onto the kernel Runner. + Each Exec object must handle it's own callbacks, result handling etc. transparently. + + All objects passed in single [Exec] must be capable of running simultaneously on the target + kernel, passed Exec objects will be executed in turn, with no dependency checking: i.e. + the job object itself must handle the appropriate passing/preparation for dependent variables + and pass these through. + + """ + + def __init__(self): + super(Job, self).__init__() + + self.status = STATUS_READY + self.tasks_queued = [] + self.tasks_running = [] + self.tasks_complete = [] + self.tasks_errored = [] - There's nothing to do but show a message. + self.is_active = False + + def start(self): """ - logging.warn("kernel restarted") - self.reset() + Pre-run initialisation + """ + self.is_active = True + self.status = STATUS_RUNNING - def _handle_execute_result(self, msg): - """ Handle display hook output. + def stop(self): + """ + Post-run cleanup """ + self.is_active = False + self.status = STATUS_COMPLETE - logging.debug("execute_result: %s", msg.get('content', '')) - if not self._hidden: # and self._is_from_this_session(msg): - msg_id = msg['parent_header']['msg_id'] + def next(self, kernel=None): + """ + Return the next available Exec object (kernel is ignored) + """ + try: + e = self.tasks_queued.pop() + + except IndexError: # Empty exec list + self.is_active = False + return None + + else: + self.tasks_running.append(e) + return e - if msg_id not in self._cell_execute_ids: # Only on the in-process kernel can this happen - self._result_queue.append(msg) - return - (cell, n, pc) = self._cell_execute_ids[msg_id] +class CodeJob(Job): - #out = NotebookNode(output_type='display_data') - for mime, data in msg['content']['data'].items(): - try: - attr = self.MIME_MAP[mime] - except KeyError: - raise NotImplementedError('unhandled mime type: %s' % mime) - #setattr(out, attr, data) + def __init__(self, code, language='python'): + super(CodeJob, self).__init__() - #cell['outputs'].append(out) + # Create a single code-object job and push it to the queue + self.tasks_queued.append( + Task(self, execute=[Execute(code, language=language)]) + ) + + +class ToolJob(Job): + """ + Job object to run a given tool, and all subsequent tools in the workflow + + Each job encapsulates the full set of tool runs that must occur to complete the execution. + This involves traversing from the initiating tool through the tree of + watchers, until hitting a dead end or a paused tool. + + The result is a list of tools that are to be executed in turn. Metadata (config, code) + from the tools is locked, and a set of generic metadata (rcParams, styles) is locked + for the entire job. + + Initiation of execution starts at T0. + + All tools are set to status 'unknown' on the initiation of execution. + + Callbacks are registered for various steps; the job object acts as the intermediary + passing the execution onto the relevant tool. + + Next execution is triggered on .next() and will attempt to find a tool who's parents + have all executed. The optional kernel parameter will attempt to execute on the + best kernel, avoiding redundant variable passing. If unable, a set of varsi will be passed. + + On error, any children of the failed tool (and their children) are removed, other execution + may continue as expected. + + A set list of tool ids is available for checking vs. other jobs. If a newer job is a superset + of an existing job, the existing job is deleted. + """ - def _handle_stream(self, msg): - """ Handle stdout, stderr, and stdin. + def __init__(self, tool, varsi, *args, **kwargs): """ - logging.debug("stream: %s", msg.get('content', '')) - if not self._hidden: # and self._is_from_this_session(msg): - logging.info(msg['content']['text']) + Generate an execution queue from the supplied singular tool + current tool is in the head position, execution will start from there. - def _handle_shutdown_reply(self, msg): - """ Handle shutdown signal, only if from other console. + As passing each tool, lock the code and config into the tool ensure static + for the whole of execution, ignore future changes. + + A set of tool objects will uniquely describe this job, + and allow superset or == jobs to delete this job. """ - logging.info("shutdown: %s", msg.get('content', '')) - restart = msg.get('content', {}).get('restart', False) - if not self._hidden: # and not self._is_from_this_session(msg): - # got shutdown reply, request came from session other than ours - if restart: - # someone restarted the kernel, handle it - self._handle_kernel_restarted(died=False) - else: - # kernel was shutdown permanently - self.exit_requested.emit(self) - - def _handle_status(self, msg): - """Handle status message""" - # This is where a busy/idle indicator would be triggered, - # when we make one. - state = msg['content'].get('execution_state', '') - if state == 'starting': - # kernel started while we were running - if self._executing: - self._handle_kernel_restarted(died=True) - elif state == 'idle': - pass - elif state == 'busy': - pass - #--------------------------------------------------------------------------- - # 'FrontendWidget' public interface - #--------------------------------------------------------------------------- + super(ToolJob, self).__init__(*args, **kwargs) - def interrupt_kernel(self): - """ Attempts to interrupt the running kernel. - - Also unsets _reading flag, to avoid runtime errors - if raw_input is called again. + # Global varsi must be passed at the start of each Exec job as may not be present otherwise + # otherwise. Stored here for re-use. n.b. within a Job these should not change (persistance) + # may need to account for this by taking a copy of styles here? + # self.global_varsi = { + # 'rcParams': {k: v for k, v in rcParams.items() if k not in strip_rcParams}, + # '_pathomx_database_path': os.path.join(utils.scriptdir, 'database'), + # 'styles': styles, + # } + + global_varsi = varsi.copy() + + # Build the queue of Exec objects; + self.execs_queue = [] + self.exec_tool_lookup = {} + tool_task_lookup = {} + + process_queue = [tool] + process_done = [] + + tool_list = [] + exec_list = [] + + previous_tool = None + + while len(process_queue) > 0: + + # Remove duplicates + process_queue = list(set(process_queue)) + t = process_queue.pop(0) # From front + + # Check for what that this tool depends on + parents = t.get_parents() + + if len(parents) > 0 \ + and len( set(parents) & set(process_queue) ) > 0: + # waiting on something here, push to the back of the list for later + process_queue.append(t) + continue + + # Build an exec object for the given tool: note that at this point we cannot determine whether the + # vars are on the correct kernel. We can't seed at this point either, as the result of subsequent + # calculations will not be reflected. The solution is to populate Exec.varsi{} at runtime dispatch. + # In order to make the neccessary data available at that time, we here store it via lookup. + varsi = { + 'config': t.config.as_dict(), + '_pathomx_tool_path': t.plugin.path, + '_pathomx_expected_output_vars': list( t.data.o.keys() ), + } + + # Build the IO magic + # - if the source did not run on the current runner we'll need to push the data over (later) + io = {'input': {}, 'output': {}, } + for i, sm in t.data.i.items(): + if sm: + mo, mi = sm + io['input'][i] = "_%s_%s" % (mi, id(mo.v)) + else: + io['input'][i] = None + for o in t.data.o.keys(): + io['output'][o] = "_%s_%s" % (o, id(t)) + varsi['_io'] = io + + + e = Execute( + varsi=varsi, + code=[ + "from pathomx.kernel_helpers import pathomx_notebook_start, pathomx_notebook_stop, progress, open_with_progress; pathomx_notebook_start(vars());", + t.code, + "pathomx_notebook_stop(vars());", + ], + varso=['varso'], + language=t.language, + metadata={'name': t.name, 'tool': t }, + ) + + e.progress.connect(t.progress.emit) + e.result.connect(t._worker_result_callback) + + # Store the tool for this Exec object; for dispatch calculation + self.exec_tool_lookup[e] = tool + + + watchers = [w.v for k, v in t.data.watchers.items() for w in v] + for w in watchers: + if w not in process_done and w not in process_queue: + # Put all watchers at the front of the list, this will iteratively ensure we can straight-line + # down at least one path once started + process_queue.insert(0, w) + + + # Determine the position of the object + # Is before fork; if there is more than 1 tool watching this one + is_before_fork = len(watchers) > 1 + # Is the end of a fork (watchers have > 1 parent) + is_end_of_fork = len([p for w in watchers for p in w.get_parents()]) > 1 + + + if previous_tool is not None and previous_tool not in parents: + # We are not a direct descendant we're going to have to start a new Task. + # There should be more effort to mitigate this in ordering of the task-queue + task = Task(self, execute=exec_list) + self.tasks_queued.append(task) + + for pt in tool_list: + tool_task_lookup[pt] = task + + exec_list = [] + tool_list = [] + + # If this is the first execute object in the queue (list is empty), update it with the global vars for run + # and store the head of branch tool for later dependencies + if not exec_list: + e.varsi.update( global_varsi ) + + tool_list.append(t) + exec_list.append(e) + + if is_before_fork or is_end_of_fork: + # We've got >1 children, we need to create a split task again + task = Task(self, execute=exec_list) + self.tasks_queued.append(task) + + for pt in tool_list: + tool_task_lookup[pt] = task + + exec_list = [] + tool_list = [] + + process_done.append(t) + previous_tool = t + + if exec_list: # Remainders + task = Task(self, execute=exec_list) + self.tasks_queued.append( task ) + + logging.debug("task_queue: %s" % self.tasks_queued) + + + # Apply the dependencies to each task: we need to do this at the end to avoid missing due to order + for t in self.tasks_queued: + if t.execute: + e0 = t.execute[0] + dependencies = [] + for ti in e0.metadata['tool'].get_parents(): + if ti in tool_task_lookup: + dependencies.append( tool_task_lookup[ti] ) + t.dependencies = dependencies + + self.tool_list = process_done + + def start(self): + if not self.is_active: + # Reset all tools in this Job to clear-status (not ready) + for t in self.tool_list: + t.status.emit('ready') + + super(ToolJob, self).start() + + def next(self, kernel=None): """ - self._reading = False - self.kernel_manager.interrupt_kernel() + Request the next Task object to run on the specified kernel + + The provided kernel identifier is used to determine whether the + parent tools' data must be sent through before execution. For + linear execution the data passing can be handled complete + kernel-side by variable copying, however on branched execution + tool execution may occur on a kernel where the parent was not + yet, or last, run. In this case the data must be passed over before + in the variable in. + + The passing should be logged, and (post-send?) the tool updated to + reflect that it's data is now *also* on the other kernel. + + ? callback on the vars push ? - def restart_kernel(self, message, now=False): - """ Attempts to restart the running kernel. """ - # Pause the heart beat channel to prevent further warnings. - self.kernel_client.hb_channel.pause() - try: - self.kernel_manager.restart_kernel(now=now) - except RuntimeError as e: - logging.error('Error restarting kernel: %s\n' % e) + if not self.tasks_queued: + return None + + for t in self.tasks_queued: + if not t.dependencies or len( set(t.dependencies) - set(self.tasks_complete) ) == 0: + # We have an Exec not waiting on dependencies + break else: - logging.info("Restarting kernel...\n") + return False # Waiting + # Remove this task from the queue + self.tasks_queued.remove(t) + if set(t.dependencies) & set(self.tasks_errored): + # A dependency has errored, we can't run this task (ever); add to error list and skip it + self.tasks_errored.append(t) + return False -class RunManager(QObject): - ''' - Auto-creating and managing distribution of notebook runners for notebooks. - Re-population handled on timer. Keeping a maximum N available at all times. - ''' + # Handle the exec here + # We receive the kernel identifier from the Queue, so here we can determine whether the parent(s) + # tools were run on the same kernel. There are two scenarios here: + # - 1. Initiating a Job, single/multiple parent tool, needs feeding in to startup + # - 2. Fork parallel job, needs feeding in to continue + # + # Note, we need to ensure we are sending up-to-date data (i.e. the previous Exec has finished, and results + # have been exported before we start on with the next). This locking can be achieved using the Exec dependencies. + + # Check whether the parents of the head-of-queue were run on this kernel + # Get the original tool; build a list of all parents + their respective kernels + + # Iterate each, find if it's current data is on *this* kernel, if so carry on + # if not, we'll need to pass it in (can stuff it into the first Exec, or add a new one?) + varsi = {} + # Apply the dependencies to each task: we need to do this at the end to avoid missing due to order + if t.execute: + # We only need to get dependencies for the head Execution; as the branching logic means that >1 parent + # anywhere >1 parent == a new Task + e0 = t.execute[0] + tool = e0.metadata['tool'] + if kernel not in tool.current_data_on_kernels: + # Build the dict to send, + # will also want to build some kind of callback to track this + for i, sm in tool.data.i.items(): + if sm: + mo, mi = sm + # We need to push the actual data; this should do it? + varsi['_%s_%s' % (mi, id(mo.v))] = tool.data.get(i) - start = pyqtSignal() - is_parallel = False + # We've got something to send! + if varsi: + print("!!!WE'RE SENDING DATA!!!", varsi) + e = Execute(varsi=varsi) + # FIXME: This will need a callback wrapped function to pass the extra data without some nasty shit + e.complete.connect(self.complete_move_data_to_kernel) + # Put this Execute instruction at the head of list + t.execute.insert(0, e) + + self.tasks_running.append(t) + return t + + + def complete_move_data_to_kernel(self, tool_ids_kernel_ids): + pass + + + def complete(): + """ + Job complete + """ + + + + + +class Queue(QObject): + """ + RunManager manages jobs in an internal Queue, automating running, + handling and cleanup. Scheduling to specific cluster kernels + for the current job is also handled, depending on the cluster + for the run-status of the previous job. + + + """ - # Store metadata about tools' last run for variable passing etc. - run_metadata = {} + start = pyqtSignal() def __init__(self): - super(RunManager, self).__init__() + super(Queue, self).__init__() self.runners = [] self.jobs = [] # Job queue a tuple of (notebook, success_callback, error_callback) @@ -516,23 +736,20 @@ def __init__(self): self.p = None self.client = None - def __del__(self): - self.terminate_cluster() - def start_timers(self): self._run_timer = QTimer() self._run_timer.timeout.connect(self.run) - self._run_timer.start(1000) # Auto-check for pending jobs every 1 second; this shouldn't be needed but some jobs get stuck(?) + self._run_timer.start(100) # Auto-check for pending jobs every 0.1 second; this shouldn't be needed but some jobs get stuck(?) self._cluster_timer = QTimer() self._cluster_timer.timeout.connect(self.create_runners) self._cluster_timer.start(5000) # Re-check runners every 5 seconds - def add_job(self, tool, varsi, progress_callback=None, result_callback=None): - # We take a copy of the notebook, so changes aren't applied back to the source - # ensuring each run starts with blank slate - self.jobs.append((tool, varsi, progress_callback, result_callback)) - self.start.emit() # Auto-start on every add job + def add(self, job): + self.jobs.append(job) + + # We fire an additional + self.start.emit() @property def no_of_kernels(self): @@ -543,14 +760,15 @@ def no_of_active_kernels(self): return sum([1 if k.is_active else 0 for k in self.runners]) def run(self): + # Check for jobs if not self.jobs: return False logging.info('Currently %d jobs remaining' % len(self.jobs)) - # We have job, get it - tool, varsi, progress_callback, result_callback = self.jobs.pop(0) # Remove from the beginning + job = self.jobs.pop() # Get the job at the front of the queue + # (? can we be more intelligent about this is there are a lot of runners available) # Identify the best runner for the job # - which runners are available @@ -561,43 +779,31 @@ def run(self): # That'll do for now break else: - self.jobs.insert(0, (tool, varsi, progress_callback, result_callback)) + # Can't run now, we'll have to wait + self.jobs.append(0, job) return False - if hasattr(tool, 'data'): - # We can run code without an associated tool (e.g. for central-setup) - varsi['_pathomx_expected_output_vars'] = list( tool.data.o.keys() ) + # Initialise the job (this is a no-op if already running) + job.start() - # Build the IO magic - # - if the source did not run on the current runner we'll need to push the data over - io = {'input': {}, 'output': {}, } - for i, sm in tool.data.i.items(): - if sm: - mo, mi = sm - io['input'][i] = "_%s_%s" % (mi, id(mo.v)) - - # Check if the last run of this occurred on the selected runner - if id(mo.v) in self.run_metadata and \ - self.run_metadata[id(mo.v)]['last_runner'] != id(runner): + # Get the details for the next execution step + e = job.next() - # We need to push the actual data; this should do it? - varsi['_%s_%s' % (mi, id(mo.v))] = tool.data.get(i) - else: - io['input'][i] = None - - for o in tool.data.o.keys(): - io['output'][o] = "_%s_%s" % (o, id(tool)) - - varsi['_io'] = io + if e is False: + # Job is waiting on something to complete; wait and trigger a post-poned fire of self.run() + self.jobs.insert(0, job) + return False - self.run_metadata[id(tool)] = { - 'last_runner': id(runner) - } + elif e is None: + # Job has completed; let it go + return - tool.logger.info("Starting job....") + else: + # If we're here, we've got an Exec object in e, that is good to go on the current runner + runner.run(e) - # Result callback gets the varso dict - runner.run(tool, varsi, progress_callback=progress_callback, result_callback=result_callback) + # Might not be finished yet + self.jobs.insert(0, job) def restart(self): self.stop_cluster() @@ -616,9 +822,9 @@ def stop_cluster(self): pass self.p = None - self.client.shutdown() - self.client = None - self.runners = [self.in_process_runner] + if self.client: + self.client.shutdown() + self.client = None def terminate_cluster(self): if self.p: @@ -643,32 +849,25 @@ def create_runners(self): if self.client is None: self.client = Client(timeout=5) - # FIXME: Inline plots are fine as long as we don't do it on the cluster+the interactive kernel; this results - # in an image cache being generated that breaks the pickle - - for e in self.client: + for k in self.client: found = False for r in self.runners: - if e.targets == r.e.targets: + if k.targets == r.k.targets: found = True if not found: - runner = ClusterRunner(e) - runner.e.execute('%reset -f') - runner.e.execute('%matplotlib inline') + runner = Runner(k) + runner.k.execute('%reset -f') + runner.k.execute('%matplotlib inline') + runner.k.apply(use_dill) self.runners.append(runner) + + else: # We've got a -value for poll; it's terminated this will trigger restart on next poll self.stop_cluster() - def create_user_kernel(self): - # Create an in-process user kernel to provide dynamic access to variables - # Start an in-process runner for the time being - self.in_process_runner = InProcessRunner() - self.in_process_runner.kernel_client.execute('%reset -f') - #self.in_process_runner.kernel_client.execute('%matplotlib inline') - class ExecuteOnly(object): language = 'python' diff --git a/pathomx/ui.py b/pathomx/ui.py index c55301d..fd131af 100755 --- a/pathomx/ui.py +++ b/pathomx/ui.py @@ -27,6 +27,7 @@ from .views import HTMLView, StaticHTMLView, ViewManager, NotebookView, IPyMplView, DataFrameWidget, SVGView, ImageView # Translation (@default context) from .translate import tr +from .runqueue import ToolJob import requests @@ -1340,6 +1341,8 @@ def __init__(self, parent, name=None, code="", position=None, auto_focus=True, a self._latest_generator_result = None self._auto_consume_data = auto_consume_data + self.current_data_on_kernels = [] + # Set this to true to auto-start a new calculation after current (block multi-runs) self._is_job_active = False self._queued_start = False @@ -1430,7 +1433,7 @@ def init_auto_consume_data(self): if self._auto_consume_data: self._is_autoconsume_success = self.data.consume_any_app(current_tools[::-1]) # Try consume from any app; work backwards - self.data.source_updated.connect(self.autogenerate) # Auto-regenerate if the source data is modified + # self.data.source_updated.connect(self.autogenerate) # Auto-regenerate if the source data is modified self.config.updated.connect(self.autoconfig) # Auto-regenerate if the configuration changes if self.autoconfig_name: @@ -1509,6 +1512,9 @@ def get_icon(self): icon_path = os.path.join(self.plugin.path, 'icon.png') return QIcon(icon_path) + def get_parents(self): + return [s[0].v for i, s in self.data.i.items() if s is not None] + def autogenerate(self, *args, **kwargs): self.logger.debug("autogenerate %s" % self.name) if self._pause_analysis_flag: @@ -1520,27 +1526,26 @@ def generate(self): self.logger.info("Running tool %s" % self.name) strip_rcParams = ['tk.pythoninspect', 'savefig.extension'] - varsi = { - 'config': self.config.as_dict(), - 'rcParams': {k: v for k, v in rcParams.items() if k not in strip_rcParams}, - 'styles': styles, - '_pathomx_tool_path': self.plugin.path, + global_varsi = { + '_rcParams': {k: v for k, v in rcParams.items() if k not in strip_rcParams}, + '_styles': styles, '_pathomx_database_path': os.path.join(utils.scriptdir, 'database'), } self.status.emit('active') self.progress.emit(0.) - notebook_queue.add_job(self, varsi, progress_callback=self.progress.emit, result_callback=self._worker_result_callback) # , error_callback=self._worker_error_callback) + notebook_queue.add( ToolJob(self, global_varsi) ) + def _worker_result_callback(self, result): self.progress.emit(1.) if 'stdout' in result: - self.logger.error(result['stdout']) + self.logger.info(result['stdout']) if result['status'] == 0: - self.logger.debug("Notebook complete %s" % self.name) + self.logger.debug("Execute complete: %s" % self.name) self.status.emit('done') varso = result['varso'] @@ -1549,12 +1554,10 @@ def _worker_result_callback(self, result): styles = varso['styles'] elif result['status'] == -1: - self.logger.debug("Notebook error %s" % self.name) + self.logger.debug("Execute error: %s" % self.name) self.status.emit('error') self.logger.error(result['traceback']) varso = {} - #varso['_pathomx_result_notebook'] = result['notebook'] - #self.nb = result['notebook'] self.worker_cleanup(varso) @@ -1579,7 +1582,7 @@ def generated(self, **kwargs): # will be fixed by setting status through downstream network once proper queue in effect # Set into the workspace of user kernel - notebook_queue.in_process_runner.kernel_manager.kernel.shell.push({'t%s' % self.id: PathomxTool(self.name, **kwargs)}) + # notebook_queue.in_process_runner.kernel_manager.kernel.shell.push({'t%s' % self.id: PathomxTool(self.name, **kwargs)}) def autoprerender(self, kwargs_dict): self.logger.debug("autoprerender %s" % self.name)