Skip to content

Commit

Permalink
[SPARK-6618][SPARK-6669][SQL] Lock Hive metastore client correctly.
Browse files Browse the repository at this point in the history
Author: Yin Huai <yhuai@databricks.com>
Author: Michael Armbrust <michael@databricks.com>

Closes apache#5333 from yhuai/lookupRelationLock and squashes the following commits:

59c884f [Michael Armbrust] [SQL] Lock metastore client in analyzeTable
7667030 [Yin Huai] Merge pull request #2 from marmbrus/pr/5333
e4a9b0b [Michael Armbrust] Correctly lock on MetastoreCatalog
d6fc32f [Yin Huai] Missing `)`.
1e241af [Yin Huai] Protect InsertIntoHive.
fee7e9c [Yin Huai] A test?
5416b0f [Yin Huai] Just protect client.
  • Loading branch information
yhuai authored and marmbrus committed Apr 2, 2015
1 parent d3944b6 commit 5db8912
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val tableFullName =
relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName

catalog.client.alterTable(tableFullName, new Table(hiveTTable))
catalog.synchronized {
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
}
}
case otherRelation =>
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
val table = synchronized {
val table = HiveMetastoreCatalog.this.synchronized {
client.getTable(in.database, in.name)
}

Expand Down Expand Up @@ -183,12 +183,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String]): LogicalPlan = synchronized {
alias: Option[String]): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
val table = try client.getTable(databaseName, tblName) catch {
val table = try {
synchronized {
client.getTable(databaseName, tblName)
}
} catch {
case te: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
throw new NoSuchTableException
}
Expand All @@ -210,7 +214,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} else {
val partitions: Seq[Partition] =
if (table.isPartitioned) {
HiveShim.getAllPartitionsOf(client, table).toSeq
synchronized {
HiveShim.getAllPartitionsOf(client, table).toSeq
}
} else {
Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val db = Hive.get(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
Expand Down Expand Up @@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
catalog.synchronized {
catalog.client.validatePartitionNameCharacters(partVals)
}
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
db.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir
)
catalog.synchronized {
catalog.client.loadDynamicPartitions(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
numDynamicPartitions,
holdDDLTime,
isSkewedStoreAsSubdir)
}
} else {
db.loadPartition(
catalog.synchronized {
catalog.client.loadPartition(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
}
}
} else {
catalog.synchronized {
catalog.client.loadTable(
outputPath,
qualifiedTableName,
orderedPartitionSpec,
overwrite,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir)
holdDDLTime)
}
} else {
db.loadTable(
outputPath,
qualifiedTableName,
overwrite,
holdDDLTime)
}

// Invalidate the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,15 @@ class SQLQuerySuite extends QueryTest {
dropTempTable("data")
setConf("spark.sql.hive.convertCTAS", originalConf)
}

test("sanity test for SPARK-6618") {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
catalog.lookupRelation(Seq(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
}
}
}

0 comments on commit 5db8912

Please sign in to comment.