New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retry bulk chunk requests to Elasticsearch 2, 5, and 6 #1626
Conversation
Implements retries when saving bulk documents to Elasticsearch. This is useful for transient connection problems and other failures which are temporary in nature. The current behavior will cause the entire "Save to Elasticsearch" step to fail and retry, which creates redundant saves to the cluster. There is no existing method to enable a single failed chunk to retry. This behavior is implemented with two new options, withMaxRetries and withRetryPause, which can be passed in the configuration as "maxRetries" and "retryPause" respectively.
Codecov Report
@@ Coverage Diff @@
## master #1626 +/- ##
=========================================
- Coverage 72.7% 68.9% -3.81%
=========================================
Files 186 174 -12
Lines 5764 5268 -496
Branches 329 321 -8
=========================================
- Hits 4191 3630 -561
- Misses 1573 1638 +65
Continue to review full report at Codecov.
|
Hi @soundofjw thanks for your PR! 😄 when I added the ES6 module this was one of the things I had in my todo list! Thank you! At first look this seems ok, however I think i would like to see this using the |
Some of the exceptions thrown are not caught within the Client bulk() method.
@regadas I have not prior knowledge of this - I think it's a great idea. |
…rch method" This exceeds the number of max params in scala style. So rely on it coming as cmdLine arg. This reverts commit 00c299f.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soundofjw cool! 👍 lets use FluentBackoff
@regadas FluentBackoff changes in - I did not go through the trouble of adding every FluentBackoff argument. Let me know if you think this is desired - my experience suggests it may be OK to have the defaults. |
this.maxBulkRequestSize = maxBulkRequestSize; | ||
this.clientSupplier = new ClientSupplier(clusterName, servers); | ||
this.toActionRequests = toActionRequests; | ||
this.error = error; | ||
|
||
this.backoffConfig = FluentBackoff.DEFAULT | ||
.withMaxRetries(maxRetries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.withMaxRetries(maxRetries) | |
.withMaxRetries(maxRetries - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the param were maxAttempts
, this would make sense - but when we say "3 retries," we are effectively saying 4 attempts. I'd resolve to keep this as is.
|
||
do { | ||
try { | ||
final BulkRequest bulkRequest = new BulkRequest().add(chunk).refresh(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soundofjw no need for .refresh(false)
it's already the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, though it should be noted that when we are indexing with a refresh interval of -1
, we're getting refreshes anyway (after a certain point, probably hitting the maximum amount of docs that can hang in the bulk queue) - it's really hard to know why this is happening. But to your point, changing this had no effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah refresh at indexing time can happen do to several things ... document sizes, nr documents, queues... I guess at some point we can offer support, but let's not do it in the same PR, next one! 😄
final long numOfShard, | ||
final int maxBulkRequestSize, | ||
final ThrowingConsumer<BulkExecutionException> error) { | ||
final InetSocketAddress[] servers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soundofjw can you undo the unrelated changes, keep the format?
this.maxBulkRequestSize = maxBulkRequestSize; | ||
this.clientSupplier = new ClientSupplier(clusterName, servers); | ||
this.toActionRequests = toActionRequests; | ||
this.error = error; | ||
|
||
this.backoffConfig = FluentBackoff.DEFAULT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soundofjw This is actually needs to be inside a
@Setup
public void setup() throws Exception {
...
}
|
||
final BackOff backoff = backoffConfig.backoff(); | ||
|
||
do { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@soundofjw wdyt of remove this do { ... } while(true)
in favor of a
while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
...
}
removes the need for all of this
try {
final long sleepTime = backoff.nextBackOffMillis();
if (sleepTime == BackOff.STOP) {
break;
}
Thread.sleep(sleepTime);
} catch (InterruptedException | IOException e1) {
LOG.error("Interrupt during backoff", e1);
break;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your proposal will introduce a sleep at the beginning of every attempt:
public static boolean next(Sleeper sleeper, BackOff backOff)
throws InterruptedException, IOException {
long backOffTime = backOff.nextBackOffMillis();
if (backOffTime == BackOff.STOP) {
return false;
}
sleeper.sleep(backOffTime);
return true;
}```
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sorry for poor explanation, yeah correct... what I was trying to say is that I think we can refactor this block by decoupling the logic a little bit and at the same time trying to leverage that util for the retry logic ... I think we can get a better semantic and readability.
|
||
do { | ||
try { | ||
final BulkRequest bulkRequest = new BulkRequest().add(chunk).refresh(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah refresh at indexing time can happen do to several things ... document sizes, nr documents, queues... I guess at some point we can offer support, but let's not do it in the same PR, next one! 😄
do { | ||
try { | ||
final BulkRequest bulkRequest = new BulkRequest().add(chunk) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here ... this policy is the default one.
|
||
final BackOff backoff = backoffConfig.backoff(); | ||
|
||
do { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah sorry for poor explanation, yeah correct... what I was trying to say is that I think we can refactor this block by decoupling the logic a little bit and at the same time trying to leverage that util for the retry logic ... I think we can get a better semantic and readability.
9bc5e21
to
b0d8103
Compare
@regadas what's the status of this ? |
Closed via #1826 |
Implements retries when saving bulk documents to Elasticsearch.
This is useful for transient connection problems and other failures which are temporary in nature.
The current behavior will cause the entire "Save to Elasticsearch" step to fail and retry, which creates redundant saves to the cluster. There is no existing method to enable a single failed chunk to retry.
This behavior is implemented with two new options, withMaxRetries and withRetryPause, which can be passed in the configuration as "maxRetries" and "retryPause" respectively.
For completeness, this also patches a typo in the name of the "Write to Elasticesarch" transform ;)
TODO:
FluentBackoff
. See comments.Here we see the logs for the re-attempts in action: