Skip to content

Commit

Permalink
PLATQ-3011 NOOP on Tombstone filtering if there are no values to filt…
Browse files Browse the repository at this point in the history
…er (apache#63)

* PLATQ-3011 Do not add filter if there are no tombstone literals
* Bump version to 1.0-adobe-17.16
  • Loading branch information
prodeezy authored and GitHub Enterprise committed Feb 13, 2020
1 parent ce4f672 commit a196581
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ allprojects {
group = "org.apache.iceberg"
apply plugin: 'com.palantir.baseline-idea'
/* TODO - this assumes that the upstream apache version is 1.0 so we need to be consistent w/ upstream changes */
version = "1.0-adobe-17.15"
version = "1.0-adobe-17.16"
repositories {
maven { url "http://palantir.bintray.com/releases" }
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ object DataSourceV2StrategyWithAdobeFilteringAndPruning extends Strategy {
}

private def pushPostScanTombstoneFilter(reader: DataSourceReader,
filters: Seq[Expression], relation: DataSourceV2Relation): Option[Expression] = {
filters: Seq[Expression], relation: DataSourceV2Relation): Option[Expression] = {
reader match {
case r: SupportsTombstoneFilters =>
val tombstoneExpression = tombstoneSourceBatchIdExpression(relation, r)
logInfo(s"TombstoneFilters: ${tombstoneExpression}")
Some(tombstoneExpression)
tombstoneExpression
case _ => None
}
}
Expand All @@ -211,11 +211,14 @@ object DataSourceV2StrategyWithAdobeFilteringAndPruning extends Strategy {
* @return expression for tombstone field NOT-IN filtered on tombstone values
*/
private def tombstoneSourceBatchIdExpression(relation: DataSourceV2Relation,
tombstoneReader: SupportsTombstoneFilters): Expression = {
tombstoneReader: SupportsTombstoneFilters): Option[Expression] = {
// TODO: This is a case-insensitive check so works for acp_system_metadata case but should be checked properly
val tombstoneFieldName = tombstoneReader.tombstoneField()
val literals = tombstoneReader.tombstoneValues().toSeq

if (literals.isEmpty) {
return None
}
if (!"".equals(tombstoneFieldName) && tombstoneFieldName.indexOf(".") > -1) {
// nested struct field
val rootParentFieldName = tombstoneFieldName.substring(0, tombstoneFieldName.indexOf("."))
Expand All @@ -227,15 +230,15 @@ object DataSourceV2StrategyWithAdobeFilteringAndPruning extends Strategy {
schema.fields(schema.fieldIndex(rootParentFieldName)) != null,
s"No such field ${rootParentFieldName} in schema : ${schema}")

Not(In(buildGetStructField(rootParentFieldAtt, schema.fields(schema.fieldIndex(rootParentFieldName)),
subFieldName), literals.map(Literal(_))))
Some(Not(In(buildGetStructField(rootParentFieldAtt, schema.fields(schema.fieldIndex(rootParentFieldName)),
subFieldName), literals.map(Literal(_)))))

} else {
// flat field
val tombstoneAttMaybe = relation.output.find(a => a.name.equalsIgnoreCase(tombstoneFieldName))
ValidationException.check(!tombstoneAttMaybe.isEmpty, s"Failed to find the tombstone field " +
s"${tombstoneFieldName} in schema : ${relation.schema}")
Not(In(tombstoneAttMaybe.get, literals.map(Literal(_))))
Some(Not(In(tombstoneAttMaybe.get, literals.map(Literal(_)))))
}
}

Expand Down Expand Up @@ -335,8 +338,9 @@ object DataSourceV2StrategyWithAdobeFilteringAndPruning extends Strategy {
relation, project, filters)
val reader = relation.newReader()
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
// `postScanFilters` need to be evaluated after the scan. Includes tombstone filtering
// or any other post-scan row group filtering.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter
val (pushedFilters, postScanFilters) = pushFilters(reader, normalizedFilters, relation)
val output = pruneColumns(reader, relation, normalizedProjects ++ postScanFilters)
logInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,66 @@ public void testWriterAppendFilesAndAppendTombstonesOnTwoLevelStructField() {

Assert.assertEquals("Result rows should only match z", 3, load.count());
}


@Test
public void testWriterNOOPTombstonesAndAppendTombstonesOnTwoLevelStructField() {
Timestamp ts = Timestamp.valueOf("2019-10-10 10:10:10.10");

List<Row> rows = Lists.newArrayList(
RowFactory.create(101, ts, "A", RowFactory.create("X", Collections.emptyMap()),
RowFactory.create(RowFactory.create("x"))),
RowFactory.create(102, ts, "A", RowFactory.create("X", Collections.emptyMap()),
RowFactory.create(RowFactory.create("x"))),
RowFactory.create(103, ts, "A", RowFactory.create("X", Collections.emptyMap()),
RowFactory.create(RowFactory.create("x"))),
RowFactory.create(104, ts, "A", RowFactory.create("X", Collections.emptyMap()),
RowFactory.create(RowFactory.create("x"))),
RowFactory.create(105, ts, "A", RowFactory.create("X", Collections.emptyMap()),
RowFactory.create(RowFactory.create("x"))),
RowFactory.create(201, ts, "A", RowFactory.create("Y", Collections.emptyMap()),
RowFactory.create(RowFactory.create("y"))),
RowFactory.create(202, ts, "A", RowFactory.create("Y", Collections.emptyMap()),
RowFactory.create(RowFactory.create("y"))),
RowFactory.create(203, ts, "A", RowFactory.create("Y", Collections.emptyMap()),
RowFactory.create(RowFactory.create("y"))),
RowFactory.create(204, ts, "A", RowFactory.create("Y", Collections.emptyMap()),
RowFactory.create(RowFactory.create("y"))),
RowFactory.create(205, ts, "A", RowFactory.create("Y", Collections.emptyMap()),
RowFactory.create(RowFactory.create("y"))),
RowFactory.create(301, ts, "A", RowFactory.create("Z", Collections.emptyMap()),
RowFactory.create(RowFactory.create("z")))
);

spark.createDataFrame(rows, SCHEMA)
.select("*")
.write()
.format("iceberg.adobe")
.mode("append")
.save(getTableLocation());

List<Row> thirdBatchRows = Lists.newArrayList(
RowFactory.create(301, ts, "B", RowFactory.create("Z", Collections.emptyMap()),
RowFactory.create(RowFactory.create("z"))),
RowFactory.create(302, ts, "B", RowFactory.create("Z", Collections.emptyMap()),
RowFactory.create(RowFactory.create("z")))
);

// Write the data and add new tombstones for X and Y
spark.createDataFrame(thirdBatchRows, SCHEMA)
.select("*")
.write()
.format("iceberg.adobe")
.option(TombstoneExtension.TOMBSTONE_COLUMN, "level_one.level_two.level_three")
.mode(SaveMode.Append)
.save(getTableLocation());

Dataset<Row> load = spark.read()
.format("iceberg.adobe")
.option("iceberg.extension.tombstone.col", "level_one.level_two.level_three")
.load(getTableLocation());
load.show(false);

Assert.assertEquals("Result rows should match all", 13, load.count());
}
}

0 comments on commit a196581

Please sign in to comment.