Skip to content

Commit

Permalink
Removed version/producerId from params
Browse files Browse the repository at this point in the history
  • Loading branch information
jeongkyun-oh committed Jun 28, 2021
1 parent faa347f commit 120f65d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
10 changes: 5 additions & 5 deletions datasync/chaindatafetcher/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (k *Kafka) split(data []byte) ([][]byte, int) {
return segments, len(segments)
}

func (k *Kafka) makeProducerMessage(topic, key, version, producerId string, segment []byte, segmentIdx, totalSegments uint64) *sarama.ProducerMessage {
func (k *Kafka) makeProducerMessage(topic, key string, segment []byte, segmentIdx, totalSegments uint64) *sarama.ProducerMessage {
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(key),
Expand All @@ -162,15 +162,15 @@ func (k *Kafka) makeProducerMessage(topic, key, version, producerId string, segm
Value: sarama.ByteEncoder(segment),
}

if version == MsgVersion1_0 {
if k.config.MsgVersion == MsgVersion1_0 {
extraHeaders := []sarama.RecordHeader{
{
Key: []byte(KeyVersion),
Value: []byte(version),
Value: []byte(k.config.MsgVersion),
},
{
Key: []byte(KeyProducerId),
Value: []byte(producerId),
Value: []byte(k.config.ProducerId),
},
}
msg.Headers = append(msg.Headers, extraHeaders...)
Expand All @@ -189,7 +189,7 @@ func (k *Kafka) Publish(topic string, data interface{}) error {
}
segments, totalSegments := k.split(dataBytes)
for idx, segment := range segments {
msg := k.makeProducerMessage(topic, key, k.config.MsgVersion, k.config.ProducerId, segment, uint64(idx), uint64(totalSegments))
msg := k.makeProducerMessage(topic, key, segment, uint64(idx), uint64(totalSegments))
_, _, err = k.producer.SendMessage(msg)
if err != nil {
logger.Error("sending kafka message is failed", "err", err, "segmentIdx", idx, "key", key)
Expand Down
6 changes: 3 additions & 3 deletions datasync/chaindatafetcher/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s *KafkaSuite) TestKafka_makeProducerV1Message() {
producerId := "test-producer-id"

// make a producer message with the random input
msg := s.kfk.makeProducerMessage(s.topic, "", version, producerId, data, idx, totalSegments)
msg := s.kfk.makeProducerMessage(s.topic, "", data, idx, totalSegments)

// compare the data is correctly inserted
s.Equal(s.topic, msg.Topic)
Expand All @@ -120,7 +120,7 @@ func (s *KafkaSuite) TestKafka_makeProducerMessage() {
idx := rand.Uint64() % totalSegments

// make a producer message with the random input
msg := s.kfk.makeProducerMessage(s.topic, "", "", "", data, idx, totalSegments)
msg := s.kfk.makeProducerMessage(s.topic, "", data, idx, totalSegments)

// compare the data is correctly inserted
s.Equal(s.topic, msg.Topic)
Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *KafkaSuite) TestKafka_PubSubWithSegements_BufferOverflow() {

// insert incomplete message segments
for i := 0; i < 3; i++ {
msg := s.kfk.makeProducerMessage(topic, "test-key-"+strconv.Itoa(i), MsgVersion1_0, "producer-0", common.MakeRandomBytes(10), 0, 2)
msg := s.kfk.makeProducerMessage(topic, "test-key-"+strconv.Itoa(i), common.MakeRandomBytes(10), 0, 2)
_, _, err = s.kfk.producer.SendMessage(msg)
s.NoError(err)
}
Expand Down

0 comments on commit 120f65d

Please sign in to comment.