/
consenter.go
104 lines (86 loc) · 3.07 KB
/
consenter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//<developer>
// <name>linapex 曹一峰</name>
// <email>linapex@163.com</email>
// <wx>superexc</wx>
// <qqgroup>128148617</qqgroup>
// <url>https://jsq.ink</url>
// <role>pku engineer</role>
// <date>2019-03-16 19:40:30</date>
//</624456100380413952>
/*
版权所有IBM公司。保留所有权利。
SPDX许可证标识符:Apache-2.0
**/
package kafka
import (
"github.com/hyperledger/fabric/common/metrics"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/consensus"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/Shopify/sarama"
logging "github.com/op/go-logging"
)
//New创建了一个基于Kafka的同意者。由订购方的main.go调用。
func New(config localconfig.Kafka, metricsProvider metrics.Provider) (consensus.Consenter, *Metrics) {
if config.Verbose {
logging.SetLevel(logging.DEBUG, "orderer.consensus.kafka.sarama")
}
brokerConfig := newBrokerConfig(
config.TLS,
config.SASLPlain,
config.Retry,
config.Version,
defaultPartition)
return &consenterImpl{
brokerConfigVal: brokerConfig,
tlsConfigVal: config.TLS,
retryOptionsVal: config.Retry,
kafkaVersionVal: config.Version,
topicDetailVal: &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: config.Topic.ReplicationFactor,
},
}, NewMetrics(metricsProvider, brokerConfig.MetricRegistry)
}
//ConsenterImpl持有满足
//协商一致。同意人界面——根据handlechain合同的要求——以及
//共同同意者一。
type consenterImpl struct {
brokerConfigVal *sarama.Config
tlsConfigVal localconfig.TLS
retryOptionsVal localconfig.Retry
kafkaVersionVal sarama.KafkaVersion
topicDetailVal *sarama.TopicDetail
metricsProvider metrics.Provider
}
//handlechain创建/返回对
//给定的一组支持资源。执行协商一致。同意人
//接口。由共识调用。newChainSupport(),它本身由
//在分类帐目录中查找时使用multichannel.newManagerImpl()。
//存在的枷锁。
func (consenter *consenterImpl) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset := getOffsets(metadata.Value, support.ChainID())
return newChain(consenter, support, lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset)
}
//CommonConsent允许我们检索
//同意人反对。这些将在所有由
//这个同意者。它们使用本地配置设置进行设置。这个
//同意模板满足接口要求。
type commonConsenter interface {
brokerConfig() *sarama.Config
retryOptions() localconfig.Retry
topicDetail() *sarama.TopicDetail
}
func (consenter *consenterImpl) brokerConfig() *sarama.Config {
return consenter.brokerConfigVal
}
func (consenter *consenterImpl) retryOptions() localconfig.Retry {
return consenter.retryOptionsVal
}
func (consenter *consenterImpl) topicDetail() *sarama.TopicDetail {
return consenter.topicDetailVal
}
//Closeable允许关闭调用资源。
type closeable interface {
close() error
}