From dd4a546abc858a3c9b707b3b2b848beb676a8831 Mon Sep 17 00:00:00 2001 From: sadikovi Date: Mon, 9 Jan 2017 22:13:58 +1300 Subject: [PATCH 1/3] add unicode test --- .../org/apache/spark/sql/IndexSuite.scala | 31 +++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/test/scala/org/apache/spark/sql/IndexSuite.scala b/src/test/scala/org/apache/spark/sql/IndexSuite.scala index 037c84a..6bb60f4 100644 --- a/src/test/scala/org/apache/spark/sql/IndexSuite.scala +++ b/src/test/scala/org/apache/spark/sql/IndexSuite.scala @@ -457,10 +457,10 @@ class IndexSuite extends UnitTestSuite with SparkLocal { // scalastyle:off val sqlContext = spark.sqlContext import sqlContext.implicits._ - Seq("a", "é").toDF("name").coalesce(1).write.parquet(dir.toString /"utf") + Seq("a", "é").toDF("col").coalesce(1).write.parquet(dir.toString /"utf") - spark.index.create.indexBy("name").parquet(dir.toString / "utf") - val df = spark.index.parquet(dir.toString / "utf").filter("name > 'a'") + spark.index.create.indexBy("col").parquet(dir.toString / "utf") + val df = spark.index.parquet(dir.toString / "utf").filter("col > 'a'") df.collect should be (Array(Row("é"))) // scalastyle:on } @@ -473,16 +473,35 @@ class IndexSuite extends UnitTestSuite with SparkLocal { // scalastyle:off val sqlContext = spark.sqlContext import sqlContext.implicits._ - Seq("aa", "bé", "bb").toDF("name").coalesce(1).write.parquet(dir.toString / "utf") + Seq("aa", "bé", "bb").toDF("col").coalesce(1).write.parquet(dir.toString / "utf") - spark.index.create.indexBy("name").parquet(dir.toString / "utf") - val df = spark.index.parquet(dir.toString / "utf").filter("name > 'bb'") + spark.index.create.indexBy("col").parquet(dir.toString / "utf") + val df = spark.index.parquet(dir.toString / "utf").filter("col > 'bb'") df.collect should be (Array(Row("bé"))) // scalastyle:on } } } + test("#25 - create index for table with UTF-8 columns only") { + withTempDir { dir => + withSQLConf( + METASTORE_LOCATION.key -> dir.toString / "metastore", + PARQUET_FILTER_STATISTICS_ENABLED.key -> "true") { + // scalastyle:off + val sqlContext = spark.sqlContext + import sqlContext.implicits._ + Seq("ᚠᛇᚻ", "᛫ᛒᛦᚦ᛫ᚠᚱ", "ᚩᚠᚢᚱ᛫", "ᚠᛁᚱᚪ᛫ᚷ", "ᛖᚻᚹᛦ", "ᛚᚳᚢᛗ").toDF("col"). + write.parquet(dir.toString / "utf") + + spark.index.create.indexBy("col").parquet(dir.toString / "utf") + val df = spark.index.parquet(dir.toString / "utf").filter("col = 'ᛖᚻᚹᛦ'") + df.collect should be (Array(Row("ᛖᚻᚹᛦ"))) + // scalastyle:on + } + } + } + test("#40 - query indexed table with empty partitions (files on disk)") { withTempDir { dir => withSQLConf(METASTORE_LOCATION.key -> dir.toString / "metastore") { From c0560a0b52b4fea006313c6a192664e9bba4db0a Mon Sep 17 00:00:00 2001 From: Ivan Date: Mon, 30 Jan 2017 08:07:45 +1300 Subject: [PATCH 2/3] Enable filter statistics by default (#44) * enable filter statistics by default * update readme --- README.md | 13 ++++++------- .../org/apache/spark/sql/internal/IndexConf.scala | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index c29b9a5..235d651 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,11 @@ does not change frequently, but is used for queries often, e.g. using Thrift JDB indexed, schema and list of files (including partitioning) will be automatically resolved from index metastore instead of inferring schema every time datasource is created. +> Project is **experimental and is in active development**. Any feedback, issues or PRs are welcome. + +> Documentation reflects changes in `master` branch, for a documentation for specific version, +> please select version tag or branch. + ### Metastore Metastore keeps information about all indexed tables and can be created on local file system or HDFS (see available options below) with support for in-memory cache of index (after first scan). Each @@ -43,12 +48,6 @@ Currently only these types are supported for indexed columns: - Append mode is not yet supported for Parquet table when creating index - Certain Spark versions are supported (see table below) -> Project is **experimental and is in active development at the moment**. We are working to remove -> limitations and add support for different versions. Any feedback, issues or PRs are welcome. - -> Documentation reflects changes in `master` branch, for specific version documentation, please -> select version tag or branch. - ## Requirements | Spark version | `parquet-index` latest version | |---------------|--------------------------------| @@ -77,7 +76,7 @@ other Spark configuration or add them to `spark-defaults.conf` file. | Name | Description | Default | |------|-------------|---------| | `spark.sql.index.metastore` | Index metastore location, created if does not exist (`file:/folder`, `hdfs://host:port/folder`) | `./index_metastore` -| `spark.sql.index.parquet.filter.enabled` | When set to `true`, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if can be applied and available (`true`, `false`) | `false` +| `spark.sql.index.parquet.filter.enabled` | When set to `true`, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if applicable (`true`, `false`) | `true` | `spark.sql.index.parquet.filter.type` | When filter statistics enabled, select type of statistics to use when creating index (`bloom`) | `bloom` | `spark.sql.index.parquet.filter.eagerLoading` | When set to `true`, read and load all filter statistics in memory the first time catalog is resolved, otherwise load them lazily as needed when evaluating predicate (`true`, `false`) | `false` | `spark.sql.index.createIfNotExists` | When set to true, create index if one does not exist in metastore for the table, and will use all available columns for indexing (`true`, `false`) | `false` diff --git a/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala b/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala index 3ac4fc1..1f2e6b7 100644 --- a/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala +++ b/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala @@ -67,9 +67,9 @@ private[spark] object IndexConf { IndexConfigBuilder("spark.sql.index.parquet.filter.enabled"). doc("When set to true, writes filter statistics for indexed columns when creating table " + "index, otherwise only min/max statistics are used. Filter statistics are always used " + - "during filtering stage, if can be applied and available"). + "during filtering stage, if applicable"). booleanConf. - createWithDefault(false) + createWithDefault(true) val PARQUET_FILTER_STATISTICS_TYPE = IndexConfigBuilder("spark.sql.index.parquet.filter.type"). From 32f4a6cc27b4954317e5f430e7cd910d2c59caa3 Mon Sep 17 00:00:00 2001 From: Ivan Date: Sat, 4 Feb 2017 11:34:48 +1300 Subject: [PATCH 3/3] add report for bloom filter oom (#46) --- .../spark/sql/sources/ColumnFilterStatistics.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/sql/sources/ColumnFilterStatistics.scala b/src/main/scala/org/apache/spark/sql/sources/ColumnFilterStatistics.scala index df46d74..77a94e4 100644 --- a/src/main/scala/org/apache/spark/sql/sources/ColumnFilterStatistics.scala +++ b/src/main/scala/org/apache/spark/sql/sources/ColumnFilterStatistics.scala @@ -150,7 +150,16 @@ object ColumnFilterStatistics { */ case class BloomFilterStatistics(numRows: Long = 1024L) extends ColumnFilterStatistics { require(numRows > 0, s"Invalid expected number of records $numRows, should be > 0") - @transient private var bloomFilter: BloomFilter = BloomFilter.create(numRows, 0.03) + @transient private var bloomFilter: BloomFilter = + try { + // Create bloom filter with at most ~1048576 expected records and FPP of 3% + BloomFilter.create(Math.min(numRows, 1 << 20), 0.03) + } catch { + case oom: OutOfMemoryError => + throw new OutOfMemoryError(s"Failed to create bloom filter for numRows=$numRows. " + + "Consider reducing partition size and/or number of filters/indexed columns"). + initCause(oom) + } @transient private var hasLoadedData: Boolean = false override def update(value: Any): Unit = bloomFilter.put(value)