diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index feda7a606c5..c939eac60c4 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -16,6 +16,7 @@ package consumer import ( "sync" + "time" "github.com/Shopify/sarama" sc "github.com/bsm/sarama-cluster" @@ -42,6 +43,8 @@ type Consumer struct { internalConsumer consumer.Consumer processorFactory ProcessorFactory + deadlockDetector deadlockDetector + partitionIDToState map[int32]*consumerState } @@ -52,17 +55,20 @@ type consumerState struct { // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { + deadlockDetector := newDeadlockDetector(params.Factory, params.Logger, time.Minute) return &Consumer{ metricsFactory: params.Factory, logger: params.Logger, internalConsumer: params.InternalConsumer, processorFactory: params.ProcessorFactory, + deadlockDetector: deadlockDetector, partitionIDToState: make(map[int32]*consumerState), }, nil } // Start begins consuming messages in a go routine func (c *Consumer) Start() { + c.deadlockDetector.start() go func() { c.logger.Info("Starting main loop") for pc := range c.internalConsumer.Partitions() { @@ -73,6 +79,7 @@ func (c *Consumer) Start() { // to the cleanup process not completing p.wg.Wait() } + c.partitionMetrics(pc.Partition()).startCounter.Inc(1) c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc} go c.handleMessages(pc) go c.handleErrors(pc.Partition(), pc.Errors()) @@ -86,6 +93,7 @@ func (c *Consumer) Close() error { c.closePartition(p.partitionConsumer) p.wg.Wait() } + c.deadlockDetector.close() c.logger.Info("Closing parent consumer") return c.internalConsumer.Close() } @@ -97,27 +105,43 @@ func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { defer c.closePartition(pc) msgMetrics := c.newMsgMetrics(pc.Partition()) + var msgProcessor processor.SpanProcessor - for msg := range pc.Messages() { - c.logger.Debug("Got msg", zap.Any("msg", msg)) - msgMetrics.counter.Inc(1) - msgMetrics.offsetGauge.Update(msg.Offset) - msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) + deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition()) + defer deadlockDetector.close() - if msgProcessor == nil { - msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) - defer msgProcessor.Close() - } + for { + select { + case msg, ok := <-pc.Messages(): + if !ok { + c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition())) + return + } + c.logger.Debug("Got msg", zap.Any("msg", msg)) + msgMetrics.counter.Inc(1) + msgMetrics.offsetGauge.Update(msg.Offset) + msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) + deadlockDetector.incrementMsgCount() + + if msgProcessor == nil { + msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) + defer msgProcessor.Close() + } + + msgProcessor.Process(&saramaMessageWrapper{msg}) - msgProcessor.Process(&saramaMessageWrapper{msg}) + case <-deadlockDetector.closePartitionChannel(): + c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition())) + return + } } - c.logger.Info("Finished handling messages", zap.Int32("partition", pc.Partition())) } func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) partitionConsumer.Close() // blocks until messages channel is drained + c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1) c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) } diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go index 4d760978e11..279a3f95911 100644 --- a/cmd/ingester/app/consumer/consumer_metrics.go +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -30,8 +30,17 @@ type errMetrics struct { errCounter metrics.Counter } +type partitionMetrics struct { + startCounter metrics.Counter + closeCounter metrics.Counter +} + +func (c *Consumer) namespace(partition int32) metrics.Factory { + return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) +} + func (c *Consumer) newMsgMetrics(partition int32) msgMetrics { - f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) + f := c.namespace(partition) return msgMetrics{ counter: f.Counter("messages", nil), offsetGauge: f.Gauge("current-offset", nil), @@ -40,7 +49,12 @@ func (c *Consumer) newMsgMetrics(partition int32) msgMetrics { } func (c *Consumer) newErrMetrics(partition int32) errMetrics { - f := c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))}) - return errMetrics{errCounter: f.Counter("errors", nil)} + return errMetrics{errCounter: c.namespace(partition).Counter("errors", nil)} +} +func (c *Consumer) partitionMetrics(partition int32) partitionMetrics { + f := c.namespace(partition) + return partitionMetrics{ + closeCounter: f.Counter("partition-close", nil), + startCounter: f.Counter("partition-start", nil)} } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index fad87ad0436..8db11895d32 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -94,6 +94,7 @@ func newConsumer( logger: logger, internalConsumer: consumer, partitionIDToState: make(map[int32]*consumerState), + deadlockDetector: newDeadlockDetector(factory, logger, time.Second), processorFactory: ProcessorFactory{ topic: topic, @@ -173,6 +174,11 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { Tags: partitionTag, Value: 0, }) + testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{ + Name: "sarama-consumer.partition-start", + Tags: partitionTag, + Value: 1, + }) } func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { @@ -210,3 +216,29 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) { t.Fail() } + +func TestHandleClosePartition(t *testing.T) { + metricsFactory := metrics.NewLocalFactory(0) + + mp := &pmocks.SpanProcessor{} + saramaConsumer := smocks.NewConsumer(t, &sarama.Config{}) + mc := saramaConsumer.ExpectConsumePartition(topic, partition, msgOffset) + mc.ExpectErrorsDrainedOnClose() + saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset) + require.NoError(t, e) + + undertest := newConsumer(metricsFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer)) + undertest.deadlockDetector = newDeadlockDetector(metricsFactory, undertest.logger, 200*time.Millisecond) + undertest.Start() + defer undertest.Close() + + for i := 0; i < 10; i++ { + undertest.deadlockDetector.allPartitionsDeadlockDetector.incrementMsgCount() // Don't trigger panic on all partitions detector + time.Sleep(100 * time.Millisecond) + c, _ := metricsFactory.Snapshot() + if c["sarama-consumer.partition-close|partition=316"] == 1 { + return + } + } + assert.Fail(t, "Did not close partition") +} diff --git a/cmd/ingester/app/consumer/deadlock_detector.go b/cmd/ingester/app/consumer/deadlock_detector.go new file mode 100644 index 00000000000..4b5b2696742 --- /dev/null +++ b/cmd/ingester/app/consumer/deadlock_detector.go @@ -0,0 +1,184 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "runtime" + "strconv" + "sync/atomic" + "time" + + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" +) + +// deadlockDetector monitors the messages consumed and wither signals for the partition to be closed by sending a +// message on closePartition, or triggers a panic if the close fails. It triggers a panic if there are no messages +// consumed across all partitions. +// +// Closing the partition should result in a rebalance, which alleviates the condition. This means that rebalances can +// happen frequently if there is no traffic on the Kafka topic. This shouldn't affect normal operations. +// +// If the message send isn't processed within the next check interval, a panic is issued.This hack relies on a +// container management system (k8s, aurora, marathon, etc) to reschedule +// the dead instance. +// +// This hack protects jaeger-ingester from issues described in https://github.com/jaegertracing/jaeger/issues/1052 +// +type deadlockDetector struct { + metricsFactory metrics.Factory + logger *zap.Logger + interval time.Duration + allPartitionsDeadlockDetector *allPartitionsDeadlockDetector + panicFunc func(int32) +} + +type partitionDeadlockDetector struct { + msgConsumed *uint64 + logger *zap.Logger + partition int32 + closePartition chan struct{} + done chan struct{} + incrementAllPartitionMsgCount func() +} + +type allPartitionsDeadlockDetector struct { + msgConsumed *uint64 + logger *zap.Logger + done chan struct{} +} + +func newDeadlockDetector(metricsFactory metrics.Factory, logger *zap.Logger, interval time.Duration) deadlockDetector { + panicFunc := func(partition int32) { + metricsFactory.Counter("deadlockdetector.panic-issued", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1) + time.Sleep(time.Second) // Allow time to flush metric + + buf := make([]byte, 1<<20) + logger.Panic("No messages processed in the last check interval", + zap.Int32("partition", partition), + zap.String("stack", string(buf[:runtime.Stack(buf, true)]))) + } + + return deadlockDetector{ + metricsFactory: metricsFactory, + logger: logger, + interval: interval, + panicFunc: panicFunc, + } +} + +func (s *deadlockDetector) startMonitoringForPartition(partition int32) *partitionDeadlockDetector { + var msgConsumed uint64 + w := &partitionDeadlockDetector{ + msgConsumed: &msgConsumed, + partition: partition, + closePartition: make(chan struct{}, 1), + done: make(chan struct{}), + logger: s.logger, + + incrementAllPartitionMsgCount: func() { + s.allPartitionsDeadlockDetector.incrementMsgCount() + }, + } + + go s.monitorForPartition(w, partition) + + return w +} + +func (s *deadlockDetector) monitorForPartition(w *partitionDeadlockDetector, partition int32) { + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-w.done: + s.logger.Info("Closing ticker routine", zap.Int32("partition", partition)) + return + case <-ticker.C: + if atomic.LoadUint64(w.msgConsumed) == 0 { + select { + case w.closePartition <- struct{}{}: + s.metricsFactory.Counter("deadlockdetector.close-signalled", map[string]string{"partition": strconv.Itoa(int(partition))}).Inc(1) + s.logger.Warn("Signalling partition close due to inactivity", zap.Int32("partition", partition)) + default: + // If closePartition is blocked, the consumer might have deadlocked - kill the process + s.panicFunc(partition) + return // For tests + } + } else { + atomic.StoreUint64(w.msgConsumed, 0) + } + } + } +} + +// start monitors that the sum of messages consumed across all partitions is non zero for the given interval +// If it is zero when there are producers producing messages on the topic, it means that sarama-cluster hasn't +// retrieved partition assignments. (This case will not be caught by startMonitoringForPartition because no partitions +// were retrieved). +func (s *deadlockDetector) start() { + var msgConsumed uint64 + detector := &allPartitionsDeadlockDetector{ + msgConsumed: &msgConsumed, + done: make(chan struct{}), + logger: s.logger, + } + + go func() { + s.logger.Debug("Starting global deadlock detector") + ticker := time.NewTicker(s.interval) + defer ticker.Stop() + + for { + select { + case <-detector.done: + s.logger.Debug("Closing global ticker routine") + return + case <-ticker.C: + if atomic.LoadUint64(detector.msgConsumed) == 0 { + s.panicFunc(-1) + return // For tests + } + atomic.StoreUint64(detector.msgConsumed, 0) + } + } + }() + + s.allPartitionsDeadlockDetector = detector +} + +func (s *deadlockDetector) close() { + s.logger.Debug("Closing all partitions deadlock detector") + s.allPartitionsDeadlockDetector.done <- struct{}{} +} + +func (s *allPartitionsDeadlockDetector) incrementMsgCount() { + atomic.AddUint64(s.msgConsumed, 1) +} + +func (w *partitionDeadlockDetector) closePartitionChannel() chan struct{} { + return w.closePartition +} + +func (w *partitionDeadlockDetector) close() { + w.logger.Debug("Closing deadlock detector", zap.Int32("partition", w.partition)) + w.done <- struct{}{} +} + +func (w *partitionDeadlockDetector) incrementMsgCount() { + w.incrementAllPartitionMsgCount() + atomic.AddUint64(w.msgConsumed, 1) +} diff --git a/cmd/ingester/app/consumer/deadlock_detector_test.go b/cmd/ingester/app/consumer/deadlock_detector_test.go new file mode 100644 index 00000000000..0cb31fbd1bf --- /dev/null +++ b/cmd/ingester/app/consumer/deadlock_detector_test.go @@ -0,0 +1,114 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumer + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + "github.com/uber/jaeger-lib/metrics/testutils" + "go.uber.org/zap" +) + +func TestClosingSignalEmitted(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, time.Millisecond) + w := f.startMonitoringForPartition(1) + assert.NotNil(t, <-w.closePartitionChannel()) + w.close() +} + +func TestNoClosingSignalIfMessagesProcessedInInterval(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, time.Second) + f.start() + defer f.close() + + w := f.startMonitoringForPartition(1) + + w.incrementMsgCount() + assert.Zero(t, len(w.closePartitionChannel())) + w.close() +} + +func TestResetMsgCount(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, 50*time.Millisecond) + f.start() + defer f.close() + w := f.startMonitoringForPartition(1) + w.incrementMsgCount() + time.Sleep(75 * time.Millisecond) + // Resets happen after every ticker interval + w.close() + assert.Zero(t, atomic.LoadUint64(w.msgConsumed)) +} + +func TestPanicFunc(t *testing.T) { + mf := metrics.NewLocalFactory(0) + l, _ := zap.NewDevelopment() + f := newDeadlockDetector(mf, l, time.Minute) + + assert.Panics(t, func() { + f.panicFunc(1) + }) + + testutils.AssertCounterMetrics(t, mf, testutils.ExpectedMetric{ + Name: "deadlockdetector.panic-issued", + Tags: map[string]string{"partition": "1"}, + Value: 1, + }) +} + +func TestPanicForPartition(t *testing.T) { + l, _ := zap.NewDevelopment() + wg := sync.WaitGroup{} + wg.Add(1) + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 1, + panicFunc: func(partition int32) { + wg.Done() + }, + } + + d.startMonitoringForPartition(1) + wg.Wait() +} + +func TestGlobalPanic(t *testing.T) { + l, _ := zap.NewDevelopment() + wg := sync.WaitGroup{} + wg.Add(1) + d := deadlockDetector{ + metricsFactory: metrics.NewLocalFactory(0), + logger: l, + interval: 1, + panicFunc: func(partition int32) { + wg.Done() + }, + } + + d.start() + wg.Wait() +}