Skip to content

Commit

Permalink
Hive module.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Apr 8, 2015
1 parent 7e0db5e commit ef4ec48
Show file tree
Hide file tree
Showing 15 changed files with 139 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ErrorPositionSuite extends QueryTest with BeforeAndAfter {
* @param query the query to analyze
* @param token a unique token in the string that should be indicated by the exception
*/
def positionTest(name: String, query: String, token: String) = {
def positionTest(name: String, query: String, token: String): Unit = {
def parseTree =
Try(quietly(HiveQl.dumpTree(HiveQl.getAst(query)))).getOrElse("<failed to parse>")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,20 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
}

def checkDataType(dt1: Seq[DataType], dt2: Seq[DataType]): Unit = {
dt1.zip(dt2).map {
case (dd1, dd2) =>
assert(dd1.getClass === dd2.getClass) // DecimalType doesn't has the default precision info
dt1.zip(dt2).foreach { case (dd1, dd2) =>
assert(dd1.getClass === dd2.getClass) // DecimalType doesn't has the default precision info
}
}

def checkValues(row1: Seq[Any], row2: Seq[Any]): Unit = {
row1.zip(row2).map {
case (r1, r2) => checkValue(r1, r2)
row1.zip(row2).foreach { case (r1, r2) =>
checkValue(r1, r2)
}
}

def checkValues(row1: Seq[Any], row2: Row): Unit = {
row1.zip(row2.toSeq).map {
case (r1, r2) => checkValue(r1, r2)
row1.zip(row2.toSeq).foreach { case (r1, r2) =>
checkValue(r1, r2)
}
}

Expand All @@ -141,7 +140,7 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
assert(r1.compare(r2) === 0)
case (r1: Array[Byte], r2: Array[Byte])
if r1 != null && r2 != null && r1.length == r2.length =>
r1.zip(r2).map { case (b1, b2) => assert(b1 === b2) }
r1.zip(r2).foreach { case (b1, b2) => assert(b1 === b2) }
case (r1, r2) => assert(r1 === r2)
}
}
Expand All @@ -166,7 +165,8 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
val constantData = constantExprs.map(_.eval())
val constantNullData = constantData.map(_ => null)
val constantWritableOIs = constantExprs.map(e => toWritableInspector(e.dataType))
val constantNullWritableOIs = constantExprs.map(e => toInspector(Literal.create(null, e.dataType)))
val constantNullWritableOIs =
constantExprs.map(e => toInspector(Literal.create(null, e.dataType)))

checkValues(constantData, constantData.zip(constantWritableOIs).map {
case (d, oi) => unwrap(wrap(d, oi), oi)
Expand Down Expand Up @@ -202,7 +202,8 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
case (t, idx) => StructField(s"c_$idx", t)
})

checkValues(row, unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[Row])
checkValues(row,
unwrap(wrap(Row.fromSeq(row), toInspector(dt)), toInspector(dt)).asInstanceOf[Row])
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
}

Expand All @@ -212,8 +213,10 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
val d = row(0) :: row(0) :: Nil
checkValue(d, unwrap(wrap(d, toInspector(dt)), toInspector(dt)))
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
checkValue(d, unwrap(wrap(d, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d, unwrap(wrap(null, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d,
unwrap(wrap(d, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d,
unwrap(wrap(null, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
}

test("wrap / unwrap Map Type") {
Expand All @@ -222,7 +225,9 @@ class HiveInspectorSuite extends FunSuite with HiveInspectors {
val d = Map(row(0) -> row(1))
checkValue(d, unwrap(wrap(d, toInspector(dt)), toInspector(dt)))
checkValue(null, unwrap(wrap(null, toInspector(dt)), toInspector(dt)))
checkValue(d, unwrap(wrap(d, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d, unwrap(wrap(null, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d,
unwrap(wrap(d, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
checkValue(d,
unwrap(wrap(null, toInspector(Literal.create(d, dt))), toInspector(Literal.create(d, dt))))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,36 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
test("SPARK-4203:random partition directory order") {
sql("CREATE TABLE tmp_table (key int, value string)")
val tmpDir = Utils.createTempDir()
sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='2') SELECT 'blarr' FROM tmp_table")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='3') SELECT 'blarr' FROM tmp_table")
sql("INSERT OVERWRITE TABLE table_with_partition partition (p1='a',p2='b',p3='c',p4='c',p5='4') SELECT 'blarr' FROM tmp_table")
sql(
s"""
|CREATE TABLE table_with_partition(c1 string)
|PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE table_with_partition
|partition (p1='a',p2='b',p3='c',p4='c',p5='1')
|SELECT 'blarr' FROM tmp_table
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE table_with_partition
|partition (p1='a',p2='b',p3='c',p4='c',p5='2')
|SELECT 'blarr' FROM tmp_table
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE table_with_partition
|partition (p1='a',p2='b',p3='c',p4='c',p5='3')
|SELECT 'blarr' FROM tmp_table
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE table_with_partition
|partition (p1='a',p2='b',p3='c',p4='c',p5='4')
|SELECT 'blarr' FROM tmp_table
""".stripMargin)
def listFolders(path: File, acc: List[String]): List[List[String]] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
Expand Down Expand Up @@ -196,34 +221,42 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
testData.registerTempTable("testData")

val testDatawithNull = TestHive.sparkContext.parallelize(
(1 to 10).map(i => ThreeCloumntable(i, i.toString,null))).toDF()
(1 to 10).map(i => ThreeCloumntable(i, i.toString, null))).toDF()

val tmpDir = Utils.createTempDir()
sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED by (ds string) location '${tmpDir.toURI.toString}' ")
sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') SELECT key,value FROM testData")
sql(
s"""
|CREATE TABLE table_with_partition(key int,value string)
|PARTITIONED by (ds string) location '${tmpDir.toURI.toString}'
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE table_with_partition
|partition (ds='1') SELECT key,value FROM testData
""".stripMargin)

// test schema the same between partition and table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
testData.collect().toSeq
)

// test difference type of field
sql("ALTER TABLE table_with_partition CHANGE COLUMN key key BIGINT")
checkAnswer(sql("select key,value from table_with_partition where ds='1' "),
testData.collect.toSeq
testData.collect().toSeq
)

// add column to table
sql("ALTER TABLE table_with_partition ADD COLUMNS(key1 string)")
checkAnswer(sql("select key,value,key1 from table_with_partition where ds='1' "),
testDatawithNull.collect.toSeq
testDatawithNull.collect().toSeq
)

// change column name to table
sql("ALTER TABLE table_with_partition CHANGE COLUMN key keynew BIGINT")
checkAnswer(sql("select keynew,value from table_with_partition where ds='1' "),
testData.collect.toSeq
testData.collect().toSeq
)

sql("DROP TABLE table_with_partition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
after: () => Unit,
query: String,
expectedAnswer: Seq[Row],
ct: ClassTag[_]) = {
ct: ClassTag[_]): Unit = {
before()

var df = sql(query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.hive.test.TestHive._
class BigDataBenchmarkSuite extends HiveComparisonTest {
val testDataDirectory = new File("target" + File.separator + "big-data-benchmark-testdata")

val userVisitPath = new File(testDataDirectory, "uservisits").getCanonicalPath
val testTables = Seq(
TestTable(
"rankings",
Expand Down Expand Up @@ -63,7 +64,7 @@ class BigDataBenchmarkSuite extends HiveComparisonTest {
| searchWord STRING,
| duration INT)
| ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
| STORED AS TEXTFILE LOCATION "${new File(testDataDirectory, "uservisits").getCanonicalPath}"
| STORED AS TEXTFILE LOCATION "$userVisitPath"
""".stripMargin.cmd),
TestTable(
"documents",
Expand All @@ -83,7 +84,10 @@ class BigDataBenchmarkSuite extends HiveComparisonTest {
"SELECT pageURL, pageRank FROM rankings WHERE pageRank > 1")

createQueryTest("query2",
"SELECT SUBSTR(sourceIP, 1, 10), SUM(adRevenue) FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 10)")
"""
|SELECT SUBSTR(sourceIP, 1, 10), SUM(adRevenue) FROM uservisits
|GROUP BY SUBSTR(sourceIP, 1, 10)
""".stripMargin)

createQueryTest("query3",
"""
Expand Down Expand Up @@ -113,8 +117,8 @@ class BigDataBenchmarkSuite extends HiveComparisonTest {
|CREATE TABLE url_counts_total AS
| SELECT SUM(count) AS totalCount, destpage
| FROM url_counts_partial GROUP BY destpage
|-- The following queries run, but generate different results in HIVE likely because the UDF is not deterministic
|-- given different input splits.
|-- The following queries run, but generate different results in HIVE
|-- likely because the UDF is not deterministic given different input splits.
|-- SELECT CAST(SUM(count) AS INT) FROM url_counts_partial
|-- SELECT COUNT(*) FROM url_counts_partial
|-- SELECT * FROM url_counts_partial
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ abstract class HiveComparisonTest
.filterNot(_ contains "hive.outerjoin.supports.filters")
.filterNot(_ contains "hive.exec.post.hooks")

if (allQueries != queryList)
if (allQueries != queryList) {
logWarning(s"Simplifications made on unsupported operations for test $testCaseName")
}

lazy val consoleTestCase = {
val quotes = "\"\"\""
Expand Down Expand Up @@ -305,13 +306,16 @@ abstract class HiveComparisonTest
try {
// Hooks often break the harness and don't really affect our test anyway, don't
// even try running them.
if (installHooksCommand.findAllMatchIn(queryString).nonEmpty)
if (installHooksCommand.findAllMatchIn(queryString).nonEmpty) {
sys.error("hive exec hooks not supported for tests.")
}

logWarning(s"Running query ${i+1}/${queryList.size} with hive.")
logWarning(s"Running query ${i + 1}/${queryList.size} with hive.")
// Analyze the query with catalyst to ensure test tables are loaded.
val answer = hiveQuery.analyzed match {
case _: ExplainCommand => Nil // No need to execute EXPLAIN queries as we don't check the output.
case _: ExplainCommand =>
// No need to execute EXPLAIN queries as we don't check the output.
Nil
case _ => TestHive.runSqlHive(queryString)
}

Expand Down Expand Up @@ -394,21 +398,24 @@ abstract class HiveComparisonTest
case tf: org.scalatest.exceptions.TestFailedException => throw tf
case originalException: Exception =>
if (System.getProperty("spark.hive.canarytest") != null) {
// When we encounter an error we check to see if the environment is still okay by running a simple query.
// If this fails then we halt testing since something must have gone seriously wrong.
// When we encounter an error we check to see if the environment is still
// okay by running a simple query. If this fails then we halt testing since
// something must have gone seriously wrong.
try {
new TestHive.HiveQLQueryExecution("SELECT key FROM src").stringResult()
TestHive.runSqlHive("SELECT key FROM src")
} catch {
case e: Exception =>
logError(s"FATAL ERROR: Canary query threw $e This implies that the testing environment has likely been corrupted.")
// The testing setup traps exits so wait here for a long time so the developer can see when things started
// to go wrong.
logError(s"FATAL ERROR: Canary query threw $e This implies that the " +
"testing environment has likely been corrupted.")
// The testing setup traps exits so wait here for a long time so the developer
// can see when things started to go wrong.
Thread.sleep(1000000)
}
}

// If the canary query didn't fail then the environment is still okay, so just throw the original exception.
// If the canary query didn't fail then the environment is still okay,
// so just throw the original exception.
throw originalException
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.util._
/**
* A framework for running the query tests that are listed as a set of text files.
*
* TestSuites that derive from this class must provide a map of testCaseName -> testCaseFiles that should be included.
* Additionally, there is support for whitelisting and blacklisting tests as development progresses.
* TestSuites that derive from this class must provide a map of testCaseName -> testCaseFiles
* that should be included. Additionally, there is support for whitelisting and blacklisting
* tests as development progresses.
*/
abstract class HiveQueryFileTest extends HiveComparisonTest {
/** A list of tests deemed out of scope and thus completely disregarded */
Expand Down Expand Up @@ -54,15 +55,17 @@ abstract class HiveQueryFileTest extends HiveComparisonTest {
case (testCaseName, testCaseFile) =>
if (blackList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_)) {
logDebug(s"Blacklisted test skipped $testCaseName")
} else if (realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_) || runAll) {
} else if (realWhiteList.map(_.r.pattern.matcher(testCaseName).matches()).reduceLeft(_||_) ||
runAll) {
// Build a test case and submit it to scala test framework...
val queriesString = fileToString(testCaseFile)
createQueryTest(testCaseName, queriesString)
} else {
// Only output warnings for the built in whitelist as this clutters the output when the user
// trying to execute a single test from the commandline.
if(System.getProperty(whiteListProperty) == null && !runAll)
if (System.getProperty(whiteListProperty) == null && !runAll) {
ignore(testCaseName) {}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.sql.hive.test.TestHive._
case class TestData(a: Int, b: String)

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
private val originalTimeZone = TimeZone.getDefault
Expand Down Expand Up @@ -237,7 +238,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}

createQueryTest("modulus",
"SELECT 11 % 10, IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, \"true\", \"false\"), (101 / 2) % 10 FROM src LIMIT 1")
"""
|SELECT 11 % 10,
| IF((101.1 % 100.0) BETWEEN 1.01 AND 1.11, "true", "false"), (101 / 2) % 10
|FROM src LIMIT 1
""".stripMargin)

test("Query expressed in SQL") {
setConf("spark.sql.dialect", "sql")
Expand Down Expand Up @@ -309,7 +314,10 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
"SELECT * FROM src a JOIN src b ON a.key = b.key")

createQueryTest("small.cartesian",
"SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a JOIN (SELECT key FROM src WHERE key = 2) b")
"""
|SELECT a.key, b.key FROM (SELECT key FROM src WHERE key < 1) a
|JOIN (SELECT key FROM src WHERE key = 2) b
""".stripMargin)

createQueryTest("length.udf",
"SELECT length(\"test\") FROM src LIMIT 1")
Expand Down Expand Up @@ -463,8 +471,10 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|create table src_lv2 (key string, value string);
|
|FROM src
|insert overwrite table src_lv1 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
|insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
|insert overwrite table src_lv1
| SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
|insert overwrite table src_lv2
| SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX
""".stripMargin)

createQueryTest("lateral view5",
Expand Down Expand Up @@ -584,7 +594,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
}

def isExplanation(result: DataFrame) = {
def isExplanation(result: DataFrame): Boolean = {
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
explanation.contains("== Physical Plan ==")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ case class Nested(a: Int, B: Int)
case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
class HiveResolutionSuite extends HiveComparisonTest {

Expand Down
Loading

0 comments on commit ef4ec48

Please sign in to comment.