Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock detector hack for Kafka driver instability #1087

Merged
merged 11 commits into from Oct 9, 2018

Conversation

Projects
None yet
3 participants
@vprithvi
Copy link
Member

vprithvi commented Sep 26, 2018

Signed-off-by: Prithvi Raj p.r@uber.com

Which problem is this PR solving?

  • Mitigate effects of #1052 as a temporary solution

Short description of the changes

  • Adds a deadlock watcher per partition that monitors message consumption rate per minute. If the rate of consumption is zero, it triggers a rebalance by sending a signal to close the partition. If the partition close is unsuccessful, the instance is killed.
Add seppuku factory
Signed-off-by: Prithvi Raj <p.r@uber.com>
@codecov

This comment has been minimized.

Copy link

codecov bot commented Sep 27, 2018

Codecov Report

❗️ No coverage uploaded for pull request base (master@a429d78). Click here to learn what that means.
The diff coverage is 100%.

Impacted file tree graph

@@           Coverage Diff            @@
##             master   #1087   +/-   ##
========================================
  Coverage          ?    100%           
========================================
  Files             ?     141           
  Lines             ?    6723           
  Branches          ?       0           
========================================
  Hits              ?    6723           
  Misses            ?       0           
  Partials          ?       0
Impacted Files Coverage Δ
cmd/ingester/app/consumer/consumer.go 100% <100%> (ø)
cmd/ingester/app/consumer/deadlock_detector.go 100% <100%> (ø)
cmd/ingester/app/consumer/consumer_metrics.go 100% <100%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a429d78...4573d51. Read the comment docs.

Show resolved Hide resolved cmd/ingester/app/consumer/consumer.go
buf := make([]byte, 1<<20)
logger.Panic("No messages processed in the last check interval",
zap.Int32("partition", partition),
zap.String("stack", string(buf[:runtime.Stack(buf, true)])))

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 1, 2018

Member

in hotrod zap logger automatically prints the stack. What's different here?

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 1, 2018

Author Member

zap only prints the current go routine - This prints out all go routines.

func (s *seppukuFactory) startMonitoringForPartition(partition int32) *seppukuWorker {
var msgConsumed uint64
w := &seppukuWorker{
msgConsumed: &msgConsumed,

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 1, 2018

Member

no reason to allocate a pointer, you can still use address of a struct field with atomic.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 1, 2018

Author Member

True, I had assumed that the pointer allocation had a similar overhead to an field allocation. I didn't want to use & on every access.

case w.closePartition <- struct{}{}:
s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition))
default:
// If closePartition is blocked, attempt seppuku

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 1, 2018

Member

// If closePartition is blocked, the consumer may have deadlocked -> kill the process

}

func (w *seppukuWorker) close() {
w.ticker.Stop()

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 1, 2018

Member

this could inadvertently kill the process after rebalance, but not sure how to avoid it

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 1, 2018

Author Member

Could you elaborate how?

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 2, 2018

Member

actually, because Ticker.close() does not close the channel, the race condition I was thinking about won't happen on rebalance, but you will leak the goroutine

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 4, 2018

Author Member

This is true, I can use a separate channel to close this if you feel strongly about it

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 4, 2018

Member

yes, let's fix this, a goroutine leak is not good

vprithvi added some commits Oct 4, 2018

Rename
Signed-off-by: Prithvi Raj <p.r@uber.com>
Address feedback
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -52,12 +56,15 @@ type consumerState struct {

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
deadlockDetectorFactory := newDeadlockDetectorFactory(params.Factory, params.Logger, time.Minute)

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

separate issue: s/params.Factory/params.MetricsFactory/

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

I'll address this separately - it'll only add noise to this PR


msgProcessor.Process(&saramaMessageWrapper{msg})
case <-deadlockDetector.getClosePartition():

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

s/getClosePartition/closePartitionChannel/

}

func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
partitionConsumer.Close() // blocks until messages channel is drained
c.newPartitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1)

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

why do we call new here? If it internally caches the metrics, then s/newPartitionMetrics/newPartitionMetrics

closeCounter metrics.Counter
}

func (c *Consumer) getNamespace(partition int32) metrics.Factory {

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

s/getNamespace/metricsFactoryForPartition/

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

NB: we don't use "get" in Go.

msgMetrics.offsetGauge.Update(msg.Offset)
msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1)
deadlockDetector.incrementMsgCount()
c.allPartitionDeadlockDetector.incrementMsgCount()

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

I don't follow what the purpose of this allPartitionDeadlockDetector is. It looks like it's only used to increment this counter - why do we even need it? We can always sum the time series to get total counter.

var msgConsumed uint64
w := &deadlockDetector{
msgConsumed: &msgConsumed,
ticker: time.NewTicker(s.interval),

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

nit: you do not need to leak the ticker. Create it inside goroutine as local var with defer close.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

👍

Show resolved Hide resolved cmd/ingester/app/consumer/deadlock_detector.go
@@ -42,6 +43,9 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetectorFactory deadlockDetectorFactory
allPartitionDeadlockDetector *deadlockDetector

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

nit: allPartitions...

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

👍

@@ -42,6 +43,9 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetectorFactory deadlockDetectorFactory

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

I think it would be cleaner and easier to understand if you had a top-level deadlockDetector, which can create partitionDeadlockDetector as needed (implementation-wide the former may contain the latter for pId=-1). So factory is only used once to create the top-level detector, and the factory does not need to be stored in the Consumer.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

Doesn't this mean that the top level deadlockDetector also has the responsibilities of the Factor? (That being said, I think it might be cleaner design)

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

it does, in a way, but there's nothing wrong with that, especially considering that it happens at runtime and many times, whereas the top-level factory will be used only once on startup and not needed afterwards.

Separating top-level from individual detectors will also allow clean separation in some implementation details, e.g. where some features are not used.

And best of all, you'll be able to move a lot of methods into detectors, away from Consumer.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

👍

msgConsumed *uint64
ticker *time.Ticker
logger *zap.Logger
closePartition chan struct{}

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 5, 2018

Member

I assume this does not apply to all-partitions detector?

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 5, 2018

Author Member

It does not

Address feedback
Signed-off-by: Prithvi Raj <p.r@uber.com>
f := c.namespace(partition)
return partitionMetrics{
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 6, 2018

Member

This still bothers me. Some metrics factories are not happy if you try to create a metric with the same name twice. So if we re-acquire the same partition, this could cause a panic, e.g. if someone is using expvar-based metrics (unless we implemented protection in the factory, which I had to do for Prometheus).

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 8, 2018

Author Member

I added a test to jaeger-lib/metrics which shows that calling Counter mutiple times with the same tags does not panic for expvar, prometheus and tally.

@@ -210,3 +216,29 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {

t.Fail()
}

func TestHandleClosePartition(t *testing.T) {
localFactory := metrics.NewLocalFactory(0)

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 6, 2018

Member

s/localFactory/metricsFactory/

done chan struct{}
}

func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 6, 2018

Member

is there some pattern you're following from elsewhere in the code? Referring to metrics factory as merely "factory" is very poor naming.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 8, 2018

Author Member

Ah, this is a result of some laziness and accepting the IDE generated name - I'll update these

Show resolved Hide resolved cmd/ingester/app/consumer/deadlock_detector.go Outdated
Show resolved Hide resolved cmd/ingester/app/consumer/deadlock_detector.go Outdated

func newDeadlockDetector(factory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector {
panicFunc := func(partition int32) {
factory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1)

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 6, 2018

Member

assumes that factory is reentrant for the same metric name, which is not guaranteed

Show resolved Hide resolved cmd/ingester/app/consumer/consumer.go

@yurishkuro yurishkuro changed the title Suicide hack for Kafka driver instability Deadlock detector hack for Kafka driver instability Oct 6, 2018

Address feedback
Signed-off-by: Prithvi Raj <p.r@uber.com>
@@ -30,8 +30,17 @@ type errMetrics struct {
errCounter metrics.Counter
}

type partitionMetrics struct {

This comment has been minimized.

Copy link
@yurishkuro

yurishkuro Oct 8, 2018

Member

nit: we should clean-up these metrics. All 3 structs are per-partition, could we not combine them into one? Then we won't have all these small functions on the Consumer.

This comment has been minimized.

Copy link
@vprithvi

vprithvi Oct 8, 2018

Author Member

I agree - I'll do it as a separate commit

Show resolved Hide resolved cmd/ingester/app/consumer/deadlock_detector.go Outdated

vprithvi added some commits Oct 8, 2018

Address feedback
Signed-off-by: Prithvi Raj <p.r@uber.com>
Merge branch 'master' into seppuku
Signed-off-by: Prithvi Raj <p.r@uber.com>
Make test not flakey
Signed-off-by: Prithvi Raj <p.r@uber.com>
Refactor panic tests
Signed-off-by: Prithvi Raj <p.r@uber.com>

@vprithvi vprithvi merged commit 7105fa9 into master Oct 9, 2018

5 of 6 checks passed

License Compliance 31 issues found
Details
WIP ready for review
Details
codecov/patch No report found to compare against
Details
codecov/project 100% (target 100%)
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details

@wafflebot wafflebot bot removed the review label Oct 9, 2018

@vprithvi vprithvi deleted the seppuku branch Oct 9, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.