Skip to content

Commit

Permalink
[SPARK-17088][HIVE] Fix 'sharesHadoopClasses' option when creating cl…
Browse files Browse the repository at this point in the history
…ient.

Because the call to the constructor of HiveClientImpl crosses class loader
boundaries, different versions of the same class (Configuration in this
case) were loaded, and that caused a runtime error when instantiating the
client. By using a safer type in the signature of the constructor, it's
possible to avoid the problem.

I considered removing 'sharesHadoopClasses', but it may still be desired
(even though there are 0 users of it since it was not working). When Spark
starts to support Hadoop 3, it may be necessary to use that option to
load clients for older Hive metastore versions that don't know about
Hadoop 3.

Tested with added unit test.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#20169 from vanzin/SPARK-17088.
  • Loading branch information
Marcelo Vanzin authored and tkakantousis committed Mar 11, 2018
1 parent c387f35 commit ff4a79d
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.hive.client

import java.io.{File, PrintStream}
import java.util.Locale
import java.lang.{Iterable => JIterable}
import java.util.{Locale, Map => JMap}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -81,8 +82,9 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
warehouseDir: Option[String],
sparkConf: SparkConf,
hadoopConf: Configuration,
hadoopConf: JIterable[JMap.Entry[String, String]],
extraConfig: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
Expand Down Expand Up @@ -131,7 +133,7 @@ private[hive] class HiveClientImpl(
if (ret != null) {
// hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
// instance constructed, we need to follow that change here.
Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
warehouseDir.foreach { dir =>
ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
}
ret
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
Expand All @@ -48,11 +49,12 @@ private[hive] object IsolatedClientLoader extends Logging {
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
barrierPrefixes: Seq[String] = Seq.empty,
sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized {
val resolvedVersion = hiveVersion(hiveMetastoreVersion)
// We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
// with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes.
var sharesHadoopClasses = true
var _sharesHadoopClasses = sharesHadoopClasses
val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
resolvedVersions((resolvedVersion, hadoopVersion))
} else {
Expand All @@ -68,7 +70,7 @@ private[hive] object IsolatedClientLoader extends Logging {
"Hadoop classes will not be shared between Spark and Hive metastore client. " +
"It is recommended to set jars used by Hive metastore client through " +
"spark.sql.hive.metastore.jars in the production environment.")
sharesHadoopClasses = false
_sharesHadoopClasses = false
(downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
}
resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
Expand All @@ -81,7 +83,7 @@ private[hive] object IsolatedClientLoader extends Logging {
execJars = files,
hadoopConf = hadoopConf,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
sharesHadoopClasses = _sharesHadoopClasses,
sharedPrefixes = sharedPrefixes,
barrierPrefixes = barrierPrefixes)
}
Expand Down Expand Up @@ -251,8 +253,10 @@ private[hive] class IsolatedClientLoader(

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
if (!isolationOn) {
return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -263,7 +267,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
.newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ private[client] object HiveClientBuilder {
def buildClient(
version: String,
hadoopConf: Configuration,
extraConf: Map[String, String] = Map.empty): HiveClient = {
extraConf: Map[String, String] = Map.empty,
sharesHadoopClasses: Boolean = true): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = new SparkConf(),
hadoopConf = hadoopConf,
config = buildConf(extraConf),
ivyPath = ivyPath).createClient()
ivyPath = ivyPath,
sharesHadoopClasses = sharesHadoopClasses).createClient()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ class HiveClientSuite(version: String)
day1 :: day2 :: Nil)
}

test("create client with sharesHadoopClasses = false") {
buildClient(new Configuration(), sharesHadoopClasses = false)
}

private def testMetastorePartitionFiltering(
filterString: String,
expectedDs: Seq[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,21 @@ import org.apache.spark.sql.hive.HiveUtils
private[client] abstract class HiveVersionSuite(version: String) extends SparkFunSuite {
protected var client: HiveClient = null

protected def buildClient(hadoopConf: Configuration): HiveClient = {
protected def buildClient(
hadoopConf: Configuration,
sharesHadoopClasses: Boolean = true): HiveClient = {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
HiveClientBuilder
.buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
HiveClientBuilder.buildClient(
version,
hadoopConf,
HiveUtils.formatTimeVarsForHiveClient(hadoopConf),
sharesHadoopClasses = sharesHadoopClasses)
}

override def suiteName: String = s"${super.suiteName}($version)"
Expand Down

0 comments on commit ff4a79d

Please sign in to comment.