diff --git a/config/default-http-post.tmpl b/config/default-http-post.tmpl index 13ba2c97..d9512831 100644 --- a/config/default-http-post.tmpl +++ b/config/default-http-post.tmpl @@ -1 +1 @@ -{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]} +{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.ID}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]} diff --git a/core/internal/cluster/kafka_cluster.go b/core/internal/cluster/kafka_cluster.go index eb08d7bd..b43705a0 100644 --- a/core/internal/cluster/kafka_cluster.go +++ b/core/internal/cluster/kafka_cluster.go @@ -45,8 +45,8 @@ type KafkaCluster struct { quitChannel chan struct{} running sync.WaitGroup - fetchMetadata bool - topicMap map[string]int + fetchMetadata bool + topicPartitions map[string][]int32 } // Configure validates the configuration for the cluster. At minimum, there must be a list of servers provided for the @@ -144,21 +144,34 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa return } - // We'll use the partition counts later - topicMap := make(map[string]int) + // We'll use topicPartitions later + topicPartitions := make(map[string][]int32) for _, topic := range topicList { partitions, err := client.Partitions(topic) if err != nil { module.Log.Error("failed to fetch partition list", zap.String("sarama_error", err.Error())) return } - topicMap[topic] = len(partitions) + + topicPartitions[topic] = make([]int32, 0, len(partitions)) + for _, partitionID := range partitions { + if _, err := client.Leader(topic, partitionID); err != nil { + module.Log.Warn("failed to fetch leader for partition", + zap.String("topic", topic), + zap.Int32("partition", partitionID), + zap.String("sarama_error", err.Error())) + } else { // partitionID has a leader + // NOTE: append only happens here + // so cap(topicPartitions[topic]) is the partition count + topicPartitions[topic] = append(topicPartitions[topic], partitionID) + } + } } // Check for deleted topics if we have a previous map to check against - if module.topicMap != nil { - for topic := range module.topicMap { - if _, ok := topicMap[topic]; !ok { + if module.topicPartitions != nil { + for topic := range module.topicPartitions { + if _, ok := topicPartitions[topic]; !ok { // Topic no longer exists - tell storage to delete it module.App.StorageChannel <- &protocol.StorageRequest{ RequestType: protocol.StorageSetDeleteTopic, @@ -169,8 +182,8 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa } } - // Save the new topicMap for next time - module.topicMap = topicMap + // Save the new topicPartitions for next time + module.topicPartitions = topicPartitions } } @@ -179,31 +192,25 @@ func (module *KafkaCluster) generateOffsetRequests(client helpers.SaramaClient) brokers := make(map[int32]helpers.SaramaBroker) // Generate an OffsetRequest for each topic:partition and bucket it to the leader broker - errorTopics := make(map[string]bool) - for topic, partitions := range module.topicMap { - for i := 0; i < partitions; i++ { - broker, err := client.Leader(topic, int32(i)) + for topic, partitions := range module.topicPartitions { + for _, partitionID := range partitions { + broker, err := client.Leader(topic, partitionID) if err != nil { module.Log.Warn("failed to fetch leader for partition", zap.String("topic", topic), - zap.Int("partition", i), + zap.Int32("partition", partitionID), zap.String("sarama_error", err.Error())) - errorTopics[topic] = true + module.fetchMetadata = true continue } if _, ok := requests[broker.ID()]; !ok { requests[broker.ID()] = &sarama.OffsetRequest{} } brokers[broker.ID()] = broker - requests[broker.ID()].AddBlock(topic, int32(i), sarama.OffsetNewest, 1) + requests[broker.ID()].AddBlock(topic, partitionID, sarama.OffsetNewest, 1) } } - // If there are any topics that had errors, force a metadata refresh on the next run - if len(errorTopics) > 0 { - module.fetchMetadata = true - } - return requests, brokers } @@ -251,7 +258,7 @@ func (module *KafkaCluster) getOffsets(client helpers.SaramaClient) { Partition: partition, Offset: offsetResponse.Offsets[0], Timestamp: ts, - TopicPartitionCount: int32(module.topicMap[topic]), + TopicPartitionCount: int32(cap(module.topicPartitions[topic])), } helpers.TimeoutSendStorageRequest(module.App.StorageChannel, offset, 1) } diff --git a/core/internal/cluster/kafka_cluster_test.go b/core/internal/cluster/kafka_cluster_test.go index 5310cd3f..4efba7cc 100644 --- a/core/internal/cluster/kafka_cluster_test.go +++ b/core/internal/cluster/kafka_cluster_test.go @@ -22,9 +22,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "sync" + "github.com/linkedin/Burrow/core/internal/helpers" "github.com/linkedin/Burrow/core/protocol" - "sync" ) func fixtureModule() *KafkaCluster { @@ -79,16 +80,43 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_NoDelete(t *testing.T) client.On("RefreshMetadata").Return(nil) client.On("Topics").Return([]string{"testtopic"}, nil) client.On("Partitions", "testtopic").Return([]int32{0}, nil) + client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil) + + module.fetchMetadata = true + module.maybeUpdateMetadataAndDeleteTopics(client) + + client.AssertExpectations(t) + assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false") + assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions)) + topic, ok := module.topicPartitions["testtopic"] + assert.True(t, ok, "Expected to find testtopic in topicPartitions") + assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic)) +} + +func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_PartialUpdate(t *testing.T) { + module := fixtureModule() + module.Configure("test", "cluster.test") + + // Set up the mock to return a test topic and partition + client := &helpers.MockSaramaClient{} + client.On("RefreshMetadata").Return(nil) + client.On("Topics").Return([]string{"testtopic"}, nil) + client.On("Partitions", "testtopic").Return([]int32{0, 1}, nil) + + var nilBroker *helpers.BurrowSaramaBroker + client.On("Leader", "testtopic", int32(0)).Return(nilBroker, errors.New("no leader error")) + client.On("Leader", "testtopic", int32(1)).Return(&helpers.MockSaramaBroker{}, nil) module.fetchMetadata = true module.maybeUpdateMetadataAndDeleteTopics(client) client.AssertExpectations(t) assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false") - assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap)) - topic, ok := module.topicMap["testtopic"] - assert.True(t, ok, "Expected to find testtopic in topicMap") - assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic) + assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions)) + topic, ok := module.topicPartitions["testtopic"] + assert.True(t, ok, "Expected to find testtopic in topicPartitions") + assert.Equalf(t, len(topic), 1, "Expected testtopic's length to be 1, not %v", len(topic)) + assert.Equalf(t, cap(topic), 2, "Expected testtopic's capacity to be 2, not %v", cap(topic)) } func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) { @@ -100,10 +128,14 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) { client.On("RefreshMetadata").Return(nil) client.On("Topics").Return([]string{"testtopic"}, nil) client.On("Partitions", "testtopic").Return([]int32{0}, nil) + client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil) module.fetchMetadata = true - module.topicMap = make(map[string]int) - module.topicMap["topictodelete"] = 10 + module.topicPartitions = make(map[string][]int32) + module.topicPartitions["topictodelete"] = make([]int32, 0, 10) + for i := 0; i < cap(module.topicPartitions["topictodelete"]); i++ { + module.topicPartitions["topictodelete"] = append(module.topicPartitions["topictodelete"], int32(i)) + } // Need to wait for this request to come in and finish, which happens when we call maybeUpdate... wg := &sync.WaitGroup{} @@ -120,17 +152,17 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) { client.AssertExpectations(t) assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false") - assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap)) - topic, ok := module.topicMap["testtopic"] - assert.True(t, ok, "Expected to find testtopic in topicMap") - assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic) + assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions)) + topic, ok := module.topicPartitions["testtopic"] + assert.True(t, ok, "Expected to find testtopic in topicPartitions") + assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic)) } func TestKafkaCluster_generateOffsetRequests(t *testing.T) { module := fixtureModule() module.Configure("test", "cluster.test") - module.topicMap = make(map[string]int) - module.topicMap["testtopic"] = 1 + module.topicPartitions = make(map[string][]int32) + module.topicPartitions["testtopic"] = []int32{0} // Set up a broker mock broker := &helpers.MockSaramaBroker{} @@ -154,8 +186,8 @@ func TestKafkaCluster_generateOffsetRequests(t *testing.T) { func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) { module := fixtureModule() module.Configure("test", "cluster.test") - module.topicMap = make(map[string]int) - module.topicMap["testtopic"] = 2 + module.topicPartitions = make(map[string][]int32) + module.topicPartitions["testtopic"] = []int32{0, 1} // Set up a broker mock broker := &helpers.MockSaramaBroker{} @@ -182,8 +214,8 @@ func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) { func TestKafkaCluster_getOffsets(t *testing.T) { module := fixtureModule() module.Configure("test", "cluster.test") - module.topicMap = make(map[string]int) - module.topicMap["testtopic"] = 2 + module.topicPartitions = make(map[string][]int32) + module.topicPartitions["testtopic"] = []int32{0, 1} module.fetchMetadata = false // Set up an OffsetResponse @@ -227,8 +259,8 @@ func TestKafkaCluster_getOffsets(t *testing.T) { func TestKafkaCluster_getOffsets_BrokerFailed(t *testing.T) { module := fixtureModule() module.Configure("test", "cluster.test") - module.topicMap = make(map[string]int) - module.topicMap["testtopic"] = 1 + module.topicPartitions = make(map[string][]int32) + module.topicPartitions["testtopic"] = []int32{0} module.fetchMetadata = false // Set up a broker mock