/
broadcast.go
229 lines (191 loc) · 8.47 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package broadcast
import (
"io"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/pkg/errors"
)
var logger = flogging.MustGetLogger("orderer.common.broadcast")
//go:generate counterfeiter -o mock/channel_support_registrar.go --fake-name ChannelSupportRegistrar . ChannelSupportRegistrar
// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a channel
type ChannelSupportRegistrar interface {
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error)
}
//go:generate counterfeiter -o mock/channel_support.go --fake-name ChannelSupport . ChannelSupport
// ChannelSupport provides the backing resources needed to support broadcast on a channel
type ChannelSupport interface {
msgprocessor.Processor
Consenter
}
// Consenter provides methods to send messages through consensus
type Consenter interface {
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(config *cb.Envelope, configSeq uint64) error
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
// This is useful when consenter needs to temporarily block ingress messages so
// that in-flight messages can be consumed. It could return error if consenter is
// in erroneous states. If this blocking behavior is not desired, consenter could
// simply return nil.
WaitReady() error
}
// Handler is designed to handle connections from Broadcast AB gRPC service
type Handler struct {
SupportRegistrar ChannelSupportRegistrar
Metrics *Metrics
}
// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
// 处理接收到的 Broadcast 信息
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context()) // 获取消息源地址
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv() // 接收消息
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
resp := bh.ProcessMessage(msg, addr) // 处理接收到的消息
err = srv.Send(resp) // 将响应信息广播出去
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
type MetricsTracker struct {
ValidateStartTime time.Time
EnqueueStartTime time.Time
ValidateDuration time.Duration
ChannelID string
TxType string
Metrics *Metrics
}
func (mt *MetricsTracker) Record(resp *ab.BroadcastResponse) {
labels := []string{
"status", resp.Status.String(),
"channel", mt.ChannelID,
"type", mt.TxType,
}
if mt.ValidateDuration == 0 {
mt.EndValidate()
}
mt.Metrics.ValidateDuration.With(labels...).Observe(mt.ValidateDuration.Seconds())
if mt.EnqueueStartTime != (time.Time{}) {
enqueueDuration := time.Since(mt.EnqueueStartTime)
mt.Metrics.EnqueueDuration.With(labels...).Observe(enqueueDuration.Seconds())
}
mt.Metrics.ProcessedCount.With(labels...).Add(1)
}
func (mt *MetricsTracker) BeginValidate() {
mt.ValidateStartTime = time.Now()
}
func (mt *MetricsTracker) EndValidate() {
mt.ValidateDuration = time.Since(mt.ValidateStartTime)
}
func (mt *MetricsTracker) BeginEnqueue() {
mt.EnqueueStartTime = time.Now()
}
// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
tracker := &MetricsTracker{ //这个结构体应该理解为记录器,记录消息的相关信息
ChannelID: "unknown",
TxType: "unknown",
Metrics: bh.Metrics,
}
defer func() {
// This looks a little unnecessary, but if done directly as
// a defer, resp gets the (always nil) current state of resp
// and not the return value
tracker.Record(resp)
}()
tracker.BeginValidate() // 记录处理消息的开始时间
// 该方法获取接收到的消息的Header,并判断是否为配置信息
chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
if chdr != nil {
tracker.ChannelID = chdr.ChannelId
tracker.TxType = cb.HeaderType(chdr.Type).String()
}
if err != nil {
logger.Warningf("[channel: %s] Could not get message processor for serving %s: %s", tracker.ChannelID, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()}
}
// peer 发来的消息为非配置类消息时
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate()
tracker.BeginEnqueue()
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s with SERVICE_UNAVAILABLE: rejected by Order: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
} else { // isConfig 比如发来的是 create 通道的消息,所以消息类型为配置信息,就会进入到这里
logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
// 对配置更新消息进行处理,主要方法
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
tracker.EndValidate() //记录消息处理完毕时间
tracker.BeginEnqueue() //开始进行入队操作
//waitReady()是一个阻塞方法,等待入队完成或出现异常
if err = processor.WaitReady(); err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of message from %s with SERVICE_UNAVAILABLE: rejected by Consenter: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
//共识方法,具体看定义的Fabric网络使用了哪种共识
err = processor.Configure(config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message from %s with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()}
}
}
logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)
//最后返回操作成功的响应
return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}
// ClassifyError converts an error type into a status code.
func ClassifyError(err error) cb.Status {
switch errors.Cause(err) {
case msgprocessor.ErrChannelDoesNotExist:
return cb.Status_NOT_FOUND
case msgprocessor.ErrPermissionDenied:
return cb.Status_FORBIDDEN
case msgprocessor.ErrMaintenanceMode:
return cb.Status_SERVICE_UNAVAILABLE
default:
return cb.Status_BAD_REQUEST
}
}