Skip to content

Commit

Permalink
added unit test for SPARK-4514
Browse files Browse the repository at this point in the history
  • Loading branch information
reggert committed Nov 22, 2015
1 parent a8ba899 commit 38b1442
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global

import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Timeouts
import org.scalatest.{Matchers, BeforeAndAfterAll}
import org.scalatest.concurrent.{Eventually, Timeouts}
import org.scalatest.time.SpanSugar._

import org.apache.spark._

class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts {
class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts
with Eventually with Matchers {

@transient private var sc: SparkContext = _

Expand Down Expand Up @@ -227,4 +228,16 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
test("ComplexFutureAction callback must not consume a thread while waiting") {
testAsyncAction((_.takeAsync(100)))
}

test("getJobIdsForGroup() with takeAsync()") {
sc.setJobGroup("my-job-group2", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq.empty)
val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
val firstJobId = eventually(timeout(10 seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId))
}
}
}

0 comments on commit 38b1442

Please sign in to comment.