diff --git a/build.sbt b/build.sbt index 1e9272aeb..7689ee947 100644 --- a/build.sbt +++ b/build.sbt @@ -86,6 +86,10 @@ lazy val jobServerPython = Project(id = "job-server-python", base = file("job-se .dependsOn(jobServerApi, akkaApp % "test") .disablePlugins(SbtScalariform) +lazy val jobserverIntegrationTests = Project(id = "job-server-integration-tests", base = file("job-server-integration-tests")) + .settings(commonSettings) + .settings(jobserverIntegrationTestsSettings) + lazy val root = Project(id = "root", base = file(".")) .settings(commonSettings) .settings(Release.settings) @@ -124,6 +128,11 @@ lazy val jobServerPythonSettings = revolverSettings ++ Assembly.settings ++ publ assembly := assembly.dependsOn(buildPython).value ) +lazy val jobserverIntegrationTestsSettings = Seq( + libraryDependencies ++= integrationTestDeps, + mainClass in Compile := Some("spark.jobserver.integrationtests.IntegrationTests"), +) + lazy val jobServerTestJarSettings = Seq( libraryDependencies ++= sparkDeps ++ apiDeps, description := "Test jar for Spark Job Server", diff --git a/job-server-integration-tests/README.md b/job-server-integration-tests/README.md new file mode 100644 index 000000000..df262c660 --- /dev/null +++ b/job-server-integration-tests/README.md @@ -0,0 +1,38 @@ +# Jobserver Integration tests +This project contains a set of integration tests that can be run against a running deployment of jobserver. +This way it can be verified that the whole stack (including configuration, spark, dao, network) is working as intended. + +## Usage +Integration tests can be run locally with a regular test runner or by invoking the provided main class `IntegrationTest`. +To run the tests against an arbitrary jobserver from a remote environment you can: +```shell +# Assemble a fat jar (from root dir) +sbt job-server-integration-tests/assembly + +# Move the fat jar to a remote location +scp target/scala-*/job-server-integration-tests-assembly-*.jar /integration-tests.jar + +# Invoke the test at the remote location +java -jar integration-tests.jar # displays usage +java -jar integration-tests.jar # executes tests on a specific deployment + +``` + +A configuration of the integration tests is possible by supplying a config file as parameter. +Within the config file you can specify: +* Address(es) of the jobserver deployment(s) to be tested +* Names of the test to be run +* Which deployment controller to take for HA tests (i.e. how to stop and restart jobservers for testing) +* Possibly additional fields required for tests or deployment controller + +Here's a running example for a config file: +```javascript +{ + // Define the addresses of to be tested jobservers in this format + jobserverAddresses: ["localhost:8090", "localhost:8091"] + // In case jobserver communicates via https + useSSL: true + // Specify which tests to run (list of concrete tests) + runTests: ["BasicApiTests", "CornerCasesTests", "TwoJobserverTests"] +} +``` diff --git a/job-server-integration-tests/src/main/resources/extras.jar b/job-server-integration-tests/src/main/resources/extras.jar new file mode 100644 index 000000000..f7b20e515 Binary files /dev/null and b/job-server-integration-tests/src/main/resources/extras.jar differ diff --git a/job-server-integration-tests/src/main/resources/tests.jar b/job-server-integration-tests/src/main/resources/tests.jar new file mode 100644 index 000000000..b86ab14a8 Binary files /dev/null and b/job-server-integration-tests/src/main/resources/tests.jar differ diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/IntegrationTests.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/IntegrationTests.scala new file mode 100644 index 000000000..a74dc2360 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/IntegrationTests.scala @@ -0,0 +1,74 @@ +package spark.jobserver.integrationtests + +import java.io.File + +import org.scalatest.ConfigMap + +import com.typesafe.config.ConfigException +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigRenderOptions + +import spark.jobserver.integrationtests.util.TestHelper + +object IntegrationTests extends App { + + // Parse config + if (args.length != 1) { + printUsage() + sys.exit(-1) + } + val file = new File(args(0)) + if (!file.exists()) { + println(s"Could not find a config file for path ${file.getAbsolutePath}") + sys.exit(-1) + } + val config = try { + ConfigFactory.parseFile(file) + } catch { + case t: Throwable => + println("Could not parse config file: ") + t.printStackTrace() + sys.exit(-1) + } + + // Validate config + try { + val addresses = config.getStringList("jobserverAddresses") + if (addresses.isEmpty()) { + println("The list of jobserverAddresses is empty. Not running any tests.") + sys.exit(-1) + } + val testsToRun = config.getStringList("runTests") + if (testsToRun.isEmpty()) { + println("The list of tests to run is empty. Not running any tests.") + sys.exit(-1) + } + } catch { + case e: ConfigException => + println("Invalid configuration file: " + e.getMessage) + sys.exit(-1) + } + + // In case HTTPS is used, just disable verification + if (config.hasPath("useSSL") && config.getBoolean("useSSL")) { + TestHelper.disableSSLVerification() + } + + // Run selected integration tests + println("Running integration tests with the following configuration:") + println(config.root().render(ConfigRenderOptions.concise().setFormatted(true).setJson(true))) + val testsToRun = config.getStringList("runTests").toArray() + testsToRun.foreach { t => + val testName = s"spark.jobserver.integrationtests.tests.$t" + val clazz = Class.forName(testName) + val test = clazz.getDeclaredConstructor() + .newInstance().asInstanceOf[org.scalatest.Suite] + test.execute(configMap = ConfigMap(("config", config))) + } + + // Usage + def printUsage() { + println("Usage: IntegrationTests ") + } + +} diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/BasicApiTests.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/BasicApiTests.scala new file mode 100644 index 000000000..352cad9f5 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/BasicApiTests.scala @@ -0,0 +1,634 @@ +package spark.jobserver.integrationtests.tests + +import org.joda.time.DateTime +import org.scalatest.BeforeAndAfterAllConfigMap +import org.scalatest.ConfigMap +import org.scalatest.FreeSpec +import org.scalatest.Matchers + +import com.softwaremill.sttp._ + +import play.api.libs.json.JsObject +import play.api.libs.json.Json +import spark.jobserver.integrationtests.util.TestHelper +import com.typesafe.config.Config + +class BasicApiTests extends FreeSpec with Matchers with BeforeAndAfterAllConfigMap { + + // Configuration + var SJS = "" + implicit val backend = HttpURLConnectionBackend() + + override def beforeAll(configMap: ConfigMap): Unit = { + val config = configMap.getRequired[Config]("config") + val jobservers = config.getStringList("jobserverAddresses") + SJS = jobservers.get(0) + } + + // Test environment + val bin = "tests.jar" + val streamingbin = "extras.jar" + val appName = "IntegrationTestApp" + val contextName = "IntegrationTestContext" + val app = "IntegrationTestTestsJar" + val streamingApp = "IntegrationTestStreamingApp" + val batchContextName = "IntegrationTestBatchContext" + val streamingContextName = "IntegrationTestStreamingContext" + val streamingMain = "spark.jobserver.StreamingTestJob" + + "/binaries" - { + var binaryUploadDate: DateTime = null + + "POST /binaries/ should upload a binary" in { + val byteArray = TestHelper.fileToByteArray(bin) + + val request = sttp.post(uri"$SJS/binaries/$appName") + .body(byteArray) + .contentType("application/java-archive") + val response = request.send() + response.code should equal(201) + } + + "GET /binaries should list all available binaries" in { + val request = sttp.get(uri"$SJS/binaries") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + val testbin = (json \ appName) + testbin.isDefined should equal(true) + binaryUploadDate = new DateTime((testbin \ "upload-time").as[String]) + } + + "GET /binaries/ should retrieve a specific binary" in { + val request = sttp.get(uri"$SJS/binaries/$appName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "app-name").as[String] should equal(appName) + (json \ "binary-type").as[String] should equal("Jar") + (json \ "upload-time").as[String] should equal(binaryUploadDate.toString) + } + + "POST /binaries/ should overwrite a binary with a new version" in { + val byteArray = TestHelper.fileToByteArray(streamingbin) + val request = sttp.post(uri"$SJS/binaries/$appName") + .body(byteArray) + .contentType("application/java-archive") + val response = request.send() + response.code should equal(201) + // See if date has been updated + val getrequest = sttp.get(uri"$SJS/binaries") + val getresponse = getrequest.send() + getresponse.code should equal(200) + val json = Json.parse(getresponse.body.merge) + val testbin = (json \ appName) + testbin.isDefined should equal(true) + val newUploadDate = new DateTime((testbin \ "upload-time").as[String]) + newUploadDate.isAfter(binaryUploadDate) should equal(true) + } + + "DELETE /binaries/ should delete all binary versions under this name" in { + val request = sttp.delete(uri"$SJS/binaries/$appName") + val response = request.send() + response.code should equal(200) + // See if not listed anymore + val getrequest = sttp.get(uri"$SJS/binaries") + val getresponse = getrequest.send() + getresponse.code should equal(200) + val json = Json.parse(getresponse.body.merge) + val testbin = (json \ appName) + testbin.isDefined should equal(false) + } + + "Error scenarios" - { + "GET /binaries/ should fail if the binary does not exist" in { + val request = sttp.get(uri"$SJS/binaries/$appName") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("Can't find binary with name") + } + + "DELETE /binaries should fail if the binary does not exist" in { + val request = sttp.delete(uri"$SJS/binaries/$appName") + val response = request.send() + response.code should equal(404) + } + } + + } + + "/contexts" - { + + "POST /contexts/ should create a new context" in { + val request = sttp.post(uri"$SJS/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + } + + "GET /contexts should list all contexts" in { + val request = sttp.get(uri"$SJS/contexts") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + val allContexts = json.as[List[String]] + allContexts.contains(contextName) should equal(true) + } + + "GET /contexts/ should retrieve infos about a context" in { + val request = sttp.get(uri"$SJS/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "name").as[String] should equal(contextName) + (json \ "state").as[String] should equal("RUNNING") + } + + "DELETE /contexts/ should delete a context" in { + val request = sttp.delete(uri"$SJS/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + // state finished? + TestHelper.waitForContextTermination(SJS, contextName) + val request2 = sttp.get(uri"$SJS/contexts/$contextName") + val response2 = request2.send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "state").as[String] should equal("FINISHED") + } + + "GET /contexts should not list deleted contexts" in { + val request = sttp.get(uri"$SJS/contexts") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + val allContexts = json.as[List[String]] + allContexts.contains(contextName) should equal(false) + } + + "Error scenarios" - { + "POST /contexts/ should fail if the context name already exists" in { + // Initial POST + val request = sttp.post(uri"$SJS/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + // Second POST + val response2 = request.send() + response2.code should equal(400) + val json2 = Json.parse(response2.body.merge) + (json2 \ "status").as[String] should equal("ERROR") + // Clean up again + sttp.delete(uri"$SJS/contexts/$contextName").send() + } + + "DELETE /contexts/ should fail if there is no such context" in { + val request = sttp.delete(uri"$SJS/contexts/$contextName") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + } + } + + } + + "/jobs" - { + + var adHocJobId: String = "" + var batchJobId: String = "" + var streamingJobId: String = "" + var jobContext: String = "" + + "(preparation) uploading job binaries for job testing should not fail" in { + val byteArray1 = TestHelper.fileToByteArray(bin) + val byteArray2 = TestHelper.fileToByteArray(streamingbin) + sttp.post(uri"$SJS/binaries/$app") + .body(byteArray1) + .contentType("application/java-archive") + .send() + .code should equal(201) + sttp.post(uri"$SJS/binaries/$streamingApp") + .body(byteArray2) + .contentType("application/java-archive") + .send(). + code should equal(201) + val response = sttp.get(uri"$SJS/binaries").send() + response.code should equal(200) + val binaries = Json.parse(response.body.merge) + (binaries \ app).isDefined should equal(true) + (binaries \ streamingApp).isDefined should equal(true) + } + + "adHoc jobs" - { + "POST /jobs?cp=..&mainClass=.. should start a job" in { + val request = sttp.post(uri"$SJS/jobs?cp=$app&mainClass=spark.jobserver.WordCountExample") + .body("input.string = a b c a b see") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + jobContext = (json \ "context").as[String] + jobContext should include("WordCountExample") + adHocJobId = (json \ "jobId").as[String] + } + + "the termination of the job should also terminate the adHoc context" in { + // Context finished? + TestHelper.waitForContextTermination(SJS, jobContext) + val request = sttp.get(uri"$SJS/contexts/$jobContext") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "state").as[String] should equal("FINISHED") + // Job finished? + val jobRequest = sttp.get(uri"$SJS/jobs/$adHocJobId") + val jobResponse = jobRequest.send() + jobResponse.code should equal(200) + val jobJson = Json.parse(jobResponse.body.merge) + (jobJson \ "status").as[String] should equal("FINISHED") + } + } + + "batch jobs" - { + "POST /jobs?context=&cp=..&mainClass=., should start a job in an " + + "existing (batch) context" in { + // Start context + jobContext = batchContextName + val contextRequest = sttp.post(uri"$SJS/contexts/$jobContext") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "status").as[String] should equal("SUCCESS") + // Start job + val request = sttp.post( + uri"$SJS/jobs?cp=$app&mainClass=spark.jobserver.WordCountExample&context=$jobContext") + .body("input.string = a b c a b see") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + (json \ "context").as[String] should equal(jobContext) + batchJobId = (json \ "jobId").as[String] + } + + "the termination of the job should not terminate the context" in { + TestHelper.waitForJobTermination(SJS, batchJobId) + val contextRequest = sttp.get(uri"$SJS/contexts/$jobContext") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "state").as[String] should equal("RUNNING") + // Cleanup + val deleteResponse = sttp.delete(uri"$SJS/contexts/$jobContext").send() + deleteResponse.code should equal(200) + } + } + + "streaming jobs" - { + "POST /jobs?context= should start a job in an existing (streaming) context" in { + // Start context + jobContext = batchContextName + val contextRequest = sttp.post( + uri"$SJS/contexts/$jobContext?context-factory=spark.jobserver.context.StreamingContextFactory") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "status").as[String] should equal("SUCCESS") + // Start job + val request = sttp.post(uri"$SJS/jobs?cp=$streamingApp&mainClass=$streamingMain&context=$jobContext") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + (json \ "context").as[String] should equal(jobContext) + streamingJobId = (json \ "jobId").as[String] + } + + "DELETE /jobs/ should stop a streaming job" in { + val request = sttp.delete(uri"$SJS/jobs/$streamingJobId") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("KILLED") + } + + "the termination of the streaming job should not terminate the streaming context" in { + Thread.sleep(10000) + // Job in state killed? + val requestJob = sttp.get(uri"$SJS/jobs/$streamingJobId") + var response = requestJob.send() + response.code should equal(200) + var json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("KILLED") + // Context running? + val contextRequest = sttp.get(uri"$SJS/contexts/$jobContext") + response = contextRequest.send() + response.code should equal(200) + json = Json.parse(response.body.merge) + (json \ "state").as[String] should equal("RUNNING") + // Cleanup + response = sttp.delete(uri"$SJS/contexts/$jobContext").send() + response.code should equal(200) + } + } + + "POST /jobs?cp=..&mainClass=.. should start a job from 2 binaries" in { + val request = sttp.post( + uri"$SJS/jobs?cp=$app,$streamingApp&mainClass=spark.jobserver.WordCountExample") + .body("input.string = a b c a b see") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + jobContext = (json \ "context").as[String] + jobContext should include("WordCountExample") + adHocJobId = (json \ "jobId").as[String] + } + + "POST /jobs should start a job by taking cp and mainClass values from config" in { + val request = sttp.post( + uri"$SJS/jobs") + .body( + s""" + |input.string = a b c a b see + |cp = $app + |mainClass=spark.jobserver.WordCountExample + |""".stripMargin) + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + jobContext = (json \ "context").as[String] + jobContext should include("WordCountExample") + adHocJobId = (json \ "jobId").as[String] + } + + "GET /jobs should list all jobs" in { + val request = sttp.get(uri"$SJS/jobs") + val response = request.send() + response.code should equal(200) + val allJobs = Json.parse(response.body.merge).as[List[JsObject]] + val jobCount = allJobs.length + jobCount should be >= 3 + allJobs.exists(o => (o \ "jobId").as[String] == adHocJobId) should equal(true) + allJobs.exists(o => (o \ "jobId").as[String] == batchJobId) should equal(true) + allJobs.exists(o => (o \ "jobId").as[String] == streamingJobId) should equal(true) + } + + "GET /jobs/ should show job information" in { + val request = sttp.get(uri"$SJS/jobs/$streamingJobId") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "jobId").as[String] should equal(streamingJobId) + } + + "GET /jobs//config should return the job config" in { + val request = sttp.get(uri"$SJS/jobs/$batchJobId/config") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + ((json \ "input") \ "string").as[String] should equal("a b c a b see") + } + + "legacy API (appName + classPath)" - { + "adHoc jobs" - { + "POST /jobs?appName=..&classPath=.. should start a job in adHoc context" in { + val request = sttp.post(uri"$SJS/jobs?appName=$app&classPath=spark.jobserver.WordCountExample") + .body("input.string = a b c a b see") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + jobContext = (json \ "context").as[String] + jobContext should include("WordCountExample") + adHocJobId = (json \ "jobId").as[String] + } + + "the termination of the job should also terminate the adHoc context" in { + // Context finished? + TestHelper.waitForContextTermination(SJS, jobContext) + val request = sttp.get(uri"$SJS/contexts/$jobContext") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "state").as[String] should equal("FINISHED") + // Job finished? + val jobRequest = sttp.get(uri"$SJS/jobs/$adHocJobId") + val jobResponse = jobRequest.send() + jobResponse.code should equal(200) + val jobJson = Json.parse(jobResponse.body.merge) + (jobJson \ "status").as[String] should equal("FINISHED") + } + } + + "batch jobs" - { + "POST /jobs?context= should start a job in an existing (batch) context" in { + // Start context + jobContext = batchContextName + val contextRequest = sttp.post(uri"$SJS/contexts/$jobContext") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "status").as[String] should equal("SUCCESS") + // Start job + val request = sttp.post( + uri"$SJS/jobs?appName=$app&classPath=spark.jobserver.WordCountExample&context=$jobContext") + .body("input.string = a b c a b see") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + (json \ "context").as[String] should equal(jobContext) + batchJobId = (json \ "jobId").as[String] + } + + "the termination of the job should not terminate the context" in { + // Job finished? + TestHelper.waitForJobTermination(SJS, batchJobId) + // Context running? + val contextRequest = sttp.get(uri"$SJS/contexts/$jobContext") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "state").as[String] should equal("RUNNING") + // Cleanup + val deleteResponse = sttp.delete(uri"$SJS/contexts/$jobContext").send() + deleteResponse.code should equal(200) + } + } + + "streaming jobs" - { + "POST /jobs?context= should start a job in an existing (streaming) context" in { + // Start context + jobContext = batchContextName + val contextRequest = sttp.post( + uri"$SJS/contexts/$jobContext?context-factory=spark.jobserver.context.StreamingContextFactory") + val contextResponse = contextRequest.send() + contextResponse.code should equal(200) + val contextJson = Json.parse(contextResponse.body.merge) + (contextJson \ "status").as[String] should equal("SUCCESS") + // Start job + val request = sttp.post( + uri"$SJS/jobs?appName=$streamingApp&classPath=$streamingMain&context=$jobContext") + val response = request.send() + response.code should equal(202) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("STARTED") + (json \ "context").as[String] should equal(jobContext) + streamingJobId = (json \ "jobId").as[String] + } + + "DELETE /jobs/ should stop a streaming job" in { + val request = sttp.delete(uri"$SJS/jobs/$streamingJobId") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("KILLED") + } + + "the termination of the streaming job should not terminate the streaming context" in { + Thread.sleep(10000) + // Job in state killed? + val requestJob = sttp.get(uri"$SJS/jobs/$streamingJobId") + var response = requestJob.send() + response.code should equal(200) + var json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("KILLED") + // Context running? + val contextRequest = sttp.get(uri"$SJS/contexts/$jobContext") + response = contextRequest.send() + response.code should equal(200) + json = Json.parse(response.body.merge) + (json \ "state").as[String] should equal("RUNNING") + // Cleanup + response = sttp.delete(uri"$SJS/contexts/$jobContext").send() + response.code should equal(200) + } + } + } + + "Error scenarios" - { + + "POST /jobs should fail if there is no such app" in { + val request = sttp.post( + uri"$SJS/jobs?appName=NonExistingAppName&classPath=spark.jobserver.WordCountExample") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("appName") + (json \ "result").as[String] should include("not found") + } + + "POST /jobs should fail if there is no such context" in { + val request = sttp.post( + uri"$SJS/jobs?appName=$streamingApp&classPath=$streamingMain&context=NonExistingContext") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("context") + (json \ "result").as[String] should include("not found") + } + + "GET /jobs/ should return an error if there is no such id" in { + val request = sttp.get(uri"$SJS/jobs/NonExistingId") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("No such job ID") + } + + "GET /jobs//config should return an error if there is no such id" in { + val request = sttp.get(uri"$SJS/jobs/NonExistingId/config") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("No such job ID") + } + + "DELETE /jobs/ should return an error if there is no such id" in { + val request = sttp.delete(uri"$SJS/jobs/NonExistingId") + val response = request.send() + response.code should equal(404) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("No such job ID") + } + + "POST /jobs should fail if configured improperly (cp is missing)" in { + val request = sttp.post(uri"$SJS/jobs?mainClass=spark.jobserver.WordCountExample") + val response = request.send() + response.code should equal(400) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("appName or cp parameters should be configured") + } + + "POST /jobs should fail if configured improperly (mainClass is missing)" in { + val request = sttp.post(uri"$SJS/jobs?cp=NonExistingAppName") + val response = request.send() + response.code should equal(400) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("mainClass parameter is missing") + } + + "POST /jobs should fail if configured improperly (cp and classPath)" in { + val request = sttp.post( + uri"$SJS/jobs?cp=NonExistingAppName&classPath=spark.jobserver.WordCountExample") + val response = request.send() + response.code should equal(400) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("mainClass parameter is missing") + } + + "POST /jobs should fail if configured improperly (appName and mainClass)" in { + val request = sttp.post( + uri"$SJS/jobs?appName=NonExistingAppName&mainClass=spark.jobserver.WordCountExample") + val response = request.send() + response.code should equal(400) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("ERROR") + (json \ "result").as[String] should include("classPath parameter is missing") + } + + "POST /jobs should return BadRequest if URI has invalid protocol" in { + val request = sttp.post(uri"$SJS/jobs?mainClass=spark.jobserver.WordCountExample") + .body( + s""" + |input.string = a b c a b see + |cp = "hdfs:///test/does/not/exist" + |mainClass=spark.jobserver.WordCountExample + |""".stripMargin) + val response = request.send() + response.code should equal(400) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("JOB LOADING FAILED: Malformed URL") + } + } + } + + override def afterAll(configMap: ConfigMap): Unit = { + // Clean up test entities in general + sttp.delete(uri"$SJS/binaries/$app") + sttp.delete(uri"$SJS/binaries/$streamingApp") + // Clean up test entities just in case something went wrong + sttp.delete(uri"$SJS/binaries/$appName") + sttp.delete(uri"$SJS/contexts/$contextName") + sttp.delete(uri"$SJS/contexts/$batchContextName") + sttp.delete(uri"$SJS/contexts/$streamingContextName") + } + +} \ No newline at end of file diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/CornerCasesTests.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/CornerCasesTests.scala new file mode 100644 index 000000000..34193e9f6 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/CornerCasesTests.scala @@ -0,0 +1,77 @@ +package spark.jobserver.integrationtests.tests + +import org.scalatest.BeforeAndAfterAllConfigMap +import org.scalatest.ConfigMap +import org.scalatest.FreeSpec +import org.scalatest.Matchers + +import com.softwaremill.sttp._ + +import play.api.libs.json.Json +import spark.jobserver.integrationtests.util.TestHelper +import com.typesafe.config.Config + +class CornerCasesTests extends FreeSpec with Matchers with BeforeAndAfterAllConfigMap { + + // Configuration + var SJS = "" + implicit val backend = HttpURLConnectionBackend() + + override def beforeAll(configMap: ConfigMap): Unit = { + val config = configMap.getRequired[Config]("config") + val jobservers = config.getStringList("jobserverAddresses") + SJS = jobservers.get(0) + } + + // Test environment + val bin = "tests.jar" + val deletionTestApp = "IntegrationTestDeletionTest" + + "DELETE /binaries should not delete binaries with running jobs" in { + var jobId: String = "" + + // upload binary + val byteArray = TestHelper.fileToByteArray(bin) + val response1 = sttp.post(uri"$SJS/binaries/$deletionTestApp") + .body(byteArray) + .contentType("application/java-archive") + .send() + response1.code should equal(201) + + // submit long running job + val response2 = sttp.post(uri"$SJS/jobs?appName=$deletionTestApp&classPath=spark.jobserver.LongPiJob") + .body("stress.test.longpijob.duration = 10") + .send() + response2.code should equal(202) + val json2 = Json.parse(response2.body.merge) + (json2 \ "status").as[String] should equal("STARTED") + jobId = (json2 \ "jobId").as[String] + + // try to delete binary + val response3 = sttp.delete(uri"$SJS/binaries/$deletionTestApp") + .send() + response3.code should equal(403) + val json3 = Json.parse(response3.body.merge) + val message = (json3 \ "result").as[String] + message should include("is in use") + message should include(jobId) + + // wait for job termination + TestHelper.waitForJobTermination(SJS, jobId, 15) + val request = sttp.get(uri"$SJS/jobs/$jobId") + val response = request.send() + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("FINISHED") + + // deletion should succeed finally + val response4 = sttp.delete(uri"$SJS/binaries/$deletionTestApp") + .send() + response4.code should equal(200) + } + + override def afterAll(configMap: ConfigMap): Unit = { + // Clean up test entities just in case something went wrong + sttp.delete(uri"$SJS/binaries/$deletionTestApp") + } + +} diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/HAFailoverTest.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/HAFailoverTest.scala new file mode 100644 index 000000000..4381682a3 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/HAFailoverTest.scala @@ -0,0 +1,146 @@ +package spark.jobserver.integrationtests.tests + +import org.scalatest.BeforeAndAfterAllConfigMap +import org.scalatest.ConfigMap +import org.scalatest.FreeSpec +import org.scalatest.Matchers + +import com.softwaremill.sttp._ +import com.typesafe.config.Config + +import play.api.libs.json.Json +import spark.jobserver.integrationtests.util.DeploymentController +import spark.jobserver.integrationtests.util.TestHelper + +class HAFailoverTest extends FreeSpec with Matchers with BeforeAndAfterAllConfigMap { + + // Configuration + var SJS1 = "" + var SJS2 = "" + implicit val backend = HttpURLConnectionBackend() + var controller: DeploymentController = _ + + override def beforeAll(configMap: ConfigMap): Unit = { + val config = configMap.getRequired[Config]("config") + val jobservers = config.getStringList("jobserverAddresses") + if (jobservers.size() < 2) { + println("You need to specify two jobserver addresses in the config to run HA tests.") + sys.exit(-1) + } + SJS1 = jobservers.get(0) + SJS2 = jobservers.get(1) + controller = DeploymentController.fromConfig(config) + // Restart second jobserver to make sure the first one is the singleton + controller.isJobserverUp(SJS2) should equal(true) + controller.stopJobserver(SJS2) + controller.isJobserverUp(SJS2) should equal(false) + controller.startJobserver(SJS2) + controller.isJobserverUp(SJS2) should equal(true) + } + + // Test artifacts + val contextName = "HAFailoverIntegrationTestContext" + + "Sustain one jobserver failure" - { + + "Stopping Jobserver 1 should succeed" in { + controller.isJobserverUp(SJS1) should equal(true) + controller.stopJobserver(SJS1) + controller.isJobserverUp(SJS1) should equal(false) + } + + // sample test representing every API call that is not routed through ClusterSupervisor + "GET /binaries should still work on Jobserver 2" in { + val request = sttp.get(uri"$SJS2/binaries") + val response = request.send() + response.code should equal(200) + } + + "waiting for 30s until the cluster has fully recovered..." in { + Thread.sleep(30000) + } + + "POST /contexts should still work on Jobserver 2" in { + val request = sttp.post(uri"$SJS2/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + } + + "GET /contexts should still work on Jobserver 2" in { + val request = sttp.get(uri"$SJS2/contexts") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + val allContexts = json.as[List[String]] + allContexts.contains(contextName) should equal(true) + } + + "DELETE /contexts should still work on Jobserver 2" in { + val request = sttp.delete(uri"$SJS2/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + // status finished? + TestHelper.waitForContextTermination(SJS2, contextName) + val request2 = sttp.get(uri"$SJS2/contexts/$contextName") + val response2 = request2.send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "state").as[String] should equal("FINISHED") + } + + "Restart of Jobserver 1 should succeed" in { + controller.isJobserverUp(SJS1) should equal(false) + controller.startJobserver(SJS1) + controller.isJobserverUp(SJS1) should equal(true) + } + + "GET /binaries should work again on Jobserver 1" in { + val request = sttp.get(uri"$SJS1/binaries") + val response = request.send() + response.code should equal(200) + } + + "POST /contexts should work again on Jobserver 1" in { + val request = sttp.post(uri"$SJS1/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + } + + "GET /contexts should work again on Jobserver 1" in { + val request = sttp.get(uri"$SJS1/contexts") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + val allContexts = json.as[List[String]] + allContexts.contains(contextName) should equal(true) + } + + "DELETE /contexts should work again on Jobserver 1" in { + val request = sttp.delete(uri"$SJS1/contexts/$contextName") + val response = request.send() + response.code should equal(200) + val json = Json.parse(response.body.merge) + (json \ "status").as[String] should equal("SUCCESS") + // status finished? + TestHelper.waitForContextTermination(SJS1, contextName) + val request2 = sttp.get(uri"$SJS1/contexts/$contextName") + val response2 = request2.send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "state").as[String] should equal("FINISHED") + } + + } + + override def afterAll(configMap: ConfigMap): Unit = { + // Clean up used contexts + sttp.delete(uri"$SJS1/binaries/$contextName") + } + +} diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/TwoJobserversTests.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/TwoJobserversTests.scala new file mode 100644 index 000000000..52ebc3b11 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/tests/TwoJobserversTests.scala @@ -0,0 +1,103 @@ +package spark.jobserver.integrationtests.tests + +import org.scalatest.BeforeAndAfterAllConfigMap +import org.scalatest.ConfigMap +import org.scalatest.FreeSpec +import org.scalatest.Matchers + +import com.softwaremill.sttp._ +import com.typesafe.config.Config + +import play.api.libs.json.Json +import spark.jobserver.integrationtests.util.TestHelper + +class TwoJobserverTests extends FreeSpec with Matchers with BeforeAndAfterAllConfigMap { + + // Configuration + var SJS1 = "" + var SJS2 = "" + implicit val backend = HttpURLConnectionBackend() + + override def beforeAll(configMap: ConfigMap): Unit = { + val config = configMap.getRequired[Config]("config") + val jobservers = config.getStringList("jobserverAddresses") + if (jobservers.size() < 2) { + println("You need to specify two jobserver addresses in the config to run HA tests.") + sys.exit(-1) + } + SJS1 = jobservers.get(0) + SJS2 = jobservers.get(1) + } + + // Test artifacts + val context1 = "HAIntegrationTestContext1" + val context2 = "HAIntegrationTestContext2" + + "Synchronize contexts across jobservers" - { + + "POST /context on sjs1 should be visible on sjs2" in { + // Create on SJS1 + val response1 = sttp.post(uri"$SJS1/contexts/$context1").send() + response1.code should equal(200) + val json1 = Json.parse(response1.body.merge) + (json1 \ "status").as[String] should equal("SUCCESS") + // Read from SJS2 + val response2 = sttp.get(uri"$SJS2/contexts/$context1").send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "name").as[String] should equal(context1) + (json2 \ "state").as[String] should equal("RUNNING") + } + + "POST /context on sjs2 should be visible on sjs1" in { + // Create on SJS2 + val response1 = sttp.post(uri"$SJS2/contexts/$context2").send() + response1.code should equal(200) + val json1 = Json.parse(response1.body.merge) + (json1 \ "status").as[String] should equal("SUCCESS") + // Read from SJS1 + val response2 = sttp.get(uri"$SJS1/contexts/$context2").send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "name").as[String] should equal(context2) + (json2 \ "state").as[String] should equal("RUNNING") + } + + "DELETE /context on sjs1 should be visible on sjs2" in { + // DELETE on SJS1 + val response1 = sttp.delete(uri"$SJS1/contexts/$context2").send() + response1.code should equal(200) + val json1 = Json.parse(response1.body.merge) + (json1 \ "status").as[String] should equal("SUCCESS") + // Read from SJS2 + TestHelper.waitForContextTermination(SJS2, context2) + val response2 = sttp.get(uri"$SJS2/contexts/$context2").send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "name").as[String] should equal(context2) + (json2 \ "state").as[String] should equal("FINISHED") + } + + "DELETE /context on sjs2 should be visible on sjs1" in { + // DELETE on SJS2 + val response1 = sttp.delete(uri"$SJS2/contexts/$context1").send() + response1.code should equal(200) + val json1 = Json.parse(response1.body.merge) + (json1 \ "status").as[String] should equal("SUCCESS") + // Read from SJS1 + TestHelper.waitForContextTermination(SJS1, context1) + val response2 = sttp.get(uri"$SJS1/contexts/$context1").send() + response2.code should equal(200) + val json2 = Json.parse(response2.body.merge) + (json2 \ "name").as[String] should equal(context1) + (json2 \ "state").as[String] should equal("FINISHED") + } + + } + + override def afterAll(configMap: ConfigMap): Unit = { + sttp.delete(uri"$SJS1/contexts/$context1") + sttp.delete(uri"$SJS1/contexts/$context2") + } + +} \ No newline at end of file diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/DeploymentController.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/DeploymentController.scala new file mode 100644 index 000000000..dcbe22013 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/DeploymentController.scala @@ -0,0 +1,40 @@ +package spark.jobserver.integrationtests.util + +import com.typesafe.config.Config + +/** + * Interface to abstract the handling (shutdown & start) of a jobserver deployment. + * + * The id passed in the methods can be any kind of identification (e.g. IP, VM Id, ...) + * + */ +abstract class DeploymentController(config: Config) { + + def stopJobserver(id: String): Boolean + + def startJobserver(id: String): Boolean + + def isJobserverUp(id: String): Boolean + +} + +object DeploymentController { + /** + * Load a concrete implementation of a DeploymentController which is specified in the config file. + */ + def fromConfig(config: Config): DeploymentController = { + val packageName = this.getClass.getPackage.getName + try { + val className = config.getString("deploymentController") + val clazz = Class.forName(s"$packageName.$className") + clazz.getDeclaredConstructor(Class.forName("com.typesafe.config.Config")) + .newInstance(config).asInstanceOf[DeploymentController] + } catch { + case t: Throwable => + println(s"DeploymentController specified in config is invalid:") + t.printStackTrace() + println("Aborting tests.") + sys.exit(-1) + } + } +} \ No newline at end of file diff --git a/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/TestHelper.scala b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/TestHelper.scala new file mode 100644 index 000000000..c7e175aa9 --- /dev/null +++ b/job-server-integration-tests/src/main/scala/spark/jobserver/integrationtests/util/TestHelper.scala @@ -0,0 +1,90 @@ +package spark.jobserver.integrationtests.util + +import java.security.KeyManagementException +import java.security.NoSuchAlgorithmException +import java.security.SecureRandom +import java.security.cert.X509Certificate + +import com.softwaremill.sttp._ + +import javax.net.ssl.HostnameVerifier +import javax.net.ssl.HttpsURLConnection +import javax.net.ssl.KeyManager +import javax.net.ssl.SSLContext +import javax.net.ssl.SSLSession +import javax.net.ssl.TrustManager +import javax.net.ssl.X509TrustManager +import play.api.libs.json.Json + +object TestHelper { + + def fileToByteArray(fileName: String): Array[Byte] = { + try { + val stream = getClass().getResourceAsStream(s"/$fileName") + Iterator continually stream.read takeWhile (-1 !=) map (_.toByte) toArray + } catch { + case e: Exception => + println(s"Could not open $fileName.") + e.printStackTrace() + sys.exit(-1) + } + } + + def waitForContextTermination(sjs: String, contextName: String, retries: Int = 10) { + implicit val backend = HttpURLConnectionBackend() + val SLEEP_BETWEEN_RETRIES = 1000; + val request = sttp.get(uri"$sjs/contexts/$contextName") + var response = request.send() + var json = Json.parse(response.body.merge) + var state = (json \ "state").as[String] + var retriesLeft = retries + while (state != "FINISHED" && retriesLeft > 0) { + Thread.sleep(SLEEP_BETWEEN_RETRIES) + response = request.send() + json = Json.parse(response.body.merge) + state = (json \ "state").as[String] + retriesLeft -= 1 + } + } + + def waitForJobTermination(sjs: String, jobId: String, retries: Int = 10) { + implicit val backend = HttpURLConnectionBackend() + val request = sttp.get(uri"$sjs/jobs/$jobId") + var response = request.send() + var json = Json.parse(response.body.merge) + var state = (json \ "status").as[String] + var retriesLeft = retries + while (state != "FINISHED" && retriesLeft > 0) { + Thread.sleep(1000) + response = request.send() + json = Json.parse(response.body.merge) + state = (json \ "status").as[String] + retriesLeft -= 1 + } + } + + def disableSSLVerification(): Unit = { + val trustAllCerts = Array[TrustManager](new X509TrustManager() { + def getAcceptedIssuers: Array[X509Certificate] = Array[X509Certificate]() + override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String): Unit = () + override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String): Unit = () + }) + + var sc: SSLContext = null + try { + sc = SSLContext.getInstance("TLS") + sc.init(Array[KeyManager](), trustAllCerts, new SecureRandom()) + } catch { + case e: KeyManagementException => + e.printStackTrace() + case e: NoSuchAlgorithmException => + e.printStackTrace() + } + HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory) + val allHostsValid = new HostnameVerifier { + override def verify(s: String, sslSession: SSLSession): Boolean = true + } + HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid) + } + +} \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0746362a2..57d2ad357 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -87,6 +87,13 @@ object Dependencies { "org.apache.curator" % "curator-test" % curatorTest % Test excludeAll(excludeGuava) ) + lazy val integrationTestDeps = Seq( + "com.typesafe" % "config" % typeSafeConfig, + "org.scalatest" %% "scalatest" % scalaTest, + "com.softwaremill.sttp" %% "core" % "1.6.3", + "com.typesafe.play" %% "play-json" % "2.7.4" + ) + lazy val securityDeps = Seq( "org.apache.shiro" % "shiro-core" % shiro )