Fix #26 - Rework stdout/stderr handling#38
Conversation
Force the pyspark.java_gateway.launch_gateway to pipe stdout/stderr to the kernel process so that it can be read in a thread instead of mucking with Scala Console streams. Opened https://issues.apache.org/jira/browse/SPARK-21094 about making the piping capability a part of the API.
They were working by timing luck before. Need to resolve adtech-labs#21 to re-enable. (Surprisingly, output in the notebook appears in the correct cell now even though the tests indicate it’s appearing after idle instead of before.)
Codecov Report
@@ Coverage Diff @@
## master #38 +/- ##
==========================================
- Coverage 85.21% 85.18% -0.03%
==========================================
Files 6 6
Lines 399 378 -21
==========================================
- Hits 340 322 -18
+ Misses 59 56 -3 |
Codecov Report
@@ Coverage Diff @@
## master #38 +/- ##
==========================================
- Coverage 85.21% 82.68% -2.53%
==========================================
Files 6 6
Lines 399 387 -12
==========================================
- Hits 340 320 -20
- Misses 59 67 +8 |
| kwargs['stderr'] = subprocess.PIPE | ||
| spark_jvm_proc = subprocess.Popen(*args, **kwargs) | ||
| return spark_jvm_proc | ||
| pyspark.java_gateway.Popen = Popen |
| await asyncio.sleep(0, loop=self.loop) | ||
| else: | ||
| await asyncio.sleep(0.01, loop=self.loop) | ||
| buff = fd.read(8192) |
There was a problem hiding this comment.
what's the rationale for the chunk size here?
There was a problem hiding this comment.
Greater-than-zero to specify that we don't want to wait until the pipe closes. Less than 65k which is (probably) the maximum pipe size these days (https://unix.stackexchange.com/questions/11946/how-big-is-the-pipe-buffer).
spylon_kernel/scala_interpreter.py
Outdated
|
|
||
| If you want to get the result as a Python object, follow this with a | ||
| call to `last_result`. | ||
| Follow this with a call `last_result` to retrieve the result as a |
| ScalaException | ||
| When there is a problem interpreting the code | ||
| """ | ||
| # Ensure the cell is not incomplete. Same approach taken by Apache Zeppelin. |
There was a problem hiding this comment.
is there an obvious place in the zeppelin code where you could link this comment?
There was a problem hiding this comment.
|
|
||
| try: | ||
| res = self.jimain.interpret(code, synthetic) | ||
| res = self.jimain.interpret(code, False) |
There was a problem hiding this comment.
Can you include the rationale for hard coding the synthetic class option to False?
There was a problem hiding this comment.
Nothing in the codebase here ever set it to True that I could find, so I preferred to remove it as YAGNI. As for what the parameter means, I haven't found anything in the API doc (http://www.scala-lang.org/api/2.12.1/scala-compiler/scala/tools/nsc/interpreter/IMain.html) but http://docs.scala-lang.org/glossary/#synthetic-class might apply and explain why False is the correct value. That's a guess, at best.
test_spylon_kernel_jkt.py
Outdated
| code_generate_error = "4 / 0" | ||
|
|
||
| def test_execute_stderr(self): | ||
| raise SkipTest("needs execute result, stream output synchronization") |
There was a problem hiding this comment.
does pytest interpret unittest.SkipTest correctly?
There was a problem hiding this comment.
Yes.
test_spylon_kernel_jkt.py::SpylonKernelTests::test_execute_stderr SKIPPED
test_spylon_kernel_jkt.py::SpylonKernelTests::test_execute_stdout SKIPPED
test/test_scala_kernel.py
Outdated
| code = dedent("""\ | ||
| %%init_spark | ||
| application_name = 'Dave' | ||
| launcher.conf.spark.app.name = 'Dave' |
There was a problem hiding this comment.
nit: use something ungendered. strawman (yes I realize the irony of me using that term): launcher.conf.spark.app.name = whatzit
There was a problem hiding this comment.
I've been seeking an ungendered version of strawman. Draft? Suggestion? Starting point? None are as good.
There was a problem hiding this comment.
Spylon-kernel-task
There was a problem hiding this comment.
I mean ungendered version of "strawman". I'll certainly replace "Dave".
| """ | ||
| nonlocal spark_jvm_proc | ||
| # Override these in kwargs to avoid duplicate value errors | ||
| kwargs['bufsize'] = 0 |
There was a problem hiding this comment.
should this match the size you're reading below (8k)?
There was a problem hiding this comment.
No. Setting to zero sets the streams to unbuffered so they don't block and allow for short reads. Adding a comment with a link to the Python doc about it.
Ensures immediate output of JVM stdout/stderr stream bytes instead of buffering until cell execution completed because the main tornado ioloop is blocked. Fixes most disabled tests.
|
last three commits LGTM |
|
Yeah looks good. Pretty scary but good :) |
* stdout and stderr tests in test_spylon_kernel_jkt.py are duplicates of those in the base class * Various references to application_name remain
* Send stderr to the default JVM log4j location by default * Allow user to %%init_spark --stderr to capture it in the notebook * Keep working tests in order
run_tests.py handles it
| using Python code. | ||
| # Use argparse to parse the whitespace delimited cell magic options | ||
| # just as we would parse a command line. | ||
| @option( |
There was a problem hiding this comment.
optparse apparently. Will adjust the comment.
spylon_kernel/init_spark_magic.py
Outdated
| help="Capture stderr in the notebook instead of in the kernel log" | ||
| ) | ||
| def cell_init_spark(self, stderr=False): | ||
| """%%init_spark --stderr CODE - starts a SparkContext with a custom |
There was a problem hiding this comment.
I don't understand what CODE is supposed to be here. It seems like it is just supposed to be a boolean?
There was a problem hiding this comment.
CODE is the body of the cell containing the Python code to initialize the Spark context. I'll try to make it clearly in the description.
| try: | ||
| handler(chunk) | ||
| except Exception as ex: | ||
| self.log.exception('Exception handling stdout') |
There was a problem hiding this comment.
log.exception() automagically bundles the traceback right?
|
|
||
| @pytest.mark.skip("fails randomly, maybe because interpreter is reused") | ||
| def test_stderr(spylon_kernel): | ||
| @pytest.mark.skip('fails randomly, possibly because of mock reuse across tests') |
There was a problem hiding this comment.
can you walk me through your thoughts on mock reuse in person today?
[edit] not sure why I specified "in person" 🤷♂️
There was a problem hiding this comment.
In-person is a good idea. Let's do that tomorrow.
Summary: I spent a good couple of hours trying to track down why this test is flaky and a better way to write it. I wound up back where I started with this test disabled. Which isn't to say that stdout/err are untested: they are as part of the jupyter_kernel_test suite that spawns a kernel and puts it through the paces before any of the spylon specific tests.
|
All of this LGTM. I'm 👍 on getting this in and continuing to iterate in subsequent PRs. I'm starting to just go commit-by-commit which is going to get tricky to keep track of |
|
Thanks for all the feedback @ericdill . I agree with merging real-soon-now and continuing in other PRs. |
Totally new tact: force pyspark to pipe py4j JVM output back to the parent process and read the streams with a small monkey patch. Opened https://issues.apache.org/jira/browse/SPARK-21094 about making this part of pyspark proper.
This change should completely fix all lost output from Scala and Spark by piping the entire JVM process to the kernel process. There's nowhere else for it to go now.
Surprisingly, output in the notebook now shows up under the proper cell too, even though the kernel unit tests for stdout/stderr fail now. The issue with the tests is that the main ioloop thread needs to yield to let the child process stream consumer threads make all waiting data available before the main thread can return the kernel execution result. Issue #21 remains open to address the problem. As it stands, the user experience is better at the expense of some protocol tests that were passing without reflecting the brokenness of output previously.
I'm opening this PR to make the changes visible for discussion and QA.