Skip to content

Commit

Permalink
Merge pull request #11 from marmbrus/goldenGen
Browse files Browse the repository at this point in the history
A bunch of work mostly to ensure catalyst does not return wrong answers.
  • Loading branch information
marmbrus committed Jan 16, 2014
2 parents 8a8b521 + ffa9f25 commit b21f803
Show file tree
Hide file tree
Showing 28 changed files with 500 additions and 187 deletions.
77 changes: 19 additions & 58 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,60 +1,21 @@
all: a b c d e f g h i j k l m n o p q r s t u v w x y" z
all: s1 s2 s3 s4 s5 s6 s7 s8

buildWhiteList:
sbt -Dshark.hive.alltests "test-only catalyst.execution.HiveCompatibility"
compile:
sbt test:compile

findBroken:
sbt -Dshark.hive.alltests -Dshark.hive.failFast "test-only catalyst.execution.HiveCompatibility"

a:
sbt -Dshark.hive.whitelist=a.* "test-only catalyst.execution.HiveCompatibility"
b:
sbt -Dshark.hive.whitelist=b.* "test-only catalyst.execution.HiveCompatibility"
c:
sbt -Dshark.hive.whitelist=c.* "test-only catalyst.execution.HiveCompatibility"
d:
sbt -Dshark.hive.whitelist=d.* "test-only catalyst.execution.HiveCompatibility"
e:
sbt -Dshark.hive.whitelist=e.* "test-only catalyst.execution.HiveCompatibility"
f:
sbt -Dshark.hive.whitelist=f.* "test-only catalyst.execution.HiveCompatibility"
g:
sbt -Dshark.hive.whitelist=g.* "test-only catalyst.execution.HiveCompatibility"
h:
sbt -Dshark.hive.whitelist=h.* "test-only catalyst.execution.HiveCompatibility"
i:
sbt -Dshark.hive.whitelist=i.* "test-only catalyst.execution.HiveCompatibility"
j:
sbt -Dshark.hive.whitelist=j.* "test-only catalyst.execution.HiveCompatibility"
k:
sbt -Dshark.hive.whitelist=k.* "test-only catalyst.execution.HiveCompatibility"
l:
sbt -Dshark.hive.whitelist=l.* "test-only catalyst.execution.HiveCompatibility"
m:
sbt -Dshark.hive.whitelist=m.* "test-only catalyst.execution.HiveCompatibility"
n:
sbt -Dshark.hive.whitelist=n.* "test-only catalyst.execution.HiveCompatibility"
o:
sbt -Dshark.hive.whitelist=o.* "test-only catalyst.execution.HiveCompatibility"
p:
sbt -Dshark.hive.whitelist=p.* "test-only catalyst.execution.HiveCompatibility"
q:
sbt -Dshark.hive.whitelist=q.* "test-only catalyst.execution.HiveCompatibility"
r:
sbt -Dshark.hive.whitelist=r.* "test-only catalyst.execution.HiveCompatibility"
s:
sbt -Dshark.hive.whitelist=s.* "test-only catalyst.execution.HiveCompatibility"
t:
sbt -Dshark.hive.whitelist=t.* "test-only catalyst.execution.HiveCompatibility"
u:
sbt -Dshark.hive.whitelist=u.* "test-only catalyst.execution.HiveCompatibility"
v:
sbt -Dshark.hive.whitelist=v.* "test-only catalyst.execution.HiveCompatibility"
w:
sbt -Dshark.hive.whitelist=w.* "test-only catalyst.execution.HiveCompatibility"
x:
sbt -Dshark.hive.whitelist=x.* "test-only catalyst.execution.HiveCompatibility"
y:
sbt -Dshark.hive.whitelist=y.* "test-only catalyst.execution.HiveCompatibility"
z:
sbt -Dshark.hive.whitelist=z.* "test-only catalyst.execution.HiveCompatibility"
s1: compile
sbt ${ARGS} -Dshark.hive.shard=0:8 "test-only catalyst.execution.HiveCompatibility"
s2: compile
sbt ${ARGS} -Dshark.hive.shard=1:8 "test-only catalyst.execution.HiveCompatibility"
s3: compile
sbt ${ARGS} -Dshark.hive.shard=2:8 "test-only catalyst.execution.HiveCompatibility"
s4: compile
sbt ${ARGS} -Dshark.hive.shard=3:8 "test-only catalyst.execution.HiveCompatibility"
s5: compile
sbt ${ARGS} -Dshark.hive.shard=4:8 "test-only catalyst.execution.HiveCompatibility"
s6: compile
sbt ${ARGS} -Dshark.hive.shard=5:8 "test-only catalyst.execution.HiveCompatibility"
s7: compile
sbt ${ARGS} -Dshark.hive.shard=6:8 "test-only catalyst.execution.HiveCompatibility"
s8: compile
sbt ${ARGS} -Dshark.hive.shard=7:8 "test-only catalyst.execution.HiveCompatibility"
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/

libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.0-incubating-SNAPSHOT"

libraryDependencies += "catalyst" % "hive-golden" % "3" % "test" from "http://repository-databricks.forge.cloudbees.com/snapshot/catalystGolden3.jar"
libraryDependencies += "catalyst" % "hive-golden" % "4" % "test" from "http://repository-databricks.forge.cloudbees.com/snapshot/catalystGolden4.jar"

// Hive 0.10.0 relies on a weird version of jdo that is not published anywhere... Remove when we upgrade to 0.11.0
libraryDependencies += "javax.jdo" % "jdo2-api" % "2.3-ec" from "http://www.datanucleus.org/downloads/maven2/javax/jdo/jdo2-api/2.3-ec/jdo2-api-2.3-ec.jar"
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/catalyst/analysis/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
PromoteNumericTypes,
PromoteStrings,
ConvertNaNs,
BooleanComparisons)
BooleanComparisons,
FunctionArgumentConversion,
PropagateTypes)
)

/**
Expand Down Expand Up @@ -91,7 +93,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan =>
q transformExpressions {
case UnresolvedFunction(name, children) if children.map(_.resolved).reduceLeft(_&&_) =>
case u @ UnresolvedFunction(name, children) if u.childrenResolved =>
registry.lookupFunction(name, children)
}
}
Expand Down
51 changes: 48 additions & 3 deletions src/main/scala/catalyst/analysis/typeCoercion.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,33 @@ import expressions._
import plans.logical._
import rules._
import types._
import catalyst.execution.{HiveUdf, HiveGenericUdf}

/**
* Applies any changes to [[catalyst.expressions.AttributeReference AttributeReference]] dataTypes
* that are made by other rules to instances higher in the query tree.
*/
object PropagateTypes extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// No propagation required for leaf nodes.
case q: LogicalPlan if q.children.isEmpty => q

case q: LogicalPlan => q transformExpressions {
case a: AttributeReference =>
q.inputSet.find(_.exprId == a.exprId) match {
// This can happen when a Attribute reference is born in a non-leaf node, for example
// due to a call to an external script like in the Transform operator.
// TODO: Perhaps those should actually be aliases?
case None => a
// Leave the same if the dataTypes match.
case Some(newType) if a.dataType == newType.dataType => a
case Some(newType) =>
logger.debug(s"Promoting $a to ${newType} in ${q.simpleString}}")
newType
}
}
}
}

/**
* Converts string "NaN"s that are in binary operators with a NaN-able types (Float / Double) to the
Expand Down Expand Up @@ -52,9 +79,9 @@ object ConvertNaNs extends Rule[LogicalPlan] {
* String conversions are handled by the PromoteStrings rule.
*/
object PromoteNumericTypes extends Rule[LogicalPlan] {
val integralPrecedence = Seq(ByteType, ShortType, IntegerType, LongType)
val toDouble = integralPrecedence ++ Seq(FloatType, DoubleType)
val toFloat = Seq(ByteType, ShortType, IntegerType) :+ FloatType
val integralPrecedence = Seq(NullType, ByteType, ShortType, IntegerType, LongType)
val toDouble = integralPrecedence ++ Seq(NullType, FloatType, DoubleType)
val toFloat = Seq(NullType, ByteType, ShortType, IntegerType) :+ FloatType
val allPromotions = integralPrecedence :: toDouble :: toFloat :: Nil

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand Down Expand Up @@ -123,3 +150,21 @@ object BooleanComparisons extends Rule[LogicalPlan] {
p.makeCopy(Array(Cast(p.left, ByteType), Cast(p.right, ByteType)))
}
}

/**
* This ensure that the types for various functions are as expected. Most of these rules are
* actually Hive specific.
* TODO: Move this to the hive specific package once we make that separation.
*/
object FunctionArgumentConversion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

// Promote SUM to largest types to prevent overflows.
// TODO: This is enough to make most of the tests pass, but we really need a full set of our own
// to really ensure compatibility.
case Sum(e) if e.dataType == IntegerType => Sum(Cast(e, LongType))

}
}
4 changes: 3 additions & 1 deletion src/main/scala/catalyst/execution/FunctionRegistry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object HiveFunctionRegistry extends analysis.FunctionRegistry {
def javaClassToDataType(clz: Class[_]): DataType = clz match {
case c: Class[_] if c == classOf[DoubleWritable] => DoubleType
case c: Class[_] if c == classOf[org.apache.hadoop.hive.serde2.io.DoubleWritable] => DoubleType
case c: Class[_] if c == classOf[org.apache.hadoop.hive.serde2.io.HiveDecimalWritable] => DecimalType
case c: Class[_] if c == classOf[org.apache.hadoop.hive.serde2.io.ByteWritable] => ByteType
case c: Class[_] if c == classOf[org.apache.hadoop.hive.serde2.io.ShortWritable] => ShortType
case c: Class[_] if c == classOf[Text] => StringType
Expand Down Expand Up @@ -162,6 +163,7 @@ case class HiveGenericUdf(
case LongType => PrimitiveObjectInspectorFactory.javaLongObjectInspector
case ShortType => PrimitiveObjectInspectorFactory.javaShortObjectInspector
case ByteType => PrimitiveObjectInspectorFactory.javaByteObjectInspector
case NullType => PrimitiveObjectInspectorFactory.javaVoidObjectInspector
}

lazy val instance = {
Expand All @@ -178,7 +180,7 @@ case class HiveGenericUdf(
case l: Short => l: java.lang.Short
case l: Byte => l: java.lang.Byte
case s: Seq[_] => seqAsJavaList(s.map(wrap))
case null => null // NullWritable.get()
case null => null
}

def evaluate(evaluatedChildren: Seq[Any]): Any = {
Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/catalyst/execution/MetastoreCatalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class HiveMetastoreCatalog(hiveConf: HiveConf) extends Catalog {

object HiveMetatoreTypes {
val VARCHAR = "(?i)VARCHAR\\((\\d+)\\)".r
// TODO: this will not work for nested arrays or maps.
val ARRAY = "(?i)array<([^>]+)>".r
val MAP = "(?i)map<([^,]+),([^>]*)>".r
def toDataType(metastoreType: String): DataType =
metastoreType match {
case "string" => StringType
Expand All @@ -87,7 +90,11 @@ object HiveMetatoreTypes {
case "double" => DoubleType
case "bigint" => LongType
case "binary" => BinaryType
case "boolean" => BooleanType
case VARCHAR(_) => StringType
case ARRAY(elemType) => ArrayType(toDataType(elemType))
case MAP(keyType, valueType) => MapType(toDataType(keyType), toDataType(valueType))
case _ => sys.error(s"Unsupported dataType: $metastoreType")
}
}

Expand Down Expand Up @@ -120,5 +127,5 @@ case class MetastoreRelation(databaseName: String, tableName: String, alias: Opt
val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute)

// Must be a stable value since new attributes are born here.
val output = partitionKeys ++ table.getSd.getCols.map(_.toAttribute)
val output = table.getSd.getCols.map(_.toAttribute) ++ partitionKeys
}
3 changes: 2 additions & 1 deletion src/main/scala/catalyst/execution/TestShark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ object TestShark extends SharkInstance {
* A list of test tables and the DDL required to initialize them. A test table is loaded on
* demand when a query are run against it.
*/
val testTables = new mutable.HashMap[String, TestTable]()
lazy val testTables = new mutable.HashMap[String, TestTable]()
def registerTestTable(testTable: TestTable) = testTables += (testTable.name -> testTable)

// The test tables that are defined in the Hive QTestUtil.
Expand Down Expand Up @@ -269,6 +269,7 @@ object TestShark extends SharkInstance {
// Analyzer and thus the test table auto-loading mechanism.
// Remove after we handle more DDL operations natively.
loadTestTable("src")
loadTestTable("srcpart")
} catch {
case e: Exception =>
logger.error(s"FATAL ERROR: Failed to reset TestDB state. $e")
Expand Down
7 changes: 7 additions & 0 deletions src/main/scala/catalyst/execution/basicOperators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode {
} else {
right.asInstanceOf[Double] compare left.asInstanceOf[Double]
}
} else if (curDataType == LongType) {
if (curDirection == Ascending) {
left.asInstanceOf[Long] compare right.asInstanceOf[Long]
} else {
right.asInstanceOf[Long] compare left.asInstanceOf[Long]
}
} else if (curDataType == StringType) {
if (curDirection == Ascending) {
left.asInstanceOf[String] compare right.asInstanceOf[String]
Expand Down Expand Up @@ -111,6 +117,7 @@ case class Sort(sortExprs: Seq[SortOrder], child: SharkPlan) extends UnaryNode {
def output = child.output
}

// TODO: Rename: SchemaRDD
case class LocalRelation(output: Seq[Attribute], data: Seq[IndexedSeq[Any]])
(@transient sc: SharkContext) extends LeafNode {
def execute() = sc.makeRDD(data.map(buildRow), 1)
Expand Down
5 changes: 3 additions & 2 deletions src/main/scala/catalyst/execution/hiveOperators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class HiveTableScan(attributes: Seq[Attribute], relation: MetastoreRelation
}

case class InsertIntoHiveTable(
table: MetastoreRelation, partition: Map[String, String], child: SharkPlan)
table: MetastoreRelation, partition: Map[String, Option[String]], child: SharkPlan)
(@transient sc: SharkContext)
extends UnaryNode {

Expand Down Expand Up @@ -110,6 +110,7 @@ case class InsertIntoHiveTable(
def output = child.output

def execute() = {
require(partition.isEmpty, "Inserting into partitioned table not supported.")
val childRdd = child.execute()
assert(childRdd != null)

Expand All @@ -122,7 +123,7 @@ case class InsertIntoHiveTable(

val partitionSpec =
if (partition.nonEmpty) {
s"PARTITION (${partition.map { case (k,v) => s"$k=$v" }.mkString(",")})"
s"PARTITION (${partition.map { case (k,v) => s"$k=${v.get}" }.mkString(",")})"
} else {
""
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/catalyst/execution/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ package object execution {
case LongType => longTypeInfo
case ShortType => shortTypeInfo
case StringType => stringTypeInfo
case DecimalType => decimalTypeInfo
case NullType => voidTypeInfo
}
}
}
3 changes: 3 additions & 0 deletions src/main/scala/catalyst/execution/planningStrategies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ trait PlanningStrategies {
def apply(plan: LogicalPlan): Seq[SharkPlan] = plan match {
case logical.Sort(sortExprs, child) =>
execution.Sort(sortExprs, planLater(child)) :: Nil
// TODO: It is correct, but overkill to do a global sorting here.
case logical.SortPartitions(sortExprs, child) =>
execution.Sort(sortExprs, planLater(child)) :: Nil
case logical.Project(projectList, child) =>
execution.Project(projectList, planLater(child)) :: Nil
case logical.Filter(condition, child) =>
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/catalyst/expressions/Evaluate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ object Evaluate extends Logging {
else
left == right

case In(value, list) =>
val evaluatedValue = eval(value)
list.exists(e => eval(e) == evaluatedValue)

// Strings
case GreaterThan(l, r) if l.dataType == StringType && r.dataType == StringType =>
eval(l).asInstanceOf[String] > eval(r).asInstanceOf[String]
Expand Down Expand Up @@ -218,6 +222,7 @@ object Evaluate extends Logging {
case Cast(e, LongType) => n1(e, _.toLong(_))
case Cast(e, ShortType) => n1(e, _.toInt(_).toShort)
case Cast(e, ByteType) => n1(e, _.toInt(_).toByte)
case Cast(e, DecimalType) => n1(e, (n,v) => BigDecimal(n.toDouble(v)))

/* Boolean Logic */
case Not(c) =>
Expand Down
35 changes: 35 additions & 0 deletions src/main/scala/catalyst/expressions/complexTypes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package catalyst
package expressions

import types._

/**
* Returns the item at `ordinal` in the Array `child`.
*/
case class GetItem(child: Expression, ordinal: Expression) extends Expression {
val children = child :: ordinal :: Nil
/** `Null` is returned for invalid ordinals. */
override def nullable = true
override def references = children.flatMap(_.references).toSet
def dataType = child.dataType match {
case ArrayType(dt) => dt
}
override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[ArrayType]
override def toString = s"$child[$ordinal]"
}

/**
* Returns the value of fields in the Struct `child`.
*/
case class GetField(child: Expression, fieldName: String) extends UnaryExpression {
def dataType = field.dataType
def nullable = field.nullable
lazy val field = child.dataType match {
case s: StructType =>
s.fields
.find(_.name == fieldName)
.getOrElse(sys.error(s"No such field $fieldName in ${child.dataType}"))
}
override lazy val resolved = childrenResolved && child.dataType.isInstanceOf[StructType]
override def toString = s"$child.$fieldName"
}
2 changes: 1 addition & 1 deletion src/main/scala/catalyst/expressions/literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object IntegerLiteral {

case class Literal(value: Any, dataType: DataType) extends LeafExpression {
override def foldable = true
def nullable = false
def nullable = value == null
def references = Set.empty

override def toString = if (value != null) value.toString else "null"
Expand Down
Loading

0 comments on commit b21f803

Please sign in to comment.