-
Notifications
You must be signed in to change notification settings - Fork 346
/
store.go
228 lines (193 loc) · 7.32 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package kv
//go:generate go run github.com/golang/mock/mockgen@v1.6.0 -source=store.go -destination=mock/store.go -package=mock
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"github.com/treeverse/lakefs/pkg/kv/kvparams"
)
// KV Schema versions
const (
InitialMigrateVersion = iota + 1
ACLMigrateVersion
ACLNoReposMigrateVersion
ACLImportMigrateVersion
NextSchemaVersion
)
const (
PathDelimiter = "/"
MetadataPartitionKey = "kv-internal-metadata"
)
var (
ErrClosedEntries = errors.New("closed entries")
ErrConnectFailed = errors.New("connect failed")
ErrDriverConfiguration = errors.New("driver configuration")
ErrMissingPartitionKey = errors.New("missing partition key")
ErrMissingKey = errors.New("missing key")
ErrMissingValue = errors.New("missing value")
ErrNotFound = errors.New("not found")
ErrPredicateFailed = errors.New("predicate failed")
ErrSetupFailed = errors.New("setup failed")
ErrUnknownDriver = errors.New("unknown driver")
ErrTableNotActive = errors.New("table not active")
ErrSlowDown = errors.New("slow down")
)
// Precond Type for special conditionals provided as predicates for the SetIf method
type Precond string
// PrecondConditionalExists Conditional for SetIf which performs Set only if key already exists in store
var PrecondConditionalExists = Precond("ConditionalExists")
func FormatPath(p ...string) string {
return strings.Join(p, PathDelimiter)
}
// Driver is the interface to access a kv database as a Store. Each kv provider implements a Driver.
type Driver interface {
// Open opens access to the database store. Implementations give access to the same storage based on the dsn.
// Implementation can return the same Storage instance based on dsn or new one as long as it provides access to
// the same storage.
Open(ctx context.Context, params kvparams.Config) (Store, error)
}
// Predicate value used to update a key base on a previous fetched value.
//
// Store's Get used to pull the key's value with the associated predicate.
// Store's SetIf used to set the key's value based on the predicate.
type Predicate interface{}
// ValueWithPredicate value with predicate - Value holds the data and Predicate a value used for conditional set.
//
// Get operation will return this struct, holding the key's information
// SetIf operation will use the Predicate for conditional set
type ValueWithPredicate struct {
Value []byte
Predicate Predicate
}
type ScanOptions struct {
// KeyStart start key to seek the scan
KeyStart []byte
// BatchSize used by each implementation to perform limited query or fetching of data while scanning.
// The 0 value means - use the default by the implementation.
BatchSize int
}
type Store interface {
// Get returns a result containing the Value and Predicate for the given key, or ErrNotFound if key doesn't exist
// Predicate can be used for SetIf operation
Get(ctx context.Context, partitionKey, key []byte) (*ValueWithPredicate, error)
// Set stores the given value, overwriting an existing value if one exists
Set(ctx context.Context, partitionKey, key, value []byte) error
// SetIf returns an ErrPredicateFailed error if the key with valuePredicate passed
// doesn't match the currently stored value. SetIf is a simple compare-and-swap operator:
// valuePredicate is either the existing value, or nil for no previous key exists.
// this is intentionally simplistic: we can model a better abstraction on top, keeping this interface simple for implementors
SetIf(ctx context.Context, partitionKey, key, value []byte, valuePredicate Predicate) error
// Delete will delete the key, no error in if key doesn't exist
Delete(ctx context.Context, partitionKey, key []byte) error
// Scan returns entries of partitionKey, by key order.
// 'options' holds optional parameters to control the batch size and the key to start the scan with.
Scan(ctx context.Context, partitionKey []byte, options ScanOptions) (EntriesIterator, error)
// Close access to the database store. After calling Close the instance is unusable.
Close()
}
// EntriesIterator used to enumerate over Scan results
type EntriesIterator interface {
// Next should be called first before access Entry.
// it will process the next entry and return true if there is one.
Next() bool
// SeekGE moves the iterator to the first Entry with a key equal or greater to the given key.
// A call to Next is required after calling this method.
SeekGE(key []byte)
// Entry current entry read after calling Next, set to nil in case of an error or no more entries.
Entry() *Entry
// Err set to last error by reading or parse the next entry.
Err() error
// Close should be called at the end of processing entries, required to release resources used to scan entries.
// After calling 'Close' the instance should not be used as the behaviour will not be defined.
Close()
}
// Entry holds a pair of key/value
type Entry struct {
PartitionKey []byte
Key []byte
Value []byte
}
func (e *Entry) String() string {
if e == nil {
return "Entry{nil}"
}
return fmt.Sprintf("Entry{%v, %v}", e.Key, e.Value)
}
// map drivers implementation
var (
drivers = make(map[string]Driver)
driversMu sync.RWMutex
)
// Register 'driver' implementation under 'name'. Panic in case of empty name, nil driver or name already registered.
func Register(name string, driver Driver) {
if name == "" {
panic("kv store register name is missing")
}
if driver == nil {
panic("kv store Register driver is nil")
}
driversMu.Lock()
defer driversMu.Unlock()
if _, found := drivers[name]; found {
panic("kv store Register driver already registered " + name)
}
drivers[name] = driver
}
// UnregisterAllDrivers remove all loaded drivers, used for test code.
func UnregisterAllDrivers() {
driversMu.Lock()
defer driversMu.Unlock()
for k := range drivers {
delete(drivers, k)
}
}
// Open lookup driver by 'type' and return store based on the configuration.
// Failed with ErrUnknownDriver in case 'name' is not registered
func Open(ctx context.Context, params kvparams.Config) (Store, error) {
driversMu.RLock()
d, ok := drivers[params.Type]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("%w: %s", ErrUnknownDriver, params.Type)
}
store, err := d.Open(ctx, params)
if err != nil {
return nil, err
}
return storeMetrics(store, params.Type), nil
}
// Drivers returns a list of registered drive names
func Drivers() []string {
driversMu.RLock()
defer driversMu.RUnlock()
names := make([]string, 0, len(drivers))
for name := range drivers {
names = append(names, name)
}
return names
}
// GetDBSchemaVersion returns the current KV DB schema version
func GetDBSchemaVersion(ctx context.Context, store Store) (int, error) {
res, err := store.Get(ctx, []byte(MetadataPartitionKey), dbSchemaPath())
if err != nil {
return -1, err
}
version, err := strconv.Atoi(string(res.Value))
if err != nil {
return -1, err
}
return version, nil
}
// SetDBSchemaVersion sets KV DB schema version
func SetDBSchemaVersion(ctx context.Context, store Store, version uint) error {
return store.Set(ctx, []byte(MetadataPartitionKey), dbSchemaPath(), []byte(fmt.Sprintf("%d", version)))
}
func dbSchemaPath() []byte {
return []byte(FormatPath("kv", "schema", "version"))
}
func IsLatestSchemaVersion(version int) bool {
return version == NextSchemaVersion-1
}