Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix NamedObjectsJobSepc under docker #671

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -6,6 +6,7 @@ import spark.jobserver.CommonMessages.JobResult
import spark.jobserver.io.JobDAOActor

class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
import scala.concurrent.duration._

override def beforeAll() {
dao = new InMemoryDAO
Expand All @@ -16,12 +17,12 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
supervisor = TestProbe().ref

manager ! JobManagerActor.Initialize(None)
expectMsgClass(classOf[JobManagerActor.Initialized])

expectMsgClass(10.seconds, classOf[JobManagerActor.Initialized])

uploadTestJar()
}

val jobName = "spark.jobserver.NamedObjectsTestJob"

private def getCreateConfig(createDF: Boolean, createRDD: Boolean, createBroadcast: Boolean = false) : Config = {
Expand All @@ -30,12 +31,12 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
NamedObjectsTestJobConfig.CREATE_RDD + " = " + createRDD + ", " +
NamedObjectsTestJobConfig.CREATE_BROADCAST + " = " + createBroadcast)
}

private def getDeleteConfig(names: List[String]) : Config = {
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
ConfigFactory.parseString("spark.jobserver.named-object-creation-timeout = 60 s, " +
NamedObjectsTestJobConfig.DELETE+" = [" + names.mkString(", ") + "]")
}

describe("NamedObjects (RDD)") {
it("should survive from one job to another one") {
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, true), errorEvents ++ syncEvents)
Expand All @@ -60,15 +61,16 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
describe("NamedObjects (DataFrame)") {
it("should survive from one job to another one") {
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, false), errorEvents ++ syncEvents)
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])
// for some reason, this just needs some more time to finish occasinally
val JobResult(_, names: Array[String]) = expectMsgClass(10.seconds, classOf[JobResult])

names should contain("df1")

manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])

names2 should equal(names)

//clean-up
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("df1")), errorEvents ++ syncEvents)
expectMsgClass(classOf[JobResult])
Expand All @@ -79,15 +81,15 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
it("should survive from one job to another one") {
manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true), errorEvents ++ syncEvents)
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])

names should contain("rdd1")
names should contain("df1")

manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(false, false), errorEvents ++ syncEvents)
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])

names2 should equal(names)

//clean-up
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1")), errorEvents ++ syncEvents)
expectMsgClass(classOf[JobResult])
Expand All @@ -99,7 +101,7 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {

manager ! JobManagerActor.StartJob("demo", jobName, getCreateConfig(true, true, true), errorEvents ++ syncEvents)
val JobResult(_, names: Array[String]) = expectMsgClass(classOf[JobResult])

names should contain("rdd1")
names should contain("df1")
names should contain("broadcast1")
Expand All @@ -108,7 +110,7 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
val JobResult(_, names2: Array[String]) = expectMsgClass(classOf[JobResult])

names2 should equal(names)

//clean-up
manager ! JobManagerActor.StartJob("demo", jobName, getDeleteConfig(List("rdd1", "df1", "broadcast1"))
, errorEvents ++ syncEvents)
Expand Down
Expand Up @@ -69,4 +69,4 @@ trait TestJarFinder {
assert(allJars.size == 1, allJars.toList.toString)
allJars.head
}
}
}