Skip to content

Commit

Permalink
Merge pull request #164 from spark-jobserver/velvia/use-spark-submit
Browse files Browse the repository at this point in the history
Use spark-submit instead of compute-classpath.sh
  • Loading branch information
velvia committed Jun 15, 2015
2 parents 39e4b6e + 0067f56 commit dca77e4
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 15 deletions.
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -46,6 +46,8 @@ For release notes, look in the `notes/` directory. They should also be up on [l

## Quick start / development mode

NOTE: This quick start guide uses SBT to run the job server and the included test jar, but the normal development process is to create a separate project for Job Server jobs and to deploy the job server to a Spark cluster.

You need to have [SBT](http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html) installed.

To set the current version, do something like this:
Expand Down Expand Up @@ -231,6 +233,8 @@ def validate(sc:SparkContext, config: Contig): SparkJobValidation = {
it to the remotes you have configured in `<environment>.sh`
3. On the remote server, start it in the deployed directory with `server_start.sh` and stop it with `server_stop.sh`

The `server_start.sh` script uses `spark-submit` under the hood and may be passed any of the standard extra arguments from `spark-submit`.

NOTE: by default the assembly jar from `job-server-extras`, which includes support for SQLContext and HiveContext, is used. If you face issues with all the extra dependencies, consider modifying the install scripts to invoke `sbt job-server/assembly` instead, which doesn't include the extra dependencies.

Note: to test out the deploy to a local staging dir, or package the job server for Mesos,
Expand Down
22 changes: 8 additions & 14 deletions bin/server_start.sh
@@ -1,5 +1,7 @@
#!/bin/bash
# Script to start the job server
# Extra arguments will be spark-submit options, for example
# ./server_start.sh --jars cassandra-spark-connector.jar
set -e

get_abs_script_path() {
Expand All @@ -16,7 +18,7 @@ GC_OPTS="-XX:+UseConcMarkSweepGC
-XX:MaxPermSize=512m
-XX:+CMSClassUnloadingEnabled "

JAVA_OPTS="-Xmx5g -XX:MaxDirectMemorySize=512M
JAVA_OPTS="-XX:MaxDirectMemorySize=512M
-XX:+HeapDumpOnOutOfMemoryError -Djava.net.preferIPv4Stack=true
-Dcom.sun.management.jmxremote.port=9999
-Dcom.sun.management.jmxremote.authenticate=false
Expand All @@ -42,13 +44,6 @@ if [ -z "$SPARK_HOME" ]; then
exit 1
fi

if [ -z "$SPARK_CONF_DIR" ]; then
SPARK_CONF_DIR=$SPARK_HOME/conf
fi

# Pull in other env vars in spark config, such as MESOS_NATIVE_LIBRARY
. $SPARK_CONF_DIR/spark-env.sh

pidFilePath=$appdir/$PIDFILE

if [ -f "$pidFilePath" ] && kill -0 $(cat "$pidFilePath"); then
Expand All @@ -62,7 +57,7 @@ if [ -z "$LOG_DIR" ]; then
fi
mkdir -p $LOG_DIR

LOGGING_OPTS="-Dlog4j.configuration=log4j-server.properties
LOGGING_OPTS="-Dlog4j.configuration=file:$appdir/log4j-server.properties
-DLOG_DIR=$LOG_DIR"

# For Mesos
Expand All @@ -78,9 +73,8 @@ fi
# This needs to be exported for standalone mode so drivers can connect to the Spark cluster
export SPARK_HOME

# job server jar needs to appear first so its deps take higher priority
# need to explicitly include app dir in classpath so logging configs can be found
CLASSPATH="$appdir:$appdir/spark-job-server.jar:$($SPARK_HOME/bin/compute-classpath.sh)"

exec java -cp $CLASSPATH $GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES $MAIN $conffile 2>&1 &
$SPARK_HOME/bin/spark-submit --class $MAIN --driver-memory 5G \
--conf "spark.executor.extraJavaOptions=$LOGGING_OPTS" \
--driver-java-options "$GC_OPTS $JAVA_OPTS $LOGGING_OPTS $CONFIG_OVERRIDES" \
$@ $appdir/spark-job-server.jar $conffile 2>&1 &
echo $! > $pidFilePath
17 changes: 17 additions & 0 deletions doc/troubleshooting.md
Expand Up @@ -16,6 +16,19 @@ send timeout param along with your request (in secs). eg below.
http://devsparkcluster.cloudapp.net/jobs?appName=job-server-tests&classPath=spark.jobserver.WordCountExample&sync=true&timeout=20
```

You may need to adjust Spray's default request timeout and idle timeout, which are by default 40 secs and 60 secs. To do this, modify the configuration file in your deployed job server, adding a section like the following:

```
spray.can.server {
idle-timeout = 210 s
request-timeout = 200 s
}
```

Then simply restart the job server.

Note that the idle-timeout must be higher than request-timeout, or Spray and the job server won't start.

## Job server won't start / cannot bind to 0.0.0.0:8090

Check that another process isn't already using that port. If it is, you may want to start it on another port:
Expand All @@ -33,6 +46,10 @@ after this fixed, I can run jobs submitted from a remote job server successfully

(Thanks to @pcliu)

## Exception in thread "main" java.lang.NoSuchMethodError: akka.actor.ActorRefFactory.dispatcher()Lscala/concurrent/ExecutionContextExecutor;

If you are running CDH 5.3 or older, you may have an incompatible version of Akka bundled together. :( Try modifying the version of Akka included with spark-jobserver to match the one in CDH (2.2.4, I think), or upgrade to CDH 5.4. If you are on CDH 5.4, check that `sparkVersion` in `Dependencies.scala` matches CDH. Or see [isse #154](https://github.com/spark-jobserver/spark-jobserver/issues/154).

## I want to run job-server on Windows

1. Create directory `C:\Hadoop\bin`
Expand Down
1 change: 1 addition & 0 deletions job-server/src/spark.jobserver/JobServer.scala
Expand Up @@ -41,6 +41,7 @@ object JobServer {
defaultConfig
}
logger.info("Starting JobServer with config {}", config.getConfig("spark").root.render())
logger.info("Spray config: {}", config.getConfig("spray.can.server").root.render())
val port = config.getInt("spark.jobserver.port")

// TODO: Hardcode for now to get going. Make it configurable later.
Expand Down
4 changes: 3 additions & 1 deletion job-server/src/spark.jobserver/WebApi.scala
Expand Up @@ -42,7 +42,6 @@ class WebApi(system: ActorSystem,
val ResultKey = "result"

val contextTimeout = SparkJobUtils.getContextTimeout(config)
val sparkAliveWorkerThreshold = Try(config.getInt("spark.jobserver.sparkAliveWorkerThreshold")).getOrElse(1)
val bindAddress = config.getString("spark.jobserver.bind-address")

val logger = LoggerFactory.getLogger(getClass)
Expand Down Expand Up @@ -338,6 +337,9 @@ class WebApi(system: ActorSystem,
}
}

override def timeoutRoute: Route =
complete(500, errMap("Request timed out. Try using the /jobs/<jobID>, /jobs APIs to get status/results"))

private def badRequest(ctx: RequestContext, msg: String) =
ctx.complete(StatusCodes.BadRequest, errMap(msg))

Expand Down

0 comments on commit dca77e4

Please sign in to comment.