Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not fail on write error #1547

Merged

Conversation

pdambrauskas
Copy link
Contributor

Fixes #1505

This issue occurred to us too. Found existing issue about it and made this PR.

  • Moved offset parsing and message parsing to single try/catch block
  • Reused fail backoff logic which was implemented in parse block
  • Made fail threshold configurable
  • Split consumeNextMessage to multiple smaller methods to make it more readable.

Copy link
Contributor

@HenryCaiHaiying HenryCaiHaiying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring makes the code diff difficult to read, I wasn't sure the purpose of making those new methods. Looks like the main purpose of the PR is to add mBasMessages, you can just insert this logic in place. Also what kind of errors do you see during messageWriter.write and you want to skip those errors?

@@ -223,37 +224,12 @@ protected boolean consumeNextMessage() {
}

parsedMessage = mMessageParser.parse(transformedMessage);
final double DECAY = 0.999;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we lose the reset of DECAY after refactoring?


private void writeMessage(Message rawMessage, ParsedMessage parsedMessage) throws Exception {
mMessageWriter.write(parsedMessage);
mBadMessages *= DECAY;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DECAY is final and used to decrease mBadMessages as before. It does not make sense to declare new DECAY variable on each writeMessage call, so I moved it to static final constant.

@pdambrauskas
Copy link
Contributor Author

The refactoring makes the code diff difficult to read, I wasn't sure the purpose of making those new methods. Looks like the main purpose of the PR is to add mBasMessages, you can just insert this logic in place. Also what kind of errors do you see during messageWriter.write and you want to skip those errors?

Hey Henry, thanks for taking time to review my Pull Requests :). I do understand that it is hard to review this code, since the method was not that small.

mBadMessages was introduced to make it possible to ignore all failing messages, but it is not the main purpose of this PR.

To simplify the idea, before code looked like this:

//...
try {
  // parse partition
} catch (Exception e) {
  // increment consumer.message_errors.count metric
  // handle fail, re-throw if there were > 1000 errors 
}

try {
  // write message
} catch (Exception e) {
  // just re-throw
}

After this change there is only one try block:

try {
  // parse partition
  // write message
} catch (Exception e) {
  // increment consumer.message_errors.count metric
  // handle fail, re-throw if there were > 1000 errors 
}

This allows us to configure secor to not fail on writes and also onsumer.message_errors.count is more correct.

Also what kind of errors do you see during messageWriter.write and you want to skip those errors?

Our error is very similar to one, described in #1505. We use custom ReaderWriterFactory implementation with custom Writer to write json messages to parquet and validate schema, so exception is a bit different though, I thought it would make sense to catch exception in our Writer, but then I saw that others have the same problem on #1505. And consumer.message_errors.count metric is increased on Consumer class, so i decided to keep on place responsible for handling this kind of errors.

@HenryCaiHaiying
Copy link
Contributor

Your explanation makes sense, I will merge in your PR, thanks.

@HenryCaiHaiying HenryCaiHaiying merged commit e33d9c9 into pinterest:master Aug 29, 2020
@pdambrauskas pdambrauskas deleted the do_not_fail_on_write_error branch August 29, 2020 06:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

UnsupportedOperationException fatal error when schema does not match
2 participants