Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implmeneted data split for kafka message #708

Merged
merged 5 commits into from
Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/utils/flaggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ var FlagGroups = []FlagGroup{
ChainDataFetcherKafkaReplicasFlag,
ChainDataFetcherKafkaPartitionsFlag,
ChainDataFetcherKafkaMaxMessageBytesFlag,
ChainDataFetcherKafkaSegmentSizeFlag,
ChainDataFetcherKafkaRequiredAcksFlag,
},
},
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,11 @@ var (
Usage: "The max size of a message produced by Kafka producer ",
Value: kafka.DefaultMaxMessageBytes,
}
ChainDataFetcherKafkaSegmentSizeFlag = cli.IntFlag{
Name: "chaindatafetcher.kafka.segment.size",
Usage: "The kafka data segment size",
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
Value: kafka.DefaultSegmentSize,
}
ChainDataFetcherKafkaRequiredAcksFlag = cli.IntFlag{
Name: "chaindatafetcher.kafka.required.acks",
Usage: "The level of acknowledgement reliability needed from Kafka broker (0: NoResponse, 1: WaitForLocal, -1: WaitForAll)",
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/nodecmd/dumpconfigcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func makeKafkaConfig(ctx *cli.Context) *kafka.KafkaConfig {
kafkaConfig.Partitions = int32(ctx.GlobalInt64(utils.ChainDataFetcherKafkaPartitionsFlag.Name))
kafkaConfig.Replicas = int16(ctx.GlobalInt64(utils.ChainDataFetcherKafkaReplicasFlag.Name))
kafkaConfig.SaramaConfig.Producer.MaxMessageBytes = ctx.GlobalInt(utils.ChainDataFetcherKafkaMaxMessageBytesFlag.Name)
kafkaConfig.SegmentSize = ctx.GlobalInt(utils.ChainDataFetcherKafkaSegmentSizeFlag.Name)
requiredAcks := sarama.RequiredAcks(ctx.GlobalInt(utils.ChainDataFetcherKafkaRequiredAcksFlag.Name))
if requiredAcks != sarama.NoResponse && requiredAcks != sarama.WaitForLocal && requiredAcks != sarama.WaitForAll {
logger.Crit("not supported requiredAcks. it must be NoResponse(0), WaitForLocal(1), or WaitForAll(-1)", "given", requiredAcks)
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/nodecmd/nodeflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var KENFlags = []cli.Flag{
utils.ChainDataFetcherKafkaTopicResourceFlag,
utils.ChainDataFetcherKafkaTopicEnvironmentFlag,
utils.ChainDataFetcherKafkaMaxMessageBytesFlag,
utils.ChainDataFetcherKafkaSegmentSizeFlag,
utils.ChainDataFetcherKafkaRequiredAcksFlag,
// DBSyncer
utils.EnableDBSyncerFlag,
Expand Down
7 changes: 5 additions & 2 deletions datasync/chaindatafetcher/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
DefaultTopicResourceName = "en-0"
DefaultMaxMessageBytes = 1000000
DefaultRequiredAcks = 1
DefaultSegmentSize = 1000000 // 1 MB
kjhman21 marked this conversation as resolved.
Show resolved Hide resolved
)

type KafkaConfig struct {
Expand All @@ -49,6 +50,7 @@ type KafkaConfig struct {
TopicResourceName string
Partitions int32 // Partitions is the number of partitions of a topic.
Replicas int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster.
SegmentSize int // SegmentSize is the size of kafka message segment
}

func GetDefaultKafkaConfig() *KafkaConfig {
Expand All @@ -66,6 +68,7 @@ func GetDefaultKafkaConfig() *KafkaConfig {
TopicResourceName: DefaultTopicResourceName,
Partitions: DefaultPartitions,
Replicas: DefaultReplicas,
SegmentSize: DefaultSegmentSize,
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -74,6 +77,6 @@ func (c *KafkaConfig) getTopicName(event string) string {
}

func (c *KafkaConfig) String() string {
return fmt.Sprintf("brokers: %v, topicEnvironment: %v, topicResourceName: %v, partitions: %v, replicas: %v, maxMessageBytes: %v, requiredAcks: %v",
c.Brokers, c.TopicEnvironmentName, c.TopicResourceName, c.Partitions, c.Replicas, c.SaramaConfig.Producer.MaxMessageBytes, c.SaramaConfig.Producer.RequiredAcks)
return fmt.Sprintf("brokers: %v, topicEnvironment: %v, topicResourceName: %v, partitions: %v, replicas: %v, maxMessageBytes: %v, requiredAcks: %v, segmentSize: %v",
c.Brokers, c.TopicEnvironmentName, c.TopicResourceName, c.Partitions, c.Replicas, c.SaramaConfig.Producer.MaxMessageBytes, c.SaramaConfig.Producer.RequiredAcks, c.SegmentSize)
}
60 changes: 55 additions & 5 deletions datasync/chaindatafetcher/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,28 @@
package kafka

import (
"crypto/md5"
"encoding/json"

"github.com/Shopify/sarama"
"github.com/klaytn/klaytn/common"
"github.com/klaytn/klaytn/log"
)

var logger = log.NewModuleLogger(log.ChainDataFetcher)

const (
MsgIdxTotalSegments = iota
MsgIdxSegmentIdx
MsgIdxCheckSum
)

const (
KeyTotalSegments = "totalSegments"
KeySegmentIdx = "segmentIdx"
KeyCheckSum = "checksum"
)

// Kafka connects to the brokers in an existing kafka cluster.
type Kafka struct {
config *KafkaConfig
Expand Down Expand Up @@ -76,17 +90,53 @@ func (k *Kafka) ListTopics() (map[string]sarama.TopicDetail, error) {
return k.admin.ListTopics()
}

func (k *Kafka) split(data []byte) ([][]byte, int) {
size := k.config.SegmentSize
var segments [][]byte
for len(data) > size {
segments = append(segments, data[:size])
data = data[size:]
}
segments = append(segments, data)
return segments, len(segments)
}

func (k *Kafka) makeProducerMessage(topic string, segment []byte, segmentIdx, totalSegments uint64) *sarama.ProducerMessage {
checkSum := md5.Sum(segment)
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
return &sarama.ProducerMessage{
Topic: topic,
Headers: []sarama.RecordHeader{
{
Key: []byte(KeyTotalSegments),
Value: common.Int64ToByteBigEndian(totalSegments),
},
{
Key: []byte(KeySegmentIdx),
Value: common.Int64ToByteBigEndian(segmentIdx),
},
{
Key: []byte(KeyCheckSum),
Value: checkSum[:],
},
},
Value: sarama.ByteEncoder(segment),
}
}

func (k *Kafka) Publish(topic string, msg interface{}) error {
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
data, err := json.Marshal(msg)
if err != nil {
return err
}

item := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(data),
segments, totalSegments := k.split(data)
for idx, segment := range segments {
item := k.makeProducerMessage(topic, segment, uint64(idx), uint64(totalSegments))
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
_, _, err = k.producer.SendMessage(item)
if err != nil {
logger.Error("sending kafka message is failed", "err", err, "segmentIdx", idx, "segment", string(segment))
return err
}
}

_, _, err = k.producer.SendMessage(item)
return err
}
88 changes: 88 additions & 0 deletions datasync/chaindatafetcher/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package kafka

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"math/rand"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -59,6 +62,50 @@ func (s *KafkaSuite) TearDownTest() {
s.kfk.Close()
}

func (s *KafkaSuite) TestKafka_split() {
segmentSize := 3
s.kfk.config.SegmentSize = segmentSize

// test with the size less than the segment size
bytes := common.MakeRandomBytes(segmentSize - 1)
parts, size := s.kfk.split(bytes)
s.Equal(bytes, parts[0])
s.Equal(1, size)

// test with the given segment size
bytes = common.MakeRandomBytes(segmentSize)
parts, size = s.kfk.split(bytes)
s.Equal(bytes, parts[0])
s.Equal(1, size)

// test with the size greater than the segment size
bytes = common.MakeRandomBytes(2*segmentSize + 2)
parts, size = s.kfk.split(bytes)
s.Equal(bytes[:segmentSize], parts[0])
s.Equal(bytes[segmentSize:2*segmentSize], parts[1])
s.Equal(bytes[2*segmentSize:], parts[2])
s.Equal(3, size)
}

func (s *KafkaSuite) TestKafka_makeProducerMessage() {
// make test data
data := common.MakeRandomBytes(100)
checksum := md5.Sum(data)
rand.Seed(time.Now().UnixNano())
totalSegments := rand.Uint64()
idx := rand.Uint64() % totalSegments

// make a producer message with the random input
msg := s.kfk.makeProducerMessage(s.topic, data, idx, totalSegments)

// compare the data is correctly inserted
s.Equal(s.topic, msg.Topic)
s.Equal(sarama.ByteEncoder(data), msg.Value)
s.Equal(totalSegments, binary.BigEndian.Uint64(msg.Headers[MsgIdxTotalSegments].Value))
s.Equal(idx, binary.BigEndian.Uint64(msg.Headers[MsgIdxSegmentIdx].Value))
s.Equal(checksum[:], msg.Headers[MsgIdxCheckSum].Value)
}

func (s *KafkaSuite) TestKafka_CreateAndDeleteTopic() {
// no topic to be deleted
err := s.kfk.DeleteTopic(s.topic)
Expand Down Expand Up @@ -252,6 +299,47 @@ func (s *KafkaSuite) TestKafka_PubSubWith2DifferentGroups() {
s.Equal(expected, actual2)
}

func (s *KafkaSuite) TestKafka_PubSubWithSegments() {
numTests := 1
testBytesSize := 10
segmentSize := 3

// to calculate data length
data, _ := json.Marshal(&kafkaData{common.MakeRandomBytes(testBytesSize)})
totalSegments := len(data) / segmentSize

s.kfk.config.SegmentSize = segmentSize
topic := "test-message-segments"
s.kfk.CreateTopic(topic)

// publish random data
expected := s.publishRandomData(topic, numTests, testBytesSize)

// gather the published data segments
var msgs []*sarama.ConsumerMessage
s.subscribeData(topic, "test-group-id", totalSegments, func(message *sarama.ConsumerMessage) error {
msgs = append(msgs, message)
return nil
})

// check the data segments are correctly inserted with the order
s.Equal(totalSegments, len(msgs))
var actual []byte
for idx, msg := range msgs {
actual = append(actual, msg.Value...)
s.Equal(uint64(totalSegments), binary.BigEndian.Uint64(msg.Headers[MsgIdxTotalSegments].Value))
s.Equal(uint64(idx), binary.BigEndian.Uint64(msg.Headers[MsgIdxSegmentIdx].Value))
checksum := md5.Sum(msg.Value)
s.Equal(checksum[:], msg.Headers[MsgIdxCheckSum].Value)
s.Equal(topic, msg.Topic)
}

// check the result after resembling the segments
var d *kafkaData
json.Unmarshal(actual, &d)
s.Equal(expected, []*kafkaData{d})
}

func (s *KafkaSuite) TestKafka_Consumer_AddTopicAndHandler() {
consumer, err := NewConsumer(s.kfk.config, "test-group-id")
s.NoError(err)
Expand Down