diff --git a/Gopkg.toml b/Gopkg.toml index 7f53b7c2096..09aa89639d3 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -162,7 +162,7 @@ required = [ [[constraint]] name = "github.com/Shopify/sarama" - revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" + revision = "1.24.1" [[constraint]] name = "github.com/grpc-ecosystem/go-grpc-middleware" diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 4d2e86d100c..c8a386ecc6d 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -66,7 +66,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit factoryParams := consumer.ProcessorFactoryParams{ Topic: options.Topic, Parallelism: options.Parallelism, - SaramaConsumer: saramaConsumer, BaseProcessor: spanProcessor, Logger: logger, Factory: metricsFactory, @@ -81,7 +80,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit ProcessorFactory: *processorFactory, MetricsFactory: metricsFactory, Logger: logger, - DeadlockCheckInterval: options.DeadlockInterval, } return consumer.New(consumerParams) } diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 595620361af..b915cad19cf 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -16,10 +16,9 @@ package consumer import ( "sync" - "time" + "context" "github.com/Shopify/sarama" - sc "github.com/bsm/sarama-cluster" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" @@ -29,153 +28,116 @@ import ( // Params are the parameters of a Consumer type Params struct { - ProcessorFactory ProcessorFactory - MetricsFactory metrics.Factory - Logger *zap.Logger - InternalConsumer consumer.Consumer - DeadlockCheckInterval time.Duration + ProcessorFactory ProcessorFactory + MetricsFactory metrics.Factory + Logger *zap.Logger + InternalConsumer consumer.Consumer } // Consumer uses sarama to consume and handle messages from kafka type Consumer struct { - metricsFactory metrics.Factory - logger *zap.Logger + metricsFactory metrics.Factory + logger *zap.Logger + internalConsumer consumer.Consumer + processorFactory ProcessorFactory + partitionsHeld int64 + partitionsHeldGauge metrics.Gauge +} - internalConsumer consumer.Consumer - processorFactory ProcessorFactory +type consumerGroupHandler struct { + processorFactory ProcessorFactory + partitionToProcessor map[int32]processor.SpanProcessor + logger *zap.Logger + consumer *Consumer + partitionToProcessorLock sync.RWMutex +} - deadlockDetector deadlockDetector +func (h *consumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error { + h.partitionToProcessor = map[int32]processor.SpanProcessor{} + return nil +} - partitionIDToState map[int32]*consumerState - partitionMapLock sync.Mutex - partitionsHeld int64 - partitionsHeldGauge metrics.Gauge +func (h *consumerGroupHandler) getProcessFactory(session sarama.ConsumerGroupSession, partition int32, offset int64) processor.SpanProcessor { + h.partitionToProcessorLock.RLock() + msgProcessor := h.partitionToProcessor[partition] + h.partitionToProcessorLock.RUnlock() + if msgProcessor == nil { + msgProcessor = h.processorFactory.new(session, partition, offset-1) + h.partitionToProcessorLock.Lock() + h.partitionToProcessor[partition] = msgProcessor + h.partitionToProcessorLock.Unlock() + } + return msgProcessor } +func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } +func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + msgMetrics := h.consumer.newMsgMetrics(claim.Partition()) -type consumerState struct { - wg sync.WaitGroup - partitionConsumer sc.PartitionConsumer + for { + select { + case msg, ok := <-claim.Messages(): + if !ok { + h.logger.Info("Message channel closed. ", zap.Int32("partition", claim.Partition())) + return nil + } + h.logger.Debug("Got msg", zap.Any("msg", msg)) + msgMetrics.counter.Inc(1) + msgMetrics.offsetGauge.Update(msg.Offset) + msgMetrics.lagGauge.Update(claim.HighWaterMarkOffset() - msg.Offset - 1) + msgProcessor := h.getProcessFactory(sess, claim.Partition(), msg.Offset) + msgProcessor.Process(&saramaMessageWrapper{msg}) + } + } + return nil } // New is a constructor for a Consumer func New(params Params) (*Consumer, error) { - deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ metricsFactory: params.MetricsFactory, logger: params.Logger, internalConsumer: params.InternalConsumer, processorFactory: params.ProcessorFactory, - deadlockDetector: deadlockDetector, - partitionIDToState: make(map[int32]*consumerState), partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory), }, nil } // Start begins consuming messages in a go routine func (c *Consumer) Start() { - c.deadlockDetector.start() go func() { c.logger.Info("Starting main loop") - for pc := range c.internalConsumer.Partitions() { - c.partitionMapLock.Lock() - if p, ok := c.partitionIDToState[pc.Partition()]; ok { - // This is a guard against simultaneously draining messages - // from the last time the partition was assigned and - // processing new messages for the same partition, which may lead - // to the cleanup process not completing - p.wg.Wait() + ctx := context.Background() + handler := consumerGroupHandler{ + processorFactory: c.processorFactory, + logger: c.logger, + consumer: c, + } + defer func() { _ = c.internalConsumer.Close() }() + + go func() { + for err := range c.internalConsumer.Errors() { + if error, ok := err.(*sarama.ConsumerError); ok { + c.logger.Info("Starting error handler", zap.Int32("partition", error.Partition)) + errMetrics := c.newErrMetrics(error.Partition) + errMetrics.errCounter.Inc(1) + c.logger.Error("Error consuming from Kafka", zap.Error(err)) + } } - c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc} - c.partitionIDToState[pc.Partition()].wg.Add(2) - c.partitionMapLock.Unlock() - c.partitionMetrics(pc.Partition()).startCounter.Inc(1) - go c.handleMessages(pc) - go c.handleErrors(pc.Partition(), pc.Errors()) + c.logger.Info("Finished handling errors") + }() + + for { + err := c.internalConsumer.Consume(ctx, &handler) + if err != nil { + panic(err) + } + } }() } // Close closes the Consumer and underlying sarama consumer func (c *Consumer) Close() error { - c.partitionMapLock.Lock() - for _, p := range c.partitionIDToState { - c.closePartition(p.partitionConsumer) - p.wg.Wait() - } - c.partitionMapLock.Unlock() - c.deadlockDetector.close() c.logger.Info("Closing parent consumer") return c.internalConsumer.Close() } - -func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { - c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) - c.partitionMapLock.Lock() - c.partitionsHeld++ - c.partitionsHeldGauge.Update(c.partitionsHeld) - wg := &c.partitionIDToState[pc.Partition()].wg - c.partitionMapLock.Unlock() - defer func() { - c.closePartition(pc) - wg.Done() - c.partitionMapLock.Lock() - c.partitionsHeld-- - c.partitionsHeldGauge.Update(c.partitionsHeld) - c.partitionMapLock.Unlock() - }() - - msgMetrics := c.newMsgMetrics(pc.Partition()) - - var msgProcessor processor.SpanProcessor - - deadlockDetector := c.deadlockDetector.startMonitoringForPartition(pc.Partition()) - defer deadlockDetector.close() - - for { - select { - case msg, ok := <-pc.Messages(): - if !ok { - c.logger.Info("Message channel closed. ", zap.Int32("partition", pc.Partition())) - return - } - c.logger.Debug("Got msg", zap.Any("msg", msg)) - msgMetrics.counter.Inc(1) - msgMetrics.offsetGauge.Update(msg.Offset) - msgMetrics.lagGauge.Update(pc.HighWaterMarkOffset() - msg.Offset - 1) - deadlockDetector.incrementMsgCount() - - if msgProcessor == nil { - msgProcessor = c.processorFactory.new(pc.Partition(), msg.Offset-1) - defer msgProcessor.Close() - } - - msgProcessor.Process(saramaMessageWrapper{msg}) - - case <-deadlockDetector.closePartitionChannel(): - c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", pc.Partition())) - return - } - } -} - -func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) { - c.logger.Info("Closing partition consumer", zap.Int32("partition", partitionConsumer.Partition())) - partitionConsumer.Close() // blocks until messages channel is drained - c.partitionMetrics(partitionConsumer.Partition()).closeCounter.Inc(1) - c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition())) -} - -func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) { - c.logger.Info("Starting error handler", zap.Int32("partition", partition)) - c.partitionMapLock.Lock() - wg := &c.partitionIDToState[partition].wg - c.partitionMapLock.Unlock() - defer wg.Done() - - errMetrics := c.newErrMetrics(partition) - for err := range errChan { - errMetrics.errCounter.Inc(1) - c.logger.Error("Error consuming from Kafka", zap.Error(err)) - } - c.logger.Info("Finished handling errors", zap.Int32("partition", partition)) -} diff --git a/cmd/ingester/app/consumer/processor_factory.go b/cmd/ingester/app/consumer/processor_factory.go index abec3daa800..ffba19a688e 100644 --- a/cmd/ingester/app/consumer/processor_factory.go +++ b/cmd/ingester/app/consumer/processor_factory.go @@ -15,6 +15,7 @@ package consumer import ( + "github.com/Shopify/sarama" "io" "github.com/uber/jaeger-lib/metrics" @@ -23,7 +24,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/offset" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor/decorator" - "github.com/jaegertracing/jaeger/pkg/kafka/consumer" ) // ProcessorFactoryParams are the parameters of a ProcessorFactory @@ -31,7 +31,6 @@ type ProcessorFactoryParams struct { Parallelism int Topic string BaseProcessor processor.SpanProcessor - SaramaConsumer consumer.Consumer Factory metrics.Factory Logger *zap.Logger RetryOptions []decorator.RetryOption @@ -40,7 +39,6 @@ type ProcessorFactoryParams struct { // ProcessorFactory is a factory for creating startedProcessors type ProcessorFactory struct { topic string - consumer consumer.Consumer metricsFactory metrics.Factory logger *zap.Logger baseProcessor processor.SpanProcessor @@ -52,7 +50,6 @@ type ProcessorFactory struct { func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, error) { return &ProcessorFactory{ topic: params.Topic, - consumer: params.SaramaConsumer, metricsFactory: params.Factory, logger: params.Logger, baseProcessor: params.BaseProcessor, @@ -61,11 +58,11 @@ func NewProcessorFactory(params ProcessorFactoryParams) (*ProcessorFactory, erro }, nil } -func (c *ProcessorFactory) new(partition int32, minOffset int64) processor.SpanProcessor { +func (c *ProcessorFactory) new(session sarama.ConsumerGroupSession, partition int32, minOffset int64) processor.SpanProcessor { c.logger.Info("Creating new processors", zap.Int32("partition", partition)) markOffset := func(offset int64) { - c.consumer.MarkPartitionOffset(c.topic, partition, offset, "") + session.MarkOffset(c.topic, partition, offset, "") } om := offset.NewManager(minOffset, markOffset, partition, c.metricsFactory) diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index eba0e847858..96607928126 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -15,26 +15,39 @@ package consumer import ( - "io" - + "context" "github.com/Shopify/sarama" - "github.com/bsm/sarama-cluster" - "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) -// Consumer is an interface to features of Sarama that are necessary for the consumer -type Consumer interface { - Partitions() <-chan cluster.PartitionConsumer - MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) - io.Closer -} - // Builder builds a new kafka consumer type Builder interface { NewConsumer() (Consumer, error) } +type Consumer interface { + Consume(ctx context.Context, handler sarama.ConsumerGroupHandler) error + Errors() <-chan error + Close() error +} + +type InternalConsumer struct { + Topic string + consumer sarama.ConsumerGroup +} + +func (c *InternalConsumer) Consume(ctx context.Context, handler sarama.ConsumerGroupHandler) error { + return c.consumer.Consume(ctx, []string{c.Topic}, handler) +} + +func (c *InternalConsumer) Errors() <-chan error { + return c.consumer.Errors() +} + +func (c *InternalConsumer) Close() error { + return c.consumer.Close() +} + // Configuration describes the configuration properties needed to create a Kafka consumer type Configuration struct { auth.AuthenticationConfig @@ -49,18 +62,27 @@ type Configuration struct { // NewConsumer creates a new kafka consumer func (c *Configuration) NewConsumer() (Consumer, error) { - saramaConfig := cluster.NewConfig() - saramaConfig.Group.Mode = cluster.ConsumerModePartitions + saramaConfig := sarama.NewConfig() saramaConfig.ClientID = c.ClientID + saramaConfig.Version = sarama.V0_10_2_0 + if len(c.ProtocolVersion) > 0 { ver, err := sarama.ParseKafkaVersion(c.ProtocolVersion) if err != nil { return nil, err } - saramaConfig.Config.Version = ver + saramaConfig.Version = ver + + } + if err := c.AuthenticationConfig.SetConfiguration(saramaConfig); err != nil { + return nil, err } - if err := c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config); err != nil { + consumerGroup, err := sarama.NewConsumerGroup(c.Brokers, c.GroupID, saramaConfig) + if err != nil { return nil, err } - return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) + return &InternalConsumer{ + Topic: c.Topic, + consumer: consumerGroup, + }, nil }