/
options.go
294 lines (249 loc) · 7.91 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package objectstorage
import (
"bufio"
"fmt"
"os"
"time"
"github.com/mr-tron/base58"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/kvstore/debug"
)
type Options struct {
store kvstore.KVStore
batchedWriterInstance *kvstore.BatchedWriter
cacheTime time.Duration
keyPartitions []int
persistenceEnabled bool
keysOnly bool
storeOnCreation bool
releaseExecutorWorkerCount int
leakDetectionOptions *LeakDetectionOptions
leakDetectionWrapper func(cachedObject *CachedObjectImpl) LeakDetectionWrapper
onEvictionCallback func(cachedObject CachedObject)
}
func newOptions(store kvstore.KVStore, optionalOptions []Option) *Options {
result := &Options{
store: store,
cacheTime: 0,
persistenceEnabled: true,
releaseExecutorWorkerCount: 1,
}
for _, optionalOption := range optionalOptions {
optionalOption(result)
}
if result.leakDetectionOptions != nil && result.leakDetectionWrapper == nil {
result.leakDetectionWrapper = newLeakDetectionWrapperImpl
}
result.batchedWriterInstance = kvstore.NewBatchedWriter(result.store)
return result
}
type Option func(*Options)
func CacheTime(duration time.Duration) Option {
return func(args *Options) {
args.cacheTime = duration
}
}
// logChannelBufferSize defines the size of the buffer used for the log writer
const logChannelBufferSize = 10240
// logEntry is a container for the
type logEntry struct {
time time.Time
command debug.Command
parameters [][]byte
}
// String returns a string representation of the log entry
func (l *logEntry) String() string {
result := l.time.Format("15:04:05") + " " + debug.CommandNames[l.command]
for _, parameter := range l.parameters {
result += " " + base58.Encode(parameter)
}
return result
}
// LogAccess sets up a logger that logs all calls to the underlying store in the given file. It is possible to filter
// the logged commands by providing an optional filter flag.
func LogAccess(fileName string, commandsFilter ...debug.Command) Option {
return func(args *Options) {
// open log file
logFile, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
writer := bufio.NewWriter(logFile)
if err != nil {
panic(err)
}
// open logger channel
logChannel := make(chan logEntry, logChannelBufferSize)
// start background worker that writes to the log file
go func() {
for {
switch loggedCommand := <-logChannel; loggedCommand.command {
case debug.ShutdownCommand:
// write log entry
if _, err = writer.WriteString(loggedCommand.String() + "\n"); err != nil {
panic(err)
}
// close channel and log file
err = writer.Flush()
if err != nil {
fmt.Println(err)
}
close(logChannel)
err = logFile.Close()
if err != nil {
fmt.Println(err)
}
return
default:
// write log entry
if _, err := writer.WriteString(loggedCommand.String() + "\n"); err != nil {
panic(err)
}
}
}
}()
// Wrap the KVStore with a debug one and pass through calls to logger channel
args.store = debug.New(args.store, func(command debug.Command, parameters ...[]byte) {
logChannel <- logEntry{time.Now(), command, parameters}
}, commandsFilter...)
}
}
func PersistenceEnabled(persistenceEnabled bool) Option {
return func(args *Options) {
args.persistenceEnabled = persistenceEnabled
}
}
func KeysOnly(keysOnly bool) Option {
return func(args *Options) {
args.keysOnly = keysOnly
}
}
// StoreOnCreation writes an object directly to the persistence layer on creation.
func StoreOnCreation(store bool) Option {
return func(args *Options) {
args.storeOnCreation = store
}
}
// ReleaseExecutorWorkerCount sets the number of workers that execute the
// scheduled eviction of the objects in parallel (whenever they become due).
func ReleaseExecutorWorkerCount(releaseExecutorWorkerCount int) Option {
if releaseExecutorWorkerCount < 1 {
panic("releaseExecutorWorkerCount must be greater or equal 1")
}
return func(args *Options) {
args.releaseExecutorWorkerCount = releaseExecutorWorkerCount
}
}
func LeakDetectionEnabled(leakDetectionEnabled bool, options ...LeakDetectionOptions) Option {
return func(args *Options) {
if leakDetectionEnabled {
switch len(options) {
case 0:
args.leakDetectionOptions = &LeakDetectionOptions{
MaxConsumersPerObject: 20,
MaxConsumerHoldTime: 240 * time.Second,
}
case 1:
args.leakDetectionOptions = &options[0]
default:
panic("too many additional arguments in call to LeakDetectionEnabled (only 0 or 1 allowed")
}
}
}
}
func OverrideLeakDetectionWrapper(wrapperFunc func(cachedObject *CachedObjectImpl) LeakDetectionWrapper) Option {
return func(args *Options) {
args.leakDetectionWrapper = wrapperFunc
}
}
func PartitionKey(keyPartitions ...int) Option {
return func(args *Options) {
args.keyPartitions = keyPartitions
}
}
// OnEvictionCallback sets a function that is called on eviction of the object.
func OnEvictionCallback(cb func(cachedObject CachedObject)) Option {
return func(args *Options) {
args.onEvictionCallback = cb
}
}
// the default options used for object storage Contains calls.
var defaultReadOptions = []ReadOption{
WithReadSkipCache(false),
WithReadSkipStorage(false),
}
// ReadOption is a function setting a read option.
type ReadOption func(opts *ReadOptions)
// ReadOptions define options for Contains calls in the object storage.
type ReadOptions struct {
// whether to skip the elements in the cache.
skipCache bool
// whether to skip the elements in the storage.
skipStorage bool
}
// applies the given ReadOption.
func (o *ReadOptions) apply(opts ...ReadOption) {
for _, opt := range opts {
opt(o)
}
}
// WithReadSkipCache is used to skip the elements in the cache.
func WithReadSkipCache(skipCache bool) ReadOption {
return func(opts *ReadOptions) {
opts.skipCache = skipCache
}
}
// WithReadSkipStorage is used to skip the elements in the storage.
func WithReadSkipStorage(skipStorage bool) ReadOption {
return func(opts *ReadOptions) {
opts.skipStorage = skipStorage
}
}
// the default options used for object storage iteration.
var defaultIteratorOptions = []IteratorOption{
WithIteratorSkipCache(false),
WithIteratorSkipStorage(false),
WithIteratorPrefix(kvstore.EmptyPrefix),
WithIteratorMaxIterations(0),
}
// IteratorOption is a function setting an iterator option.
type IteratorOption func(opts *IteratorOptions)
// IteratorOptions define options for iterations in the object storage.
type IteratorOptions struct {
// whether to skip the elements in the cache.
skipCache bool
// whether to skip the elements in the storage.
skipStorage bool
// an optional prefix to iterate a subset of elements.
optionalPrefix []byte
// used to stop the iteration after a certain amount of iterations.
maxIterations int
}
// applies the given IteratorOption.
func (o *IteratorOptions) apply(opts ...IteratorOption) {
for _, opt := range opts {
opt(o)
}
}
// WithIteratorSkipCache is used to skip the elements in the cache.
func WithIteratorSkipCache(skipCache bool) IteratorOption {
return func(opts *IteratorOptions) {
opts.skipCache = skipCache
}
}
// WithIteratorSkipStorage is used to skip the elements in the storage.
func WithIteratorSkipStorage(skipStorage bool) IteratorOption {
return func(opts *IteratorOptions) {
opts.skipStorage = skipStorage
}
}
// WithIteratorPrefix is used to iterate a subset of elements with a defined prefix.
func WithIteratorPrefix(prefix []byte) IteratorOption {
return func(opts *IteratorOptions) {
opts.optionalPrefix = prefix
}
}
// WithIteratorMaxIterations is used to stop the iteration after a certain amount of iterations.
// 0 disables the limit.
func WithIteratorMaxIterations(maxIterations int) IteratorOption {
return func(opts *IteratorOptions) {
opts.maxIterations = maxIterations
}
}