forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deliveryclient.go
346 lines (309 loc) · 12.2 KB
/
deliveryclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package deliverclient
import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/deliverservice/blocksprovider"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/spf13/viper"
"google.golang.org/grpc"
)
var logger = flogging.MustGetLogger("deliveryClient")
const (
defaultReConnectTotalTimeThreshold = time.Second * 60 * 60
defaultConnectionTimeout = time.Second * 3
defaultReConnectBackoffThreshold = time.Hour
)
func getReConnectTotalTimeThreshold() time.Duration {
return util.GetDurationOrDefault("peer.deliveryclient.reconnectTotalTimeThreshold", defaultReConnectTotalTimeThreshold)
}
func getConnectionTimeout() time.Duration {
return util.GetDurationOrDefault("peer.deliveryclient.connTimeout", defaultConnectionTimeout)
}
func getReConnectBackoffThreshold() time.Duration {
return util.GetDurationOrDefault("peer.deliveryclient.reConnectBackoffThreshold", defaultReConnectBackoffThreshold)
}
func staticRootsEnabled() bool {
return viper.GetBool("peer.deliveryclient.staticRootsEnabled")
}
// DeliverService used to communicate with orderers to obtain
// new blocks and send them to the committer service
type DeliverService interface {
// StartDeliverForChannel dynamically starts delivery of new blocks from ordering service
// to channel peers.
// When the delivery finishes, the finalizer func is called
StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error
// StopDeliverForChannel dynamically stops delivery of new blocks from ordering service
// to channel peers.
StopDeliverForChannel(chainID string) error
// UpdateEndpoints updates the ordering endpoints for the given chain.
UpdateEndpoints(chainID string, connCriteria ConnectionCriteria) error
// Stop terminates delivery service and closes the connection
Stop()
}
// deliverServiceImpl the implementation of the delivery service
// maintains connection to the ordering service and maps of
// blocks providers
type deliverServiceImpl struct {
connConfig ConnectionCriteria
conf *Config
deliverClients map[string]*deliverClient
lock sync.RWMutex
stopping bool
}
type deliverClient struct {
bp blocksprovider.BlocksProvider
bclient *broadcastClient
}
// Config dictates the DeliveryService's properties,
// namely how it connects to an ordering service endpoint,
// how it verifies messages received from it,
// and how it disseminates the messages to other peers
type Config struct {
IsStaticLeader bool
// ConnFactory returns a function that creates a connection to an endpoint
ConnFactory func(channelID string, endpointOverrides map[string]*comm.OrdererEndpoint) func(endpointCriteria comm.EndpointCriteria) (*grpc.ClientConn, error)
// ABCFactory creates an AtomicBroadcastClient out of a connection
ABCFactory func(*grpc.ClientConn) orderer.AtomicBroadcastClient
// CryptoSvc performs cryptographic actions like message verification and signing
// and identity validation
CryptoSvc api.MessageCryptoService
// Gossip enables to enumerate peers in the channel, send a message to peers,
// and add a block to the gossip state transfer layer
Gossip blocksprovider.GossipServiceAdapter
}
// ConnectionCriteria defines how to connect to ordering service nodes.
type ConnectionCriteria struct {
// Endpoints specifies the endpoints of the ordering service.
OrdererEndpoints []string
// Organizations denotes a list of organizations
Organizations []string
// OrdererEndpointsByOrg specifies the endpoints of the ordering service grouped by orgs.
OrdererEndpointsByOrg map[string][]string
// OrdererEndpointOverrides specifies a map of endpoints to override. The map
// key is the configured endpoint address to match and the value is the
// endpoint to use instead.
OrdererEndpointOverrides map[string]*comm.OrdererEndpoint
}
func (cc ConnectionCriteria) toEndpointCriteria() []comm.EndpointCriteria {
var res []comm.EndpointCriteria
// Iterate over per org criteria
for _, org := range cc.Organizations {
endpoints := cc.OrdererEndpointsByOrg[org]
if len(endpoints) == 0 {
// No endpoints for that org
continue
}
for _, endpoint := range endpoints {
// check if we need to override the endpoint
if override, ok := cc.OrdererEndpointOverrides[endpoint]; ok {
endpoint = override.Address
}
res = append(res, comm.EndpointCriteria{
Organizations: []string{org},
Endpoint: endpoint,
})
}
}
// If we have some per organization endpoint, don't continue further.
if len(res) > 0 {
return res
}
for _, endpoint := range cc.OrdererEndpoints {
// check if we need to override the endpoint
if override, ok := cc.OrdererEndpointOverrides[endpoint]; ok {
endpoint = override.Address
}
res = append(res, comm.EndpointCriteria{
Organizations: cc.Organizations,
Endpoint: endpoint,
})
}
return res
}
// NewDeliverService construction function to create and initialize
// delivery service instance. It tries to establish connection to
// the specified in the configuration ordering service, in case it
// fails to dial to it, return nil
func NewDeliverService(conf *Config, connConfig ConnectionCriteria) (*deliverServiceImpl, error) {
ds := &deliverServiceImpl{
connConfig: connConfig,
conf: conf,
deliverClients: make(map[string]*deliverClient),
}
if err := ds.validateConfiguration(); err != nil {
return nil, err
}
return ds, nil
}
func (d *deliverServiceImpl) UpdateEndpoints(chainID string, connCriteria ConnectionCriteria) error {
d.lock.RLock()
defer d.lock.RUnlock()
// update the overrides
connCriteria.OrdererEndpointOverrides = d.connConfig.OrdererEndpointOverrides
// Use chainID to obtain blocks provider and pass endpoints
// for update
if dc, ok := d.deliverClients[chainID]; ok {
// We have found specified channel so we can safely update it
dc.bclient.UpdateEndpoints(connCriteria.toEndpointCriteria())
return nil
}
return errors.New(fmt.Sprintf("Channel with %s id was not found", chainID))
}
func (d *deliverServiceImpl) validateConfiguration() error {
if d.conf.Gossip == nil {
return errors.New("no gossip provider specified")
}
if d.conf.ABCFactory == nil {
return errors.New("no AtomicBroadcast factory specified")
}
if d.conf.ConnFactory == nil {
return errors.New("no connection factory specified")
}
if d.conf.CryptoSvc == nil {
return errors.New("no crypto service specified")
}
if len(d.connConfig.OrdererEndpoints) == 0 && len(d.connConfig.OrdererEndpointsByOrg) == 0 {
return errors.New("no endpoints specified")
}
return nil
}
// StartDeliverForChannel starts blocks delivery for channel
// initializes the grpc stream for given chainID, creates blocks provider instance
// that spawns in go routine to read new blocks starting from the position provided by ledger
// info instance.
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if _, exist := d.deliverClients[chainID]; exist {
errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
} else {
client := d.newClient(chainID, ledgerInfo)
logger.Info("This peer will retrieve blocks from ordering service and disseminate to other peers in the organization for channel", chainID)
d.deliverClients[chainID] = &deliverClient{
bp: blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc),
bclient: client,
}
go d.launchBlockProvider(chainID, finalizer)
}
return nil
}
func (d *deliverServiceImpl) launchBlockProvider(chainID string, finalizer func()) {
d.lock.RLock()
dc := d.deliverClients[chainID]
d.lock.RUnlock()
if dc == nil {
logger.Info("Block delivery for channel", chainID, "was stopped before block provider started")
return
}
dc.bp.DeliverBlocks()
finalizer()
}
// StopDeliverForChannel stops blocks delivery for channel by stopping channel block provider
func (d *deliverServiceImpl) StopDeliverForChannel(chainID string) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.stopping {
errMsg := fmt.Sprintf("Delivery service is stopping, cannot stop delivery for channel %s", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
if dc, exist := d.deliverClients[chainID]; exist {
dc.bp.Stop()
delete(d.deliverClients, chainID)
logger.Debug("This peer will stop pass blocks from orderer service to other peers")
} else {
errMsg := fmt.Sprintf("Delivery service - no block provider for %s found, can't stop delivery", chainID)
logger.Errorf(errMsg)
return errors.New(errMsg)
}
return nil
}
// Stop all service and release resources
func (d *deliverServiceImpl) Stop() {
d.lock.Lock()
defer d.lock.Unlock()
// Marking flag to indicate the shutdown of the delivery service
d.stopping = true
for _, dc := range d.deliverClients {
dc.bp.Stop()
}
}
func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
reconnectBackoffThreshold := getReConnectBackoffThreshold()
reconnectTotalTimeThreshold := getReConnectTotalTimeThreshold()
requester := &blocksRequester{
tls: viper.GetBool("peer.tls.enabled"),
chainID: chainID,
}
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider)
}
backoffPolicy := func(attemptNum int, elapsedTime time.Duration) (time.Duration, bool) {
if elapsedTime >= reconnectTotalTimeThreshold {
if !d.conf.IsStaticLeader {
return 0, false
}
logger.Warning("peer is a static leader, ignoring peer.deliveryclient.reconnectTotalTimeThreshold")
}
sleepIncrement := float64(time.Millisecond * 500)
attempt := float64(attemptNum)
return time.Duration(math.Min(math.Pow(2, attempt)*sleepIncrement, float64(reconnectBackoffThreshold))), true
}
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID, d.connConfig.OrdererEndpointOverrides), d.connConfig.toEndpointCriteria())
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient
return bClient
}
func DefaultConnectionFactory(channelID string, endpointOverrides map[string]*comm.OrdererEndpoint) func(endpointCriteria comm.EndpointCriteria) (*grpc.ClientConn, error) {
return func(criteria comm.EndpointCriteria) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{grpc.WithBlock()}
// set max send/recv msg sizes
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize)))
// set the keepalive options
kaOpts := comm.DefaultKeepaliveOptions
if viper.IsSet("peer.keepalive.deliveryClient.interval") {
kaOpts.ClientInterval = viper.GetDuration(
"peer.keepalive.deliveryClient.interval")
}
if viper.IsSet("peer.keepalive.deliveryClient.timeout") {
kaOpts.ClientTimeout = viper.GetDuration(
"peer.keepalive.deliveryClient.timeout")
}
dialOpts = append(dialOpts, comm.ClientKeepaliveOptions(kaOpts)...)
if viper.GetBool("peer.tls.enabled") {
creds, err := comm.GetCredentialSupport().GetDeliverServiceCredentials(channelID, staticRootsEnabled(), criteria.Organizations, endpointOverrides)
if err != nil {
return nil, fmt.Errorf("failed obtaining credentials for channel %s: %v", channelID, err)
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
ctx, cancel := context.WithTimeout(context.Background(), getConnectionTimeout())
defer cancel()
return grpc.DialContext(ctx, criteria.Endpoint, dialOpts...)
}
}
func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
return orderer.NewAtomicBroadcastClient(conn)
}