/
config.go
87 lines (77 loc) · 3.43 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright 2020 The klaytn Authors
// This file is part of the klaytn library.
//
// The klaytn library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The klaytn library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the klaytn library. If not, see <http://www.gnu.org/licenses/>.
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
const (
EventBlockGroup = "blockgroup"
EventTraceGroup = "tracegroup"
)
const (
topicProjectName = "klaytn"
topicServiceName = "chaindatafetcher"
topicVersion = "v1"
)
const (
DefaultReplicas = 1
DefaultPartitions = 1
DefaultTopicEnvironmentName = "local"
DefaultTopicResourceName = "en-0"
DefaultMaxMessageBytes = 1000000
DefaultRequiredAcks = 1
DefaultSegmentSizeBytes = 1000000 // 1 MB
DefaultMaxMessageNumber = 100 // max number of messages in buffer
)
type KafkaConfig struct {
SaramaConfig *sarama.Config // kafka client configurations.
Brokers []string // Brokers is a list of broker URLs.
TopicEnvironmentName string
TopicResourceName string
Partitions int32 // Partitions is the number of partitions of a topic.
Replicas int16 // Replicas is a replication factor of kafka settings. This is the number of the replicated partitions in the kafka cluster.
SegmentSizeBytes int // SegmentSizeBytes is the size of kafka message segment
// (number of partitions) * (average size of segments) * buffer size should not be greater than memory size.
// default max number of messages is 100
MaxMessageNumber int // MaxMessageNumber is the maximum number of consumer messages.
}
func GetDefaultKafkaConfig() *KafkaConfig {
// TODO-ChainDataFetcher add more configuration if necessary
config := sarama.NewConfig()
// The following configurations should be true
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config.Version = sarama.MaxVersion
config.Producer.MaxMessageBytes = DefaultMaxMessageBytes
config.Producer.RequiredAcks = sarama.RequiredAcks(DefaultRequiredAcks)
return &KafkaConfig{
SaramaConfig: config,
TopicEnvironmentName: DefaultTopicEnvironmentName,
TopicResourceName: DefaultTopicResourceName,
Partitions: DefaultPartitions,
Replicas: DefaultReplicas,
SegmentSizeBytes: DefaultSegmentSizeBytes,
MaxMessageNumber: DefaultMaxMessageNumber,
}
}
func (c *KafkaConfig) GetTopicName(event string) string {
return fmt.Sprintf("%v.%v.%v.%v.%v.%v", c.TopicEnvironmentName, topicProjectName, topicServiceName, c.TopicResourceName, event, topicVersion)
}
func (c *KafkaConfig) String() string {
return fmt.Sprintf("brokers: %v, topicEnvironment: %v, topicResourceName: %v, partitions: %v, replicas: %v, maxMessageBytes: %v, requiredAcks: %v, segmentSize: %v",
c.Brokers, c.TopicEnvironmentName, c.TopicResourceName, c.Partitions, c.Replicas, c.SaramaConfig.Producer.MaxMessageBytes, c.SaramaConfig.Producer.RequiredAcks, c.SegmentSizeBytes)
}