Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into dictionary-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi committed Feb 3, 2017
2 parents 82180bb + 32f4a6c commit 33046d2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
13 changes: 6 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ does not change frequently, but is used for queries often, e.g. using Thrift JDB
indexed, schema and list of files (including partitioning) will be automatically resolved from index
metastore instead of inferring schema every time datasource is created.

> Project is **experimental and is in active development**. Any feedback, issues or PRs are welcome.
> Documentation reflects changes in `master` branch, for a documentation for specific version,
> please select version tag or branch.
### Metastore
Metastore keeps information about all indexed tables and can be created on local file system or HDFS
(see available options below) with support for in-memory cache of index (after first scan). Each
Expand Down Expand Up @@ -43,12 +48,6 @@ Currently only these types are supported for indexed columns:
- Append mode is not yet supported for Parquet table when creating index
- Certain Spark versions are supported (see table below)

> Project is **experimental and is in active development at the moment**. We are working to remove
> limitations and add support for different versions. Any feedback, issues or PRs are welcome.
> Documentation reflects changes in `master` branch, for specific version documentation, please
> select version tag or branch.
## Requirements
| Spark version | `parquet-index` latest version |
|---------------|--------------------------------|
Expand Down Expand Up @@ -77,7 +76,7 @@ other Spark configuration or add them to `spark-defaults.conf` file.
| Name | Description | Default |
|------|-------------|---------|
| `spark.sql.index.metastore` | Index metastore location, created if does not exist (`file:/folder`, `hdfs://host:port/folder`) | `./index_metastore`
| `spark.sql.index.parquet.filter.enabled` | When set to `true`, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if can be applied and available (`true`, `false`) | `false`
| `spark.sql.index.parquet.filter.enabled` | When set to `true`, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if applicable (`true`, `false`) | `true`
| `spark.sql.index.parquet.filter.type` | When filter statistics enabled, select type of statistics to use when creating index (`bloom`) | `bloom`
| `spark.sql.index.parquet.filter.eagerLoading` | When set to `true`, read and load all filter statistics in memory the first time catalog is resolved, otherwise load them lazily as needed when evaluating predicate (`true`, `false`) | `false`
| `spark.sql.index.createIfNotExists` | When set to true, create index if one does not exist in metastore for the table, and will use all available columns for indexing (`true`, `false`) | `false`
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/sql/internal/IndexConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ private[spark] object IndexConf {
IndexConfigBuilder("spark.sql.index.parquet.filter.enabled").
doc("When set to true, writes filter statistics for indexed columns when creating table " +
"index, otherwise only min/max statistics are used. Filter statistics are always used " +
"during filtering stage, if can be applied and available").
"during filtering stage, if applicable").
booleanConf.
createWithDefault(false)
createWithDefault(true)

val PARQUET_FILTER_STATISTICS_TYPE =
IndexConfigBuilder("spark.sql.index.parquet.filter.type").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,16 @@ object ColumnFilterStatistics {
*/
case class BloomFilterStatistics(numRows: Long = 1024L) extends ColumnFilterStatistics {
require(numRows > 0, s"Invalid expected number of records $numRows, should be > 0")
@transient private var bloomFilter: BloomFilter = BloomFilter.create(numRows, 0.03)
@transient private var bloomFilter: BloomFilter =
try {
// Create bloom filter with at most ~1048576 expected records and FPP of 3%
BloomFilter.create(Math.min(numRows, 1 << 20), 0.03)
} catch {
case oom: OutOfMemoryError =>
throw new OutOfMemoryError(s"Failed to create bloom filter for numRows=$numRows. " +
"Consider reducing partition size and/or number of filters/indexed columns").
initCause(oom)
}
@transient private var hasLoadedData: Boolean = false

override def update(value: Any): Unit = bloomFilter.put(value)
Expand Down
31 changes: 25 additions & 6 deletions src/test/scala/org/apache/spark/sql/IndexSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,10 @@ class IndexSuite extends UnitTestSuite with SparkLocal {
// scalastyle:off
val sqlContext = spark.sqlContext
import sqlContext.implicits._
Seq("a", "é").toDF("name").coalesce(1).write.parquet(dir.toString /"utf")
Seq("a", "é").toDF("col").coalesce(1).write.parquet(dir.toString /"utf")

spark.index.create.indexBy("name").parquet(dir.toString / "utf")
val df = spark.index.parquet(dir.toString / "utf").filter("name > 'a'")
spark.index.create.indexBy("col").parquet(dir.toString / "utf")
val df = spark.index.parquet(dir.toString / "utf").filter("col > 'a'")
df.collect should be (Array(Row("é")))
// scalastyle:on
}
Expand All @@ -473,16 +473,35 @@ class IndexSuite extends UnitTestSuite with SparkLocal {
// scalastyle:off
val sqlContext = spark.sqlContext
import sqlContext.implicits._
Seq("aa", "", "bb").toDF("name").coalesce(1).write.parquet(dir.toString / "utf")
Seq("aa", "", "bb").toDF("col").coalesce(1).write.parquet(dir.toString / "utf")

spark.index.create.indexBy("name").parquet(dir.toString / "utf")
val df = spark.index.parquet(dir.toString / "utf").filter("name > 'bb'")
spark.index.create.indexBy("col").parquet(dir.toString / "utf")
val df = spark.index.parquet(dir.toString / "utf").filter("col > 'bb'")
df.collect should be (Array(Row("")))
// scalastyle:on
}
}
}

test("#25 - create index for table with UTF-8 columns only") {
withTempDir { dir =>
withSQLConf(
METASTORE_LOCATION.key -> dir.toString / "metastore",
PARQUET_FILTER_STATISTICS_ENABLED.key -> "true") {
// scalastyle:off
val sqlContext = spark.sqlContext
import sqlContext.implicits._
Seq("ᚠᛇᚻ", "᛫ᛒᛦᚦ᛫ᚠᚱ", "ᚩᚠᚢᚱ᛫", "ᚠᛁᚱᚪ᛫ᚷ", "ᛖᚻᚹᛦ", "ᛚᚳᚢᛗ").toDF("col").
write.parquet(dir.toString / "utf")

spark.index.create.indexBy("col").parquet(dir.toString / "utf")
val df = spark.index.parquet(dir.toString / "utf").filter("col = 'ᛖᚻᚹᛦ'")
df.collect should be (Array(Row("ᛖᚻᚹᛦ")))
// scalastyle:on
}
}
}

test("#40 - query indexed table with empty partitions (files on disk)") {
withTempDir { dir =>
withSQLConf(METASTORE_LOCATION.key -> dir.toString / "metastore") {
Expand Down

0 comments on commit 33046d2

Please sign in to comment.