Skip to content

Commit

Permalink
Fix output() of Set physical. Add SQLConf param accessor method.
Browse files Browse the repository at this point in the history
  • Loading branch information
concretevitamin committed Jun 6, 2014
1 parent e9856c4 commit 22d9ed7
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 18 deletions.
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.types.{StringType, StructType}
import org.apache.spark.sql.catalyst.trees

abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
Expand Down Expand Up @@ -101,7 +101,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
*/
abstract class Command extends LeafNode {
self: Product =>
def output = Seq.empty
def output: Seq[Attribute] = Seq.empty // TODO: Is Seq.empty the right thing to do?
}

/**
Expand All @@ -113,7 +113,12 @@ case class NativeCommand(cmd: String) extends Command
/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
Expand Down
14 changes: 9 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Expand Up @@ -25,17 +25,17 @@ import scala.collection.JavaConverters._
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
* getters of this class.
* getters of this class. This class is thread-safe.
*/
class SQLConf {

/** Number of partitions to use for shuffle operators. */
def numShufflePartitions(default: Int): Int = getOption("spark.sql.shuffle.partitions")
.map(_.toInt).getOrElse(default)

private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())

private[spark] def clear() {
settings.clear()
}

def this(props: Properties) = {
this()
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
Expand Down Expand Up @@ -73,4 +73,8 @@ class SQLConf {
}
}

private[spark] def clear() {
settings.clear()
}

}
Expand Up @@ -261,7 +261,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, AddExchange) ::
Batch("Add exchange", Once, AddExchange(self)) ::
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
}

Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.rdd.ShuffledRDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
import org.apache.spark.sql.catalyst.plans.physical._
Expand Down Expand Up @@ -86,9 +86,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
* [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting
* [[Exchange]] Operators where required.
*/
private[sql] object AddExchange extends Rule[SparkPlan] {
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
val numPartitions = 150
def numPartitions = sqlContext.sqlConf.numShufflePartitions(150)

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.{SQLConf, SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -177,7 +177,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

// Can we automate these 'pass through' operations?
object BasicOperators extends Strategy {
def numPartitions = sqlConf.get("spark.sql.shufflePartitions", "200").toInt
def numPartitions = sqlConf.numShufflePartitions(200)

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
execution.Aggregate(
Expand Down Expand Up @@ -220,7 +221,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case class SetCommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value)(context))
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
case _ => Nil
}
}
Expand Down
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(key: Option[String], value: Option[String])
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
(@transient context: SQLContext) extends LeafNode {
def execute(): RDD[Row] = (key, value) match {
// Set value for key k; the action itself would
Expand All @@ -50,6 +50,4 @@ case class SetCommandPhysical(key: Option[String], value: Option[String])
// The only other case is invalid semantics and is impossible.
case _ => context.emptyResult
}

def output: Seq[Attribute] = Seq.empty // TODO: right thing?
}

0 comments on commit 22d9ed7

Please sign in to comment.