Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing #659 (output cutoff and iopub timeout) #994

Merged
merged 16 commits into from May 17, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
120 changes: 88 additions & 32 deletions nbconvert/preprocessors/execute.py
Expand Up @@ -6,6 +6,10 @@
import base64
from textwrap import dedent
from contextlib import contextmanager
try:
from time import monotonic # Py 3
except ImportError:
from time import time as monotonic # Py 2

try:
from queue import Empty # Py 3
Expand Down Expand Up @@ -449,6 +453,36 @@ def _update_display_id(self, display_id, msg):
outputs[output_idx]['data'] = out['data']
outputs[output_idx]['metadata'] = out['metadata']

def _poll_for_reply(self, msg_id, cell=None, timeout=None):
try:
# check with timeout if kernel is still alive
msg = self.kc.shell_channel.get_msg(timeout=timeout)
if msg['parent_header'].get('msg_id') == msg_id:
return msg
except Empty:
# received no message, check if kernel is still alive
self._check_alive()
# kernel still alive, wait for a message

def _get_timeout(self, cell):
if self.timeout_func is not None and cell is not None:
timeout = self.timeout_func(cell)
else:
timeout = self.timeout

if not timeout or timeout < 0:
timeout = None

return timeout

def _handle_timeout(self):
djherbis marked this conversation as resolved.
Show resolved Hide resolved
self.log.error(
"Timeout waiting for execute reply (%is)." % self.timeout)
if self.interrupt_on_timeout:
self.log.error("Interrupting kernel")
self.km.interrupt_kernel()
else:
raise TimeoutError("Cell execution timed out")

def _check_alive(self):
if not self.kc.is_alive():
Expand All @@ -458,13 +492,7 @@ def _check_alive(self):

def _wait_for_reply(self, msg_id, cell=None):
# wait for finish, with timeout
if self.timeout_func is not None and cell is not None:
timeout = self.timeout_func(cell)
else:
timeout = self.timeout

if not timeout or timeout < 0:
timeout = None
timeout = self._get_timeout(cell)
cummulative_time = 0
timeout_interval = 5
while True:
Expand All @@ -474,49 +502,77 @@ def _wait_for_reply(self, msg_id, cell=None):
self._check_alive()
cummulative_time += timeout_interval
if timeout and cummulative_time > timeout:
self.log.error(
"Timeout waiting for execute reply (%is)." % self.timeout)
if self.interrupt_on_timeout:
self.log.error("Interrupting kernel")
self.km.interrupt_kernel()
break
else:
raise TimeoutError("Cell execution timed out")
self._handle_timeout()
break
else:
if msg['parent_header'].get('msg_id') == msg_id:
return msg

def _timeout_with_deadline(self, timeout, deadline):
if deadline is not None and deadline - monotonic() < timeout:
timeout = deadline - monotonic()

if timeout < 0:
timeout = 0

return timeout

def _passed_deadline(self, deadline):
if deadline is not None and deadline - monotonic() <= 0:
self._handle_timeout()
return True
return False

def run_cell(self, cell, cell_index=0):
parent_msg_id = self.kc.execute(cell.source)
self.log.debug("Executing cell:\n%s", cell.source)
exec_reply = self._wait_for_reply(parent_msg_id, cell)
exec_timeout = self._get_timeout(cell)
deadline = None
if exec_timeout is not None:
deadline = monotonic() + exec_timeout

cell.outputs = []
self.clear_before_next_output = False

while True:
try:
# We've already waited for execute_reply, so all output
# should already be waiting. However, on slow networks, like
# in certain CI systems, waiting < 1 second might miss messages.
# So long as the kernel sends a status:idle message when it
# finishes, we won't actually have to wait this long, anyway.
msg = self.kc.iopub_channel.get_msg(timeout=self.iopub_timeout)
except Empty:
self.log.warning("Timeout waiting for IOPub output")
if self.raise_on_iopub_timeout:
raise RuntimeError("Timeout waiting for IOPub output")
else:
break
more_output = True
polling_exec_reply = True

while more_output or polling_exec_reply:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more_output being used as a signaling mechanism feels like a pretty significant piece of internal logic. Could you add a comment explaining why we're taking this approach?

I would think a description about the intention of the separate roles of more_output and polling_exec_reply would be enough to address the curious individual.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point, I added some detailed comments around this section to explain why it's written this way. Let me know if it's still not clear for future readers.

if polling_exec_reply:
if self._passed_deadline(deadline):
polling_exec_reply = False
continue

timeout = self._timeout_with_deadline(1, deadline)
exec_reply = self._poll_for_reply(parent_msg_id, cell, timeout)
if exec_reply is not None:
polling_exec_reply = False

if more_output:
try:
timeout = self.iopub_timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now with all the functionality in place, we likely need to address the fact that iopub_timeout no longer acts as it used to. In particular, since we aren't waiting for exec reply to fully complete we'll expect to timeout here often. Which means that raise_on_iopub_timeout will fire when users wouldn't expect.

@mpacer I think we should perhaps remove raise_on_iopub_timeout and iopub_timeout with this change and use timeout = self._timeout_with_deadline(1, deadline) for both conditional branches. Those other configurations just don't make any sense any more and are problematic if defined as they were in earlier versions. What's your thoughts? Even if we do a less severe change we'd need to address the change in timeout behavior somehow with this PR merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I tried to preserve that behavior, it won't hit the raise_on_iopub_timeout unless polling_exec_reply is false, and it's only false if we've passed the deadline or if exec_reply is not None (completed).

If we've passed the deadline we hit handle_timeout, which will either interrupt or throw. If it throws it won't reach this section, if it interrupts I suspect we would get an idle message?

Let me know if I got anything wrong here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yes you're correct, my end-of-conference brain power failed to read properly. Then maybe we should deprecate those fields in the next release (5.5.1) so they can be removed in 6.0? That would help cleanup this code and make it simpler to control in the future imo.

We should add a test for this case if it's not covered yet to ensure we won't timeout unexpectantly. That might be tricky to write with Mocks, so I can probably make that test if it's not clear how to set that up. After that I think I'm good to merge once @mpacer approves / has any further concerns resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey sorry, I was also gone for a week long work even this week.
Looks like I've got more merge conflicts to fix, I'll take a look Monday.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry there's been so many merge conflicts. I've been more careful to review / get more opinions on this PR before merging. Shouldn't be too many more conflicts. I asked @mpacer in person to take a look again when she's available so we don't keep this PR open for much longer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite following the reasoning behind why the fields need to be deprecated. Could you lay out the situation where these fields cease to make sense?

Copy link
Contributor Author

@djherbis djherbis May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MSeal probably has a better answer than me, my interpretation was that iopub_timeout exists to give a small window after receiving exec_reply (so we know all output should be produced by then) for the output to reach the iopub_channel (in case of network latency etc.). I think @MSeal might feel that since we are polling for this output more regularly (and not just after exec_reply) that the purpose of this flag has changed somewhat. Perhaps I shouldn't be using iopub_timeout at all unless exec_reply has finished, and use a constant timeout / deadline similar to _poll_for_reply. I could be misinterpreting @MSeal though so please correct me!

Even with all of that, I still think iopub_timeout is relevant post exec_reply being received, since you may still need to wait for output produced near the end of execution to be received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's pretty much on point. It's odd the to have a post exec_reply response specific timeout in addition to an overall timeout when the two channels are being consumed in parallel.

I'll start a separate PR / Issue on the field independent of this PR to avoid leaving this open longer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MSeal Thanks, what do you think about using a constant (like 1s) insead of the iopub_timeout in this PR for the poll to iopub_channel while polling_exec_reply=True, and then switching to iopub_timeout once exec_reply is received?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly that would be a way to approach it. In general I'm saying I think making it simpler and having 1 timeout would reduce confusion and be less nuanced.

if polling_exec_reply:
timeout = self._timeout_with_deadline(timeout, deadline)
msg = self.kc.iopub_channel.get_msg(timeout=timeout)
except Empty:
if polling_exec_reply:
continue

if self.raise_on_iopub_timeout:
raise TimeoutError("Timeout waiting for IOPub output")
else:
self.log.warning("Timeout waiting for IOPub output")
more_output = False
continue
if msg['parent_header'].get('msg_id') != parent_msg_id:
# not an output from our execution
continue

# Will raise CellExecutionComplete when completed
try:
# Will raise CellExecutionComplete when completed
self.process_message(msg, cell, cell_index)
except CellExecutionComplete:
break
more_output = False

# Return cell.outputs still for backwards compatability
return exec_reply, cell.outputs
Expand Down
64 changes: 64 additions & 0 deletions nbconvert/preprocessors/tests/test_execute.py
Expand Up @@ -32,6 +32,10 @@
from testpath import modified_env
from ipython_genutils.py3compat import string_types

try:
from queue import Empty # Py 3
except ImportError:
from Queue import Empty # Py 2
try:
TimeoutError # Py 3
except NameError:
Expand Down Expand Up @@ -167,6 +171,7 @@ def test_mock_wrapper(self):
shell_channel=MagicMock(get_msg=shell_channel_message_mock()),
execute=MagicMock(return_value=parent_id)
)
preprocessor.parent_id = parent_id
return func(self, preprocessor, cell_mock, message_mock)
return test_mock_wrapper
return prepared_wrapper
Expand Down Expand Up @@ -510,6 +515,65 @@ def test_busy_message(self, preprocessor, cell_mock, message_mock):
# Ensure no outputs were generated
assert cell_mock.outputs == []

@prepare_cell_mocks({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all look great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the tests are great work by @MSeal (thank you!) it's my first contribution so I was unfamiliar with how to use these test frameworks.

'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stdout', 'text': 'foo'},
}, {
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stderr', 'text': 'bar'}
})
def test_deadline_exec_reply(self, preprocessor, cell_mock, message_mock):
# exec_reply is never received, so we expect to hit the timeout.
preprocessor.kc.shell_channel.get_msg = MagicMock(side_effect=Empty())
preprocessor.timeout = 1

with pytest.raises(TimeoutError):
preprocessor.run_cell(cell_mock)

assert message_mock.call_count == 3
# Ensure the output was captured
self.assertListEqual(cell_mock.outputs, [
{'output_type': 'stream', 'name': 'stdout', 'text': 'foo'},
{'output_type': 'stream', 'name': 'stderr', 'text': 'bar'}
])

@prepare_cell_mocks()
def test_deadline_iopub(self, preprocessor, cell_mock, message_mock):
# The shell_channel will complete, so we expect only to hit the iopub timeout.
message_mock.side_effect = Empty()
preprocessor.raise_on_iopub_timeout = True

with pytest.raises(TimeoutError):
preprocessor.run_cell(cell_mock)

@prepare_cell_mocks({
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stdout', 'text': 'foo'},
}, {
'msg_type': 'stream',
'header': {'msg_type': 'stream'},
'content': {'name': 'stderr', 'text': 'bar'}
})
def test_eventual_deadline_iopub(self, preprocessor, cell_mock, message_mock):
# Process a few messages before raising a timeout from iopub
message_mock.side_effect = list(message_mock.side_effect)[:-1] + [Empty()]
preprocessor.kc.shell_channel.get_msg = MagicMock(
return_value={'parent_header': {'msg_id': preprocessor.parent_id}})
preprocessor.raise_on_iopub_timeout = True

with pytest.raises(TimeoutError):
preprocessor.run_cell(cell_mock)

assert message_mock.call_count == 3
# Ensure the output was captured
self.assertListEqual(cell_mock.outputs, [
{'output_type': 'stream', 'name': 'stdout', 'text': 'foo'},
{'output_type': 'stream', 'name': 'stderr', 'text': 'bar'}
])

@prepare_cell_mocks({
'msg_type': 'execute_input',
'header': {'msg_type': 'execute_input'},
Expand Down