Skip to content

Commit

Permalink
[HUDI-4741] hotfix to avoid partial failover cause restored subtask t…
Browse files Browse the repository at this point in the history
…imeout (apache#6796)

Co-authored-by: jian.feng <jian.feng@shopee.com>
  • Loading branch information
fengjian428 and jian.feng committed Oct 30, 2022
1 parent a7c5697 commit e222693
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 14 deletions.
Expand Up @@ -63,6 +63,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
Expand Down Expand Up @@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;

/**
* Counter for the failed tasks, a number within the range (0, task_num) means
* a partial failover.
*/
private transient AtomicInteger failedCnt;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -294,6 +301,17 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
// reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);

// based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
// when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
// then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).

// reset the ckp metadata for either partial or complete failover
if (this.failedCnt.get() == 0) {
this.ckpMetadata.reset();
}
// inc the failed tasks counter
this.failedCnt.incrementAndGet();
}

@Override
Expand Down Expand Up @@ -347,6 +365,14 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr

private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
this.failedCnt = new AtomicInteger(0);
}

/**
* Checks whether it is a PARTIAL failover.
*/
private boolean isPartialFailover() {
return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
}

/**
Expand Down Expand Up @@ -410,6 +436,16 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
// start to initialize the instant.
initInstant(event.getInstantTime());
} else if (isPartialFailover()) {
// if the bootstrap event comes from a partial failover,
// decrement the failed tasks by one.

// if all the failed task bootstrap events are received, send a start instant
// to the ckp metadata and unblock the data flushing.
if (this.failedCnt.decrementAndGet() <= 0) {
this.ckpMetadata.startInstant(this.instant);
this.failedCnt.set(0);
}
}
}

Expand Down
Expand Up @@ -37,6 +37,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);

protected static final int MAX_RETAIN_CKP_NUM = 3;
private static final int MAX_RETAIN_CKP_NUM = 3;

// the ckp metadata directory
private static final String CKP_META = "ckp_meta";
Expand Down Expand Up @@ -99,39 +100,67 @@ public void bootstrap() throws IOException {
fs.mkdirs(path);
}

/**
* Resets the message bus, would clean all the messages.
*
* <p>This expects to be called by the driver.
*/
public void reset() {
Iterator<String> itr = this.instantCache.iterator();
while (itr.hasNext()) {
cleanInstant(itr.next(), true);
itr.remove();
}
}

public void startInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
// cache the instant
cache(instant);
// cleaning
clean(instant);
clean();
}

private void clean(String newInstant) {
private void cache(String newInstant) {
if (this.instantCache == null) {
this.instantCache = new ArrayList<>();
}
this.instantCache.add(newInstant);
}

private void clean() {
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
final String instant = instantCache.get(0);
boolean[] error = new boolean[1];
CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
try {
fs.delete(path, false);
} catch (IOException e) {
error[0] = true;
LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
}
});
if (!error[0]) {
boolean success = cleanInstant(instantCache.get(0), false);
if (success) {
instantCache.remove(0);
}
}
}

private boolean cleanInstant(String instant, boolean throwsT) {
boolean success = true;
for (String fileName : CkpMessage.getAllFileNames(instant)) {
Path path = fullPath(fileName);
try {
fs.delete(path, false);
} catch (IOException ex) {
success = false;
final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
if (throwsT) {
throw new HoodieException(errMsg, ex);
} else {
LOG.warn(errMsg, ex);
}
}
}
return success;
}

/**
* Add a checkpoint commit message.
*
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -164,6 +165,14 @@ public void testCheckpointCompleteWithPartialEvents() {
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}

@Test
public void testSubTaskFailed() {
coordinator.subtaskFailed(0, null);
assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
}

@Test
public void testHiveSyncInvoked() throws Exception {
// reset
Expand Down

0 comments on commit e222693

Please sign in to comment.