Skip to content

Commit

Permalink
Replace topicMap with topicPartitions in cluster module (#439)
Browse files Browse the repository at this point in the history
* consistent with notifier executeTemplate

* replace topicMap with topicPartitions

* add partial partitions update test
  • Loading branch information
BewareMyPower authored and toddpalino committed Mar 2, 2019
1 parent 7f3d76c commit c6a993e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 43 deletions.
2 changes: 1 addition & 1 deletion config/default-http-post.tmpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.ID}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
53 changes: 30 additions & 23 deletions core/internal/cluster/kafka_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type KafkaCluster struct {
quitChannel chan struct{}
running sync.WaitGroup

fetchMetadata bool
topicMap map[string]int
fetchMetadata bool
topicPartitions map[string][]int32
}

// Configure validates the configuration for the cluster. At minimum, there must be a list of servers provided for the
Expand Down Expand Up @@ -144,21 +144,34 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa
return
}

// We'll use the partition counts later
topicMap := make(map[string]int)
// We'll use topicPartitions later
topicPartitions := make(map[string][]int32)
for _, topic := range topicList {
partitions, err := client.Partitions(topic)
if err != nil {
module.Log.Error("failed to fetch partition list", zap.String("sarama_error", err.Error()))
return
}
topicMap[topic] = len(partitions)

topicPartitions[topic] = make([]int32, 0, len(partitions))
for _, partitionID := range partitions {
if _, err := client.Leader(topic, partitionID); err != nil {
module.Log.Warn("failed to fetch leader for partition",
zap.String("topic", topic),
zap.Int32("partition", partitionID),
zap.String("sarama_error", err.Error()))
} else { // partitionID has a leader
// NOTE: append only happens here
// so cap(topicPartitions[topic]) is the partition count
topicPartitions[topic] = append(topicPartitions[topic], partitionID)
}
}
}

// Check for deleted topics if we have a previous map to check against
if module.topicMap != nil {
for topic := range module.topicMap {
if _, ok := topicMap[topic]; !ok {
if module.topicPartitions != nil {
for topic := range module.topicPartitions {
if _, ok := topicPartitions[topic]; !ok {
// Topic no longer exists - tell storage to delete it
module.App.StorageChannel <- &protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteTopic,
Expand All @@ -169,8 +182,8 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa
}
}

// Save the new topicMap for next time
module.topicMap = topicMap
// Save the new topicPartitions for next time
module.topicPartitions = topicPartitions
}
}

Expand All @@ -179,31 +192,25 @@ func (module *KafkaCluster) generateOffsetRequests(client helpers.SaramaClient)
brokers := make(map[int32]helpers.SaramaBroker)

// Generate an OffsetRequest for each topic:partition and bucket it to the leader broker
errorTopics := make(map[string]bool)
for topic, partitions := range module.topicMap {
for i := 0; i < partitions; i++ {
broker, err := client.Leader(topic, int32(i))
for topic, partitions := range module.topicPartitions {
for _, partitionID := range partitions {
broker, err := client.Leader(topic, partitionID)
if err != nil {
module.Log.Warn("failed to fetch leader for partition",
zap.String("topic", topic),
zap.Int("partition", i),
zap.Int32("partition", partitionID),
zap.String("sarama_error", err.Error()))
errorTopics[topic] = true
module.fetchMetadata = true
continue
}
if _, ok := requests[broker.ID()]; !ok {
requests[broker.ID()] = &sarama.OffsetRequest{}
}
brokers[broker.ID()] = broker
requests[broker.ID()].AddBlock(topic, int32(i), sarama.OffsetNewest, 1)
requests[broker.ID()].AddBlock(topic, partitionID, sarama.OffsetNewest, 1)
}
}

// If there are any topics that had errors, force a metadata refresh on the next run
if len(errorTopics) > 0 {
module.fetchMetadata = true
}

return requests, brokers
}

Expand Down Expand Up @@ -251,7 +258,7 @@ func (module *KafkaCluster) getOffsets(client helpers.SaramaClient) {
Partition: partition,
Offset: offsetResponse.Offsets[0],
Timestamp: ts,
TopicPartitionCount: int32(module.topicMap[topic]),
TopicPartitionCount: int32(cap(module.topicPartitions[topic])),
}
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, offset, 1)
}
Expand Down
70 changes: 51 additions & 19 deletions core/internal/cluster/kafka_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"sync"

"github.com/linkedin/Burrow/core/internal/helpers"
"github.com/linkedin/Burrow/core/protocol"
"sync"
)

func fixtureModule() *KafkaCluster {
Expand Down Expand Up @@ -79,16 +80,43 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_NoDelete(t *testing.T)
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0}, nil)
client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.maybeUpdateMetadataAndDeleteTopics(client)

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic))
}

func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_PartialUpdate(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")

// Set up the mock to return a test topic and partition
client := &helpers.MockSaramaClient{}
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0, 1}, nil)

var nilBroker *helpers.BurrowSaramaBroker
client.On("Leader", "testtopic", int32(0)).Return(nilBroker, errors.New("no leader error"))
client.On("Leader", "testtopic", int32(1)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.maybeUpdateMetadataAndDeleteTopics(client)

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap))
topic, ok := module.topicMap["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicMap")
assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic)
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, len(topic), 1, "Expected testtopic's length to be 1, not %v", len(topic))
assert.Equalf(t, cap(topic), 2, "Expected testtopic's capacity to be 2, not %v", cap(topic))
}

func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {
Expand All @@ -100,10 +128,14 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0}, nil)
client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.topicMap = make(map[string]int)
module.topicMap["topictodelete"] = 10
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["topictodelete"] = make([]int32, 0, 10)
for i := 0; i < cap(module.topicPartitions["topictodelete"]); i++ {
module.topicPartitions["topictodelete"] = append(module.topicPartitions["topictodelete"], int32(i))
}

// Need to wait for this request to come in and finish, which happens when we call maybeUpdate...
wg := &sync.WaitGroup{}
Expand All @@ -120,17 +152,17 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap))
topic, ok := module.topicMap["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicMap")
assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic)
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic))
}

func TestKafkaCluster_generateOffsetRequests(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 1
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0}

// Set up a broker mock
broker := &helpers.MockSaramaBroker{}
Expand All @@ -154,8 +186,8 @@ func TestKafkaCluster_generateOffsetRequests(t *testing.T) {
func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 2
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0, 1}

// Set up a broker mock
broker := &helpers.MockSaramaBroker{}
Expand All @@ -182,8 +214,8 @@ func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) {
func TestKafkaCluster_getOffsets(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 2
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0, 1}
module.fetchMetadata = false

// Set up an OffsetResponse
Expand Down Expand Up @@ -227,8 +259,8 @@ func TestKafkaCluster_getOffsets(t *testing.T) {
func TestKafkaCluster_getOffsets_BrokerFailed(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 1
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0}
module.fetchMetadata = false

// Set up a broker mock
Expand Down

0 comments on commit c6a993e

Please sign in to comment.