From 07da780d668a5b9577ad6f0a762be436fa27f9a0 Mon Sep 17 00:00:00 2001 From: sadikovi Date: Tue, 2 Jan 2018 21:07:40 +1300 Subject: [PATCH] spark 2.2 support --- .travis.yml | 4 ++-- build.sbt | 2 +- .../datasources/IndexSourceStrategy.scala | 4 +--- .../execution/datasources/IndexedDataSource.scala | 4 ++-- .../sql/execution/datasources/MetastoreIndex.scala | 4 +++- .../datasources/parquet/ParquetIndex.scala | 2 +- .../org/apache/spark/sql/internal/IndexConf.scala | 14 +++++++------- .../datasources/MetastoreIndexSuite.scala | 2 +- .../parquet/ParquetSchemaUtilsSuite.scala | 8 ++++++++ version.sbt | 2 +- 10 files changed, 27 insertions(+), 19 deletions(-) diff --git a/.travis.yml b/.travis.yml index 0bedb19..0f54536 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,10 +9,10 @@ matrix: include: - jdk: oraclejdk8 scala: 2.11.7 - env: TEST_SPARK_VERSION="2.1.0" TEST_SPARK_RELEASE="spark-2.1.0-bin-hadoop2.7" + env: TEST_SPARK_VERSION="2.2.0" TEST_SPARK_RELEASE="spark-2.2.0-bin-hadoop2.7" - jdk: oraclejdk8 scala: 2.11.7 - env: TEST_SPARK_VERSION="2.1.1" TEST_SPARK_RELEASE="spark-2.1.1-bin-hadoop2.7" + env: TEST_SPARK_VERSION="2.2.1" TEST_SPARK_RELEASE="spark-2.2.1-bin-hadoop2.7" script: - sbt ++$TRAVIS_SCALA_VERSION scalastyle - sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle" diff --git a/build.sbt b/build.sbt index 6aee2a7..388efc1 100644 --- a/build.sbt +++ b/build.sbt @@ -6,7 +6,7 @@ scalaVersion := "2.11.7" spName := "lightcopy/parquet-index" -val defaultSparkVersion = "2.1.0" +val defaultSparkVersion = "2.2.0" sparkVersion := sys.props.getOrElse("spark.testVersion", defaultSparkVersion) diff --git a/src/main/scala/org/apache/spark/sql/execution/datasources/IndexSourceStrategy.scala b/src/main/scala/org/apache/spark/sql/execution/datasources/IndexSourceStrategy.scala index eeda141..d156812 100644 --- a/src/main/scala/org/apache/spark/sql/execution/datasources/IndexSourceStrategy.scala +++ b/src/main/scala/org/apache/spark/sql/execution/datasources/IndexSourceStrategy.scala @@ -95,15 +95,13 @@ object IndexSourceStrategy extends Strategy with Logging { val outputAttributes = readDataColumns ++ partitionColumns - val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) - val scan = FileSourceScanExec( fsRelation, outputAttributes, outputSchema, partitionKeyFilters.toSeq, - pushedDownFilters, + dataFilters, table.map(_.identifier)) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) diff --git a/src/main/scala/org/apache/spark/sql/execution/datasources/IndexedDataSource.scala b/src/main/scala/org/apache/spark/sql/execution/datasources/IndexedDataSource.scala index 16e7e13..720b7f5 100644 --- a/src/main/scala/org/apache/spark/sql/execution/datasources/IndexedDataSource.scala +++ b/src/main/scala/org/apache/spark/sql/execution/datasources/IndexedDataSource.scala @@ -60,7 +60,7 @@ case class IndexedDataSource( } def resolveRelation(): BaseRelation = { - val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val caseInsensitiveOptions = CaseInsensitiveMap(options) providingClass.newInstance() match { case support: MetastoreSupport => // if index does not exist in metastore and option is selected, we will create it before @@ -102,7 +102,7 @@ case class IndexedDataSource( val catalog = new InMemoryFileIndex(metastore.session, paths, options, partitionSchema) val partitionSpec = catalog.partitionSpec // ignore filtering expression for partitions, fetch all available files - val allFiles = catalog.listFiles(Nil) + val allFiles = catalog.listFiles(Nil, Nil) val spec = locationSpec(s.identifier, tablePath.getPath, catalogTable) metastore.create(spec, mode) { (status, isAppend) => s.createIndex(metastore, status, tablePath, isAppend, partitionSpec, allFiles, columns) diff --git a/src/main/scala/org/apache/spark/sql/execution/datasources/MetastoreIndex.scala b/src/main/scala/org/apache/spark/sql/execution/datasources/MetastoreIndex.scala index 301cc8b..e560f0e 100644 --- a/src/main/scala/org/apache/spark/sql/execution/datasources/MetastoreIndex.scala +++ b/src/main/scala/org/apache/spark/sql/execution/datasources/MetastoreIndex.scala @@ -77,7 +77,9 @@ abstract class MetastoreIndex extends FileIndex with Logging { * @param partitionFilters filters used to prune which partitions are returned. * @param dataFilters filters that can be applied on non-partitioned columns. */ - final def listFiles(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = { + final def listFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { listFilesWithIndexSupport(partitionFilters, Nil, indexFilters) } } diff --git a/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIndex.scala b/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIndex.scala index 6bf8e42..5e1239b 100644 --- a/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIndex.scala +++ b/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIndex.scala @@ -128,7 +128,7 @@ class ParquetIndex( }) val selected = partitions.filter { - case PartitionPath(values, _) => boundPredicate(values) + case PartitionPath(values, _) => boundPredicate.eval(values) } logInfo { val total = partitions.length diff --git a/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala b/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala index 16c1fe9..b12050a 100644 --- a/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala +++ b/src/main/scala/org/apache/spark/sql/internal/IndexConf.scala @@ -20,20 +20,20 @@ import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.SparkSession object IndexConf { - import SQLConf.SQLConfigBuilder + import SQLConf.buildConf - val METASTORE_LOCATION = SQLConfigBuilder("spark.sql.index.metastore"). + val METASTORE_LOCATION = buildConf("spark.sql.index.metastore"). doc("Metastore location or root directory to store index information, will be created " + "if path does not exist"). stringConf. createWithDefault("") - val CREATE_IF_NOT_EXISTS = SQLConfigBuilder("spark.sql.index.createIfNotExists"). + val CREATE_IF_NOT_EXISTS = buildConf("spark.sql.index.createIfNotExists"). doc("When set to true, creates index if one does not exist in metastore for the table"). booleanConf. createWithDefault(false) - val NUM_PARTITIONS = SQLConfigBuilder("spark.sql.index.partitions"). + val NUM_PARTITIONS = buildConf("spark.sql.index.partitions"). doc("When creating index uses this number of partitions. If value is non-positive or not " + "provided then uses `sc.defaultParallelism * 3` or `spark.sql.shuffle.partitions` " + "configuration value, whichever is smaller"). @@ -41,21 +41,21 @@ object IndexConf { createWithDefault(0) val PARQUET_FILTER_STATISTICS_ENABLED = - SQLConfigBuilder("spark.sql.index.parquet.filter.enabled"). + buildConf("spark.sql.index.parquet.filter.enabled"). doc("When set to true, writes filter statistics for indexed columns when creating table " + "index, otherwise only min/max statistics are used. Filter statistics are always used " + "during filtering stage, if applicable"). booleanConf. createWithDefault(true) - val PARQUET_FILTER_STATISTICS_TYPE = SQLConfigBuilder("spark.sql.index.parquet.filter.type"). + val PARQUET_FILTER_STATISTICS_TYPE = buildConf("spark.sql.index.parquet.filter.type"). doc("When filter statistics enabled, selects type of statistics to use when creating index. " + "Available options are `bloom`, `dict`"). stringConf. createWithDefault("bloom") val PARQUET_FILTER_STATISTICS_EAGER_LOADING = - SQLConfigBuilder("spark.sql.index.parquet.filter.eagerLoading"). + buildConf("spark.sql.index.parquet.filter.eagerLoading"). doc("When set to true, read and load all filter statistics in memory the first time catalog " + "is resolved, otherwise load them lazily as needed when evaluating predicate. " + "Eager loading removes IO of reading filter data from disk, but requires extra memory"). diff --git a/src/test/scala/org/apache/spark/sql/execution/datasources/MetastoreIndexSuite.scala b/src/test/scala/org/apache/spark/sql/execution/datasources/MetastoreIndexSuite.scala index d244a7a..70468b9 100644 --- a/src/test/scala/org/apache/spark/sql/execution/datasources/MetastoreIndexSuite.scala +++ b/src/test/scala/org/apache/spark/sql/execution/datasources/MetastoreIndexSuite.scala @@ -67,7 +67,7 @@ class MetastoreIndexSuite extends UnitTestSuite { } } - catalog.listFiles(Seq.empty) + catalog.listFiles(Seq.empty, Seq.empty) indexSeq should be (Nil) filterSeq should be (Nil) } diff --git a/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaUtilsSuite.scala b/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaUtilsSuite.scala index 7bd2060..f144013 100644 --- a/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaUtilsSuite.scala +++ b/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaUtilsSuite.scala @@ -128,6 +128,14 @@ class ParquetSchemaUtilsSuite extends UnitTestSuite { ("id", 0) :: ("str", 1) :: ("b", 2) :: ("c", 3) :: Nil) } + test("topLevelUniqueColumns - empty schema") { + val schema = MessageTypeParser.parseMessageType( + """ + | message spark_schema { } + """.stripMargin) + ParquetSchemaUtils.topLevelUniqueColumns(schema) should be (Nil) + } + test("topLevelUniqueColumns - duplicate column names") { val schema = MessageTypeParser.parseMessageType( """ diff --git a/version.sbt b/version.sbt index ed63e13..acfbdb6 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.3.1-SNAPSHOT" +version in ThisBuild := "0.4.0-SNAPSHOT"