Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed May 14, 2014
2 parents 689495d + b22952f commit f76679b
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in [[TaskContext]].
*/
private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
@DeveloperApi
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

def hasNext: Boolean = {
Expand Down
61 changes: 0 additions & 61 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,64 +157,3 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
}

/**
* A [[org.apache.spark.Partitioner]] that partitions records into specified bounds
* Default value is 1000. Once all partitions have bounds elements, the partitioner
* allocates 1 element per partition so eventually the smaller partitions are at most
* off by 1 key compared to the larger partitions.
*/
class BoundaryPartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val boundary: Int = 1000)
extends Partitioner {

// this array keeps track of keys assigned to a partition
// counts[0] refers to # of keys in partition 0 and so on
private val counts: Array[Int] = {
new Array[Int](numPartitions)
}

def numPartitions = math.abs(partitions)

/*
* Ideally, this should've been calculated based on # partitions and total keys
* But we are not calling count on RDD here to avoid calling an action.
* User has the flexibility of calling count and passing in any appropriate boundary
*/
def keysPerPartition = boundary

var currPartition = 0

/*
* Pick current partition for the key until we hit the bound for keys / partition,
* start allocating to next partition at that time.
*
* NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500
* passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3,
* after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2,
* 1503 goes to P3 and so on.
*/
def getPartition(key: Any): Int = {
val partition = currPartition
counts(partition) = counts(partition) + 1
/*
* Since we are filling up a partition before moving to next one (this helps in maintaining
* order of keys, in certain cases, it is possible to end up with empty partitions, like
* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely
* empty.
*/
if(counts(currPartition) >= keysPerPartition) {
currPartition = (currPartition + 1) % numPartitions
}
partition
}

override def equals(other: Any): Boolean = other match {
case r: BoundaryPartitioner[_,_] =>
(r.counts.sameElements(counts) && r.boundary == boundary
&& r.currPartition == currPartition)
case _ =>
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* Exception for a task getting killed.
* :: DeveloperApi ::
* Exception thrown when a task is explicitly killed (i.e., task failure is expected).
*/
private[spark] class TaskKilledException extends RuntimeException
@DeveloperApi
class TaskKilledException extends RuntimeException
34 changes: 0 additions & 34 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(descendingP4 != p4)
}

test("BoundaryPartitioner equality") {
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))

val p2 = new BoundaryPartitioner(2, rdd, 1000)
val p4 = new BoundaryPartitioner(4, rdd, 1000)
val anotherP4 = new BoundaryPartitioner(4, rdd)

assert(p2 === p2)
assert(p4 === p4)
assert(p2 != p4)
assert(p4 != p2)
assert(p4 === anotherP4)
assert(anotherP4 === p4)
}

test("BoundaryPartitioner getPartition") {
val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
val partitioner = new BoundaryPartitioner(4, rdd, 500)
1.to(2000).map { element => {
val partition = partitioner.getPartition(element)
if (element <= 500) {
assert(partition === 0)
} else if (element > 501 && element <= 1000) {
assert(partition === 1)
} else if (element > 1001 && element <= 1500) {
assert(partition === 2)
} else if (element > 1501 && element <= 2000) {
assert(partition === 3)
}
}}
}

test("RangePartitioner getPartition") {
val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
// We have different behaviour of getPartition for partitions with less than 1000 and more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("ConstantFolding", Once,
Batch("ConstantFolding", FixedPoint(100),
NullPropagation,
ConstantFolding,
BooleanSimplification,
SimplifyFilters,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
Batch("Filter Pushdown", FixedPoint(100),
CombineFilters,
PushPredicateThroughProject,
PushPredicateThroughInnerJoin,
Expand All @@ -49,24 +49,27 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
*/
object ColumnPruning extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Eliminate attributes that are not needed to calculate the specified aggregates.
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
// Project away references that are not needed to calculate the required aggregates.
a.copy(child = Project(a.references.toSeq, child))

// Eliminate unneeded attributes from either side of a Join.
case Project(projectList, Join(left, right, joinType, condition)) =>
// Collect the list of off references required either above or to evaluate the condition.
val allReferences: Set[Attribute] =
projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty)
/** Applies a projection when the child is producing unnecessary attributes */

/** Applies a projection only when the child is producing unnecessary attributes */
def prunedChild(c: LogicalPlan) =
if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) {
Project(allReferences.filter(c.outputSet.contains).toSeq, c)
} else {
c
}

Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition))

// Combine adjacent Projects.
case Project(projectList1, Project(projectList2, child)) =>
// Create a map of Aliases to their values from the child projection.
// e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)).
Expand All @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
}).asInstanceOf[Seq[NamedExpression]]

Project(substitutedProjection, child)

// Eliminate no-op Projects
case Project(projectList, child) if(child.output == projectList) => child
}
}

Expand Down

0 comments on commit f76679b

Please sign in to comment.