Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash on long lines #1187

Merged
merged 2 commits into from Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -2,9 +2,8 @@ package polynote.kernel.interpreter

import java.io.{OutputStream, PrintStream}
import java.lang.reflect.InvocationHandler

import polynote.kernel.environment.{CurrentRuntime, CurrentTask, PublishResult, PublishStatus}
import polynote.kernel.util.ResultOutputStream
import polynote.kernel.util.{ResultOutputStream, ResultPrintStream}
import polynote.kernel.{BaseEnv, ExecutionStatus, InterpreterEnv, Output, Result, ScalaCompiler, withContextClassLoader}
import polynote.messages.CellID
import polynote.runtime.KernelRuntime
Expand All @@ -26,7 +25,7 @@ class CellExecutor(publishSync: Result => Unit, classLoader: ClassLoader, blocki
blockingExecutor.submit {
new Runnable {
def run(): Unit = {
val console = new PrintStream(new ResultOutputStream(publishSync), true)
val console = new ResultPrintStream(publishSync)()
withContextClassLoader(classLoader) {
try {
Console.withOut(console) {
Expand Down
@@ -1,13 +1,12 @@
package polynote.kernel.util

import java.io.OutputStream
import java.io.{OutputStream, PrintStream}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean

import polynote.kernel.{Output, Result}

class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) extends OutputStream {
class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 65536) extends OutputStream {
private val buf: ByteBuffer = ByteBuffer.allocate(bufSize)
private val closed = new AtomicBoolean(false)

Expand All @@ -21,15 +20,17 @@ class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) exten

override def flush(): Unit = {
super.flush()
buf.synchronized {
val len = buf.position()
if (len > 0) {
val b = ByteBuffer.allocate(buf.position())
val arr = new Array[Byte](buf.position())
buf.rewind()
buf.get(arr)
publishSync(Output("text/plain; rel=stdout", new String(arr, StandardCharsets.UTF_8)))
buf.rewind()
if (buf.hasRemaining) {
buf.synchronized {
val len = buf.position()
if (len > 0) {
val b = ByteBuffer.allocate(buf.position())
val arr = new Array[Byte](buf.position())
buf.rewind()
buf.get(arr)
publishSync(Output("text/plain; rel=stdout", new String(arr, StandardCharsets.UTF_8)))
buf.rewind()
}
}
}
}
Expand All @@ -42,3 +43,17 @@ class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) exten
}

}

class ResultPrintStream(publishSync: Result => Unit, bufSize: Int = 65536)(private val outputStream: OutputStream = new ResultOutputStream(publishSync, bufSize)) extends PrintStream(outputStream, true, "UTF-8") {
override def println(value: String): Unit = {
outputStream.flush()
publishSync(Output("text/plain; rel=stdout", value + "\n"))
}

override def print(s: String): Unit = {
outputStream.flush()
publishSync(Output("text/plain; rel=stdout", s))
}

override def println(x: AnyRef): Unit = println(String.valueOf(x))
}