Skip to content

Commit

Permalink
Implmeneted data split for kafka message
Browse files Browse the repository at this point in the history
  • Loading branch information
jeongkyun-oh committed Oct 14, 2020
1 parent 4b49582 commit 7672dfa
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 7 deletions.
1 change: 1 addition & 0 deletions cmd/utils/flaggroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ var FlagGroups = []FlagGroup{
ChainDataFetcherKafkaReplicasFlag,
ChainDataFetcherKafkaPartitionsFlag,
ChainDataFetcherKafkaMaxMessageBytesFlag,
ChainDataFetcherKafkaSegmentSizeFlag,
ChainDataFetcherKafkaRequiredAcksFlag,
},
},
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,11 @@ var (
Usage: "The max size of a message produced by Kafka producer ",
Value: kafka.DefaultMaxMessageBytes,
}
ChainDataFetcherKafkaSegmentSizeFlag = cli.IntFlag{
Name: "chaindatafetcher.kafka.segment.size",
Usage: "The kafka data segment size",
Value: kafka.DefaultSegmentSize,
}
ChainDataFetcherKafkaRequiredAcksFlag = cli.IntFlag{
Name: "chaindatafetcher.kafka.required.acks",
Usage: "The level of acknowledgement reliability needed from Kafka broker (0: NoResponse, 1: WaitForLocal, -1: WaitForAll)",
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/nodecmd/dumpconfigcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func makeKafkaConfig(ctx *cli.Context) *kafka.KafkaConfig {
kafkaConfig.Partitions = int32(ctx.GlobalInt64(utils.ChainDataFetcherKafkaPartitionsFlag.Name))
kafkaConfig.Replicas = int16(ctx.GlobalInt64(utils.ChainDataFetcherKafkaReplicasFlag.Name))
kafkaConfig.SaramaConfig.Producer.MaxMessageBytes = ctx.GlobalInt(utils.ChainDataFetcherKafkaMaxMessageBytesFlag.Name)
kafkaConfig.SegmentSize = ctx.GlobalInt(utils.ChainDataFetcherKafkaSegmentSizeFlag.Name)
requiredAcks := sarama.RequiredAcks(ctx.GlobalInt(utils.ChainDataFetcherKafkaRequiredAcksFlag.Name))
if requiredAcks != sarama.NoResponse && requiredAcks != sarama.WaitForLocal && requiredAcks != sarama.WaitForAll {
logger.Crit("not supported requiredAcks. it must be NoResponse(0), WaitForLocal(1), or WaitForAll(-1)", "given", requiredAcks)
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/nodecmd/nodeflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ var KENFlags = []cli.Flag{
utils.ChainDataFetcherKafkaTopicResourceFlag,
utils.ChainDataFetcherKafkaTopicEnvironmentFlag,
utils.ChainDataFetcherKafkaMaxMessageBytesFlag,
utils.ChainDataFetcherKafkaSegmentSizeFlag,
utils.ChainDataFetcherKafkaRequiredAcksFlag,
// DBSyncer
utils.EnableDBSyncerFlag,
Expand Down
7 changes: 5 additions & 2 deletions datasync/chaindatafetcher/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
DefaultTopicResourceName = "en-0"
DefaultMaxMessageBytes = 1000000
DefaultRequiredAcks = 1
DefaultSegmentSize = 1000000 // 1 MB
)

type KafkaConfig struct {
Expand All @@ -49,6 +50,7 @@ type KafkaConfig struct {
TopicResourceName string
Partitions int32 // Partitions is the number of partitions of a topic.
Replicas int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster.
SegmentSize int // SegmentSize is the size of kafka message segment
}

func GetDefaultKafkaConfig() *KafkaConfig {
Expand All @@ -66,6 +68,7 @@ func GetDefaultKafkaConfig() *KafkaConfig {
TopicResourceName: DefaultTopicResourceName,
Partitions: DefaultPartitions,
Replicas: DefaultReplicas,
SegmentSize: DefaultSegmentSize,
}
}

Expand All @@ -74,6 +77,6 @@ func (c *KafkaConfig) getTopicName(event string) string {
}

func (c *KafkaConfig) String() string {
return fmt.Sprintf("brokers: %v, topicEnvironment: %v, topicResourceName: %v, partitions: %v, replicas: %v, maxMessageBytes: %v, requiredAcks: %v",
c.Brokers, c.TopicEnvironmentName, c.TopicResourceName, c.Partitions, c.Replicas, c.SaramaConfig.Producer.MaxMessageBytes, c.SaramaConfig.Producer.RequiredAcks)
return fmt.Sprintf("brokers: %v, topicEnvironment: %v, topicResourceName: %v, partitions: %v, replicas: %v, maxMessageBytes: %v, requiredAcks: %v, segmentSize: %v",
c.Brokers, c.TopicEnvironmentName, c.TopicResourceName, c.Partitions, c.Replicas, c.SaramaConfig.Producer.MaxMessageBytes, c.SaramaConfig.Producer.RequiredAcks, c.SegmentSize)
}
59 changes: 54 additions & 5 deletions datasync/chaindatafetcher/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka

import (
"crypto/md5"
"encoding/json"

"github.com/Shopify/sarama"
Expand All @@ -25,6 +26,18 @@ import (

var logger = log.NewModuleLogger(log.ChainDataFetcher)

const (
MsgIdxTotalSegments = iota
MsgIdxSegmentIdx
MsgIdxCheckSum
)

const (
KeyTotalSegments = "totalSegments"
KeySegmentIdx = "segmentIdx"
KeyCheckSum = "checksum"
)

// Kafka connects to the brokers in an existing kafka cluster.
type Kafka struct {
config *KafkaConfig
Expand Down Expand Up @@ -76,17 +89,53 @@ func (k *Kafka) ListTopics() (map[string]sarama.TopicDetail, error) {
return k.admin.ListTopics()
}

func (k *Kafka) split(data []byte) ([][]byte, int) {
size := k.config.SegmentSize
var segments [][]byte
for len(data) > size {
segments = append(segments, data[:size])
data = data[size:]
}
segments = append(segments, data)
return segments, len(segments)
}

func (k *Kafka) makeProducerMessage(topic string, segment []byte, segmentIdx, totalSegments uint64) *sarama.ProducerMessage {
checkSum := md5.Sum(segment)
return &sarama.ProducerMessage{
Topic: topic,
Headers: []sarama.RecordHeader{
{
Key: []byte(KeyTotalSegments),
Value: intToBytes(totalSegments),
},
{
Key: []byte(KeySegmentIdx),
Value: intToBytes(segmentIdx),
},
{
Key: []byte(KeyCheckSum),
Value: checkSum[:],
},
},
Value: sarama.ByteEncoder(segment),
}
}

func (k *Kafka) Publish(topic string, msg interface{}) error {
data, err := json.Marshal(msg)
if err != nil {
return err
}

item := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(data),
segments, totalSegments := k.split(data)
for idx, segment := range segments {
item := k.makeProducerMessage(topic, segment, uint64(idx), uint64(totalSegments))
_, _, err = k.producer.SendMessage(item)
if err != nil {
logger.Error("sending kafka message is failed", "err", err, "segmentIdx", idx, "segment", string(segment))
return err
}
}

_, _, err = k.producer.SendMessage(item)
return err
}
84 changes: 84 additions & 0 deletions datasync/chaindatafetcher/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ package kafka

import (
"context"
"crypto/md5"
"encoding/json"
"math/rand"
"reflect"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -59,6 +62,43 @@ func (s *KafkaSuite) TearDownTest() {
s.kfk.Close()
}

func (s *KafkaSuite) TestKafka_split() {
segmentSize := 3
s.kfk.config.SegmentSize = segmentSize

bytes := common.MakeRandomBytes(segmentSize - 1)
parts, size := s.kfk.split(bytes)
s.Equal(bytes, parts[0])
s.Equal(1, size)

bytes = common.MakeRandomBytes(segmentSize)
parts, size = s.kfk.split(bytes)
s.Equal(bytes, parts[0])
s.Equal(1, size)

bytes = common.MakeRandomBytes(2*segmentSize + 2)
parts, size = s.kfk.split(bytes)
s.Equal(bytes[:segmentSize], parts[0])
s.Equal(bytes[segmentSize:2*segmentSize], parts[1])
s.Equal(bytes[2*segmentSize:], parts[2])
s.Equal(3, size)
}

func (s *KafkaSuite) TestKafka_makeProducerMessage() {
data := common.MakeRandomBytes(100)
checksum := md5.Sum(data)
rand.Seed(time.Now().UnixNano())
totalSegments := rand.Uint64()
idx := rand.Uint64() % totalSegments
msg := s.kfk.makeProducerMessage(s.topic, data, idx, totalSegments)

s.Equal(s.topic, msg.Topic)
s.Equal(sarama.ByteEncoder(data), msg.Value)
s.Equal(totalSegments, bytesToInt(msg.Headers[MsgIdxTotalSegments].Value))
s.Equal(idx, bytesToInt(msg.Headers[MsgIdxSegmentIdx].Value))
s.Equal(checksum[:], msg.Headers[MsgIdxCheckSum].Value)
}

func (s *KafkaSuite) TestKafka_CreateAndDeleteTopic() {
// no topic to be deleted
err := s.kfk.DeleteTopic(s.topic)
Expand Down Expand Up @@ -252,6 +292,50 @@ func (s *KafkaSuite) TestKafka_PubSubWith2DifferentGroups() {
s.Equal(expected, actual2)
}

func (s *KafkaSuite) TestKafka_PubSubWithSegments() {
numTests := 1
testBytesSize := 10
segmentSize := 3

// to calculate data length
data, _ := json.Marshal(&kafkaData{common.MakeRandomBytes(testBytesSize)})
totalSegments := len(data) / segmentSize

s.kfk.config.SegmentSize = segmentSize
topic := "test-message-segments"
s.kfk.CreateTopic(topic)

// publish random data
expected := s.publishRandomData(topic, numTests, testBytesSize)

var msgs []*sarama.ConsumerMessage
s.subscribeData(topic, "test-group-id", totalSegments, func(message *sarama.ConsumerMessage) error {
msgs = append(msgs, message)
return nil
})

s.Equal(totalSegments, len(msgs))
sort.SliceStable(msgs, func(i, j int) bool {
segment1Idx := bytesToInt(msgs[i].Headers[1].Value)
segment2Idx := bytesToInt(msgs[j].Headers[1].Value)
return segment1Idx < segment2Idx
})

var actual []byte
for idx, msg := range msgs {
actual = append(actual, msg.Value...)
s.Equal(uint64(totalSegments), bytesToInt(msg.Headers[MsgIdxTotalSegments].Value))
s.Equal(uint64(idx), bytesToInt(msg.Headers[MsgIdxSegmentIdx].Value))
checksum := md5.Sum(msg.Value)
s.Equal(checksum[:], msg.Headers[MsgIdxCheckSum].Value)
s.Equal(topic, msg.Topic)
}

var d *kafkaData
json.Unmarshal(actual, &d)
s.Equal(expected, []*kafkaData{d})
}

func (s *KafkaSuite) TestKafka_Consumer_AddTopicAndHandler() {
consumer, err := NewConsumer(s.kfk.config, "test-group-id")
s.NoError(err)
Expand Down
11 changes: 11 additions & 0 deletions datasync/chaindatafetcher/kafka/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kafka

import (
"encoding/binary"
"strings"

klaytnApi "github.com/klaytn/klaytn/api"
Expand Down Expand Up @@ -96,3 +97,13 @@ func makeBlockGroupOutput(blockchain *blockchain.BlockChain, block *types.Block,
r["transactions"] = rpcTransactions
return r
}

func intToBytes(num uint64) []byte {
buffer := make([]byte, 8)
binary.BigEndian.PutUint64(buffer, num)
return buffer
}

func bytesToInt(bytes []byte) uint64 {
return binary.BigEndian.Uint64(bytes)
}

0 comments on commit 7672dfa

Please sign in to comment.