/
groupcache.go
203 lines (178 loc) · 6.71 KB
/
groupcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package storage
import (
"context"
"encoding/binary"
"fmt"
"net/http"
"strings"
"github.com/golang/groupcache"
"github.com/janelia-flyem/dvid/dvid"
)
// GroupcacheStats returns all the stats of interest for this groupcache service.
type GroupcacheStats struct {
Gets int64 // any GET request, including from peers
CacheHits int64 // either cache was good
PeerLoads int64 // either remote load or remote cache hit (not an error)
PeerErrors int64
Loads int64 // gets - cacheHits
LoadsDeduped int64 // after singleflight
LocalLoads int64 // total good local loads
LocalLoadErrs int64 // total bad local loads
ServerRequests int64 // gets that came over network from peers
// Cache stats for items owned by the host.
MainCache groupcache.CacheStats
// Cache stats for replicated items from a peer.
HotCache groupcache.CacheStats
}
// GetGroupcacheStats returns all kinds of stats about the groupcache system.
func GetGroupcacheStats() (stats GroupcacheStats, err error) {
if !manager.setup {
err = fmt.Errorf("Storage manager not initialized before requesting groupcache stats")
return
}
if manager.gcache.cache == nil {
return
}
s := manager.gcache.cache.Stats
stats.Gets = int64(s.Gets)
stats.CacheHits = int64(s.CacheHits)
stats.PeerLoads = int64(s.PeerLoads)
stats.PeerErrors = int64(s.PeerErrors)
stats.Loads = int64(s.Loads)
stats.LoadsDeduped = int64(s.LoadsDeduped)
stats.LocalLoads = int64(s.LocalLoads)
stats.LocalLoadErrs = int64(s.LocalLoadErrs)
stats.ServerRequests = int64(s.ServerRequests)
stats.MainCache = manager.gcache.cache.CacheStats(groupcache.MainCache)
stats.HotCache = manager.gcache.cache.CacheStats(groupcache.HotCache)
return
}
// GroupcacheConfig handles settings for the groupcache library.
type GroupcacheConfig struct {
GB int
Host string // The http address of this DVID server's groupcache port.
Peers []string // The http addresses of the peer groupcache group.
Instances []string // Data instances that use groupcache in form "<name>:<uuid>""
}
type groupcacheT struct {
cache *groupcache.Group
supported map[dvid.DataSpecifier]struct{} // set if the given data instance has groupcache support.
}
type ctxKey int
const (
contextKey ctxKey = 0
kvdbKey ctxKey = 1
)
func setupGroupcache(config GroupcacheConfig) error {
if config.GB == 0 {
return nil
}
var cacheBytes int64
cacheBytes = int64(config.GB) << 30
pool := groupcache.NewHTTPPool(config.Host)
if pool != nil {
dvid.Infof("Initializing groupcache with %d GB at %s...\n", config.GB, config.Host)
manager.gcache.cache = groupcache.NewGroup("immutable", cacheBytes, groupcache.GetterFunc(
func(c context.Context, key string, dest groupcache.Sink) error {
ctx, ok := c.Value(contextKey).(Context)
if !ok {
return fmt.Errorf("bad groupcache context with no context: %v", c)
}
kvdb, ok := c.Value(kvdbKey).(KeyValueDB)
if !ok {
return fmt.Errorf("bad groupcache context with no kv db: %v", c)
}
// First four bytes of key is instance ID to isolate groupcache collisions.
tk := TKey(key[4:])
data, err := kvdb.Get(ctx, tk)
if err != nil {
return err
}
return dest.SetBytes(data)
}))
manager.gcache.supported = make(map[dvid.DataSpecifier]struct{})
for _, dataspec := range config.Instances {
name := strings.Trim(dataspec, "\"")
parts := strings.Split(name, ":")
switch len(parts) {
case 2:
dataid := dvid.GetDataSpecifier(dvid.InstanceName(parts[0]), dvid.UUID(parts[1]))
manager.gcache.supported[dataid] = struct{}{}
default:
dvid.Errorf("bad data instance specification %q given for groupcache support in config file\n", dataspec)
}
}
// If we have additional peers, add them and start a listener via the HTTP port.
if len(config.Peers) > 0 {
peers := []string{config.Host}
peers = append(peers, config.Peers...)
pool.Set(peers...)
dvid.Infof("Groupcache configuration has %d peers in addition to local host.\n", len(config.Peers))
dvid.Infof("Starting groupcache HTTP server on %s\n", config.Host)
http.ListenAndServe(config.Host, http.HandlerFunc(pool.ServeHTTP))
}
}
return nil
}
// returns a store that tries groupcache before resorting to passed Store.
func wrapGroupcache(store dvid.Store, cache *groupcache.Group) (dvid.Store, error) {
okvstore, ok := store.(OrderedKeyValueDB)
if ok {
return groupcacheOrderedStore{OrderedKeyValueDB: okvstore, cache: cache}, nil
}
kvstore, ok := store.(KeyValueDB)
if !ok {
return store, fmt.Errorf("can't wrap store %s in groupcache: doesn't implement KeyValueDB", store)
}
return groupcacheStore{KeyValueDB: kvstore, cache: cache}, nil
}
type instanceProvider interface {
InstanceID() dvid.InstanceID
}
type groupcacheOrderedStore struct {
OrderedKeyValueDB
cache *groupcache.Group
}
type groupcacheStore struct {
KeyValueDB
cache *groupcache.Group
}
func (g groupcacheOrderedStore) Get(ctx Context, k TKey) ([]byte, error) {
// we only provide this server for data contexts that have InstanceID().
ip, ok := ctx.(instanceProvider)
if !ok {
dvid.Criticalf("groupcache Get passed a non-data context %v, falling back on normal kv store Get\n", ctx)
return g.OrderedKeyValueDB.Get(ctx, k)
}
// the groupcache key has an instance identifier in first 4 bytes.
instanceBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(instanceBytes, uint32(ip.InstanceID()))
gkey := string(instanceBytes) + string(k)
// Try to get data from groupcache, which if fails, will call the original KeyValueDB in passed Context.
var data []byte
gctx := context.Background()
gctx = context.WithValue(gctx, contextKey, ctx)
gctx = context.WithValue(gctx, kvdbKey, g.OrderedKeyValueDB)
err := g.cache.Get(gctx, gkey, groupcache.AllocatingByteSliceSink(&data))
return data, err
}
// only need to override the Get function of the wrapped KeyValueDB.
func (g groupcacheStore) Get(ctx Context, k TKey) ([]byte, error) {
// we only provide this server for data contexts that have InstanceID().
ip, ok := ctx.(instanceProvider)
if !ok {
dvid.Criticalf("groupcache Get passed a non-data context %v, falling back on normal kv store Get\n", ctx)
return g.KeyValueDB.Get(ctx, k)
}
// the groupcache key has an instance identifier in first 4 bytes.
instanceBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(instanceBytes, uint32(ip.InstanceID()))
gkey := string(instanceBytes) + string(k)
// Try to get data from groupcache, which if fails, will call the original KeyValueDB in passed Context.
var data []byte
gctx := context.Background()
gctx = context.WithValue(gctx, contextKey, ctx)
gctx = context.WithValue(gctx, kvdbKey, g.KeyValueDB)
err := g.cache.Get(gctx, gkey, groupcache.AllocatingByteSliceSink(&data))
return data, err
}