Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Jobserver to 2.4.4 Spark #1283

Merged
merged 8 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN sbt update
# add the rest of the code
COPY . .

ENV SPARK_HOME /tmp/spark-2.3.2-bin-hadoop2.7
ENV SPARK_HOME /tmp/spark-2.4.4-bin-hadoop2.7
ENV JAVA_OPTIONS "-Xmx1500m -XX:MaxPermSize=512m -Dakka.test.timefactor=3"

CMD ["/usr/src/app/run_tests.sh"]
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Spark Job Server is included in Datastax Enterprise!
| 0.6.2 | 1.6.1 |
| 0.7.0 | 1.6.2 |
| 0.8.0 | 2.2.0 |
| 0.9.0-SNAPSHOT | 2.3.2 |
| 0.9.0-SNAPSHOT | 2.4.4 |

For release notes, look in the `notes/` directory.

Expand Down
2 changes: 1 addition & 1 deletion ci/install-spark.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env bash
set -e
curl -L -o /tmp/spark.tgz http://archive.apache.org/dist/spark/spark-2.3.2/spark-2.3.2-bin-hadoop2.7.tgz
curl -L -o /tmp/spark.tgz http://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar -xvzf /tmp/spark.tgz -C /tmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package spark.jobserver.util

import org.apache.commons.lang.RandomStringUtils
import py4j.GatewayServer
import py4j.GatewayServer.GatewayServerBuilder

class JobserverPy4jGateway() {
private var server: GatewayServer = _
private val py4jToken = RandomStringUtils.randomAlphanumeric(50)

def getGatewayPort(endpoint: Any): String = {
server = new GatewayServerBuilder()
.entryPoint(endpoint)
.javaPort(0)
.authToken(py4jToken)
.build()

//Server runs asynchronously on a dedicated thread. See Py4J source for more detail
server.start()
server.getListeningPort.toString
}

def getToken(): String = {
this.py4jToken
}

def getGateway(): GatewayServer = {
this.server
}

def stop(): Unit = {
server.shutdown()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@

public class JSessionTestLoaderJob extends JSessionJob<Long> {

private final String tableCreate = "CREATE TABLE `default`.`test_addresses`";
private final String tableCreate = "CREATE TABLE `test_addresses`";
private final String tableArgs = "(`firstName` String, `lastName` String, `address` String, `city` String)";
private final String tableRowFormat = "ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'";
private final String tableColFormat = "COLLECTION ITEMS TERMINATED BY '\002'";
private final String tableMapFormat = "MAP KEYS TERMINATED BY '\003' STORED";
private final String tableAs = "AS TextFile";
private final String tableAs = "AS TextFile LOCATION 'tmp/jobserver-java-hive-test'";
private final String loadPath = "'src/main/resources/hive_test_job_addresses.txt'";

@Override
public Long run(SparkSession spark, JobEnvironment runtime, Config data) {
spark.sql("DROP TABLE if exists `default`.`test_addresses`");
spark.sql("DROP TABLE if exists `test_addresses`");
spark.sql(String.format("%s %s %s %s %s %s", tableCreate, tableArgs, tableRowFormat, tableColFormat, tableMapFormat, tableAs));
spark.sql(String.format("LOAD DATA LOCAL INPATH %s OVERWRITE INTO TABLE `default`.`test_addresses`", loadPath));
spark.sql(String.format("LOAD DATA LOCAL INPATH %s OVERWRITE INTO TABLE `test_addresses`", loadPath));

final Dataset<Row> addrRdd = spark.sql("SELECT * FROM `default`.`test_addresses`");
final Dataset<Row> addrRdd = spark.sql("SELECT * FROM `test_addresses`");
return addrRdd.count();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ object SessionLoaderTestJob extends SparkSessionJob {
type JobData = Config
type JobOutput = Long

val tableCreate = "CREATE TABLE `default`.`test_addresses`"
val tableCreate = "CREATE TABLE `test_addresses`"
val tableArgs = "(`firstName` String, `lastName` String, `address` String, `city` String)"
val tableRowFormat = "ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'"
val tableColFormat = "COLLECTION ITEMS TERMINATED BY '\u0002'"
val tableMapFormat = "MAP KEYS TERMINATED BY '\u0003' STORED"
val tableAs = "AS TextFile"
val tableAs = "AS TextFile LOCATION 'tmp/jobserver-scala-hive-test'"

val loadPath = s"'src/main/resources/hive_test_job_addresses.txt'"

def validate(spark: SparkSession, runtime: JobEnvironment, config: Config):
JobData Or Every[ValidationProblem] = Good(config)

def runJob(spark: SparkSession, runtime: JobEnvironment, config: JobData): JobOutput = {
spark.sql("DROP TABLE if exists `default`.`test_addresses`")
spark.sql("DROP TABLE if exists `test_addresses`")
spark.sql(s"$tableCreate $tableArgs $tableRowFormat $tableColFormat $tableMapFormat $tableAs")

spark.sql(s"LOAD DATA LOCAL INPATH $loadPath OVERWRITE INTO TABLE `default`.`test_addresses`")
val addrRdd: DataFrame = spark.sql("SELECT * FROM `default`.`test_addresses`")
spark.sql(s"LOAD DATA LOCAL INPATH $loadPath OVERWRITE INTO TABLE `test_addresses`")
val addrRdd: DataFrame = spark.sql("SELECT * FROM `test_addresses`")
addrRdd.count()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.SparkConf
import spark.jobserver.ContextLike
import spark.jobserver.japi.{BaseJavaJob, JSessionJob, JStreamingJob}
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}

class JavaSessionContextFactory extends JavaContextFactory {
type C = SparkSessionContextLikeWrapper
Expand All @@ -16,15 +16,21 @@ class JavaSessionContextFactory extends JavaContextFactory {
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
val builder = SparkSession.builder()
builder.config(sparkConf).appName(contextName)
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => logger.warn(s"Hive support not enabled - ${e.getMessage()}")
}
setupHiveSupport(config, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(config)) spark.sparkContext.hadoopConfiguration.set(k, v)
SparkSessionContextLikeWrapper(spark)
}

protected def setupHiveSupport(config: Config, builder: SparkSession.Builder) = {
if (config.getBoolean(JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED)) {
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => logger.warn(s"Hive support not enabled - ${e.getMessage()}")
}
}
}
}

class JavaStreamingContextFactory extends JavaContextFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import spark.jobserver.{ContextLike, SparkSessionJob}
import spark.jobserver.api.SparkJobBase
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}
import org.slf4j.LoggerFactory

case class SparkSessionContextLikeWrapper(spark: SparkSession) extends ContextLike {
Expand All @@ -28,13 +28,19 @@ class SessionContextFactory extends ScalaContextFactory {
def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
val builder = SparkSession.builder()
builder.config(sparkConf).appName(contextName)
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => logger.warn(s"Hive support not enabled - ${e.getMessage()}")
}
setupHiveSupport(config, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(config)) spark.sparkContext.hadoopConfiguration.set(k, v)
SparkSessionContextLikeWrapper(spark)
}

protected def setupHiveSupport(config: Config, builder: SparkSession.Builder) = {
if (config.getBoolean(JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED)) {
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => logger.warn(s"Hive support not enabled - ${e.getMessage()}")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalactic.{Bad, Good, Or}
import org.slf4j.LoggerFactory
import spark.jobserver._
import spark.jobserver.context.{JobLoadError, LoadingError, SparkContextFactory}
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -179,15 +179,21 @@ class PythonSessionContextFactory extends PythonContextFactory {
contextName: String): C = {
val builder = SparkSession.builder().config(sparkConf.set("spark.yarn.isPython", "true"))
builder.appName(contextName)
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => println(s"Hive support not enabled - ${e.getMessage()}")
}
setupHiveSupport(contextConfig, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(contextConfig))
spark.sparkContext.hadoopConfiguration.set(k, v)
context = PythonSessionContextLikeWrapper(spark, contextConfig)
context
}

protected def setupHiveSupport(config: Config, builder: SparkSession.Builder) = {
if (config.getBoolean(JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED)) {
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => println(s"Hive support not enabled - ${e.getMessage()}")
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package spark.jobserver.python

import com.typesafe.config.Config
import org.scalactic.{Good, Every, Or}
import org.scalactic.{Every, Good, Or}
import org.slf4j.LoggerFactory
import py4j.GatewayServer
import spark.jobserver.api.{SparkJobBase, ValidationProblem, JobEnvironment}
import spark.jobserver.api.{JobEnvironment, SparkJobBase, ValidationProblem}
import spark.jobserver.util.JobserverPy4jGateway

import scala.sys.process.{ProcessLogger, Process}
import scala.sys.process.{Process, ProcessLogger}
import scala.util.{Failure, Success, Try}

case class PythonJob[X <: PythonContextLike](eggPath: String,
Expand Down Expand Up @@ -53,16 +53,18 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
override def runJob(sc: X, runtime: JobEnvironment, data: Config): Any = {
logger.info(s"Running $modulePath from $eggPath")
val ep = endpoint(sc, runtime.contextConfig, runtime.jobId, data)
val server = new GatewayServer(ep, 0)
val pythonPathDelimiter : String = if (System.getProperty("os.name").indexOf("Win") >= 0) ";" else ":"
val pythonPath = (eggPath +: sc.pythonPath).mkString(pythonPathDelimiter)
logger.info(s"Using Python path of ${pythonPath}")

val jobserverPy4jGateway = new JobserverPy4jGateway()

val subProcessOutcome = Try {
//Server runs asynchronously on a dedicated thread. See Py4J source for more detail
server.start()
val gatewayPort = jobserverPy4jGateway.getGatewayPort(ep)
val process =
Process(
Seq(sc.pythonExecutable, "-m", "sparkjobserver.subprocess", server.getListeningPort.toString),
Process(Seq(sc.pythonExecutable, "-m", "sparkjobserver.subprocess",
gatewayPort, jobserverPy4jGateway.getToken()),
None,
"EGGPATH" -> eggPath,
"PYTHONPATH" -> pythonPath,
Expand All @@ -87,7 +89,7 @@ case class PythonJob[X <: PythonContextLike](eggPath: String,
throw new Exception(s"Python job failed with error code $errorCode and standard err [$err]")
}
}
server.shutdown()
jobserverPy4jGateway.stop()
subProcessOutcome match {
case Success(res) => res
case Failure(ex) => throw ex
Expand Down
5 changes: 5 additions & 0 deletions job-server-extras/src/test/resources/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
<value>jdbc:derby:;databaseName=/tmp/metastore_db;create=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/tmp/hive_metastore_db</value>
<description>Unit test data goes in here on your local filesystem.</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.apache.derby.jdbc.EmbeddedDriver</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,24 @@ package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import spark.jobserver.context.{JavaContextFactory, SparkSessionContextLikeWrapper}
import spark.jobserver.context.{JavaContextFactory, JavaSessionContextFactory, SparkSessionContextLikeWrapper}
import spark.jobserver.japi.{BaseJavaJob, JSessionJob}
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}
import org.apache.spark.sql.Row
import spark.jobserver.CommonMessages.JobResult
import spark.jobserver.io.JobDAOActor
import spark.jobserver.common.akka.AkkaTestUtils

import scala.concurrent.duration._


class JavaTestSessionContextFactory extends JavaContextFactory {
type C = SparkSessionContextLikeWrapper

def isValidJob(job: BaseJavaJob[_, _]): Boolean = job.isInstanceOf[JSessionJob[_]]

def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
class JavaTestSessionContextFactory extends JavaSessionContextFactory {
override def makeContext(sparkConf: SparkConf, config: Config, contextName: String): C = {
val builder = SparkSession.builder()
builder.config(sparkConf).appName(contextName).master("local")
builder.config("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:myDB;create=true")
builder.config("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => logger.warn(s"Hive support not enabled - ${e.getMessage()}")
}
super.setupHiveSupport(config, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(config)) spark.sparkContext.hadoopConfiguration.set(k, v)
SparkSessionContextLikeWrapper(spark)
Expand All @@ -50,7 +43,7 @@ class JavaSessionSpec extends ExtrasJobSpecBase(JavaSessionSpec.getNewSystem) {
""".stripMargin)
val queryConfig = ConfigFactory.parseString(
"""
|sql = "SELECT firstName, lastName FROM `default`.`test_addresses` WHERE city = 'San Jose'"
|sql = "SELECT firstName, lastName FROM `test_addresses` WHERE city = 'San Jose'"
|cp = ["demo"]
|""".stripMargin
)
Expand All @@ -59,7 +52,13 @@ class JavaSessionSpec extends ExtrasJobSpecBase(JavaSessionSpec.getNewSystem) {
before {
dao = new InMemoryDAO
daoActor = system.actorOf(JobDAOActor.props(dao))
manager = system.actorOf(JobManagerActor.props(daoActor))
// JobManagerTestActor is used to take advantage of CleanlyStoppingSparkContextJobManagerActor class
manager = system.actorOf(JobManagerTestActor.props(daoActor))
}

after {
JobManagerTestActor.stopSparkContextIfAlive(system, manager) should be(true)
AkkaTestUtils.shutdownAndWait(manager)
}

describe("Java Session Jobs") {
Expand All @@ -83,5 +82,33 @@ class JavaSessionSpec extends ExtrasJobSpecBase(JavaSessionSpec.getNewSystem) {
}
expectNoMsg()
}

/**
* The nature of exception can vary e.g. if you are trying to use Hive
* with Spark 2.4.4 and Hadoop 3.2.0, then you will get
* "Unrecognized Hadoop major version number: 3.2.0" since Hive is not fully
* compatible but if you are using Spark/Hadoop which is compatible with Hive
* then you will get
* "org.apache.spark.sql.AnalysisException: Hive support is required to CREATE Hive TABLE (AS SELECT);;"
*/
it("should throw exception if hive is disabled") {
val configWithHiveDisabled = ConfigFactory.parseString(
s"${JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED}=false").withFallback(cfg)

val exception = intercept[java.lang.AssertionError] {
manager ! JobManagerActor.Initialize(configWithHiveDisabled, None, emptyActor)
expectMsgClass(30 seconds, classOf[JobManagerActor.Initialized])

val testBinInfo = uploadTestJar()
manager ! JobManagerActor.StartJob(
hiveLoaderClass, Seq(testBinInfo), emptyConfig, syncEvents ++ errorEvents)
expectMsgPF(120 seconds, "Did not get JobResult") {
case JobResult(_, result: Long) => result should equal (3L)
}
}

exception.getMessage.contains(
"org.apache.spark.sql.AnalysisException: Hive support is required") should be(true)
}
}
}