Skip to content

Commit

Permalink
Merge pull request apache#5 from maropu/pr29085
Browse files Browse the repository at this point in the history
Fix
  • Loading branch information
AngersZhuuuu committed Jul 18, 2020
2 parents d37ef86 + f3e05c6 commit 5c049b5
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import java.io.{BufferedReader, InputStream, OutputStream}
import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -98,6 +98,46 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
.map { case (data, writer) => writer(data) })
}

protected def createOutputIteratorWithoutSerde(
writerThread: BaseScriptTransformationWriterThread,
inputStream: InputStream,
proc: Process,
stderrBuffer: CircularBuffer): Iterator[InternalRow] = {
new Iterator[InternalRow] {
var curLine: String = null
val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))

override def hasNext: Boolean = {
try {
if (curLine == null) {
curLine = reader.readLine()
if (curLine == null) {
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
return false
}
}
true
} catch {
case NonFatal(e) =>
// If this exception is due to abrupt / unclean termination of `proc`,
// then detect it and propagate a better exception message for end users
checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)

throw e
}
}

override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException
}
val prevLine = curLine
curLine = reader.readLine()
processOutputWithoutSerde(prevLine, reader)
}
}
}

protected def checkFailureAndPropagate(
writerThread: BaseScriptTransformationWriterThread,
cause: Throwable = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
package org.apache.spark.sql.execution

import java.io._
import java.nio.charset.StandardCharsets

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

Expand Down Expand Up @@ -68,39 +65,8 @@ case class SparkScriptTransformationExec(
hadoopConf
)

val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] {
var curLine: String = null

override def hasNext: Boolean = {
try {
if (curLine == null) {
curLine = reader.readLine()
if (curLine == null) {
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
return false
}
}
true
} catch {
case NonFatal(e) =>
// If this exception is due to abrupt / unclean termination of `proc`,
// then detect it and propagate a better exception message for end users
checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)

throw e
}
}

override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException
}
val prevLine = curLine
curLine = reader.readLine()
processOutputWithoutSerde(prevLine, reader)
}
}
val outputIterator = createOutputIteratorWithoutSerde(
writerThread, inputStream, proc, stderrBuffer)

writerThread.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution

import java.sql.Timestamp
import java.util.Locale

import org.scalatest.Assertions._
import org.scalatest.BeforeAndAfterEach
Expand All @@ -30,26 +29,18 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._

abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils
with TestHiveSingleton with BeforeAndAfterEach {
with BeforeAndAfterEach {
import testImplicits._
import ScriptTransformationIOSchema._

def scriptType: String

def isHive23OrSpark: Boolean = true

import spark.implicits._

var noSerdeIOSchema: ScriptTransformationIOSchema = ScriptTransformationIOSchema.defaultIOSchema
protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler

private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _

protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler

protected override def beforeAll(): Unit = {
super.beforeAll()
defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler
Expand All @@ -66,32 +57,14 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
uncaughtExceptionHandler.cleanStatus()
}

def isHive23OrSpark: Boolean

def createScriptTransformationExec(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = {
scriptType.toUpperCase(Locale.ROOT) match {
case "SPARK" => new SparkScriptTransformationExec(
input = input,
script = script,
output = output,
child = child,
ioschema = ioschema
)
case "HIVE" => new HiveScriptTransformationExec(
input = input,
script = script,
output = output,
child = child,
ioschema = ioschema
)
case _ => throw new TestFailedException(
"Test class implement from BaseScriptTransformationSuite" +
" should override method `scriptType` to Spark or Hive", 0)
}
}
ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec

test("cat without SerDe") {
assume(TestUtils.testCommandAvailable("/bin/bash"))
Expand All @@ -104,7 +77,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = noSerdeIOSchema
ioschema = defaultIOSchema
),
rowsDf.collect())
assert(uncaughtExceptionHandler.exception.isEmpty)
Expand All @@ -122,7 +95,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = noSerdeIOSchema
ioschema = defaultIOSchema
),
rowsDf.collect())
}
Expand Down Expand Up @@ -178,8 +151,8 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
script = "some_non_existent_command",
output = Seq(AttributeReference("a", StringType)()),
child = rowsDf.queryExecution.sparkPlan,
ioschema = noSerdeIOSchema)
SparkPlanTest.executePlan(plan, hiveContext)
ioschema = defaultIOSchema)
SparkPlanTest.executePlan(plan, spark.sqlContext)
}
assert(e.getMessage.contains("Subprocess exited with status"))
assert(uncaughtExceptionHandler.exception.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,37 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution

import java.sql.{Date, Timestamp}

import org.apache.spark.TestUtils
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

class SparkScriptTransformationSuite extends BaseScriptTransformationSuite {
class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession {
import testImplicits._
import ScriptTransformationIOSchema._

import spark.implicits._
override def isHive23OrSpark: Boolean = true

override def scriptType: String = "SPARK"
override def createScriptTransformationExec(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = {
SparkScriptTransformationExec(
input = input,
script = script,
output = output,
child = child,
ioschema = ioschema
)
}

test("SPARK-32106: SparkScriptTransformExec should handle different data types correctly") {
assume(TestUtils.testCommandAvailable("python"))
Expand Down Expand Up @@ -97,7 +112,7 @@ class SparkScriptTransformationSuite extends BaseScriptTransformationSuite {
AttributeReference("j", StringType)(),
AttributeReference("k", StringType)()),
child = child,
ioschema = noSerdeIOSchema
ioschema = defaultIOSchema
),
df.select(
'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution
package org.apache.spark.sql.execution

class TestUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler {

Expand Down
Loading

0 comments on commit 5c049b5

Please sign in to comment.