diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1ba9a70202c69..00650f7aa667e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -102,8 +102,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * this does not necessarily need to be the same version of Hive that is used internally by * Spark SQL for execution. */ - protected[hive] def hiveVersion: String = - getConf("spark.sql.hive.version", "0.13.1") + protected[hive] def hiveMetastoreVersion: String = + getConf("spark.sql.hive.metastore.version", "0.13.1") + + /** + * The location of the jars that should be used to instantiate the HiveMetastoreClient. This + * property can be one of three option: + * - a comma-separated list of jar files that could be passed to a URLClassLoader + * - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This + * option is only valid when using the execution version of Hive. + * - maven - download the correct version of hive on demand from maven. + */ + protected[hive] def hiveMetastoreJars: String = + getConf("spark.sql.hive.metastore.jars", "builtin") @transient protected[sql] lazy val substitutor = new VariableSubstitution() @@ -121,6 +132,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { executionConf.set( "javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true") + /** The version of hive used internally by Spark SQL. */ + lazy val hiveExecutionVersion: String = "0.13.1" + /** * The copy of the hive client that is used for execution. Currently this must always be * Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the @@ -129,31 +143,71 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * for storing peristent metadata, and only point to a dummy metastore in a temporary directory. */ @transient - protected[hive] lazy val executionHive: ClientWrapper = - new IsolatedClientLoader( - version = IsolatedClientLoader.hiveVersion("13"), - isolationOn = false, + protected[hive] lazy val executionHive: ClientWrapper = { + logInfo(s"Initilizing execution hive, version $hiveExecutionVersion") + new ClientWrapper( + version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), config = Map( "javax.jdo.option.ConnectionURL" -> - s"jdbc:derby:;databaseName=$localMetastore;create=true"), - rootClassLoader = Utils.getContextOrSparkClassLoader).client.asInstanceOf[ClientWrapper] + s"jdbc:derby:;databaseName=$localMetastore;create=true")) + } SessionState.setCurrentSessionState(executionHive.state) /** - * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. This + * The copy of the Hive client that is used to retrieve metadata from the Hive MetaStore. * The version of the Hive client that is used here must match the metastore that is configured * in the hive-site.xml file. */ @transient protected[hive] lazy val metadataHive: ClientInterface = { + val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) + // We instantiate a HiveConf here to read in the hive-site.xml file and then pass the options // into the isolated client loader val metadataConf = new HiveConf() - val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap + // `configure` goes second to override other settings. + val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure + + val isolatedLoader = if (hiveMetastoreJars == "builtin") { + if (hiveExecutionVersion != hiveMetastoreVersion) { + throw new IllegalArgumentException( + "Builtin jars can only be used when hive execution version == hive metastore version. " + + s"Execution: ${hiveExecutionVersion} != Metastore: ${hiveMetastoreVersion}. " + + "Specify a vaild path to the correct hive jars using spark.sql.hive.metastore.jars " + + s"or change spark.sql.hive.metastore.version to ${hiveExecutionVersion}.") + } + val jars = getClass.getClassLoader match { + case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs + case other => + throw new IllegalArgumentException( + "Unable to locate hive jars to connect to metastore " + + s"using classloader ${other.getClass.getName}. " + + "Please set spark.sql.hive.metastore.jars") + } - // Config goes second to override other settings. - // TODO: Support for loading the jars from an already downloaded location. - IsolatedClientLoader.forVersion(hiveVersion, allConfig ++ configure).client + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") + new IsolatedClientLoader( + version = metaVersion, + execJars = jars.toSeq, + config = allConfig, + isolationOn = true) + } else if (hiveMetastoreJars == "maven") { + // TODO: Support for loading the jars from an already downloaded location. + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.") + IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig ) + } else { + val jars = hiveMetastoreJars.split(",").map(new java.net.URL(_)) + logInfo( + s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using $jars") + new IsolatedClientLoader( + version = metaVersion, + execJars = jars.toSeq, + config = allConfig, + isolationOn = true) + } + isolatedLoader.client } protected[sql] override def parseSql(sql: String): LogicalPlan = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index a82ba5ee3dccd..42c74aae0f1f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -307,11 +307,12 @@ class ClientWrapper( try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") + // The remainder of the command. val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() val proc: CommandProcessor = version match { case hive.v12 => classOf[CommandProcessorFactory] - .callStatic[String, HiveConf, CommandProcessor]("get", cmd_1, conf) + .callStatic[String, HiveConf, CommandProcessor]("get", tokens(0), conf) case hive.v13 => classOf[CommandProcessorFactory] .callStatic[Array[String], HiveConf, CommandProcessor]("get", Array(tokens(0)), conf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 710dbca6e3c66..13986822e895d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.File -import java.net.URLClassLoader +import java.net.{URL, URLClassLoader} import java.util import scala.language.reflectiveCalls @@ -49,7 +49,7 @@ object IsolatedClientLoader { case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13 } - private def downloadVersion(version: HiveVersion): Seq[File] = { + private def downloadVersion(version: HiveVersion): Seq[URL] = { val hiveArtifacts = (Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde") ++ (if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil)) @@ -72,10 +72,10 @@ object IsolatedClientLoader { tempDir.mkdir() allFiles.foreach(f => FileUtils.copyFileToDirectory(f, tempDir)) - tempDir.listFiles() + tempDir.listFiles().map(_.toURL) } - private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[File]] + private def resolvedVersions = new scala.collection.mutable.HashMap[HiveVersion, Seq[URL]] } /** @@ -101,7 +101,7 @@ object IsolatedClientLoader { */ class IsolatedClientLoader( val version: HiveVersion, - val execJars: Seq[File] = Seq.empty, + val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent, @@ -112,7 +112,7 @@ class IsolatedClientLoader( assert(Try(baseClassLoader.loadClass("org.apache.hive.HiveConf")).isFailure) /** All jars used by the hive specific classloader. */ - protected def allJars = execJars.map(_.toURI.toURL).toArray + protected def allJars = execJars.toArray protected def isSharedClass(name: String): Boolean = name.contains("slf4j") || diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 81e77ba257bf1..e6ac91db9378a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -101,5 +101,9 @@ class VersionsSuite extends FunSuite with Logging { test(s"$version: getTable") { client.getTable("default", "src") } + + test(s"$version: set command") { + client.runSqlHive("SET spark.sql.test.key=1") + } } }