Skip to content

Commit

Permalink
Backport apache#8803
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 23, 2023
1 parent 67d5cdc commit 0f365cb
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 24 deletions.
Expand Up @@ -165,6 +165,10 @@ private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T refineScanW
refinedScan = refinedScan.includeColumnStats();
}

if (context.includeStatsForColumns() != null) {
refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns());
}

refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());

refinedScan =
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class ScanContext implements Serializable {
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;
private final Collection<String> includeStatsForColumns;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
Expand All @@ -84,6 +86,7 @@ private ScanContext(
List<Expression> filters,
long limit,
boolean includeColumnStats,
Collection<String> includeStatsForColumns,
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
Expand Down Expand Up @@ -114,6 +117,7 @@ private ScanContext(
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
this.includeStatsForColumns = includeStatsForColumns;
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
Expand Down Expand Up @@ -248,6 +252,10 @@ public boolean includeColumnStats() {
return includeColumnStats;
}

public Collection<String> includeStatsForColumns() {
return includeStatsForColumns;
}

public boolean exposeLocality() {
return exposeLocality;
}
Expand Down Expand Up @@ -285,6 +293,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.includeColumnStats(includeStatsForColumns)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
Expand Down Expand Up @@ -313,6 +322,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.includeColumnStats(includeStatsForColumns)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
Expand Down Expand Up @@ -349,6 +359,7 @@ public static class Builder {
private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
private boolean includeColumnStats =
FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
private Collection<String> includeStatsForColumns = null;
private boolean exposeLocality;
private Integer planParallelism =
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
Expand Down Expand Up @@ -464,6 +475,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) {
return this;
}

public Builder includeColumnStats(Collection<String> newIncludeStatsForColumns) {
this.includeStatsForColumns = newIncludeStatsForColumns;
return this;
}

public Builder exposeLocality(boolean newExposeLocality) {
this.exposeLocality = newExposeLocality;
return this;
Expand Down Expand Up @@ -531,6 +547,7 @@ public ScanContext build() {
filters,
limit,
includeColumnStats,
includeStatsForColumns,
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
Expand Down
Expand Up @@ -84,7 +84,7 @@ private void appendTwoSnapshots() throws IOException {
}

/** @return the last enumerated snapshot id */
private IcebergEnumeratorPosition verifyOneCycle(
private CycleResult verifyOneCycle(
ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition)
throws Exception {
List<Record> batch =
Expand All @@ -106,7 +106,7 @@ private IcebergEnumeratorPosition verifyOneCycle(
Assert.assertEquals(
dataFile.path().toString(),
Iterables.getOnlyElement(split.task().files()).file().path().toString());
return result.toPosition();
return new CycleResult(result.toPosition(), split);
}

@Test
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testTableScanThenIncrementalWithEmptyTable() throws Exception {
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -169,7 +169,7 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -281,7 +281,7 @@ public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -323,12 +323,12 @@ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Except

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

@Test
public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception {
public void testIncrementalFromSnapshotIdWithEmptyTable() {
ScanContext scanContextWithInvalidSnapshotId =
ScanContext.builder()
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
Expand Down Expand Up @@ -409,12 +409,12 @@ public void testIncrementalFromSnapshotId() throws Exception {

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

@Test
public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception {
public void testIncrementalFromSnapshotTimestampWithEmptyTable() {
ScanContext scanContextWithInvalidSnapshotId =
ScanContext.builder()
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception {

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -529,6 +529,115 @@ public void testMaxPlanningSnapshotCount() throws Exception {
thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.path().toString()));
}

@Test
public void testTableScanNoStats() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(false)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 0);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 0);
lastPosition = result.lastPosition;
}
}

@Test
public void testTableScanAllStats() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 3);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 3);
lastPosition = result.lastPosition;
}
}

@Test
public void testTableScanSingleStat() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(ImmutableSet.of("data"))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 1);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 1);
lastPosition = result.lastPosition;
}
}

private void verifyStatCount(IcebergSourceSplit split, int expected) {
if (expected == 0) {
split
.task()
.files()
.forEach(
f -> {
Assert.assertNull(f.file().valueCounts());
Assert.assertNull(f.file().columnSizes());
Assert.assertNull(f.file().lowerBounds());
Assert.assertNull(f.file().upperBounds());
Assert.assertNull(f.file().nanValueCounts());
Assert.assertNull(f.file().nullValueCounts());
});
} else {
split
.task()
.files()
.forEach(
f -> {
Assert.assertEquals(expected, f.file().valueCounts().size());
Assert.assertEquals(expected, f.file().columnSizes().size());
Assert.assertEquals(expected, f.file().lowerBounds().size());
Assert.assertEquals(expected, f.file().upperBounds().size());
Assert.assertEquals(expected, f.file().nullValueCounts().size());
// The nanValue is not counted for long and string fields
Assert.assertEquals(0, f.file().nanValueCounts().size());
});
}
}

private void verifyMaxPlanningSnapshotCountResult(
ContinuousEnumerationResult result,
Snapshot fromSnapshotExclusive,
Expand Down Expand Up @@ -566,4 +675,14 @@ private Snapshot appendSnapshot(long seed, int numRecords) throws Exception {
dataAppender.appendToTable(dataFile);
return tableResource.table().currentSnapshot();
}

private static class CycleResult {
IcebergEnumeratorPosition lastPosition;
IcebergSourceSplit split;

CycleResult(IcebergEnumeratorPosition lastPosition, IcebergSourceSplit split) {
this.lastPosition = lastPosition;
this.split = split;
}
}
}
Expand Up @@ -165,6 +165,10 @@ private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T refineScanW
refinedScan = refinedScan.includeColumnStats();
}

if (context.includeStatsForColumns() != null) {
refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns());
}

refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());

refinedScan =
Expand Down

0 comments on commit 0f365cb

Please sign in to comment.