Skip to content

Commit

Permalink
[SPARK-7862] [SQL] Fix the deadlock in script transformation for stderr
Browse files Browse the repository at this point in the history
[Related PR SPARK-7044] (apache#5671)

Author: zhichao.li <zhichao.li@intel.com>

Closes apache#6404 from zhichao-li/transform and squashes the following commits:

8418c97 [zhichao.li] add comments and remove useless failAfter logic
d9677e1 [zhichao.li] redirect the error desitination to be the same as the current process
  • Loading branch information
zhichao-li authored and nemccarthy committed Jun 19, 2015
1 parent 96792e8 commit 279d79d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution

import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader}
import java.lang.ProcessBuilder.Redirect
import java.util.Properties

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -58,6 +59,12 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
// redirectError(Redirect.INHERIT) would consume the error output from buffer and
// then print it to stderr (inherit the target from the current Scala process).
// If without this there would be 2 issues:
// 1) The error msg generated by the script process would be hidden.
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
builder.redirectError(Redirect.INHERIT)
val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,12 +645,20 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.analyzed
}

test("test script transform") {
test("test script transform for stdout") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(100000 ===
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
.queryExecution.toRdd.count())
.queryExecution.toRdd.count())
}

test("test script transform for stderr") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(0 ===
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans")
.queryExecution.toRdd.count())
}

test("window function: udaf with aggregate expressin") {
Expand Down

0 comments on commit 279d79d

Please sign in to comment.