Skip to content

Commit

Permalink
[SPARK-3809][SQL] Fixes test suites in hive-thriftserver
Browse files Browse the repository at this point in the history
As scwf pointed out, `HiveThriftServer2Suite` isn't effective anymore after the Thrift server was made a daemon. On the other hand, these test suites were known flaky, PR apache#2214 tried to fix them but failed because of unknown Jenkins build error. This PR fixes both sets of issues.

In this PR, instead of watching `start-thriftserver.sh` output, the test code start a `tail` process to watch the log file. A `Thread.sleep` has to be introduced because the `kill` command used in `stop-thriftserver.sh` is not synchronous.

As for the root cause of the mysterious Jenkins build failure. Please refer to [this comment](apache#2675 (comment)) below for details.

----

(Copied from PR description of apache#2214)

This PR fixes two issues of `HiveThriftServer2Suite` and brings 1 enhancement:

1. Although metastore, warehouse directories and listening port are randomly chosen, all test cases share the same configuration. Due to parallel test execution, one of the two test case is doomed to fail
2. We caught any exceptions thrown from a test case and print diagnosis information, but forgot to re-throw the exception...
3. When the forked server process ends prematurely (e.g., fails to start), the `serverRunning` promise is completed with a failure, preventing the test code to keep waiting until timeout.

So, embarrassingly, this test suite was failing continuously for several days but no one had ever noticed it... Fortunately no bugs in the production code were covered under the hood.

Author: Cheng Lian <lian.cs.zju@gmail.com>
Author: wangfei <wangfei1@huawei.com>

Closes apache#2675 from liancheng/fix-thriftserver-tests and squashes the following commits:

1c384b7 [Cheng Lian] Minor code cleanup, restore the logging level hack in TestHive.scala
7805c33 [wangfei]  reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths
af2b5a9 [Cheng Lian] Removes log level hacks from TestHiveContext
d116405 [wangfei] make sure that log4j level is INFO
ee92a82 [Cheng Lian] Relaxes timeout
7fd6757 [Cheng Lian] Fixes test suites in hive-thriftserver
  • Loading branch information
liancheng authored and marmbrus committed Oct 13, 2014
1 parent 9d9ca91 commit 9eb49d4
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.Logging
import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand Down Expand Up @@ -62,8 +62,11 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
Expand All @@ -77,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL CLI process exit value: $exitValue")
foundAllExpectedAnswers.tryFailure(
new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
}

try {
Expand All @@ -98,6 +102,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|End CliSuite failure output
|===========================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
Expand All @@ -109,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

runCliWithin(1.minute)(
runCliWithin(3.minute)(
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
Expand All @@ -120,7 +125,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
-> "Time taken: ",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
"DROP TABLE hive_test"
"DROP TABLE hive_test;"
-> "Time taken: "
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Try

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.scalatest.FunSuite
Expand All @@ -41,25 +41,25 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

private val listeningHost = "localhost"
private val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

private val warehousePath = getTempFilePath("warehouse")
private val metastorePath = getTempFilePath("metastore")
private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"

def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)

val warehousePath = getTempFilePath("warehouse")
val metastorePath = getTempFilePath("metastore")
val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
val listeningHost = "localhost"
val listeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
val socket = new ServerSocket(0)
val port = socket.getLocalPort
socket.close()
port
}

val command =
s"""$serverScript
s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
Expand All @@ -68,29 +68,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
""".stripMargin.split("\\s+").toSeq

val serverStarted = Promise[Unit]()
val serverRunning = Promise[Unit]()
val buffer = new ArrayBuffer[String]()
val LOGGING_MARK =
s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
var logTailingProcess: Process = null
var logFilePath: String = null

def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
def captureLogOutput(line: String): Unit = {
buffer += line
if (line.contains("ThriftBinaryCLIService listening on")) {
serverStarted.success(())
serverRunning.success(())
}
}

val process = Process(command).run(
ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))

Future {
val exitValue = process.exitValue()
logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
def captureThriftServerOutput(source: String)(line: String): Unit = {
if (line.startsWith(LOGGING_MARK)) {
logFilePath = line.drop(LOGGING_MARK.length).trim
// Ensure that the log file is created so that the `tail' command won't fail
Try(new File(logFilePath).createNewFile())
logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
.run(ProcessLogger(captureLogOutput, _ => ()))
}
}

// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
captureThriftServerOutput("stdout"),
captureThriftServerOutput("stderr")))

val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")

try {
Await.result(serverStarted.future, timeout)
Await.result(serverRunning.future, timeout)

val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()
Expand Down Expand Up @@ -122,10 +133,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin, cause)
throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
process.destroy()
Process(stopScript).run().exitValue()
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
Thread.sleep(3.seconds.toMillis)
Option(logTailingProcess).map(_.destroy())
Option(logFilePath).map(new File(_).delete())
}
}

Expand Down

0 comments on commit 9eb49d4

Please sign in to comment.