# nbjob.ipyparallel_display - better visualizations for ipyparallel

ipyparallel defines an `AsyncResult` datatype, which is the returned for non-blocking remote calls. This adds a display hook to the Jupyter notebook that displays stdout/stderr for the remote execution, as well as the result of the task.

The option to interrupt a remote task is unfortunately broken in the current impementation

In [1]:
import ipyparallel as ipp

In [2]:
from IPython.display import display

In [3]:
import ipywidgets

In [4]:
import html

In [5]:
import socket, signal, os

In [6]:
import time

In [7]:
# TODO: these used to actually hold pids and hosts, but in the latest refactoring
# nothing sets them. The nbjob framework should update these.
pid_map = {}
host_map = {}

In [8]:
def interrupt_engine(eid):
    global host_map, pid_map
    host = host_map[eid]
    pid = pid_map[eid]
    
    if ipp.Client()[eid].queue_status()['queue'] == 0:
        return # Only interrupt if there are running jobs
    
    # HACK: Guard against timing issues
    # (We always want to interrupt the engine if there is a job running,
    #  so the job dies and not the engine)
    ipp.Client()[eid].execute("while True: pass", block=False)
    import time
    time.sleep(0.1)
    
    if host == socket.gethostname():
        # local
        os.kill(pid, signal.SIGINT)
    else:
        !ssh $host kill -INT $pid

In [9]:
def format_output(output, stderr=False):
    return """
        <div class="output_area">
            <div class="output_subarea output_text output_stream output_{kind}">
                <pre>{}</pre>
            </div>
        </div>
        """.format(html.escape(output), kind=("stderr" if stderr else "stdout"))

In [10]:
def async_result_formatter(ar):
    refresh = ipywidgets.Button(description='Refresh')
    interrupt = ipywidgets.Button(description='Interrupt')
    buttons = ipywidgets.VBox()
    buttons.children = [refresh, interrupt]
    
    contents = [ipywidgets.HTML() for _ in range(len(ar))]
    result = ipywidgets.HTML()
    def do_refresh(_ignored=None):
        for i, c in enumerate(contents):
            output = ""
            if ar.stderr:
                output += format_output(ar.stderr[i] if len(ar) >  1 else ar.stderr, stderr=True)
            if ar.stdout:
                output += format_output(ar.stdout[i] if len(ar) > 1 else ar.stdout)
            c.value = """<div style="overflow:scroll; max-height:400px">""" + output + "</div>"

        if ar.ready():
            result.visible = False
            result.value = ""
            refresh.on_click(do_refresh, remove=True)
            buttons.visible = False
            if isinstance(ar.result, list) or (hasattr(ar.result, 'data') and ar.result.data is not None):
                display(ar.result) # This can throw exception!
            elif not hasattr(ar.result, 'data'):
                display(ar.result)
        else:
            result.value = "<pre>{}</pre>".format(html.escape(repr(ar)))
    refresh.on_click(do_refresh)
    do_refresh()
    def do_interrupt(_ignored=None):
        targets = ar._targets
        for target in targets:
            interrupt_engine(target)
        interrupt.description='Re-interrupt'
    
    interrupt.on_click(do_interrupt)
    ar_widget = ipywidgets.VBox()
    contents_widget = ipywidgets.HBox()
    for w in contents:
        w.border_width = "1px"
        w.width = "calc(100% / {})".format(len(contents))
        w.margin = "1px"
    contents_widget.children = contents
    ar_widget.children = [buttons, contents_widget, result]
    display(ar_widget)
    return ""

In [11]:
get_ipython().display_formatter.formatters['text/html'].for_type(
        ipp.client.asyncresult.AsyncResult,
        async_result_formatter)