Skip to content

Commit

Permalink
spark 2.2 support
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi committed Jan 2, 2018
1 parent 6a55637 commit 07da780
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Expand Up @@ -9,10 +9,10 @@ matrix:
include:
- jdk: oraclejdk8
scala: 2.11.7
env: TEST_SPARK_VERSION="2.1.0" TEST_SPARK_RELEASE="spark-2.1.0-bin-hadoop2.7"
env: TEST_SPARK_VERSION="2.2.0" TEST_SPARK_RELEASE="spark-2.2.0-bin-hadoop2.7"
- jdk: oraclejdk8
scala: 2.11.7
env: TEST_SPARK_VERSION="2.1.1" TEST_SPARK_RELEASE="spark-2.1.1-bin-hadoop2.7"
env: TEST_SPARK_VERSION="2.2.1" TEST_SPARK_RELEASE="spark-2.2.1-bin-hadoop2.7"
script:
- sbt ++$TRAVIS_SCALA_VERSION scalastyle
- sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle"
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Expand Up @@ -6,7 +6,7 @@ scalaVersion := "2.11.7"

spName := "lightcopy/parquet-index"

val defaultSparkVersion = "2.1.0"
val defaultSparkVersion = "2.2.0"

sparkVersion := sys.props.getOrElse("spark.testVersion", defaultSparkVersion)

Expand Down
Expand Up @@ -95,15 +95,13 @@ object IndexSourceStrategy extends Strategy with Logging {

val outputAttributes = readDataColumns ++ partitionColumns

val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)

val scan =
FileSourceScanExec(
fsRelation,
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
dataFilters,
table.map(_.identifier))

val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
Expand Down
Expand Up @@ -60,7 +60,7 @@ case class IndexedDataSource(
}

def resolveRelation(): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val caseInsensitiveOptions = CaseInsensitiveMap(options)
providingClass.newInstance() match {
case support: MetastoreSupport =>
// if index does not exist in metastore and option is selected, we will create it before
Expand Down Expand Up @@ -102,7 +102,7 @@ case class IndexedDataSource(
val catalog = new InMemoryFileIndex(metastore.session, paths, options, partitionSchema)
val partitionSpec = catalog.partitionSpec
// ignore filtering expression for partitions, fetch all available files
val allFiles = catalog.listFiles(Nil)
val allFiles = catalog.listFiles(Nil, Nil)
val spec = locationSpec(s.identifier, tablePath.getPath, catalogTable)
metastore.create(spec, mode) { (status, isAppend) =>
s.createIndex(metastore, status, tablePath, isAppend, partitionSpec, allFiles, columns)
Expand Down
Expand Up @@ -77,7 +77,9 @@ abstract class MetastoreIndex extends FileIndex with Logging {
* @param partitionFilters filters used to prune which partitions are returned.
* @param dataFilters filters that can be applied on non-partitioned columns.
*/
final def listFiles(partitionFilters: Seq[Expression]): Seq[PartitionDirectory] = {
final def listFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
listFilesWithIndexSupport(partitionFilters, Nil, indexFilters)
}
}
Expand Up @@ -128,7 +128,7 @@ class ParquetIndex(
})

val selected = partitions.filter {
case PartitionPath(values, _) => boundPredicate(values)
case PartitionPath(values, _) => boundPredicate.eval(values)
}
logInfo {
val total = partitions.length
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/org/apache/spark/sql/internal/IndexConf.scala
Expand Up @@ -20,42 +20,42 @@ import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.SparkSession

object IndexConf {
import SQLConf.SQLConfigBuilder
import SQLConf.buildConf

val METASTORE_LOCATION = SQLConfigBuilder("spark.sql.index.metastore").
val METASTORE_LOCATION = buildConf("spark.sql.index.metastore").
doc("Metastore location or root directory to store index information, will be created " +
"if path does not exist").
stringConf.
createWithDefault("")

val CREATE_IF_NOT_EXISTS = SQLConfigBuilder("spark.sql.index.createIfNotExists").
val CREATE_IF_NOT_EXISTS = buildConf("spark.sql.index.createIfNotExists").
doc("When set to true, creates index if one does not exist in metastore for the table").
booleanConf.
createWithDefault(false)

val NUM_PARTITIONS = SQLConfigBuilder("spark.sql.index.partitions").
val NUM_PARTITIONS = buildConf("spark.sql.index.partitions").
doc("When creating index uses this number of partitions. If value is non-positive or not " +
"provided then uses `sc.defaultParallelism * 3` or `spark.sql.shuffle.partitions` " +
"configuration value, whichever is smaller").
intConf.
createWithDefault(0)

val PARQUET_FILTER_STATISTICS_ENABLED =
SQLConfigBuilder("spark.sql.index.parquet.filter.enabled").
buildConf("spark.sql.index.parquet.filter.enabled").
doc("When set to true, writes filter statistics for indexed columns when creating table " +
"index, otherwise only min/max statistics are used. Filter statistics are always used " +
"during filtering stage, if applicable").
booleanConf.
createWithDefault(true)

val PARQUET_FILTER_STATISTICS_TYPE = SQLConfigBuilder("spark.sql.index.parquet.filter.type").
val PARQUET_FILTER_STATISTICS_TYPE = buildConf("spark.sql.index.parquet.filter.type").
doc("When filter statistics enabled, selects type of statistics to use when creating index. " +
"Available options are `bloom`, `dict`").
stringConf.
createWithDefault("bloom")

val PARQUET_FILTER_STATISTICS_EAGER_LOADING =
SQLConfigBuilder("spark.sql.index.parquet.filter.eagerLoading").
buildConf("spark.sql.index.parquet.filter.eagerLoading").
doc("When set to true, read and load all filter statistics in memory the first time catalog " +
"is resolved, otherwise load them lazily as needed when evaluating predicate. " +
"Eager loading removes IO of reading filter data from disk, but requires extra memory").
Expand Down
Expand Up @@ -67,7 +67,7 @@ class MetastoreIndexSuite extends UnitTestSuite {
}
}

catalog.listFiles(Seq.empty)
catalog.listFiles(Seq.empty, Seq.empty)
indexSeq should be (Nil)
filterSeq should be (Nil)
}
Expand Down
Expand Up @@ -128,6 +128,14 @@ class ParquetSchemaUtilsSuite extends UnitTestSuite {
("id", 0) :: ("str", 1) :: ("b", 2) :: ("c", 3) :: Nil)
}

test("topLevelUniqueColumns - empty schema") {
val schema = MessageTypeParser.parseMessageType(
"""
| message spark_schema { }
""".stripMargin)
ParquetSchemaUtils.topLevelUniqueColumns(schema) should be (Nil)
}

test("topLevelUniqueColumns - duplicate column names") {
val schema = MessageTypeParser.parseMessageType(
"""
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
@@ -1 +1 @@
version in ThisBuild := "0.3.1-SNAPSHOT"
version in ThisBuild := "0.4.0-SNAPSHOT"

0 comments on commit 07da780

Please sign in to comment.