From 37f02978e8e247b213865be74c7afcd1d46dd4ed Mon Sep 17 00:00:00 2001 From: Evan Chan Date: Sun, 8 Dec 2013 10:14:22 -0800 Subject: [PATCH] Add WordCountExample and a walk-through to the README --- jobserver/README.md | 81 ++++++++++++++++--- .../spark.jobserver/WordCountExample.scala | 34 ++++++++ 2 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 jobserver/src/test/scala/spark.jobserver/WordCountExample.scala diff --git a/jobserver/README.md b/jobserver/README.md index 54b89bd3bd1..917cc253400 100644 --- a/jobserver/README.md +++ b/jobserver/README.md @@ -1,5 +1,16 @@ spark-job-server provides a RESTful interface for submitting and managing Spark jobs, jars, and job contexts. +## Features + +- *"Spark as a Service"*: Simple REST interface for all aspects of job, context management +- Supports sub-second low-latency jobs via long-running job contexts +- Start and stop job contexts for RDD sharing and low-latency jobs; change resources on restart +- Kill running jobs via stop context +- Separate jar uploading step for faster job startup +- Asynchronous and synchronous job API. Synchronous API is great for low latency jobs! +- Works with Standalone Spark as well as Mesos +- Job and jar info is persisted via a pluggable DAO interface + ## Quick start / development mode From SBT shell, simply type "re-start". This uses a default configuration file. An optional argument is a @@ -12,16 +23,68 @@ Note that re-start (SBT Revolver) forks the job server in a separate process. I type re-start again at the SBT shell prompt, it will compile your changes and restart the jobserver. It enables very fast turnaround cycles. -## Features +### WordCountExample walk-through -- *"Spark as a Service"*: Simple REST interface for all aspects of job, context management -- Supports sub-second low-latency jobs via long-running job contexts -- Start and stop job contexts for RDD sharing and low-latency jobs; change resources on restart -- Kill running jobs via stop context -- Separate jar uploading step for faster job startup -- Asynchronous and synchronous job API. Synchronous API is great for low latency jobs! -- Works with Standalone Spark as well as Mesos -- Job and jar info is persisted via a pluggable DAO interface +First, to package the test jar containing the WordCountExample: `sbt/sbt jobserver/test:package`. +Then go ahead and start the job server using the instructions above. + +Let's upload the jar: + + curl --data-binary @jobserver/target/scala-2.9.3/spark-job-server_2.9.3-0.9.0-incubating-SNAPSHOT-test.jar localhost:8090/jars/test + OK⏎ + +The above jar is uploaded as app `test`. Next, let's start an ad-hoc word count job, meaning that the job +server will create its own SparkContext, and return a job ID for subsequent querying: + + curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample' + { + "status": "STARTED", + "result": { + "jobId": "5453779a-f004-45fc-a11d-a39dae0f9bf4", + "context": "b7ea0eb5-spark.jobserver.WordCountExample" + } + }⏎ + +From this point, you could asynchronously query the status and results: + + curl localhost:8090/jobs/5453779a-f004-45fc-a11d-a39dae0f9bf4 + { + "status": "OK", + "result": { + "a": 2, + "b": 2, + "c": 1, + "see": 1 + } + }⏎ + +Note that you could append `&sync=true` when you POST to /jobs to get the results back in one request, but for +real clusters and most jobs this may be too slow. + +Another way of running this job is in a pre-created context. Start a new context: + + curl -d "" 'localhost:8090/contexts/test-context?numCores=2&memPerNode=512m' + OK⏎ + +You can verify that the context has been created: + + curl localhost:8090/contexts + ["test-context"]⏎ + +Now let's run the job in the context and get the results back right away: + + curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample&context=test-context&sync=true' + { + "status": "OK", + "result": { + "a": 2, + "b": 2, + "c": 1, + "see": 1 + } + }⏎ + +Note the addition of `context=` and `sync=true`. ## Architecture diff --git a/jobserver/src/test/scala/spark.jobserver/WordCountExample.scala b/jobserver/src/test/scala/spark.jobserver/WordCountExample.scala new file mode 100644 index 00000000000..f76100635ee --- /dev/null +++ b/jobserver/src/test/scala/spark.jobserver/WordCountExample.scala @@ -0,0 +1,34 @@ +package spark.jobserver + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import scala.util.Try + +/** + * A super-simple Spark job example that implements the SparkJob trait and can be submitted to the job server. + * + * Set the config with the sentence to split or count: + * input.string = "adsfasdf asdkf safksf a sdfa" + * + * validate() returns SparkJobInvalid if there is no input.string + */ +object WordCountExample extends SparkJob { + def main(args: Array[String]) { + val sc = new SparkContext("local[4]", "WordCountExample") + val config = ConfigFactory.parseString("") + val results = runJob(sc, config) + println("Result is " + results) + } + + override def validate(sc: SparkContext, config: Config): SparkJobValidation = { + Try(config.getString("input.string")) + .map(x => SparkJobValid) + .getOrElse(SparkJobInvalid("No input.string config param")) + } + + override def runJob(sc: SparkContext, config: Config): Any = { + val dd = sc.parallelize(config.getString("input.string").split(" ").toSeq) + dd.map((_, 1)).reduceByKey(_ + _).collect().toMap + } +}