Skip to content

Commit

Permalink
[SPARK-22160][SQL] Make sample points per partition (in range partiti…
Browse files Browse the repository at this point in the history
…oner) configurable and bump the default value up to 100

Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.

Added a pretty sophisticated test based on chi square test ...

Author: Reynold Xin <rxin@databricks.com>

Closes apache#19387 from rxin/SPARK-22160.

This commit contains the following squashed commits:
938326b NETFLIX-BUILD: Fixup backport of SPARK-22160.
  • Loading branch information
rxin authored and rdblue committed Mar 22, 2018
1 parent 71cfc55 commit 0f7895a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,21 @@ class HashPartitioner(partitions: Int) extends Partitioner {
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {

// A constructor declared in order to maintain backward compatibility for Java, when we add the
// 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
// This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
}

// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

private var ordering = implicitly[Ordering[K]]

Expand All @@ -122,7 +132,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION =
SQLConfigBuilder("spark.sql.execution.rangeExchange.sampleSizePerPartition")
.internal()
.doc("Number of points to sample per partition in order to determine the range boundaries" +
" for range partitioning, typically used in global sorting (without limit).")
.intConf
.createWithDefault(100)

val IGNORE_BATCH_ID_FOLDERS = SQLConfigBuilder("spark.sql.ignore-batchid-folders")
.doc("Ignore folders named batchid=<num> when listing files in a partition directory.")
.booleanConf
Expand Down Expand Up @@ -995,6 +1003,9 @@ class SQLConf extends Serializable with Logging {
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)

def ndvMaxError: Double = getConf(NDV_MAX_ERROR)

def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.MutablePair

/**
Expand Down Expand Up @@ -215,7 +216,11 @@ object ShuffleExchange {
iter.map(row => mutablePair.update(row.copy(), null))
}
implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
new RangePartitioner(
numPartitions,
rddForSampling,
ascending = true,
samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
case SinglePartition =>
new Partitioner {
override def numPartitions: Int = 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.commons.math3.stat.inference.ChiSquareTest

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext


class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {

import testImplicits._

test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition") {
// In this test, we run a sort and compute the histogram for partition size post shuffle.
// With a high sample count, the partition size should be more evenly distributed, and has a
// low chi-sq test value.
// Also the whole code path for range partitioning as implemented should be deterministic
// (it uses the partition id as the seed), so this test shouldn't be flaky.

val numPartitions = 4

def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
val data = spark.range(0, n, 1, 1).sort('id)
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
val dist = data.groupBy(_._1).map(_._2.length.toLong).toArray
assert(dist.length == 4)

new ChiSquareTest().chiSquare(
Array.fill(numPartitions) { n.toDouble / numPartitions },
dist)
}

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)

withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 1000)
}
}
}

}

0 comments on commit 0f7895a

Please sign in to comment.