Skip to content

Handle empty topics for the ProgressMonitor#356

Merged
HenryCaiHaiying merged 4 commits intopinterest:masterfrom
strava:fix_missing_topic
Jul 28, 2017
Merged

Handle empty topics for the ProgressMonitor#356
HenryCaiHaiying merged 4 commits intopinterest:masterfrom
strava:fix_missing_topic

Conversation

@Fluxx
Copy link
Copy Markdown
Contributor

@Fluxx Fluxx commented Jul 28, 2017

If a TopicPartition has had all of its messages on the broker compacted away, when you call KafakaClient.getCommittedMessage with that TopicPartition a RuntimeException was raised. This is due to the obvious fact that getCommittedMessage is attempting to return a Message, however there is no Message to return from the broker.

This is not, I would argue, an exceptional situation. If the topic has low volume or an aggressive compaction configuration, often times the log on the broker will be empty. This generally does not cause problems, except in the case of #189, where the ProgressMonitor will crash on the error.

This is not ideal, as actual progress is totally caught up. So this change attempts to address this situation through two changes:

  1. Change KafakaClient.getCommittedMessage to return null if there is no message found.
  2. Update ProgressMonitor.getStats to record a 0 lag value if no committed message was found.

I've left some self-review notes below please take a look.

I'll also add that I didn't see any existing tests for the ProgressMonitor or KafakaClient so I did not add any. Let me know if you think it would be worth adding some however.

response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()));
int errorCode = response.errorCode(topicPartition.getTopic(), topicPartition.getPartition());

if (errorCode == 1) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hard-code 1, can we use the ErrorCodes (OutOrRange) from kafka side?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah that's a good idea. Will do.

// That is no an exceptional situation - in fact it can be normal if
// the topic being consumed by Secor has a low volume. So in that
// case, simply return null
return null;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think a suitable alternative to catching this error here is to just let it bubble up, and catch it in the ProgressMonitor instead. Thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with either way. The caller either needs to deal with that special exception or check null return value.

response.errorCode(topicPartition.getTopic(), topicPartition.getPartition()));
int errorCode = response.errorCode(topicPartition.getTopic(), topicPartition.getPartition());

if (errorCode == 1) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of hard-code 1, can we use the ErrorCodes (OutOrRange) from kafka side?

//
// That is no an exceptional situation - in fact it can be normal if
// the topic being consumed by Secor has a low volume. So in that
// case, simply return null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to have Log.WARN in this case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, will add.

// That is no an exceptional situation - in fact it can be normal if
// the topic being consumed by Secor has a low volume. So in that
// case, simply return null
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with either way. The caller either needs to deal with that special exception or check null return value.


long offsetLag = lastOffset - committedOffset;
long timestampMillisLag = lastTimestampMillis - committedTimestampMillis;
long offsetLag = 0L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the code just return around line 195?

If committed message is null, what will the value of lastMessage? Will that also be null? Otherwise secor would have another upload after the committed message.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...perhaps not return, but it could continue to skip this TopicPartition?

@Fluxx
Copy link
Copy Markdown
Contributor Author

Fluxx commented Jul 28, 2017

Alright, changes have been made, including the continue one I hinted at. Please take another look.

@HenryCaiHaiying HenryCaiHaiying merged commit 4975c8e into pinterest:master Jul 28, 2017
@HenryCaiHaiying
Copy link
Copy Markdown
Contributor

Thanks for the contribution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants