Skip to content

Commit

Permalink
[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
Browse files Browse the repository at this point in the history
Just replaced mutable.HashMap to ConcurrentHashMap

Author: navis.ryu <navis@apache.org>

Closes apache#6699 from navis/SPARK-7792 and squashes the following commits:

f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
  • Loading branch information
navis authored and rxin committed Jun 10, 2015
1 parent 6e4fb0c commit 778f3ca
Showing 1 changed file with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
Expand Down Expand Up @@ -81,18 +85,18 @@ trait Catalog {
}

class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
val tables = new ConcurrentHashMap[String, LogicalPlan]

override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables += ((getDbTableName(tableIdent), plan))
tables.put(getDbTableName(tableIdent), plan)
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables -= getDbTableName(tableIdent)
tables.remove(getDbTableName(tableIdent))
}

override def unregisterAllTables(): Unit = {
Expand All @@ -101,18 +105,18 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(getDbTableName(tableIdent)) match {
case Some(_) => true
case None => false
}
tables.containsKey(getDbTableName(tableIdent))
}

override def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
val table = tables.get(tableFullName)
if (table == null) {
sys.error(s"Table Not Found: $tableFullName")
}
val tableWithQualifiers = Subquery(tableIdent.last, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
Expand All @@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
tables.map {
case (name, _) => (name, true)
}.toSeq
val result = ArrayBuffer.empty[(String, Boolean)]
for (name <- tables.keySet()) {
result += ((name, true))
}
result
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
Expand Down

0 comments on commit 778f3ca

Please sign in to comment.