Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace topicMap with topicPartitions in cluster module #439

Merged
merged 3 commits into from Mar 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/default-http-post.tmpl
@@ -1 +1 @@
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.ID}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
53 changes: 30 additions & 23 deletions core/internal/cluster/kafka_cluster.go
Expand Up @@ -45,8 +45,8 @@ type KafkaCluster struct {
quitChannel chan struct{}
running sync.WaitGroup

fetchMetadata bool
topicMap map[string]int
fetchMetadata bool
topicPartitions map[string][]int32
}

// Configure validates the configuration for the cluster. At minimum, there must be a list of servers provided for the
Expand Down Expand Up @@ -144,21 +144,34 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa
return
}

// We'll use the partition counts later
topicMap := make(map[string]int)
// We'll use topicPartitions later
topicPartitions := make(map[string][]int32)
for _, topic := range topicList {
partitions, err := client.Partitions(topic)
if err != nil {
module.Log.Error("failed to fetch partition list", zap.String("sarama_error", err.Error()))
return
}
topicMap[topic] = len(partitions)

topicPartitions[topic] = make([]int32, 0, len(partitions))
for _, partitionID := range partitions {
if _, err := client.Leader(topic, partitionID); err != nil {
module.Log.Warn("failed to fetch leader for partition",
zap.String("topic", topic),
zap.Int32("partition", partitionID),
zap.String("sarama_error", err.Error()))
} else { // partitionID has a leader
// NOTE: append only happens here
// so cap(topicPartitions[topic]) is the partition count
topicPartitions[topic] = append(topicPartitions[topic], partitionID)
}
}
}

// Check for deleted topics if we have a previous map to check against
if module.topicMap != nil {
for topic := range module.topicMap {
if _, ok := topicMap[topic]; !ok {
if module.topicPartitions != nil {
for topic := range module.topicPartitions {
if _, ok := topicPartitions[topic]; !ok {
// Topic no longer exists - tell storage to delete it
module.App.StorageChannel <- &protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteTopic,
Expand All @@ -169,8 +182,8 @@ func (module *KafkaCluster) maybeUpdateMetadataAndDeleteTopics(client helpers.Sa
}
}

// Save the new topicMap for next time
module.topicMap = topicMap
// Save the new topicPartitions for next time
module.topicPartitions = topicPartitions
}
}

Expand All @@ -179,31 +192,25 @@ func (module *KafkaCluster) generateOffsetRequests(client helpers.SaramaClient)
brokers := make(map[int32]helpers.SaramaBroker)

// Generate an OffsetRequest for each topic:partition and bucket it to the leader broker
errorTopics := make(map[string]bool)
for topic, partitions := range module.topicMap {
for i := 0; i < partitions; i++ {
broker, err := client.Leader(topic, int32(i))
for topic, partitions := range module.topicPartitions {
for _, partitionID := range partitions {
broker, err := client.Leader(topic, partitionID)
if err != nil {
module.Log.Warn("failed to fetch leader for partition",
zap.String("topic", topic),
zap.Int("partition", i),
zap.Int32("partition", partitionID),
zap.String("sarama_error", err.Error()))
errorTopics[topic] = true
module.fetchMetadata = true
continue
}
if _, ok := requests[broker.ID()]; !ok {
requests[broker.ID()] = &sarama.OffsetRequest{}
}
brokers[broker.ID()] = broker
requests[broker.ID()].AddBlock(topic, int32(i), sarama.OffsetNewest, 1)
requests[broker.ID()].AddBlock(topic, partitionID, sarama.OffsetNewest, 1)
}
}

// If there are any topics that had errors, force a metadata refresh on the next run
if len(errorTopics) > 0 {
module.fetchMetadata = true
}

return requests, brokers
}

Expand Down Expand Up @@ -251,7 +258,7 @@ func (module *KafkaCluster) getOffsets(client helpers.SaramaClient) {
Partition: partition,
Offset: offsetResponse.Offsets[0],
Timestamp: ts,
TopicPartitionCount: int32(module.topicMap[topic]),
TopicPartitionCount: int32(cap(module.topicPartitions[topic])),
}
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, offset, 1)
}
Expand Down
70 changes: 51 additions & 19 deletions core/internal/cluster/kafka_cluster_test.go
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"sync"

"github.com/linkedin/Burrow/core/internal/helpers"
"github.com/linkedin/Burrow/core/protocol"
"sync"
)

func fixtureModule() *KafkaCluster {
Expand Down Expand Up @@ -79,16 +80,43 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_NoDelete(t *testing.T)
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0}, nil)
client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.maybeUpdateMetadataAndDeleteTopics(client)

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic))
}

func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_PartialUpdate(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")

// Set up the mock to return a test topic and partition
client := &helpers.MockSaramaClient{}
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0, 1}, nil)

var nilBroker *helpers.BurrowSaramaBroker
client.On("Leader", "testtopic", int32(0)).Return(nilBroker, errors.New("no leader error"))
client.On("Leader", "testtopic", int32(1)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.maybeUpdateMetadataAndDeleteTopics(client)

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap))
topic, ok := module.topicMap["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicMap")
assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic)
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, len(topic), 1, "Expected testtopic's length to be 1, not %v", len(topic))
assert.Equalf(t, cap(topic), 2, "Expected testtopic's capacity to be 2, not %v", cap(topic))
}

func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {
Expand All @@ -100,10 +128,14 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {
client.On("RefreshMetadata").Return(nil)
client.On("Topics").Return([]string{"testtopic"}, nil)
client.On("Partitions", "testtopic").Return([]int32{0}, nil)
client.On("Leader", "testtopic", int32(0)).Return(&helpers.MockSaramaBroker{}, nil)

module.fetchMetadata = true
module.topicMap = make(map[string]int)
module.topicMap["topictodelete"] = 10
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["topictodelete"] = make([]int32, 0, 10)
for i := 0; i < cap(module.topicPartitions["topictodelete"]); i++ {
module.topicPartitions["topictodelete"] = append(module.topicPartitions["topictodelete"], int32(i))
}

// Need to wait for this request to come in and finish, which happens when we call maybeUpdate...
wg := &sync.WaitGroup{}
Expand All @@ -120,17 +152,17 @@ func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_Delete(t *testing.T) {

client.AssertExpectations(t)
assert.False(t, module.fetchMetadata, "Expected fetchMetadata to be reset to false")
assert.Lenf(t, module.topicMap, 1, "Expected 1 topic entry, not %v", len(module.topicMap))
topic, ok := module.topicMap["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicMap")
assert.Equalf(t, 1, topic, "Expected testtopic to be recorded with 1 partition, not %v", topic)
assert.Lenf(t, module.topicPartitions, 1, "Expected 1 topic entry, not %v", len(module.topicPartitions))
topic, ok := module.topicPartitions["testtopic"]
assert.True(t, ok, "Expected to find testtopic in topicPartitions")
assert.Equalf(t, 1, len(topic), "Expected testtopic to be recorded with 1 partition, not %v", len(topic))
}

func TestKafkaCluster_generateOffsetRequests(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 1
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0}

// Set up a broker mock
broker := &helpers.MockSaramaBroker{}
Expand All @@ -154,8 +186,8 @@ func TestKafkaCluster_generateOffsetRequests(t *testing.T) {
func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 2
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0, 1}

// Set up a broker mock
broker := &helpers.MockSaramaBroker{}
Expand All @@ -182,8 +214,8 @@ func TestKafkaCluster_generateOffsetRequests_NoLeader(t *testing.T) {
func TestKafkaCluster_getOffsets(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 2
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0, 1}
module.fetchMetadata = false

// Set up an OffsetResponse
Expand Down Expand Up @@ -227,8 +259,8 @@ func TestKafkaCluster_getOffsets(t *testing.T) {
func TestKafkaCluster_getOffsets_BrokerFailed(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")
module.topicMap = make(map[string]int)
module.topicMap["testtopic"] = 1
module.topicPartitions = make(map[string][]int32)
module.topicPartitions["testtopic"] = []int32{0}
module.fetchMetadata = false

// Set up a broker mock
Expand Down