Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a metric for number of partitions held #1154

Merged
merged 2 commits into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Consumer struct {
deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionsHeld metrics.Counter
}

type consumerState struct {
Expand All @@ -63,6 +64,7 @@ func New(params Params) (*Consumer, error) {
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(params.MetricsFactory),
}, nil
}

Expand Down Expand Up @@ -100,6 +102,8 @@ func (c *Consumer) Close() error {

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionsHeld.Inc(1)
defer c.partitionsHeld.Inc(-1)
c.partitionIDToState[pc.Partition()].wg.Add(1)
defer c.partitionIDToState[pc.Partition()].wg.Done()
defer c.closePartition(pc)
Expand Down
8 changes: 7 additions & 1 deletion cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/uber/jaeger-lib/metrics"
)

const consumerNamespace = "sarama-consumer"

type msgMetrics struct {
counter metrics.Counter
offsetGauge metrics.Gauge
Expand All @@ -36,7 +38,7 @@ type partitionMetrics struct {
}

func (c *Consumer) namespace(partition int32) metrics.Factory {
return c.metricsFactory.Namespace("sarama-consumer", map[string]string{"partition": strconv.Itoa(int(partition))})
return c.metricsFactory.Namespace(consumerNamespace, map[string]string{"partition": strconv.Itoa(int(partition))})
}

func (c *Consumer) newMsgMetrics(partition int32) msgMetrics {
Expand All @@ -58,3 +60,7 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
closeCounter: f.Counter("partition-close", nil),
startCounter: f.Counter("partition-start", nil)}
}

func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
return metricsFactory.Namespace(consumerNamespace, nil).Counter("partitions-held", nil)
}
21 changes: 16 additions & 5 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
)

func TestConstructor(t *testing.T) {
newConsumer, err := New(Params{})
newConsumer, err := New(Params{MetricsFactory: metrics.NullFactory})
assert.NoError(t, err)
assert.NotNil(t, newConsumer)
}
Expand Down Expand Up @@ -83,23 +83,24 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
}

func newConsumer(
factory metrics.Factory,
metricsFactory metrics.Factory,
topic string,
processor processor.SpanProcessor,
consumer consumer.Consumer) *Consumer {

logger, _ := zap.NewDevelopment()
return &Consumer{
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
deadlockDetector: newDeadlockDetector(factory, logger, time.Second),
partitionsHeld: partitionsHeld(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
consumer: consumer,
metricsFactory: factory,
metricsFactory: metricsFactory,
logger: logger,
baseProcessor: processor,
parallelism: 1,
Expand Down Expand Up @@ -152,12 +153,22 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
mc.YieldMessage(msg)
isProcessed.Wait()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 1,
})

mp.AssertExpectations(t)
// Ensure that the partition consumer was updated in the map
assert.Equal(t, saramaPartitionConsumer.HighWaterMarkOffset(),
undertest.partitionIDToState[partition].partitionConsumer.HighWaterMarkOffset())
undertest.Close()

testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.partitions-held",
Value: 0,
})

partitionTag := map[string]string{"partition": fmt.Sprint(partition)}
testutils.AssertCounterMetrics(t, localFactory, testutils.ExpectedMetric{
Name: "sarama-consumer.messages",
Expand Down