/
cache.go
155 lines (132 loc) · 3.08 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package storage
import (
"context"
"encoding/binary"
"sync"
"time"
"github.com/VictoriaMetrics/fastcache"
"golang.org/x/sync/singleflight"
)
// A Cache will return cached data when available or call update when not.
type Cache interface {
GetOrUpdate(
ctx context.Context,
key []byte,
update func(ctx context.Context) ([]byte, error),
) ([]byte, error)
Invalidate(key []byte)
}
type localCache struct {
singleflight singleflight.Group
mu sync.RWMutex
m map[string][]byte
}
// NewLocalCache creates a new Cache backed by a map.
func NewLocalCache() Cache {
return &localCache{
m: make(map[string][]byte),
}
}
func (cache *localCache) GetOrUpdate(
ctx context.Context,
key []byte,
update func(ctx context.Context) ([]byte, error),
) ([]byte, error) {
strkey := string(key)
cache.mu.RLock()
cached, ok := cache.m[strkey]
cache.mu.RUnlock()
if ok {
return cached, nil
}
v, err, _ := cache.singleflight.Do(strkey, func() (interface{}, error) {
cache.mu.RLock()
cached, ok := cache.m[strkey]
cache.mu.RUnlock()
if ok {
return cached, nil
}
result, err := update(ctx)
if err != nil {
return nil, err
}
cache.mu.Lock()
cache.m[strkey] = result
cache.mu.Unlock()
return result, nil
})
if err != nil {
return nil, err
}
return v.([]byte), nil
}
func (cache *localCache) Invalidate(key []byte) {
cache.mu.Lock()
delete(cache.m, string(key))
cache.mu.Unlock()
}
type globalCache struct {
ttl time.Duration
singleflight singleflight.Group
mu sync.RWMutex
fastcache *fastcache.Cache
}
// NewGlobalCache creates a new Cache backed by fastcache and a TTL.
func NewGlobalCache(ttl time.Duration) Cache {
return &globalCache{
ttl: ttl,
fastcache: fastcache.New(256 * 1024 * 1024), // up to 256MB of RAM
}
}
func (cache *globalCache) GetOrUpdate(
ctx context.Context,
key []byte,
update func(ctx context.Context) ([]byte, error),
) ([]byte, error) {
now := time.Now()
data, expiry, ok := cache.get(key)
if ok && now.Before(expiry) {
return data, nil
}
v, err, _ := cache.singleflight.Do(string(key), func() (interface{}, error) {
data, expiry, ok := cache.get(key)
if ok && now.Before(expiry) {
return data, nil
}
value, err := update(ctx)
if err != nil {
return nil, err
}
cache.set(key, value)
return value, nil
})
if err != nil {
return nil, err
}
return v.([]byte), nil
}
func (cache *globalCache) Invalidate(key []byte) {
cache.mu.Lock()
cache.fastcache.Del(key)
cache.mu.Unlock()
}
func (cache *globalCache) get(k []byte) (data []byte, expiry time.Time, ok bool) {
cache.mu.RLock()
item := cache.fastcache.Get(nil, k)
cache.mu.RUnlock()
if len(item) < 8 {
return data, expiry, false
}
unix, data := binary.LittleEndian.Uint64(item), item[8:]
expiry = time.UnixMilli(int64(unix))
return data, expiry, true
}
func (cache *globalCache) set(k, v []byte) {
unix := time.Now().Add(cache.ttl).UnixMilli()
item := make([]byte, len(v)+8)
binary.LittleEndian.PutUint64(item, uint64(unix))
copy(item[8:], v)
cache.mu.Lock()
cache.fastcache.Set(k, item)
cache.mu.Unlock()
}