New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use bounded list for offset manager #1567
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1567 +/- ##
==========================================
+ Coverage 98.21% 98.22% +<.01%
==========================================
Files 195 195
Lines 9602 9632 +30
==========================================
+ Hits 9431 9461 +30
Misses 134 134
Partials 37 37
Continue to review full report at Codecov.
|
cmd/ingester/app/flags.go
Outdated
@@ -63,14 +65,17 @@ const ( | |||
DefaultEncoding = kafka.EncodingProto | |||
// DefaultDeadlockInterval is the default deadlock interval | |||
DefaultDeadlockInterval = 1 * time.Minute | |||
// DefaultMaxOutOfOrderOffsets is the default max out of ourder offsets to maintain before restarting the processor | |||
DefaultMaxOutOfOrderOffsets = 500000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we determine this default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used an average message rate of 10k qps, so that this list would be exhausted in 50s.
These numbers are arbitrary - using a number that is too small would cause churn by closing and recreating partition consumers unnecessarily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having some explanation of the default value in the comment would be good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it's something we can tweak at runtime?
7bc2376
to
37c037a
Compare
37c037a
to
9df0742
Compare
f7a7ca4
to
57b37ae
Compare
resolves jaegertracing#1109 Signed-off-by: Prithvi Raj <p.r@uber.com>
57b37ae
to
3250b91
Compare
Signed-off-by: Prithvi Raj <p.r@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not clear on the design for this change, and some implementation details.
concurrent_list stores non-consecutive offsets, right? So 'capacity' is kind of misleading as a control knob, since it doesn't correlate with the number of messages outstanding
let's discuss offline
@@ -43,7 +43,7 @@ func (d *comittingProcessor) Process(message processor.Message) error { | |||
if msg, ok := message.(Message); ok { | |||
err := d.processor.Process(message) | |||
if err == nil { | |||
d.marker.MarkOffset(msg.Offset()) | |||
return d.marker.MarkOffset(msg.Offset()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if an error is returned here, will the message be re-processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it should not be. If an error is thrown here, the number of retries have already been exhausted.
@@ -43,7 +43,7 @@ func (d *comittingProcessor) Process(message processor.Message) error { | |||
if msg, ok := message.(Message); ok { | |||
err := d.processor.Process(message) | |||
if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: switch to more idiomatic
if err != nil {
return err
}
return d.marker.MarkOffset(msg.Offset())
@@ -149,11 +150,17 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { | |||
defer msgProcessor.Close() | |||
} | |||
|
|||
msgProcessor.Process(&saramaMessageWrapper{msg}) | |||
msgProcessor.Process(&saramaMessageWrapper{msg}, func(message processor.Message, e error) { | |||
msgErrors <- e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the callback func() only called on error? If it can be called on success, then we should check for err != nil.
nit: s/e/err/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The callback is only called onError
Signed-off-by: Prithvi Raj <p.r@uber.com>
we're considering a different solution |
Which problem is this PR solving?
Resolves #1109
Short description of the changes
ParallelSpanProcessor
that can signal when there are errors when processing spans via a channelConsumer
to handle errors fromParallelSpanProcessor
by closing the partition. This should trigger a rebalance causing reprocessing from the last committed offset.Signed-off-by: Prithvi Raj p.r@uber.com