Skip to content

Commit

Permalink
Merge pull request #1187 from polynote/fix-long-lines-crash
Browse files Browse the repository at this point in the history
Fix crash on long lines
  • Loading branch information
jeremyrsmith committed Jul 16, 2021
2 parents a990cfd + 0e03252 commit 1a815b9
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 15 deletions.
Expand Up @@ -2,9 +2,8 @@ package polynote.kernel.interpreter

import java.io.{OutputStream, PrintStream}
import java.lang.reflect.InvocationHandler

import polynote.kernel.environment.{CurrentRuntime, CurrentTask, PublishResult, PublishStatus}
import polynote.kernel.util.ResultOutputStream
import polynote.kernel.util.{ResultOutputStream, ResultPrintStream}
import polynote.kernel.{BaseEnv, ExecutionStatus, InterpreterEnv, Output, Result, ScalaCompiler, withContextClassLoader}
import polynote.messages.CellID
import polynote.runtime.KernelRuntime
Expand All @@ -26,7 +25,7 @@ class CellExecutor(publishSync: Result => Unit, classLoader: ClassLoader, blocki
blockingExecutor.submit {
new Runnable {
def run(): Unit = {
val console = new PrintStream(new ResultOutputStream(publishSync), true)
val console = new ResultPrintStream(publishSync)()
withContextClassLoader(classLoader) {
try {
Console.withOut(console) {
Expand Down
@@ -1,13 +1,12 @@
package polynote.kernel.util

import java.io.OutputStream
import java.io.{OutputStream, PrintStream}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicBoolean

import polynote.kernel.{Output, Result}

class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) extends OutputStream {
class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 65536) extends OutputStream {
private val buf: ByteBuffer = ByteBuffer.allocate(bufSize)
private val closed = new AtomicBoolean(false)

Expand All @@ -21,15 +20,17 @@ class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) exten

override def flush(): Unit = {
super.flush()
buf.synchronized {
val len = buf.position()
if (len > 0) {
val b = ByteBuffer.allocate(buf.position())
val arr = new Array[Byte](buf.position())
buf.rewind()
buf.get(arr)
publishSync(Output("text/plain; rel=stdout", new String(arr, StandardCharsets.UTF_8)))
buf.rewind()
if (buf.hasRemaining) {
buf.synchronized {
val len = buf.position()
if (len > 0) {
val b = ByteBuffer.allocate(buf.position())
val arr = new Array[Byte](buf.position())
buf.rewind()
buf.get(arr)
publishSync(Output("text/plain; rel=stdout", new String(arr, StandardCharsets.UTF_8)))
buf.rewind()
}
}
}
}
Expand All @@ -42,3 +43,17 @@ class ResultOutputStream(publishSync: Result => Unit, bufSize: Int = 1024) exten
}

}

class ResultPrintStream(publishSync: Result => Unit, bufSize: Int = 65536)(private val outputStream: OutputStream = new ResultOutputStream(publishSync, bufSize)) extends PrintStream(outputStream, true, "UTF-8") {
override def println(value: String): Unit = {
outputStream.flush()
publishSync(Output("text/plain; rel=stdout", value + "\n"))
}

override def print(s: String): Unit = {
outputStream.flush()
publishSync(Output("text/plain; rel=stdout", s))
}

override def println(x: AnyRef): Unit = println(String.valueOf(x))
}

0 comments on commit 1a815b9

Please sign in to comment.