Skip to content

Commit

Permalink
Updated unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeongkyun-oh committed Jun 27, 2021
1 parent b2015a6 commit c56ec05
Show file tree
Hide file tree
Showing 2 changed files with 320 additions and 47 deletions.
255 changes: 220 additions & 35 deletions datasync/chaindatafetcher/kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_newSegment_Success(t *testing.T) {
func Test_newSegment_Success_LegacyMessage(t *testing.T) {
value := common.MakeRandomBytes(100)
rand.Seed(time.Now().UnixNano())
total := uint64(10)
Expand All @@ -57,49 +57,167 @@ func Test_newSegment_Success(t *testing.T) {
assert.Equal(t, key, segment.key)
}

func Test_newSegment_Fail(t *testing.T) {
// input message is nil
seg, err := newSegment(nil)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), nilConsumerMessageErrorMsg))
assert.Nil(t, seg)

// the appropriate headers not given
seg, err = newSegment(&sarama.ConsumerMessage{})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), wrongHeaderNumberErrorMsg))
assert.Nil(t, seg)
func Test_newSegment_Success_Version1Message(t *testing.T) {
value := common.MakeRandomBytes(100)
rand.Seed(time.Now().UnixNano())
total := uint64(10)
idx := rand.Uint64() % total
totalBytes := common.Int64ToByteBigEndian(total)
idxBytes := common.Int64ToByteBigEndian(idx)
key := "test-key"
version := MsgVersion1_0
versionBytes := []byte(version)
producerId := GetDefaultProducerId()
producerIdBytes := []byte(producerId)

// the first header key is wrong
seg, err = newSegment(&sarama.ConsumerMessage{
msg := &sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte("wrong-header-key")},
{Key: []byte(KeyTotalSegments)},
{Key: []byte(KeyTotalSegments), Value: totalBytes},
{Key: []byte(KeySegmentIdx), Value: idxBytes},
{Key: []byte(KeyVersion), Value: versionBytes},
{Key: []byte(KeyProducerId), Value: producerIdBytes},
},
})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), wrongHeaderKeyErrorMsg))
assert.Nil(t, seg)
Key: []byte(key),
Value: value,
}

// the second header key is wrong
seg, err = newSegment(&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte("wrong-header-key")},
segment, err := newSegment(msg)
assert.NoError(t, err)
assert.Equal(t, msg, segment.orig)
assert.Equal(t, value, segment.value)
assert.Equal(t, total, segment.total)
assert.Equal(t, idx, segment.index)
assert.Equal(t, key, segment.key)
assert.Equal(t, version, segment.version)
assert.Equal(t, producerId, segment.producerId)
}

func Test_newSegment_Fail(t *testing.T) {
type testcase struct {
name string
input *sarama.ConsumerMessage
errMsg string
}

testcases := []testcase{
{
"nil consumer message",
nil,
nilConsumerMessageErrorMsg,
},
})
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), wrongHeaderKeyErrorMsg))
assert.Nil(t, seg)
{
"empty consumer message",
&sarama.ConsumerMessage{},
wrongHeaderNumberErrorMsg,
},
{
"wrong first key with legacy header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte("wrong-header-key")},
{Key: []byte(KeyTotalSegments)},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong second key with legacy header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte("wrong-header-key")},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong first key with 1.0 header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte("wrong-header-key")},
{Key: []byte(KeyTotalSegments)},
{Key: []byte(KeyVersion), Value: []byte(MsgVersion1_0)},
{Key: []byte(KeyProducerId)},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong second key with 1.0 header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte("wrong-header-key")},
{Key: []byte(KeyVersion), Value: []byte(MsgVersion1_0)},
{Key: []byte(KeyProducerId)},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong third key with 1.0 header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte(KeyTotalSegments)},
{Key: []byte("wrong-header-key")},
{Key: []byte(KeyProducerId)},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong fourth key with 1.0 header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte(KeyTotalSegments)},
{Key: []byte(KeyVersion), Value: []byte(MsgVersion1_0)},
{Key: []byte("wrong-header-key")},
},
},
wrongHeaderKeyErrorMsg,
},
{
"wrong version with 1.0 header",
&sarama.ConsumerMessage{
Headers: []*sarama.RecordHeader{
{Key: []byte(KeySegmentIdx)},
{Key: []byte(KeyTotalSegments)},
{Key: []byte(KeyVersion), Value: []byte("wrong-version")},
{Key: []byte(KeyProducerId)},
},
},
wrongMsgVersionErrorMsg,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
seg, err := newSegment(tc.input)
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), tc.errMsg))
assert.Nil(t, seg)
})
}
}

func makeTestV1Segment(orig *sarama.ConsumerMessage, key string, total, index uint64, version, producerId string) *Segment {
seg := makeTestSegment(orig, key, total, index)
seg.version = version
seg.producerId = producerId
return seg
}

func makeTestSegment(orig *sarama.ConsumerMessage, key string, total, index uint64) *Segment {
return &Segment{
orig: orig,
key: key,
total: total,
index: index,
value: common.MakeRandomBytes(5),
orig: orig,
key: key,
total: total,
index: index,
value: common.MakeRandomBytes(5),
version: "",
producerId: "",
}
}

Expand Down Expand Up @@ -371,3 +489,70 @@ func TestConsumer_updateOffset(t *testing.T) {
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), emptySegmentErrorMsg))
}

func getTestConsumer() (string, *Consumer) {
testTopic := "test-topic"
testConsumer := &Consumer{
handlers: make(map[string]TopicHandler),
}

testConsumer.handlers[testTopic] = func(message *sarama.ConsumerMessage) error {
return nil
}
return testTopic, testConsumer
}

func Test_V1Segment_MultiProducers(t *testing.T) {
{ // Legacy segments test
var buffer [][]*Segment
var err error

msgKey := "msg"
total := uint64(3)
indices := []uint64{0, 1, 0, 2, 1, 0, 2, 1, 1 /* duplicated */, 2}

topic, consumer := getTestConsumer()

var msgs []*Segment
for _, idx := range indices {
msgs = append(msgs, makeTestSegment(&sarama.ConsumerMessage{Topic: topic}, msgKey, total, idx))
}

for idx, msg := range msgs {
buffer, err = insertSegment(msg, buffer)
if idx < 6 {
assert.NoError(t, err)
} else if idx == 6 {
assert.Error(t, err)
assert.True(t, strings.Contains(err.Error(), missingSegmentErrorMsg))
break
}
buffer, err = consumer.handleBufferedMessages(buffer)
assert.NoError(t, err)
}
}

{ // V1 segments test
var buffer [][]*Segment
var err error

msgKey := "msg"
total := uint64(3)
producerId := []string{"1", "1", "2", "1", "2", "3", "2", "3", "3", "3"}
indices := []uint64{0, 1, 0, 2, 1, 0, 2, 1, 1 /* duplicated */, 2}

topic, consumer := getTestConsumer()

var msgs []*Segment
for pi, si := range indices {
msgs = append(msgs, makeTestV1Segment(&sarama.ConsumerMessage{Topic: topic}, msgKey, total, si, MsgVersion1_0, producerId[pi]))
}

for _, msg := range msgs {
buffer, err = insertSegment(msg, buffer)
assert.NoError(t, err)
buffer, err = consumer.handleBufferedMessages(buffer)
assert.NoError(t, err)
}
}
}

0 comments on commit c56ec05

Please sign in to comment.