Skip to content

Commit

Permalink
comments from vanzin
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed May 7, 2015
1 parent 5f3945e commit 81bb366
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ private[hive] object SparkSQLCLIDriver {
}
val cliConf = new HiveConf(classOf[SessionState])
// Override the location of the metastore since this is only used for local execution.
cliConf.set(
"javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true")
HiveContext.newTemporaryConfiguation().foreach {
case (key, value) => cliConf.set(key, value)
}
val sessionState = new CliSessionState(cliConf)

sessionState.in = System.in
Expand All @@ -103,7 +104,7 @@ private[hive] object SparkSQLCLIDriver {
sessionState.cmdProperties.entrySet().foreach { item =>
val key = item.getKey.asInstanceOf[String]
val value = item.getValue.asInstanceOf[String]
// We do not propogate metastore options to the execution copy of hive.
// We do not propagate metastore options to the execution copy of hive.
if (key != "javax.jdo.option.ConnectionURL") {
conf.set(key, value)
sessionState.getOverriddenConfigurations.put(key, value)
Expand Down
50 changes: 23 additions & 27 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive

import java.io.{BufferedReader, InputStreamReader, PrintStream}
import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList}

Expand All @@ -36,8 +36,9 @@ import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}

import org.apache.spark.SparkContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -105,12 +106,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* Spark SQL for execution.
*/
protected[hive] def hiveMetastoreVersion: String =
getConf(HIVE_METASTORE_VERSION, "0.13.1")
getConf(HIVE_METASTORE_VERSION, hiveExecutionVersion)

/**
* The location of the jars that should be used to instantiate the HiveMetastoreClient. This
* property can be one of three options:
* - a colon-separated list of jar files or directories for hive and hadoop.
* - a classpath in the standard format for both hive and hadoop.
* - builtin - attempt to discover the jars that were used to load Spark SQL and use those. This
* option is only valid when using the execution version of Hive.
* - maven - download the correct version of hive on demand from maven.
Expand All @@ -121,22 +122,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
@transient
protected[sql] lazy val substitutor = new VariableSubstitution()


/** A local instance of hive that is only used for execution. */
protected[hive] lazy val localMetastore = {
val temp = Utils.createTempDir()
temp.delete()
temp
}

@transient
protected[hive] lazy val executionConf = new HiveConf()
executionConf.set(
"javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$localMetastore;create=true")

/** The version of hive used internally by Spark SQL. */
lazy val hiveExecutionVersion: String = "0.13.1"

/**
* The copy of the hive client that is used for execution. Currently this must always be
* Hive 13 as this is the version of Hive that is packaged with Spark SQL. This copy of the
Expand All @@ -149,9 +134,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
logInfo(s"Initilizing execution hive, version $hiveExecutionVersion")
new ClientWrapper(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
config = Map(
"javax.jdo.option.ConnectionURL" ->
s"jdbc:derby:;databaseName=$localMetastore;create=true"))
config = newTemporaryConfiguation())
}
SessionState.setCurrentSessionState(executionHive.state)

Expand Down Expand Up @@ -203,11 +186,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
// Convert to files and expand any directories.
val jars =
hiveMetastoreJars
.split(":")
.map(new java.io.File(_))
.split(File.pathSeparator)
.flatMap {
case f if f.isDirectory => f.listFiles()
case f => f :: Nil
case path if path.endsWith("*") =>
val directory = new File(path.dropRight(1))
directory.listFiles.filter(_.getName.endsWith("jar"))
case path =>
new File(path) :: Nil
}
.map(_.toURI.toURL)

Expand Down Expand Up @@ -471,9 +456,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {


private[hive] object HiveContext {
/** The version of hive used internally by Spark SQL. */
val hiveExecutionVersion: String = "0.13.1"

val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version"
val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars"

/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
def newTemporaryConfiguation(): Map[String, String] = {
val tempDir = Utils.createTempDir()
val localMetastore = new File(tempDir, "metastore")
Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$localMetastore;create=true")
}

protected val primitiveTypes =
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DateType, TimestampType, BinaryType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ case class CreateTableAsSelect(
schema =
query.output.map(c =>
HiveColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType), null)),
inputFormat =
tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
outputFormat =
tableDesc.outputFormat
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
inputFormat =
tableDesc.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
outputFormat =
tableDesc.outputFormat
.orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
serde = tableDesc.serde.orElse(Some(classOf[LazySimpleSerDe].getName())))
hiveContext.catalog.client.createTable(withSchema)

// Get the Metastore Relation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ object TestHive
class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
self =>

import HiveContext._

// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
System.clearProperty("spark.hostPort")
Expand All @@ -71,16 +73,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
hiveconf.set("hive.plan.serialization.format", "javaXML")

lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = {
val temp = Utils.createTempDir()
temp.delete()
temp
}

/** Sets up the system initially or after a RESET command */
protected override def configure(): Map[String, String] = Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
protected override def configure(): Map[String, String] =
newTemporaryConfiguation() ++ Map("hive.metastore.warehouse.dir" -> warehousePath.toString)

val testTempDir = Utils.createTempDir()

Expand Down

0 comments on commit 81bb366

Please sign in to comment.