Skip to content

Commit

Permalink
impove test case
Browse files Browse the repository at this point in the history
Signed-off-by: axfor <aixiaoxiang2009@hotmail.com>
  • Loading branch information
axfor committed Apr 15, 2023
1 parent 975ef3e commit 532f13a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 31 deletions.
84 changes: 54 additions & 30 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,48 +46,74 @@ type Consumer struct {
internalConsumer consumer.Consumer
processorFactory ProcessorFactory

deadlockDetector deadlockDetector
deadlockNotify sync.WaitGroup
closeDeadlockDetectorOnce sync.Once
deadlockDetector *globalDeadlockDetector

partitionsHeld atomic.Int64
partitionsHeldGauge metrics.Gauge

doneWg sync.WaitGroup
consumerReady *consumerReady

consumerReady *ConsumerReady
doneWg sync.WaitGroup

topic string
cancel context.CancelFunc
}

type ConsumerReady struct {
type globalDeadlockDetector struct {
*deadlockDetector
closeDetectorOnce sync.Once
deadlockNotifyCh chan struct{}
closeNotifyOnce sync.Once
}

func (ad *globalDeadlockDetector) close() {
ad.closeDetectorOnce.Do(func() {
ad.deadlockDetector.close()
})
}

func (ad *globalDeadlockDetector) deadlockNotify() <-chan struct{} {
return ad.deadlockNotifyCh
}

func (c *globalDeadlockDetector) markDeadlock() {
c.closeNotifyOnce.Do(func() {
close(c.deadlockNotifyCh)
})
}

type consumerReady struct {
readyCh chan struct{}
closeOnce sync.Once
}

func (c *ConsumerReady) WaitReady() {
func (c *consumerReady) waitReady() {
<-c.readyCh
}

func (c *ConsumerReady) MarkReady() {
func (c *consumerReady) markReady() {
c.closeOnce.Do(func() {
close(c.readyCh)
})
}

// New is a constructor for a Consumer
func New(params Params) (*Consumer, error) {
// global deadlock detector
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
// new Consumer
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: &globalDeadlockDetector{
deadlockDetector: &deadlockDetector,
deadlockNotifyCh: make(chan struct{}, 1),
},
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
consumerReady: &ConsumerReady{
readyCh: make(chan struct{}),
consumerReady: &consumerReady{
readyCh: make(chan struct{}, 1),
},
topic: params.ProcessorFactory.topic,
}, nil
Expand Down Expand Up @@ -121,8 +147,9 @@ func (c *Consumer) startConsume(ctx context.Context) {
if err := c.internalConsumer.Consume(ctx, topic, c); err != nil {
c.logger.Error("Error from consumer", zap.Error(err))
}
c.consumerReady = &ConsumerReady{
readyCh: make(chan struct{}),

c.consumerReady = &consumerReady{
readyCh: make(chan struct{}, 1),
}

// check if context was cancelled, signaling that the consumer should stop
Expand Down Expand Up @@ -152,32 +179,30 @@ func (c *Consumer) Close() error {
return err
}

// CloseDeadlockDetector close global deadlock detector and retain partition deadlock detector //for tests
func (c *Consumer) CloseDeadlockDetector() {
c.closeDeadlockDetectorOnce.Do(func() {
c.deadlockDetector.close()
})
}

// Ready wait consumer running
func (c *Consumer) WaitReady() {
c.consumerReady.WaitReady()
c.consumerReady.waitReady()
}

// Ready wait consumer done
func (c *Consumer) WaitDone() {
c.doneWg.Wait()
}

// CloseDeadlockDetector close global deadlock detector and retain partition deadlock detector //for tests
func (c *Consumer) CloseDeadlockDetector() {
c.deadlockDetector.close()
}

// DeadlockNotify notify when there is a deadlock
func (c *Consumer) WaitDeadlock() {
c.deadlockNotify.Wait()
func (c *Consumer) DeadlockNotify() {
<-c.deadlockDetector.deadlockNotify()
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
c.consumerReady.MarkReady()
c.consumerReady.markReady()
return nil
}

Expand Down Expand Up @@ -205,14 +230,12 @@ func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sar
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())

partitionDeadlockDetector := c.deadlockDetector.startMonitoringForPartition(claim.Partition())
c.deadlockNotify.Add(1)

defer func() {
partitionDeadlockDetector.close()
c.closePartition(claim)
c.partitionsHeld.Add(-1)
c.partitionsHeldGauge.Update(c.partitionsHeld.Load())
c.deadlockNotify.Done()
}()

msgMetrics := c.newMsgMetrics(claim.Partition())
Expand Down Expand Up @@ -244,6 +267,7 @@ func (c *Consumer) handleMessages(session sarama.ConsumerGroupSession, claim sar
return session.Context().Err()
case <-partitionDeadlockDetector.closePartitionChannel():
c.logger.Info("Closing partition due to inactivity", zap.Int32("partition", claim.Partition()))
c.deadlockDetector.markDeadlock()
return fmt.Errorf("closing partition[%d] due to inactivity", claim.Partition())
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func TestGroupConsumerWithDeadlockDetector(t *testing.T) {
consumer.WaitReady()
t.Log("Consumer is ready and wait message")

consumer.WaitDeadlock()
consumer.DeadlockNotify()
consumer.Close()
t.Log("Consumer is done")

Expand Down

0 comments on commit 532f13a

Please sign in to comment.