Skip to content

Commit

Permalink
[Bugfix][Zeta] Fix the checkpoint timeout (apache#5722)
Browse files Browse the repository at this point in the history
Avoid reading large files or using scroll queries in Elasticsearch,
as the pollNext method tends to hold the checkpointLock indefinitely,
leading to checkpoint timeout.
  • Loading branch information
xuqi1633 committed Nov 22, 2023
1 parent 29bc32b commit b88c41f
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,31 +75,29 @@ public void close() throws IOException {

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
ElasticsearchSourceSplit split = splits.poll();
if (split != null) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
ScrollResult scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
ElasticsearchSourceSplit split = splits.poll();
if (split != null) {
SourceIndexInfo sourceIndexInfo = split.getSourceIndexInfo();
ScrollResult scrollResult =
esRestClient.searchByScroll(
sourceIndexInfo.getIndex(),
sourceIndexInfo.getSource(),
sourceIndexInfo.getQuery(),
sourceIndexInfo.getScrollTime(),
sourceIndexInfo.getScrollSize());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
while (scrollResult.getDocs() != null && scrollResult.getDocs().size() > 0) {
scrollResult =
esRestClient.searchWithScrollId(
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
outputFromScrollResult(scrollResult, sourceIndexInfo.getSource(), output);
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded ELasticsearch source");
context.signalNoMoreElement();
} else {
Thread.sleep(pollNextWaitTime);
}
} else if (noMoreSplit) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded ELasticsearch source");
context.signalNoMoreElement();
} else {
Thread.sleep(pollNextWaitTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
beingWrittenWriter.clear();
}

private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(key, getTargetLocation(key));
});
beingWrittenOutputStream.clear();
isFirstWrite.clear();
}

private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,24 @@ public void close() throws IOException {}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
// todo: If there is only one table , the tableId is not needed, but it's better
// to set this
readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
throw new FileConnectorException(
CommonErrorCodeDeprecated.FILE_OPERATION_FAILED, errorMsg, e);
}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded File source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
FileSourceSplit split = sourceSplits.poll();
if (null != split) {
try {
// todo: If there is only one table , the tableId is not needed, but it's better
// to set this
readStrategy.read(split.splitId(), "", output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
throw new FileConnectorException(
CommonErrorCodeDeprecated.FILE_OPERATION_FAILED, errorMsg, e);
}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded File source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,23 @@ public void close() throws IOException {
@Override
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
JdbcSourceSplit split = splits.poll();
if (null != split) {
try {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
output.collect(seaTunnelRow);
}
} finally {
inputFormat.close();
JdbcSourceSplit split = splits.poll();
if (null != split) {
try {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
SeaTunnelRow seaTunnelRow = inputFormat.nextRecord();
output.collect(seaTunnelRow);
}
} else if (noMoreSplit && splits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded jdbc source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
} finally {
inputFormat.close();
}
} else if (noMoreSplit && splits.isEmpty()) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded jdbc source");
context.signalNoMoreElement();
} else {
Thread.sleep(1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ public void collect(SeaTunnelRow sourceRecord) {
flowControlGate.audit(sourceRecord);
}
}
internalCollector.collect(rowSerialization.convert(sourceRecord));
synchronized (checkpointLock) {
internalCollector.collect(rowSerialization.convert(sourceRecord));
}
emptyThisPollNext = false;
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit b88c41f

Please sign in to comment.