Skip to content

Commit

Permalink
[SPARK-7862] [SQL] Disable the error message redirect to stderr
Browse files Browse the repository at this point in the history
This is a follow up of apache#6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log.

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#6882 from chenghao-intel/verbose and squashes the following commits:

bfedd77 [Cheng Hao] revert the write
76ff46b [Cheng Hao] update the CircularBuffer
692b19e [Cheng Hao] check the process exitValue for ScriptTransform
47e0970 [Cheng Hao] Use the RedirectThread instead
1de771d [Cheng Hao] naming the threads in ScriptTransformation
8536e81 [Cheng Hao] disable the error message redirection for stderr
  • Loading branch information
chenghao-intel authored and marmbrus committed Jun 29, 2015
1 parent 637b4ee commit c6ba2ea
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 46 deletions.
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
}
}
}

/**
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
* in a circular buffer. The current contents of the buffer can be accessed using
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.length
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(!Utils.isInDirectory(nullFile, parentDir))
assert(!Utils.isInDirectory(nullFile, childFile3))
}

test("circular buffer") {
val buffer = new CircularBuffer(25)
val stream = new java.io.PrintStream(buffer, true, "UTF-8")

stream.println("test circular test circular test circular test circular test circular")
assert(buffer.toString === "t circular test circular\n")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy

import org.apache.spark.util.CircularBuffer

import scala.collection.JavaConversions._
import scala.language.reflectiveCalls

Expand Down Expand Up @@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
with Logging {

// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](10240)
def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.size
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while(line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
private val outputBuffer = new CircularBuffer()

private val shim = version match {
case hive.v12 => new Shim_v0_12()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}

/**
* Transforms the input by forking and running the specified script.
Expand All @@ -59,15 +59,13 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
// redirectError(Redirect.INHERIT) would consume the error output from buffer and
// then print it to stderr (inherit the target from the current Scala process).
// If without this there would be 2 issues:
// We need to start threads connected to the process pipeline:
// 1) The error msg generated by the script process would be hidden.
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
builder.redirectError(Redirect.INHERIT)
val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream
val reader = new BufferedReader(new InputStreamReader(inputStream))

val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
Expand Down Expand Up @@ -152,29 +150,43 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)

// TODO make the 2048 configurable?
val stderrBuffer = new CircularBuffer(2048)
// Consume the error stream from the pipeline, otherwise it will be blocked if
// the pipeline is full.
new RedirectThread(errorStream, // input stream from the pipeline
stderrBuffer, // output to a circular buffer
"Thread-ScriptTransformation-STDERR-Consumer").start()

// Put the write(output to the pipeline) into a single thread
// and keep the collector as remain in the main thread.
// otherwise it will causes deadlock if the data size greater than
// the pipeline / buffer capacity.
new Thread(new Runnable() {
override def run(): Unit = {
iter
.map(outputProjection)
.foreach { row =>
if (inputSerde == null) {
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")

outputStream.write(data)
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
Utils.tryWithSafeFinally {
iter
.map(outputProjection)
.foreach { row =>
if (inputSerde == null) {
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")

outputStream.write(data)
} else {
val writable = inputSerde.serialize(
row.asInstanceOf[GenericInternalRow].values, inputSoi)
prepareWritable(writable).write(dataOutputStream)
}
}
outputStream.close()
} {
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
}
outputStream.close()
}
}).start()
}, "Thread-ScriptTransformation-Feed").start()

iterator
}
Expand Down Expand Up @@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.toRdd.count())
}

ignore("test script transform for stderr") {
test("test script transform for stderr") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(0 ===
Expand Down

0 comments on commit c6ba2ea

Please sign in to comment.