From 8418c97397f21528000ccf61170a47048ad864a1 Mon Sep 17 00:00:00 2001 From: "zhichao.li" Date: Fri, 12 Jun 2015 10:32:45 +0800 Subject: [PATCH] add comments and remove useless failAfter logic --- .../hive/execution/ScriptTransformation.scala | 5 +++++ .../sql/hive/execution/SQLQuerySuite.scala | 20 ++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 898395c2296ed..8f3e2ff7470fc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -59,6 +59,11 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) + // redirectError(Redirect.INHERIT) would consume the error output from buffer and + // then print it to stderr (inherit the target from the current Scala process). + // If without this there would be 2 issues: + // 1) The error msg generated by the script process would be hidden. + // 2) If the error msg is too big to chock up the buffer, the input logic would be hung builder.redirectError(Redirect.INHERIT) val proc = builder.start() val inputStream = proc.getInputStream diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ee3117ab96090..37dfd8b0f52c3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,10 +17,6 @@ package org.apache.spark.sql.hive.execution -import org.scalatest.concurrent.Timeouts._ -import org.scalatest.time.Span -import org.scalatest.time.Millis - import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException @@ -636,21 +632,17 @@ class SQLQuerySuite extends QueryTest { test("test script transform for stdout") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") - failAfter(Span(20000, Millis)) { - assert(100000 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) - } + assert(100000 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) } test("test script transform for stderr") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") - failAfter(Span(20000, Millis)) { - assert(0 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) - } + assert(0 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) } test("window function: udaf with aggregate expressin") {