/
chservice.go
139 lines (117 loc) · 4.33 KB
/
chservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package dynamicdiscovery
import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/random"
coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context"
fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
"github.com/pkg/errors"
)
// ChannelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
type ChannelService struct {
*service
channelID string
membership fab.ChannelMembership
}
// NewChannelService creates a Discovery Service to query the list of member peers on a given channel.
func NewChannelService(ctx contextAPI.Client, membership fab.ChannelMembership, channelID string, opts ...coptions.Opt) (*ChannelService, error) {
logger.Debug("Creating new dynamic discovery service")
s := &ChannelService{
channelID: channelID,
membership: membership,
}
s.service = newService(ctx.EndpointConfig(), s.queryPeers, opts...)
err := s.service.initialize(ctx)
if err != nil {
return nil, err
}
return s, nil
}
// Close releases resources
func (s *ChannelService) Close() {
logger.Debugf("Closing discovery service for channel [%s]", s.channelID)
s.service.Close()
}
func (s *ChannelService) queryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelID)
ctx := s.context()
targets, err := s.getTargets(ctx)
if err != nil {
return nil, err
}
if len(targets) == 0 {
return nil, errors.Errorf("no peers configured for channel [%s]", s.channelID)
}
reqCtx, cancel := reqContext.NewRequest(ctx, reqContext.WithTimeout(s.responseTimeout))
defer cancel()
req := discclient.NewRequest().OfChannel(s.channelID).AddPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, targets...)
if err != nil {
if len(responses) == 0 {
return nil, errors.Wrapf(err, "error calling discover service send")
}
logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err)
}
return s.evaluate(ctx, responses)
}
func (s *ChannelService) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, error) {
chPeers := ctx.EndpointConfig().ChannelPeers(s.channelID)
if len(chPeers) == 0 {
return nil, errors.Errorf("no channel peers configured for channel [%s]", s.channelID)
}
chConfig := ctx.EndpointConfig().ChannelConfig(s.channelID)
//pick number of peers given in channel policy
return random.PickRandomNPeerConfigs(chPeers, chConfig.Policies.Discovery.MaxTargets), nil
}
// evaluate validates the responses and returns the peers
func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscovery.Response) ([]fab.Peer, error) {
if len(responses) == 0 {
return nil, errors.New("no successful response received from any peer")
}
// TODO: In a future patch:
// - validate the signatures in the responses
// For now just pick the first successful response
var lastErr error
for _, response := range responses {
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = errors.Wrap(err, "error getting peers from discovery response")
logger.Warn(lastErr.Error())
continue
}
return s.asPeers(ctx, endpoints), nil
}
return nil, lastErr
}
func (s *ChannelService) asPeers(ctx contextAPI.Client, endpoints []*discclient.Peer) []fab.Peer {
var peers []fab.Peer
for _, endpoint := range endpoints {
peer, ok := asPeer(ctx, endpoint)
if !ok {
continue
}
//check if cache is updated with tlscert if this is a new org joined and membership is not done yet updating cache
if s.membership.ContainsMSP(peer.MSPID()) {
peers = append(peers, &peerEndpoint{
Peer: peer,
blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight,
})
}
}
return peers
}
type peerEndpoint struct {
fab.Peer
blockHeight uint64
}
func (p *peerEndpoint) BlockHeight() uint64 {
return p.blockHeight
}