Skip to content

Commit

Permalink
[HUDI-2996] Flink streaming reader 'skip_compaction' option does not …
Browse files Browse the repository at this point in the history
…work (apache#4304)

close apache#4304
  • Loading branch information
Fugle666 authored and nsivabalan committed Dec 29, 2021
1 parent abfa64d commit 2b26051
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
// returns early for streaming mode
return completedTimeline.getInstants()
return maySkipCompaction(completedTimeline.getInstants())
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void testStreamWriteBatchReadOptimized() {
@Test
void testStreamWriteReadSkippingCompaction() throws Exception {
// create filesystem table named source
String createSource = TestConfigurations.getFileSourceDDL("source");
String createSource = TestConfigurations.getFileSourceDDL("source", 4);
streamTableEnv.executeSql(createSource);

String hoodieTableDDL = sql("t1")
Expand All @@ -260,7 +260,12 @@ void testStreamWriteReadSkippingCompaction() throws Exception {
String insertInto = "insert into t1 select * from source";
execInsertSql(streamTableEnv, insertInto);

List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
String instant = TestUtils.getNthCompleteInstant(tempFile.getAbsolutePath(), 2, true);

streamTableEnv.getConfig().getConfiguration()
.setBoolean("table.dynamic-table-options.enabled", true);
final String query = String.format("select * from t1/*+ options('read.start-commit'='%s')*/", instant);
List<Row> rows = execSelectSql(streamTableEnv, query, 10);
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT_LATEST_COMMIT);
}

Expand Down
13 changes: 13 additions & 0 deletions hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;

import javax.annotation.Nullable;

import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -64,6 +66,17 @@ public static String getFirstCompleteInstant(String basePath) {
.map(HoodieInstant::getTimestamp).orElse(null);
}

@Nullable
public static String getNthCompleteInstant(String basePath, int n, boolean isDelta) {
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
return metaClient.getActiveTimeline()
.filterCompletedInstants()
.filter(instant -> isDelta ? HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()) : HoodieTimeline.COMMIT_ACTION.equals(instant.getAction()))
.nthInstant(n).map(HoodieInstant::getTimestamp)
.orElse(null);
}

public static String getSplitPartitionPath(MergeOnReadInputSplit split) {
assertTrue(split.getLogPaths().isPresent());
final String logPath = split.getLogPaths().get().get(0);
Expand Down

0 comments on commit 2b26051

Please sign in to comment.