forked from hyperledger/fabric-sdk-go
/
dynamicselection.go
200 lines (166 loc) · 6.23 KB
/
dynamicselection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package dynamicselection
import (
"fmt"
"time"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazycache"
"github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref"
copts "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/pkg/errors"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/dynamicselection/pgresolver"
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
)
const defaultCacheTimeout = 30 * time.Minute
// Opt applies a selection provider option
type Opt func(*SelectionService)
// WithLoadBalancePolicy sets the load-balance policy
func WithLoadBalancePolicy(lbp pgresolver.LoadBalancePolicy) Opt {
return func(s *SelectionService) {
s.pgLBP = lbp
}
}
// WithCacheTimeout sets the expiration timeout of the cache
func WithCacheTimeout(timeout time.Duration) Opt {
return func(s *SelectionService) {
s.cacheTimeout = timeout
}
}
// SelectionService chooses endorsing peers for a given set of chaincodes using their chaincode policy
type SelectionService struct {
channelID string
pgResolvers *lazycache.Cache
pgLBP pgresolver.LoadBalancePolicy
ccPolicyProvider CCPolicyProvider
discoveryService fab.DiscoveryService
cacheTimeout time.Duration
}
type policyProviderFactory func() (CCPolicyProvider, error)
// NewService creates a new dynamic selection service
func NewService(context context.Client, channelID string, discovery fab.DiscoveryService, opts ...Opt) (*SelectionService, error) {
return newService(context, channelID, discovery,
func() (CCPolicyProvider, error) {
return newCCPolicyProvider(context, discovery, channelID)
}, opts...)
}
func newService(context context.Client, channelID string, discovery fab.DiscoveryService, factory policyProviderFactory, opts ...Opt) (*SelectionService, error) {
ccPolicyProvider, err := factory()
if err != nil {
return nil, errors.WithMessage(err, "Failed to create cc policy provider")
}
service := &SelectionService{
channelID: channelID,
discoveryService: discovery,
ccPolicyProvider: ccPolicyProvider,
cacheTimeout: defaultCacheTimeout,
pgLBP: pgresolver.NewRandomLBP(),
}
for _, opt := range opts {
opt(service)
}
if service.cacheTimeout == 0 {
service.cacheTimeout = context.EndpointConfig().Timeout(fab.SelectionServiceRefresh)
}
if service.pgLBP == nil {
service.pgLBP = pgresolver.NewRandomLBP()
}
service.pgResolvers = lazycache.New(
"PG_Resolver_Cache",
func(key lazycache.Key) (interface{}, error) {
return service.createPGResolver(key.(*resolverKey))
},
lazyref.WithAbsoluteExpiration(service.cacheTimeout),
)
return service, nil
}
// GetEndorsersForChaincode returns the endorsing peers for the given chaincodes
func (s *SelectionService) GetEndorsersForChaincode(chaincodes []*fab.ChaincodeCall, opts ...copts.Opt) ([]fab.Peer, error) {
if len(chaincodes) == 0 {
return nil, errors.New("no chaincode IDs provided")
}
params := options.NewParams(opts)
var chaincodeIDs []string
for _, cc := range chaincodes {
chaincodeIDs = append(chaincodeIDs, cc.ID)
}
resolver, err := s.getPeerGroupResolver(chaincodeIDs)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("Error getting peer group resolver for chaincodes [%v] on channel [%s]", chaincodeIDs, s.channelID))
}
peers, err := s.discoveryService.GetPeers()
if err != nil {
return nil, err
}
if params.PeerFilter != nil {
var filteredPeers []fab.Peer
for _, peer := range peers {
if params.PeerFilter(peer) {
filteredPeers = append(filteredPeers, peer)
} else {
logger.Debugf("Peer [%s] is not accepted by the filter and therefore peer group will be excluded.", peer.URL())
}
}
peers = filteredPeers
}
if params.PeerSorter != nil {
sortedPeers := make([]fab.Peer, len(peers))
copy(sortedPeers, peers)
peers = params.PeerSorter(sortedPeers)
}
peerGroup, err := resolver.Resolve(peers)
if err != nil {
return nil, err
}
return peerGroup.Peers(), nil
}
// Close closes all resources associated with the service
func (s *SelectionService) Close() {
s.pgResolvers.Close()
}
func (s *SelectionService) getPeerGroupResolver(chaincodeIDs []string) (pgresolver.PeerGroupResolver, error) {
resolver, err := s.pgResolvers.Get(newResolverKey(s.channelID, chaincodeIDs...))
if err != nil {
return nil, err
}
return resolver.(pgresolver.PeerGroupResolver), nil
}
func (s *SelectionService) createPGResolver(key *resolverKey) (pgresolver.PeerGroupResolver, error) {
// Retrieve the signature policies for all of the chaincodes
var policyGroups []pgresolver.GroupRetriever
for _, ccID := range key.chaincodeIDs {
policyGroup, err := s.getPolicyGroupForCC(key.channelID, ccID)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error retrieving signature policy for chaincode [%s] on channel [%s]", ccID, key.channelID))
}
policyGroups = append(policyGroups, policyGroup)
}
// Perform an 'and' operation on all of the peer groups
aggregatePolicyGroupRetriever := func(peerRetriever pgresolver.MSPPeerRetriever) (pgresolver.GroupOfGroups, error) {
var groups []pgresolver.Group
for _, f := range policyGroups {
grps, err := f(peerRetriever)
if err != nil {
return nil, err
}
groups = append(groups, grps)
}
return pgresolver.NewGroupOfGroups(groups).Nof(int32(len(policyGroups)))
}
// Create the resolver
resolver, err := pgresolver.NewPeerGroupResolver(aggregatePolicyGroupRetriever, s.pgLBP)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error creating peer group resolver for chaincodes [%v] on channel [%s]", key.chaincodeIDs, key.channelID))
}
return resolver, nil
}
func (s *SelectionService) getPolicyGroupForCC(channelID string, ccID string) (pgresolver.GroupRetriever, error) {
sigPolicyEnv, err := s.ccPolicyProvider.GetChaincodePolicy(ccID)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error querying chaincode [%s] on channel [%s]", ccID, channelID))
}
return pgresolver.CompileSignaturePolicy(sigPolicyEnv)
}