Skip to content

Commit

Permalink
Use MetadataResponse V5 in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
stanislavkozlovski committed Aug 27, 2020
1 parent 921cf9c commit 91e853d
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 59 deletions.
44 changes: 22 additions & 22 deletions async_producer_test.go
Expand Up @@ -102,7 +102,7 @@ func TestAsyncProducer(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
leader0 := NewMockBroker(t, 2)
leader1 := NewMockBroker(t, 3)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1 := newMetadataResponse()
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
Expand All @@ -291,7 +291,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader1.Returns(prodNotLeader)

metadataLeader2 := new(MetadataResponse)
metadataLeader2 := newMetadataResponse()
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
leader1.Returns(metadataLeader2)
Expand All @@ -318,7 +318,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1 := newMetadataResponse()
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
Expand All @@ -344,7 +344,7 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
expectResults(t, producer, 0, 2)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
metadataLeader2 := new(MetadataResponse)
metadataLeader2 := newMetadataResponse()
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1 := newMetadataResponse()
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
Expand All @@ -480,7 +480,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down

// ok fine, tell it to go to leader2 finally
metadataLeader2 := new(MetadataResponse)
metadataLeader2 := newMetadataResponse()
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader2)
Expand All @@ -500,7 +500,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1 := newMetadataResponse()
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
Expand All @@ -522,7 +522,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
leader1.Returns(prodNotLeader)

metadataLeader2 := new(MetadataResponse)
metadataLeader2 := newMetadataResponse()
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1 := newMetadataResponse()
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)
Expand All @@ -583,7 +583,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)

metadataLeader2 := new(MetadataResponse)
metadataLeader2 := newMetadataResponse()
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)

Expand Down Expand Up @@ -624,7 +624,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataResponse)
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
leader := NewMockBroker(t, 2)
leaderAddr := leader.Addr()

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse := newMetadataResponse()
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
Expand Down Expand Up @@ -802,7 +802,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader := newMetadataResponse()
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)
Expand Down Expand Up @@ -851,7 +851,7 @@ func TestAsyncProducerNoReturns(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)

metadataLeader := new(MetadataResponse)
metadataLeader := newMetadataResponse()
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)
Expand Down Expand Up @@ -1259,7 +1259,7 @@ func testProducerInterceptor(
) {
seedBroker := NewMockBroker(t, 1)
leader := NewMockBroker(t, 2)
metadataLeader := new(MetadataResponse)
metadataLeader := newMetadataResponse()
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader)
Expand Down

0 comments on commit 91e853d

Please sign in to comment.