Skip to content

Commit

Permalink
Merge pull request #488 from timbertson/backfill
Browse files Browse the repository at this point in the history
start-latest + backfill-earliest mode
  • Loading branch information
bai authored Aug 14, 2019
2 parents e685792 + 29e7ccc commit 02f109d
Show file tree
Hide file tree
Showing 9 changed files with 702 additions and 134 deletions.
121 changes: 112 additions & 9 deletions core/internal/consumer/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type KafkaClient struct {
servers []string
offsetsTopic string
startLatest bool
backfillEarliest bool
reportedConsumerGroup string
saramaConfig *sarama.Config
groupWhitelist *regexp.Regexp
Expand Down Expand Up @@ -78,6 +79,9 @@ type metadataMember struct {
SessionTimeout int32
Assignment map[string][]int32
}
type backfillEndOffset struct {
Value int64
}

// Configure validates the configuration for the consumer. At minimum, there must be a cluster name to which these
// consumers belong, as well as a list of servers provided for the Kafka cluster, of the form host:port. If not
Expand Down Expand Up @@ -109,6 +113,7 @@ func (module *KafkaClient) Configure(name string, configRoot string) {
viper.SetDefault(configRoot+".offsets-topic", "__consumer_offsets")
module.offsetsTopic = viper.GetString(configRoot + ".offsets-topic")
module.startLatest = viper.GetBool(configRoot + ".start-latest")
module.backfillEarliest = module.startLatest && viper.GetBool(configRoot+".backfill-earliest")
module.reportedConsumerGroup = "burrow-" + module.name

whitelist := viper.GetString(configRoot + ".group-whitelist")
Expand Down Expand Up @@ -165,13 +170,79 @@ func (module *KafkaClient) Stop() error {
return nil
}

func (module *KafkaClient) partitionConsumer(consumer sarama.PartitionConsumer) {
func (module *KafkaClient) startBackfillPartitionConsumer(partition int32, client helpers.SaramaClient, consumer sarama.Consumer) error {
pconsumer, err := consumer.ConsumePartition(module.offsetsTopic, partition, sarama.OffsetOldest)
if err != nil {
module.Log.Error("failed to consume partition",
zap.String("topic", module.offsetsTopic),
zap.Int32("partition", partition),
zap.String("error", err.Error()),
)
return err
}

// We check for an empty partition after building the consumer, otherwise we
// could be unlucky enough to observe a nonempty partition
// whose only segment expires right after we check.
oldestOffset, err := client.GetOffset(module.offsetsTopic, partition, sarama.OffsetOldest)
if err != nil {
module.Log.Error("failed to get oldest offset",
zap.String("topic", module.offsetsTopic),
zap.Int32("partition", partition),
zap.String("error", err.Error()),
)
return err
}

newestOffset, err := client.GetOffset(module.offsetsTopic, partition, sarama.OffsetNewest)
if err != nil {
module.Log.Error("failed to get newest offset",
zap.String("topic", module.offsetsTopic),
zap.Int32("partition", partition),
zap.String("error", err.Error()),
)
return err
}
if newestOffset > 0 {
// GetOffset returns the next (not yet published) offset, but we want the latest published offset.
newestOffset--
}

if oldestOffset >= newestOffset {
module.Log.Info("not backfilling empty partition",
zap.String("topic", module.offsetsTopic),
zap.Int32("partition", partition),
zap.Int64("oldestOffset", oldestOffset),
zap.Int64("newestOffset", newestOffset),
)
pconsumer.AsyncClose()
} else {
module.running.Add(1)
endWaterMark := &backfillEndOffset{newestOffset}
module.Log.Debug("consuming backfill",
zap.Int32("partition", partition),
zap.Int64("oldestOffset", oldestOffset),
zap.Int64("newestOffset", newestOffset),
)
go module.partitionConsumer(pconsumer, endWaterMark)
}
return nil
}

func (module *KafkaClient) partitionConsumer(consumer sarama.PartitionConsumer, stopAtOffset *backfillEndOffset) {
defer module.running.Done()
defer consumer.AsyncClose()

for {
select {
case msg := <-consumer.Messages():
if stopAtOffset != nil && msg.Offset >= stopAtOffset.Value {
module.Log.Debug("backfill consumer reached target offset, terminating",
zap.Int32("partition", msg.Partition),
zap.Int64("offset", stopAtOffset.Value),
)
return
}
if module.reportedConsumerGroup != "" {
burrowOffset := &protocol.StorageRequest{
RequestType: protocol.StorageSetConsumerOffset,
Expand Down Expand Up @@ -228,18 +299,49 @@ func (module *KafkaClient) startKafkaConsumer(client helpers.SaramaClient) error
zap.String("topic", module.offsetsTopic),
zap.Int("count", len(partitions)),
)
for i, partition := range partitions {
for _, partition := range partitions {
pconsumer, err := consumer.ConsumePartition(module.offsetsTopic, partition, startFrom)
if err != nil {
module.Log.Error("failed to consume partition",
zap.String("topic", module.offsetsTopic),
zap.Int("partition", i),
zap.Int32("partition", partition),
zap.String("error", err.Error()),
)
return err
}
module.running.Add(1)
go module.partitionConsumer(pconsumer)
go module.partitionConsumer(pconsumer, nil)
}

if module.backfillEarliest {
module.Log.Debug("backfilling consumer offsets")
// Note: since we are consuming each partition twice,
// we need a second consumer instance
consumer, err := client.NewConsumerFromClient()
if err != nil {
module.Log.Error("failed to get new consumer", zap.Error(err))
client.Close()
return err
}

waiting := len(partitions)
backfillStartedChan := make(chan error)
for _, partition := range partitions {
go func(partition int32) {
backfillStartedChan <- module.startBackfillPartitionConsumer(partition, client, consumer)
}(partition)
}
for waiting > 0 {
select {
case err := <-backfillStartedChan:
waiting--
if err != nil {
return err
}
case <-module.quitChannel:
return nil
}
}
}

return nil
Expand Down Expand Up @@ -270,7 +372,7 @@ func (module *KafkaClient) processConsumerOffsetsMessage(msg *sarama.ConsumerMes

switch keyver {
case 0, 1:
module.decodeKeyAndOffset(keyBuffer, msg.Value, logger)
module.decodeKeyAndOffset(msg.Offset, keyBuffer, msg.Value, logger)
case 2:
module.decodeGroupMetadata(keyBuffer, msg.Value, logger)
default:
Expand Down Expand Up @@ -309,7 +411,7 @@ func (module *KafkaClient) acceptConsumerGroup(group string) bool {
return true
}

func (module *KafkaClient) decodeKeyAndOffset(keyBuffer *bytes.Buffer, value []byte, logger *zap.Logger) {
func (module *KafkaClient) decodeKeyAndOffset(offsetOrder int64, keyBuffer *bytes.Buffer, value []byte, logger *zap.Logger) {
// Version 0 and 1 keys are decoded the same way
offsetKey, errorAt := decodeOffsetKeyV0(keyBuffer)
if errorAt != "" {
Expand Down Expand Up @@ -347,9 +449,9 @@ func (module *KafkaClient) decodeKeyAndOffset(keyBuffer *bytes.Buffer, value []b

switch valueVersion {
case 0, 1:
module.decodeAndSendOffset(offsetKey, valueBuffer, offsetLogger, decodeOffsetValueV0)
module.decodeAndSendOffset(offsetOrder, offsetKey, valueBuffer, offsetLogger, decodeOffsetValueV0)
case 3:
module.decodeAndSendOffset(offsetKey, valueBuffer, offsetLogger, decodeOffsetValueV3)
module.decodeAndSendOffset(offsetOrder, offsetKey, valueBuffer, offsetLogger, decodeOffsetValueV3)
default:
offsetLogger.Warn("failed to decode",
zap.String("reason", "value version"),
Expand All @@ -358,7 +460,7 @@ func (module *KafkaClient) decodeKeyAndOffset(keyBuffer *bytes.Buffer, value []b
}
}

func (module *KafkaClient) decodeAndSendOffset(offsetKey offsetKey, valueBuffer *bytes.Buffer, logger *zap.Logger, decoder func(*bytes.Buffer) (offsetValue, string)) {
func (module *KafkaClient) decodeAndSendOffset(offsetOrder int64, offsetKey offsetKey, valueBuffer *bytes.Buffer, logger *zap.Logger, decoder func(*bytes.Buffer) (offsetValue, string)) {
offsetValue, errorAt := decoder(valueBuffer)
if errorAt != "" {
logger.Warn("failed to decode",
Expand All @@ -377,6 +479,7 @@ func (module *KafkaClient) decodeAndSendOffset(offsetKey offsetKey, valueBuffer
Group: offsetKey.Group,
Timestamp: int64(offsetValue.Timestamp),
Offset: int64(offsetValue.Offset),
Order: offsetOrder,
}
logger.Debug("consumer offset",
zap.Int64("offset", offsetValue.Offset),
Expand Down
52 changes: 46 additions & 6 deletions core/internal/consumer/kafka_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ func fixtureModule() *KafkaClient {
viper.Set("consumer.test.class-name", "kafka")
viper.Set("consumer.test.servers", []string{"broker1.example.com:1234"})
viper.Set("consumer.test.cluster", "test")
viper.Set("consumer.test-backfill.class-name", "kafka")
viper.Set("consumer.test-backfill.servers", []string{"broker1.example.com:1234"})
viper.Set("consumer.test-backfill.cluster", "test")
viper.Set("consumer.test-backfill.start-latest", true)
viper.Set("consumer.test-backfill.backfill-earliest", true)

return &module
}
Expand Down Expand Up @@ -94,7 +99,7 @@ func TestKafkaClient_partitionConsumer(t *testing.T) {
consumer.On("Errors").Return(func() <-chan *sarama.ConsumerError { return errorChan }())

module.running.Add(1)
go module.partitionConsumer(consumer)
go module.partitionConsumer(consumer, nil)

// Send a message over the error channel to make sure it doesn't block
testError := &sarama.ConsumerError{
Expand Down Expand Up @@ -125,7 +130,7 @@ func TestKafkaClient_partitionConsumer_reports_own_progress(t *testing.T) {
consumer.On("Errors").Return(func() <-chan *sarama.ConsumerError { return errorChan }())

module.running.Add(1)
go module.partitionConsumer(consumer)
go module.partitionConsumer(consumer, nil)

// Send a message over the Messages channel and ensure progress gets reported
message := &sarama.ConsumerMessage{
Expand Down Expand Up @@ -179,6 +184,40 @@ func TestKafkaClient_startKafkaConsumer(t *testing.T) {
client.AssertExpectations(t)
}

func TestKafkaClient_startKafkaConsumerWithBackfill(t *testing.T) {
module := fixtureModule()
module.Configure("test", "consumer.test-backfill")

// Channels for testing
messageChan := make(chan *sarama.ConsumerMessage)
errorChan := make(chan *sarama.ConsumerError)

// Don't assert expectations on this - the way it goes down, they're called but don't show up
mockPartitionConsumer := &helpers.MockSaramaPartitionConsumer{}
mockPartitionConsumer.On("AsyncClose").Return()
mockPartitionConsumer.On("Messages").Return(func() <-chan *sarama.ConsumerMessage { return messageChan }())
mockPartitionConsumer.On("Errors").Return(func() <-chan *sarama.ConsumerError { return errorChan }())

consumer := &helpers.MockSaramaConsumer{}
consumer.On("ConsumePartition", "__consumer_offsets", int32(0), sarama.OffsetOldest).Return(mockPartitionConsumer, nil)
consumer.On("ConsumePartition", "__consumer_offsets", int32(0), sarama.OffsetNewest).Return(mockPartitionConsumer, nil)

client := &helpers.MockSaramaClient{}
client.On("GetOffset", "__consumer_offsets", int32(0), sarama.OffsetOldest).Return(int64(123), nil)
client.On("GetOffset", "__consumer_offsets", int32(0), sarama.OffsetNewest).Return(int64(456), nil)
client.On("NewConsumerFromClient").Return(consumer, nil)
client.On("Partitions", "__consumer_offsets").Return([]int32{0}, nil)

err := module.startKafkaConsumer(client)
assert.Nil(t, err, "Expected startKafkaConsumer to return no error")

close(module.quitChannel)
module.running.Wait()

consumer.AssertExpectations(t)
client.AssertExpectations(t)
}

func TestKafkaClient_startKafkaConsumer_FailCreateConsumer(t *testing.T) {
module := fixtureModule()
module.Configure("test", "consumer.test")
Expand Down Expand Up @@ -430,7 +469,7 @@ func TestKafkaClient_decodeKeyAndOffset(t *testing.T) {
keyBuf := bytes.NewBuffer([]byte("\x00\x09testgroup\x00\x09testtopic\x00\x00\x00\x0b"))
valueBytes := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x20\xb4\x00\x08testdata\x00\x00\x00\x00\x00\x00\x06\x65")

go module.decodeKeyAndOffset(keyBuf, valueBytes, zap.NewNop())
go module.decodeKeyAndOffset(543, keyBuf, valueBytes, zap.NewNop())
request := <-module.App.StorageChannel

assert.Equalf(t, protocol.StorageSetConsumerOffset, request.RequestType, "Expected request sent with type StorageSetConsumerOffset, not %v", request.RequestType)
Expand All @@ -439,6 +478,7 @@ func TestKafkaClient_decodeKeyAndOffset(t *testing.T) {
assert.Equalf(t, int32(11), request.Partition, "Expected request sent with partition 0, not %v", request.Partition)
assert.Equalf(t, "testgroup", request.Group, "Expected request sent with Group testgroup, not %v", request.Group)
assert.Equalf(t, int64(8372), request.Offset, "Expected Offset to be 8372, not %v", request.Offset)
assert.Equalf(t, int64(543), request.Order, "Expected Order to be 543, not %v", request.Offset)
assert.Equalf(t, int64(1637), request.Timestamp, "Expected Timestamp to be 1637, not %v", request.Timestamp)
}

Expand All @@ -454,7 +494,7 @@ func TestKafkaClient_decodeKeyAndOffset_BadValueVersion(t *testing.T) {

for _, values := range decodeKeyAndOffsetErrors {
// Should not timeout
module.decodeKeyAndOffset(bytes.NewBuffer(values.KeyBytes), values.ValueBytes, zap.NewNop())
module.decodeKeyAndOffset(0, bytes.NewBuffer(values.KeyBytes), values.ValueBytes, zap.NewNop())
}
}

Expand All @@ -467,7 +507,7 @@ func TestKafkaClient_decodeKeyAndOffset_Whitelist(t *testing.T) {
valueBytes := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x20\xb4\x00\x08testdata\x00\x00\x00\x00\x00\x00\x06\x65")

// Should not timeout as the group should be dropped by the whitelist
module.decodeKeyAndOffset(keyBuf, valueBytes, zap.NewNop())
module.decodeKeyAndOffset(0, keyBuf, valueBytes, zap.NewNop())
}

func TestKafkaClient_decodeAndSendOffset_ErrorValue(t *testing.T) {
Expand All @@ -481,7 +521,7 @@ func TestKafkaClient_decodeAndSendOffset_ErrorValue(t *testing.T) {
}
valueBuf := bytes.NewBuffer([]byte("\x00\x00\x00\x00\x00\x00\x00\x00\x20\xb4\x00\x08testd"))

module.decodeAndSendOffset(offsetKey, valueBuf, zap.NewNop(), decodeOffsetValueV0)
module.decodeAndSendOffset(0, offsetKey, valueBuf, zap.NewNop(), decodeOffsetValueV0)
// Should not timeout
}

Expand Down
13 changes: 9 additions & 4 deletions core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func calculatePartitionStatus(offsets []*protocol.ConsumerOffset, brokerOffsets
// Rule 1 - If over the stored period, the lag is ever zero for the partition, the period is OK
func isLagAlwaysNotZero(offsets []*protocol.ConsumerOffset) bool {
for _, offset := range offsets {
if offset.Lag == 0 {
if offset.Lag != nil && offset.Lag.Value == 0 {
return false
}
}
Expand Down Expand Up @@ -379,9 +379,14 @@ func checkIfOffsetsStalled(offsets []*protocol.ConsumerOffset) bool {

// Rule 5 - If the consumer offsets are advancing, but the lag is not decreasing somewhere, it's a warning (consumer is slow)
func checkIfLagNotDecreasing(offsets []*protocol.ConsumerOffset) bool {
for i := 1; i < len(offsets); i++ {
if offsets[i].Lag < offsets[i-1].Lag {
return false
var lastLag *protocol.Lag
for i := 0; i < len(offsets); i++ {
lag := offsets[i].Lag
if lag != nil {
if lastLag != nil && lag.Value < lastLag.Value {
return false
}
lastLag = lag
}
}
return true
Expand Down
Loading

0 comments on commit 02f109d

Please sign in to comment.