Skip to content

Commit

Permalink
feat(python): Disable hive support from Python Spark Session
Browse files Browse the repository at this point in the history
Change-Id: I692a8ff1387aee99ea4db7863d4676f6dd8fa5c9
  • Loading branch information
bsikander committed Mar 17, 2020
1 parent 875407a commit 79e992c
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.scalactic.{Bad, Good, Or}
import org.slf4j.LoggerFactory
import spark.jobserver._
import spark.jobserver.context.{JobLoadError, LoadingError, SparkContextFactory}
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -179,15 +179,21 @@ class PythonSessionContextFactory extends PythonContextFactory {
contextName: String): C = {
val builder = SparkSession.builder().config(sparkConf.set("spark.yarn.isPython", "true"))
builder.appName(contextName)
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => println(s"Hive support not enabled - ${e.getMessage()}")
}
setupHiveSupport(contextConfig, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(contextConfig))
spark.sparkContext.hadoopConfiguration.set(k, v)
context = PythonSessionContextLikeWrapper(spark, contextConfig)
context
}

protected def setupHiveSupport(config: Config, builder: SparkSession.Builder) = {
if (config.getBoolean(JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED)) {
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => println(s"Hive support not enabled - ${e.getMessage()}")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@ import java.nio.file.Paths
import akka.testkit.{TestKit, TestProbe}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import spark.jobserver.{InMemoryDAO, JavaStreamingSpec, JobsLoader, WindowsIgnore}
import spark.jobserver.util.SparkJobUtils
import spark.jobserver.{JavaStreamingSpec, JobsLoader, WindowsIgnore}
import spark.jobserver.util.{JobserverConfig, SparkJobUtils}

import scala.collection.JavaConverters._
import org.scalatest._
import spark.jobserver.common.akka.AkkaTestUtils

object PythonSessionContextFactorySpec {

Expand All @@ -34,19 +33,7 @@ object PythonSessionContextFactorySpec {
}
}

class TestPythonSessionContextFactory extends PythonContextFactory {

override type C = PythonSessionContextLikeWrapper
var context : PythonSessionContextLikeWrapper = _

override def py4JImports: Seq[String] =
PythonContextFactory.hiveContextImports

override def doMakeContext(sc: SparkContext,
contextConfig: Config,
contextName: String): C = {
context
}
class TestPythonSessionContextFactory extends PythonSessionContextFactory {

override def makeContext(sparkConf: SparkConf,
contextConfig: Config,
Expand All @@ -62,11 +49,7 @@ class TestPythonSessionContextFactory extends PythonContextFactory {
builder.appName(contextName).master("local")
builder.config("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:myDB;create=true")
builder.config("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
try {
builder.enableHiveSupport()
} catch {
case e: IllegalArgumentException => println(s"Hive support not enabled - ${e.getMessage()}")
}
super.setupHiveSupport(contextConfig, builder)
val spark = builder.getOrCreate()
for ((k, v) <- SparkJobUtils.getHadoopConfig(contextConfig))
spark.sparkContext.hadoopConfiguration.set(k, v)
Expand Down Expand Up @@ -139,10 +122,11 @@ with BeforeAndAfterAll {

def runSessionTest(factory: TestPythonSessionContextFactory,
context: PythonSessionContextLikeWrapper,
c: Config): Unit = {
c: Config,
pythonJobName: String = "example_jobs.session_window.SessionWindowJob"): Unit = {
val loadResult = factory.loadAndValidateJob(
Seq("sql-average"),
"example_jobs.session_window.SessionWindowJob",
pythonJobName,
DummyJobCache)
loadResult.isGood should be (true)
val jobContainer = loadResult.get
Expand Down Expand Up @@ -187,5 +171,21 @@ with BeforeAndAfterAll {
context = factory.makeContext(sparkConf, p3Config, "test-create")
runSessionTest(factory, context, p3Config)
}

it("should throw exception if hive is disabled and hive job is executed") {
val configWithHiveDisabled = ConfigFactory.parseString(
s"${JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED}=false").withFallback(config)
val factory = new TestPythonSessionContextFactory()
context = factory.makeContext(sparkConf, configWithHiveDisabled, "test-create")

val exception = intercept[java.lang.Exception] {
runSessionTest(factory, context, configWithHiveDisabled,
"example_jobs.hive_support_job.HiveSupportJob")
}

exception.getMessage().contains(
"Hive support is required to CREATE Hive TABLE (AS SELECT)") should be(true)
exception.getMessage().contains("check_support") should be(true)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import org.apache.spark.SparkConf
import org.joda.time.DateTime
import spark.jobserver._
import spark.jobserver.api.JobEnvironment
import spark.jobserver.util.JobserverConfig

import scala.concurrent.duration.FiniteDuration

case class DummyJobEnvironment(jobId: String, contextConfig: Config) extends JobEnvironment {
Expand Down Expand Up @@ -103,6 +105,7 @@ object PythonSparkContextFactorySpec {
|]
|
|python.executable = "python"
|${JobserverConfig.IS_SPARK_SESSION_HIVE_ENABLED} = true
""".replace("\\", "\\\\") // Windows-compatibility
.stripMargin)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from sparkjobserver.api import SparkJob, build_problems


class HiveSupportJob(SparkJob):
def validate(self, context, runtime, config):
return None

def run_job(self, context, runtime, data):
context.sql('CREATE TABLE IF NOT EXISTS check_support (key INT, value STRING) USING hive')
3 changes: 3 additions & 0 deletions job-server-python/src/python/test/apitests.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ def test_run_sql_job(self):
result = job.run_job(sqlContext, None, jobData)
self.assertEqual([(20, 1250), (21, 1500)], result)

# Note: The following testcase will fail if Hive/Hadoop versions
# are not compatible e.g. Hadoop 3.2.0 is not compatible with
# Hive version brought by spark 2.4.4.
def test_run_hive_job(self):
job = SQLJob()
sqlContext = HiveContext(self.sc)
Expand Down

0 comments on commit 79e992c

Please sign in to comment.