diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 0d155982a8c54..4a31fb5034666 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.ui -import javax.servlet.http.HttpServletRequest +import java.net.URL +import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import scala.collection.JavaConversions._ import scala.xml.Node +import org.json4s._ +import org.json4s.jackson.JsonMethods import org.openqa.selenium.htmlunit.HtmlUnitDriver import org.openqa.selenium.{By, WebDriver} import org.scalatest._ @@ -29,10 +32,15 @@ import org.scalatest.concurrent.Eventually._ import org.scalatest.selenium.WebBrowser import org.scalatest.time.SpanSugar._ + import org.apache.spark.LocalSparkContext._ import org.apache.spark._ import org.apache.spark.api.java.StorageLevels +import org.apache.spark.deploy.history.HistoryServerSuite import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.status.api.StageStatus +import org.apache.spark.status.api.v1.CustomObjectMapper +import org.apache.sparktest.TestTags.ActiveTag /** @@ -41,6 +49,8 @@ import org.apache.spark.shuffle.FetchFailedException class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll { implicit var webDriver: WebDriver = _ + implicit val formats = DefaultFormats + override def beforeAll(): Unit = { webDriver = new HtmlUnitDriver @@ -74,28 +84,40 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before val rdd = sc.parallelize(Seq(1, 2, 3)) rdd.persist(StorageLevels.DISK_ONLY).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (ui.appUIAddress.stripSuffix("/") + "/storage") + goToUi(ui, "/storage") val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.DISK_ONLY.description) } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0") + goToUi(ui, "/storage/rdd/?id=0") val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.DISK_ONLY.description) } + val storageJson = getJson(ui, "storage/rdd") + storageJson.children.length should be (1) + (storageJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description) + val rddJson = getJson(ui, "storage/rdd/0") + (rddJson \ "storageLevel").extract[String] should be (StorageLevels.DISK_ONLY.description) + rdd.unpersist() rdd.persist(StorageLevels.MEMORY_ONLY).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (ui.appUIAddress.stripSuffix("/") + "/storage") + goToUi(ui, "/storage") val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.MEMORY_ONLY.description) } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (ui.appUIAddress.stripSuffix("/") + "/storage/rdd/?id=0") + goToUi(ui, "/storage/rdd/?id=0") val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq tableRowText should contain (StorageLevels.MEMORY_ONLY.description) } + + val updatedStorageJson = getJson(ui, "storage/rdd") + updatedStorageJson.children.length should be (1) + (updatedStorageJson \ "storageLevel").extract[String] should be (StorageLevels.MEMORY_ONLY.description) + val updatedRddJson = getJson(ui, "storage/rdd/0") + (updatedRddJson \ "storageLevel").extract[String] should be (StorageLevels.MEMORY_ONLY.description) } } @@ -106,10 +128,13 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.parallelize(1 to 10).map { x => throw new Exception()}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + goToUi(sc, "/stages") find(id("active")) should be(None) // Since we hide empty tables find(id("failed")).get.text should be("Failed Stages (1)") } + val stageJson = getJson(sc.ui.get, "stages") + stageJson.children.length should be (1) + (stageJson \ "status").extract[String] should be (StageStatus.Failed.name()) // Regression test for SPARK-2105 class NotSerializable @@ -118,12 +143,15 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.parallelize(1 to 10).map { x => unserializableObject}.collect() } eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + goToUi(sc, "/stages") find(id("active")) should be(None) // Since we hide empty tables // The failure occurs before the stage becomes active, hence we should still show only one // failed stage, not two: find(id("failed")).get.text should be("Failed Stages (1)") } + + val updatedStageJson = getJson(sc.ui.get, "stages") + updatedStageJson should be (stageJson) } } @@ -145,7 +173,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before withSpark(getSparkContext(killEnabled = true)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + goToUi(sc, "/stages") assert(hasKillLink) } } @@ -153,7 +181,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before withSpark(getSparkContext(killEnabled = false)) { sc => runSlowJob(sc) eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages") + goToUi(sc, "/stages") assert(!hasKillLink) } } @@ -164,7 +192,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // If no job has been run in a job group, then "(Job Group)" should not appear in the header sc.parallelize(Seq(1, 2, 3)).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + goToUi(sc, "/jobs") val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq tableHeaders should not contain "Job Id (Job Group)" } @@ -172,10 +200,22 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before sc.setJobGroup("my-job-group", "my-job-group-description") sc.parallelize(Seq(1, 2, 3)).count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + goToUi(sc, "/jobs") val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq tableHeaders should contain ("Job Id (Job Group)") } + + val jobJson = getJson(sc.ui.get, "jobs") + for { + job @ JObject(_) <- jobJson + JInt(jobId) <- job \ "jobId" + jobGroup = job \ "jobGroup" + } { + jobId.toInt match { + case 0 => jobGroup should be (JNothing) + case 1 => jobGroup should be (JString("my-job-group")) + } + } } } @@ -202,7 +242,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } mappedData.count() eventually(timeout(5 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + goToUi(sc, "/jobs") find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)") // Ideally, the following test would pass, but currently we overcount completed tasks // if task recomputations occur: @@ -211,6 +251,32 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // of completed tasks may be higher: find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)") } + val jobJson = getJson(sc.ui.get, "jobs") + (jobJson \ "numTasks").extract[Int]should be (2) + (jobJson \ "numCompletedTasks").extract[Int] should be (3) + (jobJson \ "numFailedTasks").extract[Int] should be (1) + (jobJson \ "numCompletedStages").extract[Int] should be (2) + (jobJson \ "numFailedStages").extract[Int] should be (1) + val stageJson = getJson(sc.ui.get, "stages") + + for { + stage @ JObject(_) <- stageJson + JString(status) <- stage \ "status" + JInt(stageId) <- stage \ "stageId" + JInt(attemptId) <- stage \ "attemptId" + } { + val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete + status should be (exp.name()) + } + + for { + stageId <- 0 to 1 + attemptId <- 0 to 1 + } { + val exp = if (attemptId == 0 && stageId == 1) StageStatus.Failed else StageStatus.Complete + val stageJson = getJson(sc.ui.get, s"stages/$stageId/$attemptId") + (stageJson \ "status").extract[String] should be (exp.name()) + } } } @@ -225,7 +291,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before // Start the job: rdd.countAsync() eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0") + goToUi(sc, "/jobs/job/?id=0") find(id("active")).get.text should be ("Active Stages (1)") find(id("pending")).get.text should be ("Pending Stages (2)") // Essentially, we want to check that none of the stage rows show @@ -251,7 +317,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + goToUi(sc, "/jobs") // The completed jobs table should have two rows. The first row will be the most recent job: val firstRow = find(cssSelector("tbody tr")).get.underlying val firstRowColumns = firstRow.findElements(By.tagName("td")) @@ -278,7 +344,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1") + goToUi(sc, "/jobs/job/?id=1") find(id("pending")) should be (None) find(id("active")) should be (None) find(id("failed")) should be (None) @@ -306,7 +372,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before rdd.count() rdd.count() eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs") + goToUi(sc, "/jobs") findAll(cssSelector("tbody tr a")).foreach { link => link.text.toLowerCase should include ("count") link.text.toLowerCase should not include "unknown" @@ -328,7 +394,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } sparkUI.attachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/")) + goToUi(sc, "") find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None) find(cssSelector("""ul li a[href*="stages"]""")) should not be(None) find(cssSelector("""ul li a[href*="storage"]""")) should not be(None) @@ -337,12 +403,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } eventually(timeout(10 seconds), interval(50 milliseconds)) { // check whether new page exists - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo") + goToUi(sc, "/foo") find(cssSelector("b")).get.text should include ("html magic") } sparkUI.detachTab(newTab) eventually(timeout(10 seconds), interval(50 milliseconds)) { - go to (sc.ui.get.appUIAddress.stripSuffix("/")) + goToUi(sc, "") find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None) find(cssSelector("""ul li a[href*="stages"]""")) should not be(None) find(cssSelector("""ul li a[href*="storage"]""")) should not be(None) @@ -351,9 +417,160 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before } eventually(timeout(10 seconds), interval(50 milliseconds)) { // check new page not exist - go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo") + goToUi(sc, "/foo") find(cssSelector("b")) should be(None) } } } + + test("stage & job retention", ActiveTag) { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test") + .set("spark.ui.enabled", "true") + .set("spark.ui.port", "0") + .set("spark.ui.retainedStages", "3") + .set("spark.ui.retainedJobs", "2") + val sc = new SparkContext(conf) + assert(sc.ui.isDefined) + + withSpark(sc) { sc => + //run a few jobs & stages ... + (0 until 5).foreach { idx => + // NOTE: if we reverse the order, things don't really behave nicely + // we lose the stage for a job we keep, and then the job doesn't know + // about its last stage + sc.parallelize(idx to (idx + 3)).map(identity).groupBy(identity).map(identity) + .groupBy(identity).count() + sc.parallelize(idx to (idx + 3)).collect() + } + + + val expJobInfo = Seq( + ("9", "collect"), + ("8", "count") + ) + + eventually(timeout(1 second), interval(50 milliseconds)) { + goToUi(sc, "/jobs") + // The completed jobs table should have two rows. The first row will be the most recent job: + find("completed").get.text should be ("Completed Jobs (2)") + val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying} + rows.size should be (expJobInfo.size) + for { + (row, idx) <- rows.zipWithIndex + columns = row.findElements(By.tagName("td")) + id = columns(0).getText() + description = columns(1).getText() + } { + id should be (expJobInfo(idx)._1) + description should include (expJobInfo(idx)._2) + } + } + + val jobsJson = getJson(sc.ui.get, "jobs") + jobsJson.children.size should be (expJobInfo.size) + for { + (job @ JObject(_),idx) <- jobsJson.children.zipWithIndex + id = (job \ "jobId").extract[String] + name = (job \ "name").extract[String] + } { + withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") { + id should be (expJobInfo(idx)._1) + name should include (expJobInfo(idx)._2) + } + } + + // what about when we query for a job that did exist, but has been cleared? + goToUi(sc, "/jobs/job/?id=7") + find("no-info").get.text should be ("No information to display for job 7") + + //TODO json for one job + + val expStageInfo = Seq( + ("19", "collect"), + ("18", "count"), + ("17", "groupBy") + ) + + eventually(timeout(1 second), interval(50 milliseconds)) { + goToUi(sc, "/stages") + find("completed").get.text should be ("Completed Stages (20)") + val rows = findAll(cssSelector("tbody tr")).toIndexedSeq.map{_.underlying} + rows.size should be (3) + for { + (row, idx) <- rows.zipWithIndex + columns = row.findElements(By.tagName("td")) + id = columns(0).getText() + description = columns(1).getText() + } { + id should be (expStageInfo(idx)._1) + description should include (expStageInfo(idx)._2) + } + } + + val stagesJson = getJson(sc.ui.get, "stages") + stagesJson.children.size should be (3) + for { + (stage @ JObject(_), idx) <- stagesJson.children.zipWithIndex + id = (stage \ "stageId").extract[String] + name = (stage \ "name").extract[String] + } { + id should be (expStageInfo(idx)._1) + name should include (expStageInfo(idx)._2) + } + + // nonexistent stage + + goToUi(sc, "/stages/stage/?id=12&attempt=0") + find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)") + val badStage = HistoryServerSuite.getContentAndCode( + new URL(sc.ui.get.appUIAddress + "/json/v1/applications/test/stages/12/0")) + badStage._1 should be (HttpServletResponse.SC_NOT_FOUND) + badStage._2 should be (None) + badStage._3 should be (Some("unknown stage: 12")) + + val badAttempt = HistoryServerSuite.getContentAndCode( + new URL(sc.ui.get.appUIAddress + "/json/v1/applications/test/stages/19/15")) + badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND) + badAttempt._2 should be (None) + badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]")) + + val badStageAttemptList = HistoryServerSuite.getContentAndCode( + new URL(sc.ui.get.appUIAddress + "/json/v1/applications/test/stages/12")) + badStageAttemptList._1 should be (HttpServletResponse.SC_NOT_FOUND) + badStageAttemptList._2 should be (None) + badStageAttemptList._3 should be (Some("unknown stage: 12")) + } + } + + test("live UI json application list") { + withSpark(newSparkContext()) { sc => + val appListRawJson = HistoryServerSuite.getUrl(new URL(sc.ui.get.appUIAddress + "/json/v1/applications")) + val appListJsonAst = JsonMethods.parse(appListRawJson) + appListJsonAst.children.length should be (1) + (appListJsonAst \ "completed").extract[Boolean] should be (false) + parseDate(appListJsonAst \ "startTime") should be (sc.startTime) + parseDate(appListJsonAst \ "endTime") should be (-1) + val oneAppJsonAst = getJson(sc.ui.get, "") + oneAppJsonAst should be (appListJsonAst.children(0)) + } + } + + def goToUi(sc: SparkContext, path: String): Unit = { + goToUi(sc.ui.get, path) + } + + def goToUi(ui: SparkUI, path: String): Unit = { + go to (ui.appUIAddress.stripSuffix("/") + path) + } + + def parseDate(json: JValue): Long = { + CustomObjectMapper.makeISODateFormat.parse(json.extract[String]).getTime + } + + def getJson(ui: SparkUI, path: String): JValue = { + JsonMethods.parse(HistoryServerSuite.getUrl( + new URL(ui.appUIAddress + "/json/v1/applications/test/" + path))) + } }