Skip to content

Commit

Permalink
[Spark-1461] Deferred Expression Evaluation (short-circuit evaluation)
Browse files Browse the repository at this point in the history
This patch unify the foldable & nullable interface for Expression.
1) Deterministic-less UDF (like Rand()) can not be folded.
2) Short-circut will significantly improves the performance in Expression Evaluation, however, the stateful UDF should not be ignored in a short-circuit evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, row_sequence() can not be ignored even if col1 > 0 is false)

I brought an concept of DeferredObject from Hive, which has 2 kinds of children classes (EagerResult / DeferredResult), the former requires triggering the evaluation before it's created, while the later trigger the evaluation when first called its get() method.

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#446 from chenghao-intel/expression_deferred_evaluation and squashes the following commits:

d2729de [Cheng Hao] Fix the codestyle issues
a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation
af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF
b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation
  • Loading branch information
chenghao-intel authored and rxin committed May 16, 2014
1 parent bb98eca commit a20fea9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {

override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == false || r == false) {
false
} else if (l == null || r == null ) {
null
if (l == false) {
false
} else {
true
val r = right.eval(input)
if (r == false) {
false
} else {
if (l != null && r != null) {
true
} else {
null
}
}
}
}
}
Expand All @@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {

override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == true || r == true) {
if (l == true) {
true
} else if (l == null || r == null) {
null
} else {
false
val r = right.eval(input)
if (r == true) {
true
} else {
if (l != null && r != null) {
false
} else {
null
}
}
}
}
}
Expand All @@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
def symbol = "="
override def eval(input: Row): Any = {
val l = left.eval(input)
val r = right.eval(input)
if (l == null || r == null) null else l == r
if (l == null) {
null
} else {
val r = right.eval(input)
if (r == null) null else l == r
}
}
}

Expand Down Expand Up @@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
extends Expression {

def children = predicate :: trueValue :: falseValue :: Nil
def nullable = trueValue.nullable || falseValue.nullable
override def nullable = trueValue.nullable || falseValue.nullable
def references = children.flatMap(_.references).toSet
override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
def dataType = {
Expand All @@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
}

type EvaluatedType = Any

override def eval(input: Row): Any = {
if (predicate.eval(input).asInstanceOf[Boolean]) {
if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
falseValue.eval(input)
Expand Down
28 changes: 21 additions & 7 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
}

protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
new DeferredObjectAdapter
})

// Adapter from Catalyst ExpressionResult to Hive DeferredObject
class DeferredObjectAdapter extends DeferredObject {
private var func: () => Any = _
def set(func: () => Any) {
this.func = func
}
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(func())
}

val dataType: DataType = inspectorToDataType(returnInspector)

override def eval(input: Row): Any = {
returnInspector // Make sure initialized.
val args = children.map { v =>
new DeferredObject {
override def prepare(i: Int) = {}
override def get(): AnyRef = wrap(v.eval(input))
}
}.toArray
unwrap(function.evaluate(args))
var i = 0
while (i < children.length) {
val idx = i
deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
i += 1
}
unwrap(function.evaluate(deferedObjects))
}
}

Expand Down

0 comments on commit a20fea9

Please sign in to comment.