-
Notifications
You must be signed in to change notification settings - Fork 555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ExecutePreprocessor using jupyter_kernel_mgmt APIs #809
base: main
Are you sure you want to change the base?
Changes from all commits
e517770
c6f7b98
0c0eccd
0409efb
66eb16a
852b7c6
771dd2c
547d926
93e8d64
05bd50e
5009a7e
e1afd39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ __pycache__ | |
*.bak | ||
.ipynb_checkpoints | ||
.tox | ||
.pytest_cache | ||
.DS_Store | ||
\#*# | ||
.#* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,6 @@ python: | |
- "nightly" | ||
- 3.6 | ||
- 3.5 | ||
- 3.4 | ||
- 2.7 | ||
env: | ||
global: | ||
- PATH=$TRAVIS_BUILD_DIR/bin:$PATH | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -158,16 +158,6 @@ class ExecutePreprocessor(Preprocessor): | |
) | ||
).tag(config=True) | ||
|
||
@default('kernel_name') | ||
def _kernel_name_default(self): | ||
try: | ||
return self.nb.metadata.get('kernelspec', {}).get('name', 'python') | ||
except AttributeError: | ||
raise AttributeError('You did not specify a kernel_name for ' | ||
'the ExecutePreprocessor and you have not set ' | ||
'self.nb to be able to use that to infer the ' | ||
'kernel_name.') | ||
|
||
raise_on_iopub_timeout = Bool(False, | ||
help=dedent( | ||
""" | ||
|
@@ -205,19 +195,6 @@ def _kernel_name_default(self): | |
) | ||
).tag(config=True) | ||
|
||
kernel_manager_class = Type( | ||
config=True, | ||
help='The kernel manager class to use.' | ||
) | ||
@default('kernel_manager_class') | ||
def _kernel_manager_class_default(self): | ||
"""Use a dynamic default to avoid importing jupyter_client at startup""" | ||
try: | ||
from jupyter_client import KernelManager | ||
except ImportError: | ||
raise ImportError("`nbconvert --execute` requires the jupyter_client package: `pip install jupyter_client`") | ||
return KernelManager | ||
|
||
_display_id_map = Dict( | ||
help=dedent( | ||
""" | ||
|
@@ -231,38 +208,6 @@ def _kernel_manager_class_default(self): | |
} | ||
""")) | ||
|
||
def start_new_kernel(self, **kwargs): | ||
"""Creates a new kernel manager and kernel client. | ||
|
||
Parameters | ||
---------- | ||
kwargs : | ||
Any options for `self.kernel_manager_class.start_kernel()`. Because | ||
that defaults to KernelManager, this will likely include options | ||
accepted by `KernelManager.start_kernel()``, which includes `cwd`. | ||
|
||
Returns | ||
------- | ||
km : KernelManager | ||
A kernel manager as created by self.kernel_manager_class. | ||
kc : KernelClient | ||
Kernel client as created by the kernel manager `km`. | ||
""" | ||
km = self.kernel_manager_class(kernel_name=self.kernel_name, | ||
config=self.config) | ||
km.start_kernel(extra_arguments=self.extra_arguments, **kwargs) | ||
|
||
kc = km.client() | ||
kc.start_channels() | ||
try: | ||
kc.wait_for_ready(timeout=self.startup_timeout) | ||
except RuntimeError: | ||
kc.stop_channels() | ||
km.shutdown_kernel() | ||
raise | ||
kc.allow_stdin = False | ||
return km, kc | ||
|
||
@contextmanager | ||
def setup_preprocessor(self, nb, resources, km=None): | ||
""" | ||
|
@@ -295,40 +240,44 @@ def setup_preprocessor(self, nb, resources, km=None): | |
resources : dictionary | ||
Additional resources used in the conversion process. | ||
""" | ||
# Nbconvert can be installed without these, so do the imports only | ||
# when the preprocessor is being used. | ||
from jupyter_kernel_mgmt.discovery import KernelFinder | ||
from jupyter_kernel_mgmt.client import BlockingKernelClient | ||
|
||
path = resources.get('metadata', {}).get('path', '') or None | ||
self.nb = nb | ||
# clear display_id map | ||
self._display_id_map = {} | ||
|
||
if km is None: | ||
self.km, self.kc = self.start_new_kernel(cwd=path) | ||
try: | ||
# Yielding unbound args for more easier understanding and downstream consumption | ||
yield nb, self.km, self.kc | ||
finally: | ||
self.kc.stop_channels() | ||
self.km.shutdown_kernel(now=self.shutdown_kernel == 'immediate') | ||
|
||
for attr in ['nb', 'km', 'kc']: | ||
delattr(self, attr) | ||
kf = KernelFinder.from_entrypoints() | ||
if self.kernel_name: | ||
kernel_name = self.kernel_name | ||
if '/' not in kernel_name: | ||
kernel_name = 'spec/' + kernel_name | ||
else: | ||
self.km = km | ||
if not km.has_kernel: | ||
km.start_kernel(extra_arguments=self.extra_arguments, **kwargs) | ||
self.kc = km.client() | ||
|
||
self.kc.start_channels() | ||
try: | ||
self.kc.wait_for_ready(timeout=self.startup_timeout) | ||
except RuntimeError: | ||
self.kc.stop_channels() | ||
raise | ||
self.kc.allow_stdin = False | ||
try: | ||
yield nb, self.km, self.kc | ||
finally: | ||
for attr in ['nb', 'km', 'kc']: | ||
delattr(self, attr) | ||
kernel_name = 'spec/' + nb.metadata.get('kernelspec', {})[ | ||
'name'] | ||
except KeyError: | ||
kernel_name = 'pyimport/kernel' | ||
|
||
self.log.info("Launching kernel %s to execute notebook" % kernel_name) | ||
conn_info, self.km = kf.launch(kernel_name, cwd=path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To address the async jupyter_kernel_mgmt changes, the only change required to get things working is replacing this line with the following (along with accompanying conn_info, self.km = asyncio.get_event_loop().run_until_complete(kf.launch(kernel_name, cwd=path)) However, what is strange (and independent of this change) is that when I run nbconvert via a debugger (pycharm), I get the following exception:
Since this occurs in either case, I figured its a side-effect of having a These changes also ignore the case where the kernel manager instance could be passed as an argument. We should remove that parameter if that's no longer applicable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Well, obviously a I've gone ahead and created a PR against your branch in case you want to take these changes. If not, just close the PR. Thanks |
||
self.kc = BlockingKernelClient(conn_info, manager=self.km) | ||
self.kc.wait_for_ready() | ||
|
||
self.kc.allow_stdin = False | ||
self.nb = nb | ||
|
||
try: | ||
yield nb, self.km, self.kc | ||
finally: | ||
if self.shutdown_kernel == 'immediate': | ||
self.km.kill() | ||
else: | ||
self.kc.shutdown_or_terminate() | ||
self.kc.close() | ||
|
||
def preprocess(self, nb, resources, km=None): | ||
""" | ||
|
@@ -357,10 +306,9 @@ def preprocess(self, nb, resources, km=None): | |
""" | ||
|
||
with self.setup_preprocessor(nb, resources, km=km): | ||
self.log.info("Executing notebook with kernel: %s" % self.kernel_name) | ||
nb, resources = super(ExecutePreprocessor, self).preprocess(nb, resources) | ||
info_msg = self._wait_for_reply(self.kc.kernel_info()) | ||
nb.metadata['language_info'] = info_msg['content']['language_info'] | ||
info_dict = self.kc.loop_client.kernel_info_dict | ||
nb.metadata['language_info'] = info_dict['language_info'] | ||
|
||
return nb, resources | ||
|
||
|
@@ -383,8 +331,8 @@ def preprocess_cell(self, cell, resources, cell_index): | |
for out in outputs: | ||
if out.output_type == 'error': | ||
raise CellExecutionError.from_cell_and_msg(cell, out) | ||
if (reply is not None) and reply['content']['status'] == 'error': | ||
raise CellExecutionError.from_cell_and_msg(cell, reply['content']) | ||
if (reply is not None) and reply.content['status'] == 'error': | ||
raise CellExecutionError.from_cell_and_msg(cell, reply.content) | ||
return cell, resources | ||
|
||
def _update_display_id(self, display_id, msg): | ||
|
@@ -393,11 +341,11 @@ def _update_display_id(self, display_id, msg): | |
self.log.debug("display id %r not in %s", display_id, self._display_id_map) | ||
return | ||
|
||
if msg['header']['msg_type'] == 'update_display_data': | ||
msg['header']['msg_type'] = 'display_data' | ||
if msg.header['msg_type'] == 'update_display_data': | ||
msg.header['msg_type'] = 'display_data' | ||
|
||
try: | ||
out = output_from_msg(msg) | ||
out = output_from_msg(msg.make_dict()) | ||
except ValueError: | ||
self.log.error("unhandled iopub msg: " + msg['msg_type']) | ||
return | ||
|
@@ -409,76 +357,53 @@ def _update_display_id(self, display_id, msg): | |
outputs[output_idx]['data'] = out['data'] | ||
outputs[output_idx]['metadata'] = out['metadata'] | ||
|
||
def _wait_for_reply(self, msg_id, cell=None): | ||
# wait for finish, with timeout | ||
while True: | ||
try: | ||
if self.timeout_func is not None and cell is not None: | ||
timeout = self.timeout_func(cell) | ||
else: | ||
timeout = self.timeout | ||
|
||
if not timeout or timeout < 0: | ||
timeout = None | ||
msg = self.kc.shell_channel.get_msg(timeout=timeout) | ||
except Empty: | ||
self.log.error( | ||
"Timeout waiting for execute reply (%is)." % self.timeout) | ||
if self.interrupt_on_timeout: | ||
self.log.error("Interrupting kernel") | ||
self.km.interrupt_kernel() | ||
break | ||
else: | ||
try: | ||
exception = TimeoutError | ||
except NameError: | ||
exception = RuntimeError | ||
raise exception("Cell execution timed out") | ||
|
||
if msg['parent_header'].get('msg_id') == msg_id: | ||
return msg | ||
else: | ||
# not our reply | ||
continue | ||
def _get_timeout(self, cell): | ||
if self.timeout_func is not None: | ||
timeout = self.timeout_func(cell) | ||
else: | ||
timeout = self.timeout | ||
|
||
if not timeout or timeout < 0: | ||
timeout = None | ||
return timeout | ||
|
||
def run_cell(self, cell, cell_index=0): | ||
msg_id = self.kc.execute(cell.source) | ||
from tornado import ioloop | ||
|
||
timeout = self._get_timeout(cell) | ||
interrupt_timeout = None | ||
if self.interrupt_on_timeout: | ||
interrupt_timeout = timeout | ||
timeout += 5 | ||
|
||
out_msgs = [] | ||
|
||
self.log.debug("Executing cell:\n%s", cell.source) | ||
exec_reply = self._wait_for_reply(msg_id, cell) | ||
try: | ||
reply = self.kc.execute_interactive( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great to see execute_interactive working out here |
||
cell.source, | ||
output_hook=out_msgs.append, | ||
timeout=timeout, | ||
interrupt_timeout=interrupt_timeout, | ||
idle_timeout=self.iopub_timeout, | ||
raise_on_no_idle=self.raise_on_iopub_timeout, | ||
) | ||
except ioloop.TimeoutError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a problem for now, but could this ioloop.TimeoutError -> TimeoutError happen in jupyter_protocol? Does that seem appropriate to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, I think it would be appropriate for the BlockingKernelClient to do that translation. |
||
raise TimeoutError("Cell execution timed out") | ||
|
||
outs = cell.outputs = [] | ||
|
||
while True: | ||
try: | ||
# We've already waited for execute_reply, so all output | ||
# should already be waiting. However, on slow networks, like | ||
# in certain CI systems, waiting < 1 second might miss messages. | ||
# So long as the kernel sends a status:idle message when it | ||
# finishes, we won't actually have to wait this long, anyway. | ||
msg = self.kc.iopub_channel.get_msg(timeout=self.iopub_timeout) | ||
except Empty: | ||
self.log.warning("Timeout waiting for IOPub output") | ||
if self.raise_on_iopub_timeout: | ||
raise RuntimeError("Timeout waiting for IOPub output") | ||
else: | ||
break | ||
if msg['parent_header'].get('msg_id') != msg_id: | ||
# not an output from our execution | ||
continue | ||
|
||
msg_type = msg['msg_type'] | ||
for msg in out_msgs: | ||
msg_type = msg.header['msg_type'] | ||
self.log.debug("output: %s", msg_type) | ||
content = msg['content'] | ||
content = msg.content | ||
|
||
# set the prompt number for the input and the output | ||
if 'execution_count' in content: | ||
cell['execution_count'] = content['execution_count'] | ||
|
||
if msg_type == 'status': | ||
if content['execution_state'] == 'idle': | ||
break | ||
else: | ||
continue | ||
continue | ||
elif msg_type == 'execute_input': | ||
continue | ||
elif msg_type == 'clear_output': | ||
|
@@ -493,15 +418,15 @@ def run_cell(self, cell, cell_index=0): | |
|
||
display_id = None | ||
if msg_type in {'execute_result', 'display_data', 'update_display_data'}: | ||
display_id = msg['content'].get('transient', {}).get('display_id', None) | ||
display_id = msg.content.get('transient', {}).get('display_id', None) | ||
if display_id: | ||
self._update_display_id(display_id, msg) | ||
if msg_type == 'update_display_data': | ||
# update_display_data doesn't get recorded | ||
continue | ||
|
||
try: | ||
out = output_from_msg(msg) | ||
out = output_from_msg(msg.make_dict()) | ||
except ValueError: | ||
self.log.error("unhandled iopub msg: " + msg_type) | ||
continue | ||
|
@@ -514,7 +439,7 @@ def run_cell(self, cell, cell_index=0): | |
|
||
outs.append(out) | ||
|
||
return exec_reply, outs | ||
return reply, outs | ||
|
||
|
||
def executenb(nb, cwd=None, km=None, **kwargs): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will produce a duplicated
'spec/'
prefix when the existing notebook file has been persisted using the new kernel providers. I modified this to the following when investigating how async support in jupyter_kernel_mgmt affects existing clients (per this comment).