Skip to content

Commit

Permalink
[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port…
Browse files Browse the repository at this point in the history
… issue.

Author: jerryshao <saisai.shao@intel.com>
Author: Saisai Shao <saisai.shao@intel.com>

Closes apache#5060 from jerryshao/SPARK-6304 and squashes the following commits:

89b01f5 [jerryshao] Update the unit test to add more cases
275d252 [jerryshao] Address the comments
7cc146d [jerryshao] Address the comments
2624723 [jerryshao] Fix rebase conflict
45befaa [Saisai Shao] Update the unit test
bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue
  • Loading branch information
jerryshao authored and tdas committed Jul 16, 2015
1 parent fec10f0 commit 031d7d4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
// Reload properties for the checkpoint application since user wants to set a reload property
// or spark had changed its value and user wants to set it back.
val propertiesToReload = List(
"spark.driver.host",
"spark.driver.port",
"spark.master",
"spark.yarn.keytab",
"spark.yarn.principal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
}
}

// This tests if "spark.driver.host" and "spark.driver.port" is set by user, can be recovered
// with correct value.
test("get correct spark.driver.[host|port] from checkpoint") {
val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> "9999")
conf.foreach(kv => System.setProperty(kv._1, kv._2))
ssc = new StreamingContext(master, framework, batchDuration)
val originalConf = ssc.conf
assert(originalConf.get("spark.driver.host") === "localhost")
assert(originalConf.get("spark.driver.port") === "9999")

val cp = new Checkpoint(ssc, Time(1000))
ssc.stop()

// Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))

val newCpConf = newCp.createSparkConf()
assert(newCpConf.contains("spark.driver.host"))
assert(newCpConf.contains("spark.driver.port"))
assert(newCpConf.get("spark.driver.host") === "localhost")
assert(newCpConf.get("spark.driver.port") === "9999")

// Check if all the parameters have been restored
ssc = new StreamingContext(null, newCp, null)
val restoredConf = ssc.conf
assert(restoredConf.get("spark.driver.host") === "localhost")
assert(restoredConf.get("spark.driver.port") === "9999")
ssc.stop()

// If spark.driver.host and spark.driver.host is not set in system property, these two
// parameters should not be presented in the newly recovered conf.
conf.foreach(kv => System.clearProperty(kv._1))
val newCpConf1 = newCp.createSparkConf()
assert(!newCpConf1.contains("spark.driver.host"))
assert(!newCpConf1.contains("spark.driver.port"))

// Spark itself will dispatch a random, not-used port for spark.driver.port if it is not set
// explicitly.
ssc = new StreamingContext(null, newCp, null)
val restoredConf1 = ssc.conf
assert(restoredConf1.get("spark.driver.host") === "localhost")
assert(restoredConf1.get("spark.driver.port") !== "9999")
}

// This tests whether the systm can recover from a master failure with simple
// This tests whether the system can recover from a master failure with simple
// non-stateful operations. This assumes as reliable, replayable input
// source - TestInputDStream.
test("recovery with map and reduceByKey operations") {
Expand Down

0 comments on commit 031d7d4

Please sign in to comment.