Skip to content

Commit

Permalink
[SPARK-10176] [SQL] Show partially analyzed plans when checkAnswer fa…
Browse files Browse the repository at this point in the history
…ils to analyze

This PR takes over apache/spark#8389.

This PR improves `checkAnswer` to print the partially analyzed plan in addition to the user friendly error message, in order to aid debugging failing tests.

In doing so, I ran into a conflict with the various ways that we bring a SQLContext into the tests. Depending on the trait we refer to the current context as `sqlContext`, `_sqlContext`, `ctx` or `hiveContext` with access modifiers `public`, `protected` and `private` depending on the defining class.

I propose we refactor as follows:

1. All tests should only refer to a `protected sqlContext` when testing general features, and `protected hiveContext` when it is a method that only exists on a `HiveContext`.
2. All tests should only import `testImplicits._` (i.e., don't import `TestHive.implicits._`)

Author: Wenchen Fan <cloud0fan@outlook.com>

Closes #8584 from cloud-fan/cleanupTests.
  • Loading branch information
cloud-fan authored and kiszk committed Dec 26, 2015
1 parent 35ec210 commit d9c42fb
Show file tree
Hide file tree
Showing 90 changed files with 908 additions and 999 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util._
* Provides helper methods for comparing plans.
*/
class PlanTest extends SparkFunSuite {

/**
* Since attribute references are given globally unique ids during analysis,
* we must normalize them to check if two different queries are identical.
Expand Down
156 changes: 79 additions & 77 deletions sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
import testImplicits._

def rddIdOf(tableName: String): Int = {
val executedPlan = ctx.table(tableName).queryExecution.executedPlan
val executedPlan = sqlContext.table(tableName).queryExecution.executedPlan
executedPlan.collect {
case InMemoryColumnarTableScan(_, _, relation) =>
relation.cachedColumnBuffers.id
Expand All @@ -44,7 +44,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
}

def isMaterialized(rddId: Int): Boolean = {
ctx.sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}

test("withColumn doesn't invalidate cached dataframe") {
Expand All @@ -69,153 +69,153 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
test("cache temp table") {
testData.select('key).registerTempTable("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
ctx.cacheTable("tempTable")
sqlContext.cacheTable("tempTable")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
ctx.uncacheTable("tempTable")
sqlContext.uncacheTable("tempTable")
}

test("unpersist an uncached table will not raise exception") {
assert(None == ctx.cacheManager.lookupCachedData(testData))
assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
assert(None == ctx.cacheManager.lookupCachedData(testData))
assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
assert(None == ctx.cacheManager.lookupCachedData(testData))
assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.persist()
assert(None != ctx.cacheManager.lookupCachedData(testData))
assert(None != sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
assert(None == ctx.cacheManager.lookupCachedData(testData))
assert(None == sqlContext.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
assert(None == ctx.cacheManager.lookupCachedData(testData))
assert(None == sqlContext.cacheManager.lookupCachedData(testData))
}

test("cache table as select") {
sql("CACHE TABLE tempTable AS SELECT key FROM testData")
assertCached(sql("SELECT COUNT(*) FROM tempTable"))
ctx.uncacheTable("tempTable")
sqlContext.uncacheTable("tempTable")
}

test("uncaching temp table") {
testData.select('key).registerTempTable("tempTable1")
testData.select('key).registerTempTable("tempTable2")
ctx.cacheTable("tempTable1")
sqlContext.cacheTable("tempTable1")

assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
assertCached(sql("SELECT COUNT(*) FROM tempTable2"))

// Is this valid?
ctx.uncacheTable("tempTable2")
sqlContext.uncacheTable("tempTable2")

// Should this be cached?
assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
}

test("too big for memory") {
val data = "*" * 1000
ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
.registerTempTable("bigData")
ctx.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
assert(ctx.table("bigData").count() === 200000L)
ctx.table("bigData").unpersist(blocking = true)
sqlContext.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)
assert(sqlContext.table("bigData").count() === 200000L)
sqlContext.table("bigData").unpersist(blocking = true)
}

test("calling .cache() should use in-memory columnar caching") {
ctx.table("testData").cache()
assertCached(ctx.table("testData"))
ctx.table("testData").unpersist(blocking = true)
sqlContext.table("testData").cache()
assertCached(sqlContext.table("testData"))
sqlContext.table("testData").unpersist(blocking = true)
}

test("calling .unpersist() should drop in-memory columnar cache") {
ctx.table("testData").cache()
ctx.table("testData").count()
ctx.table("testData").unpersist(blocking = true)
assertCached(ctx.table("testData"), 0)
sqlContext.table("testData").cache()
sqlContext.table("testData").count()
sqlContext.table("testData").unpersist(blocking = true)
assertCached(sqlContext.table("testData"), 0)
}

test("isCached") {
ctx.cacheTable("testData")
sqlContext.cacheTable("testData")

assertCached(ctx.table("testData"))
assert(ctx.table("testData").queryExecution.withCachedData match {
assertCached(sqlContext.table("testData"))
assert(sqlContext.table("testData").queryExecution.withCachedData match {
case _: InMemoryRelation => true
case _ => false
})

ctx.uncacheTable("testData")
assert(!ctx.isCached("testData"))
assert(ctx.table("testData").queryExecution.withCachedData match {
sqlContext.uncacheTable("testData")
assert(!sqlContext.isCached("testData"))
assert(sqlContext.table("testData").queryExecution.withCachedData match {
case _: InMemoryRelation => false
case _ => true
})
}

test("SPARK-1669: cacheTable should be idempotent") {
assume(!ctx.table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
assume(!sqlContext.table("testData").logicalPlan.isInstanceOf[InMemoryRelation])

ctx.cacheTable("testData")
assertCached(ctx.table("testData"))
sqlContext.cacheTable("testData")
assertCached(sqlContext.table("testData"))

assertResult(1, "InMemoryRelation not found, testData should have been cached") {
ctx.table("testData").queryExecution.withCachedData.collect {
sqlContext.table("testData").queryExecution.withCachedData.collect {
case r: InMemoryRelation => r
}.size
}

ctx.cacheTable("testData")
sqlContext.cacheTable("testData")
assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
ctx.table("testData").queryExecution.withCachedData.collect {
sqlContext.table("testData").queryExecution.withCachedData.collect {
case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r
}.size
}

ctx.uncacheTable("testData")
sqlContext.uncacheTable("testData")
}

test("read from cached table and uncache") {
ctx.cacheTable("testData")
checkAnswer(ctx.table("testData"), testData.collect().toSeq)
assertCached(ctx.table("testData"))
sqlContext.cacheTable("testData")
checkAnswer(sqlContext.table("testData"), testData.collect().toSeq)
assertCached(sqlContext.table("testData"))

ctx.uncacheTable("testData")
checkAnswer(ctx.table("testData"), testData.collect().toSeq)
assertCached(ctx.table("testData"), 0)
sqlContext.uncacheTable("testData")
checkAnswer(sqlContext.table("testData"), testData.collect().toSeq)
assertCached(sqlContext.table("testData"), 0)
}

test("correct error on uncache of non-cached table") {
intercept[IllegalArgumentException] {
ctx.uncacheTable("testData")
sqlContext.uncacheTable("testData")
}
}

test("SELECT star from cached table") {
sql("SELECT * FROM testData").registerTempTable("selectStar")
ctx.cacheTable("selectStar")
sqlContext.cacheTable("selectStar")
checkAnswer(
sql("SELECT * FROM selectStar WHERE key = 1"),
Seq(Row(1, "1")))
ctx.uncacheTable("selectStar")
sqlContext.uncacheTable("selectStar")
}

test("Self-join cached") {
val unCachedAnswer =
sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key").collect()
ctx.cacheTable("testData")
sqlContext.cacheTable("testData")
checkAnswer(
sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key"),
unCachedAnswer.toSeq)
ctx.uncacheTable("testData")
sqlContext.uncacheTable("testData")
}

test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
sql("CACHE TABLE testData")
assertCached(ctx.table("testData"))
assertCached(sqlContext.table("testData"))

val rddId = rddIdOf("testData")
assert(
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")

sql("UNCACHE TABLE testData")
assert(!ctx.isCached("testData"), "Table 'testData' should not be cached")
assert(!sqlContext.isCached("testData"), "Table 'testData' should not be cached")

eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
Expand All @@ -224,37 +224,37 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {

test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
assertCached(ctx.table("testCacheTable"))
assertCached(sqlContext.table("testCacheTable"))

val rddId = rddIdOf("testCacheTable")
assert(
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")

ctx.uncacheTable("testCacheTable")
sqlContext.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}

test("CACHE TABLE tableName AS SELECT ...") {
sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
assertCached(ctx.table("testCacheTable"))
assertCached(sqlContext.table("testCacheTable"))

val rddId = rddIdOf("testCacheTable")
assert(
isMaterialized(rddId),
"Eagerly cached in-memory table should have already been materialized")

ctx.uncacheTable("testCacheTable")
sqlContext.uncacheTable("testCacheTable")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}

test("CACHE LAZY TABLE tableName") {
sql("CACHE LAZY TABLE testData")
assertCached(ctx.table("testData"))
assertCached(sqlContext.table("testData"))

val rddId = rddIdOf("testData")
assert(
Expand All @@ -266,15 +266,15 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
isMaterialized(rddId),
"Lazily cached in-memory table should have been materialized")

ctx.uncacheTable("testData")
sqlContext.uncacheTable("testData")
eventually(timeout(10 seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}

test("InMemoryRelation statistics") {
sql("CACHE TABLE testData")
ctx.table("testData").queryExecution.withCachedData.collect {
sqlContext.table("testData").queryExecution.withCachedData.collect {
case cached: InMemoryRelation =>
val actualSizeInBytes = (1 to 100).map(i => INT.defaultSize + i.toString.length + 4).sum
assert(cached.statistics.sizeInBytes === actualSizeInBytes)
Expand All @@ -283,46 +283,48 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {

test("Drops temporary table") {
testData.select('key).registerTempTable("t1")
ctx.table("t1")
ctx.dropTempTable("t1")
assert(intercept[RuntimeException](ctx.table("t1")).getMessage.startsWith("Table Not Found"))
sqlContext.table("t1")
sqlContext.dropTempTable("t1")
assert(
intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
}

test("Drops cached temporary table") {
testData.select('key).registerTempTable("t1")
testData.select('key).registerTempTable("t2")
ctx.cacheTable("t1")
sqlContext.cacheTable("t1")

assert(ctx.isCached("t1"))
assert(ctx.isCached("t2"))
assert(sqlContext.isCached("t1"))
assert(sqlContext.isCached("t2"))

ctx.dropTempTable("t1")
assert(intercept[RuntimeException](ctx.table("t1")).getMessage.startsWith("Table Not Found"))
assert(!ctx.isCached("t2"))
sqlContext.dropTempTable("t1")
assert(
intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
assert(!sqlContext.isCached("t2"))
}

test("Clear all cache") {
sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
ctx.cacheTable("t1")
ctx.cacheTable("t2")
ctx.clearCache()
assert(ctx.cacheManager.isEmpty)
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")
sqlContext.clearCache()
assert(sqlContext.cacheManager.isEmpty)

sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")
ctx.cacheTable("t1")
ctx.cacheTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")
sql("Clear CACHE")
assert(ctx.cacheManager.isEmpty)
assert(sqlContext.cacheManager.isEmpty)
}

test("Clear accumulators when uncacheTable to prevent memory leaking") {
sql("SELECT key FROM testData LIMIT 10").registerTempTable("t1")
sql("SELECT key FROM testData LIMIT 5").registerTempTable("t2")

ctx.cacheTable("t1")
ctx.cacheTable("t2")
sqlContext.cacheTable("t1")
sqlContext.cacheTable("t2")

sql("SELECT * FROM t1").count()
sql("SELECT * FROM t2").count()
Expand All @@ -331,8 +333,8 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {

Accumulators.synchronized {
val accsSize = Accumulators.originals.size
ctx.uncacheTable("t1")
ctx.uncacheTable("t2")
sqlContext.uncacheTable("t1")
sqlContext.uncacheTable("t2")
assert((accsSize - 2) == Accumulators.originals.size)
}
}
Expand Down
Loading

0 comments on commit d9c42fb

Please sign in to comment.