-
Notifications
You must be signed in to change notification settings - Fork 0
/
authcache.go
196 lines (169 loc) · 5.18 KB
/
authcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package discovery
import (
"encoding/asn1"
"encoding/hex"
"sync"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)
const (
defaultMaxCacheSize = 1000
defaultRetentionRatio = 0.75
)
var (
// asBytes is the function that is used to marshal common.SignedData to bytes
asBytes = asn1.Marshal
)
type acSupport interface {
// Eligible returns whether the given peer is eligible for receiving
// service from the discovery service for a given channel
EligibleForService(channel string, data common.SignedData) error
// ConfigSequence returns the configuration sequence of the given channel
ConfigSequence(channel string) uint64
}
type authCacheConfig struct {
enabled bool
// maxCacheSize is the maximum size of the cache, after which
// a purge takes place
maxCacheSize int
// purgeRetentionRatio is the % of entries that remain in the cache
// after the cache is purged due to overpopulation
purgeRetentionRatio float64
}
// authCache defines an interface that authenticates a request in a channel context,
// and does memoization of invocations
type authCache struct {
credentialCache map[string]*accessCache
acSupport
sync.RWMutex
conf authCacheConfig
}
func newAuthCache(s acSupport, conf authCacheConfig) *authCache {
return &authCache{
acSupport: s,
credentialCache: make(map[string]*accessCache),
conf: conf,
}
}
// Eligible returns whether the given peer is eligible for receiving
// service from the discovery service for a given channel
func (ac *authCache) EligibleForService(channel string, data common.SignedData) error {
if !ac.conf.enabled {
return ac.acSupport.EligibleForService(channel, data)
}
// Check whether we already have a cache for this channel
ac.RLock()
cache := ac.credentialCache[channel]
ac.RUnlock()
if cache == nil {
// Cache for given channel wasn't found, so create a new one
ac.Lock()
cache = ac.newAccessCache(channel)
// And store the cache instance.
ac.credentialCache[channel] = cache
ac.Unlock()
}
return cache.EligibleForService(data)
}
type accessCache struct {
sync.RWMutex
channel string
ac *authCache
lastSequence uint64
entries map[string]error
}
func (ac *authCache) newAccessCache(channel string) *accessCache {
return &accessCache{
channel: channel,
ac: ac,
entries: make(map[string]error),
}
}
func (cache *accessCache) EligibleForService(data common.SignedData) error {
key, err := signedDataToKey(data)
if err != nil {
logger.Warningf("Failed computing key of signed data: +%v", err)
return errors.Wrap(err, "failed computing key of signed data")
}
currSeq := cache.ac.acSupport.ConfigSequence(cache.channel)
if cache.isValid(currSeq) {
foundInCache, isEligibleErr := cache.lookup(key)
if foundInCache {
return isEligibleErr
}
} else {
cache.configChange(currSeq)
}
// Make sure the cache doesn't overpopulate.
// It might happen that it overgrows the maximum size due to concurrent
// goroutines waiting on the lock above, but that's acceptable.
cache.purgeEntriesIfNeeded()
// Compute the eligibility of the client for the service
err = cache.ac.acSupport.EligibleForService(cache.channel, data)
cache.Lock()
defer cache.Unlock()
// Check if the sequence hasn't changed since last time
if currSeq != cache.ac.acSupport.ConfigSequence(cache.channel) {
// The sequence at which we computed the eligibility might have changed,
// so we can't put it into the cache because a more fresh computation result
// might already be present in the cache by now, and we don't want to override it
// with a stale computation result, so just return the result.
return err
}
// Else, the eligibility of the client has been computed under the latest sequence,
// so store the computation result in the cache
cache.entries[key] = err
return err
}
func (cache *accessCache) isPurgeNeeded() bool {
cache.RLock()
defer cache.RUnlock()
return len(cache.entries)+1 > cache.ac.conf.maxCacheSize
}
func (cache *accessCache) purgeEntriesIfNeeded() {
if !cache.isPurgeNeeded() {
return
}
cache.Lock()
defer cache.Unlock()
maxCacheSize := cache.ac.conf.maxCacheSize
purgeRatio := cache.ac.conf.purgeRetentionRatio
entries2evict := maxCacheSize - int(purgeRatio*float64(maxCacheSize))
for key := range cache.entries {
if entries2evict == 0 {
return
}
entries2evict--
delete(cache.entries, key)
}
}
func (cache *accessCache) isValid(currSeq uint64) bool {
cache.RLock()
defer cache.RUnlock()
return currSeq == cache.lastSequence
}
func (cache *accessCache) configChange(currSeq uint64) {
cache.Lock()
defer cache.Unlock()
cache.lastSequence = currSeq
// Invalidate entries
cache.entries = make(map[string]error)
}
func (cache *accessCache) lookup(key string) (cacheHit bool, lookupResult error) {
cache.RLock()
defer cache.RUnlock()
lookupResult, cacheHit = cache.entries[key]
return
}
func signedDataToKey(data common.SignedData) (string, error) {
b, err := asBytes(data)
if err != nil {
return "", errors.Wrap(err, "failed marshaling signed data")
}
return hex.EncodeToString(util.ComputeSHA256(b)), nil
}