diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index c87d5b2443c4..a7b3994357d2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -63,6 +63,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator */ private CkpMetadata ckpMetadata; + /** + * Counter for the failed tasks, a number within the range (0, task_num) means + * a partial failover. + */ + private transient AtomicInteger failedCnt; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -294,6 +301,17 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); + + // based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling, + // when a sub-task event is received, we can decide whether it recovers from a partial or complete failover, + // then to reuse the current instant(PARTIAL) or start a new one(COMPLETE). + + // reset the ckp metadata for either partial or complete failover + if (this.failedCnt.get() == 0) { + this.ckpMetadata.reset(); + } + // inc the failed tasks counter + this.failedCnt.incrementAndGet(); } @Override @@ -347,6 +365,14 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr private void reset() { this.eventBuffer = new WriteMetadataEvent[this.parallelism]; + this.failedCnt = new AtomicInteger(0); + } + + /** + * Checks whether it is a PARTIAL failover. + */ + private boolean isPartialFailover() { + return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism; } /** @@ -410,6 +436,16 @@ private void handleBootstrapEvent(WriteMetadataEvent event) { if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) { // start to initialize the instant. initInstant(event.getInstantTime()); + } else if (isPartialFailover()) { + // if the bootstrap event comes from a partial failover, + // decrement the failed tasks by one. + + // if all the failed task bootstrap events are received, send a start instant + // to the ckp metadata and unblock the data flushing. + if (this.failedCnt.decrementAndGet() <= 0) { + this.ckpMetadata.startInstant(this.instant); + this.failedCnt.set(0); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 6895b2a0c63d..d0f26740d6e2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -37,6 +37,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class); - protected static final int MAX_RETAIN_CKP_NUM = 3; + private static final int MAX_RETAIN_CKP_NUM = 3; // the ckp metadata directory private static final String CKP_META = "ckp_meta"; @@ -99,6 +100,19 @@ public void bootstrap() throws IOException { fs.mkdirs(path); } + /** + * Resets the message bus, would clean all the messages. + * + *

This expects to be called by the driver. + */ + public void reset() { + Iterator itr = this.instantCache.iterator(); + while (itr.hasNext()) { + cleanInstant(itr.next(), true); + itr.remove(); + } + } + public void startInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)); try { @@ -106,32 +120,47 @@ public void startInstant(String instant) { } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e); } + // cache the instant + cache(instant); // cleaning - clean(instant); + clean(); } - private void clean(String newInstant) { + private void cache(String newInstant) { if (this.instantCache == null) { this.instantCache = new ArrayList<>(); } this.instantCache.add(newInstant); + } + + private void clean() { if (instantCache.size() > MAX_RETAIN_CKP_NUM) { - final String instant = instantCache.get(0); - boolean[] error = new boolean[1]; - CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> { - try { - fs.delete(path, false); - } catch (IOException e) { - error[0] = true; - LOG.warn("Exception while cleaning the checkpoint meta file: " + path); - } - }); - if (!error[0]) { + boolean success = cleanInstant(instantCache.get(0), false); + if (success) { instantCache.remove(0); } } } + private boolean cleanInstant(String instant, boolean throwsT) { + boolean success = true; + for (String fileName : CkpMessage.getAllFileNames(instant)) { + Path path = fullPath(fileName); + try { + fs.delete(path, false); + } catch (IOException ex) { + success = false; + final String errMsg = "Exception while cleaning the checkpoint meta file: " + path; + if (throwsT) { + throw new HoodieException(errMsg, ex); + } else { + LOG.warn(errMsg, ex); + } + } + } + return success; + } + /** * Add a checkpoint commit message. * diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index d5d35f7494f4..64bf8e278865 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -29,6 +29,7 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -164,6 +165,14 @@ public void testCheckpointCompleteWithPartialEvents() { assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } + @Test + public void testSubTaskFailed() { + coordinator.subtaskFailed(0, null); + assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned"); + CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath())); + assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned"); + } + @Test public void testHiveSyncInvoked() throws Exception { // reset