-
Notifications
You must be signed in to change notification settings - Fork 0
/
backend.go
343 lines (286 loc) · 8.77 KB
/
backend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/*
Copyright 2015-2019 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package backend provides storage backend abstraction layer
package backend
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/jonboulle/clockwork"
)
// Forever means that object TTL will not expire unless deleted
const (
Forever time.Duration = 0
)
// Backend implements abstraction over local or remote storage backend
type Backend interface {
// Create creates item if it does not exist
Create(ctx context.Context, i Item) (*Lease, error)
// Put puts value into backend (creates if it does not
// exists, updates it otherwise)
Put(ctx context.Context, i Item) (*Lease, error)
// CompareAndSwap compares item with existing item
// and replaces is with replaceWith item
CompareAndSwap(ctx context.Context, expected Item, replaceWith Item) (*Lease, error)
// Update updates value in the backend
Update(ctx context.Context, i Item) (*Lease, error)
// Get returns a single item or not found error
Get(ctx context.Context, key []byte) (*Item, error)
// GetRange returns query range
GetRange(ctx context.Context, startKey []byte, endKey []byte, limit int) (*GetResult, error)
// Delete deletes item by key, returns NotFound error
// if item does not exist
Delete(ctx context.Context, key []byte) error
// DeleteRange deletes range of items with keys between startKey and endKey
DeleteRange(ctx context.Context, startKey, endKey []byte) error
// KeepAlive keeps object from expiring, updates lease on the existing object,
// expires contains the new expiry to set on the lease,
// some backends may ignore expires based on the implementation
// in case if the lease managed server side
KeepAlive(ctx context.Context, lease Lease, expires time.Time) error
// NewWatcher returns a new event watcher
NewWatcher(ctx context.Context, watch Watch) (Watcher, error)
// Close closes backend and all associated resources
Close() error
// Clock returns clock used by this backend
Clock() clockwork.Clock
// CloseWatchers closes all the watchers
// without closing the backend
CloseWatchers()
// Migrate performs any data migration necessary between Teleport versions.
// Migrate must be called BEFORE using any other methods of the Backend.
Migrate(context.Context) error
}
// Batch implements some batch methods
// that are not mandatory for all interfaces,
// only the ones used in bulk operations.
type Batch interface {
// PutRange puts range of items in one transaction
PutRange(ctx context.Context, items []Item) error
}
// Lease represents a lease on the item that can be used
// to extend item's TTL without updating its contents.
//
// Here is an example of renewing object TTL:
//
// lease, err := backend.Create()
// lease.Expires = time.Now().Add(time.Second)
// // Item TTL is extended
// err = backend.KeepAlive(lease)
//
type Lease struct {
// Key is an object representing lease
Key []byte
// ID is a lease ID, could be empty
ID int64
}
// IsEmpty returns true if the lease is empty value
func (l *Lease) IsEmpty() bool {
return l.ID == 0 && len(l.Key) == 0
}
// Watch specifies watcher parameters
type Watch struct {
// Name is a watch name set for debugging
// purposes
Name string
// Prefixes specifies prefixes to watch,
// passed to the backend implementation
Prefixes [][]byte
// QueueSize is an optional queue size
QueueSize int
// MetricComponent if set will start reporting
// with a given component metric
MetricComponent string
}
// String returns a user-friendly description
// of the watcher
func (w *Watch) String() string {
return fmt.Sprintf("Watcher(name=%v, prefixes=%v)", w.Name, string(bytes.Join(w.Prefixes, []byte(", "))))
}
// Watcher returns watcher
type Watcher interface {
// Events returns channel with events
Events() <-chan Event
// Done returns the channel signalling the closure
Done() <-chan struct{}
// Close closes the watcher and releases
// all associated resources
Close() error
}
// GetResult provides the result of GetRange request
type GetResult struct {
// Items returns a list of items
Items []Item
}
// OpType specifies operation type
type OpType int
const (
// OpInit is returned by the system whenever the system
// is initialized, init operation is always sent
// as a first event over the channel, so the client
// can verify that watch has been established.
OpInit OpType = iota
// OpPut is returned for Put events
OpPut OpType = iota
// OpDelete is returned for Delete events
OpDelete OpType = iota
// OpGet is used for tracking, not present in the event stream
OpGet OpType = iota
)
// String returns user-friendly description of the operation
func (o OpType) String() string {
switch o {
case OpInit:
return "Init"
case OpPut:
return "Put"
case OpDelete:
return "Delete"
case OpGet:
return "Get"
default:
return "unknown"
}
}
// Event is a event containing operation with item
type Event struct {
// Type is operation type
Type OpType
// Item is event Item
Item Item
}
// Item is a key value item
type Item struct {
// Key is a key of the key value item
Key []byte
// Value is a value of the key value item
Value []byte
// Expires is an optional record expiry time
Expires time.Time
// ID is a record ID, newer records have newer ids
ID int64
// LeaseID is a lease ID, could be set on objects
// with TTL
LeaseID int64
}
// Config is used for 'storage' config section. It's a combination of
// values for various backends: 'boltdb', 'etcd', 'filesystem' and 'dynamodb'
type Config struct {
// Type can be "bolt" or "etcd" or "dynamodb"
Type string `yaml:"type,omitempty"`
// Params is a generic key/value property bag which allows arbitrary
// falues to be passed to backend
Params Params `yaml:",inline"`
}
// Params type defines a flexible unified back-end configuration API.
// It is just a map of key/value pairs which gets populated by `storage` section
// in Teleport YAML config.
type Params map[string]interface{}
// GetString returns a string value stored in Params map, or an empty string
// if nothing is found
func (p Params) GetString(key string) string {
v, ok := p[key]
if !ok {
return ""
}
s, _ := v.(string)
return s
}
// NoLimit specifies no limits
const NoLimit = 0
// RangeEnd returns end of the range for given key
func RangeEnd(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
if end[i] < 0xff {
end[i] = end[i] + 1
end = end[:i+1]
return end
}
}
// next key does not exist (e.g., 0xffff);
return noEnd
}
var (
noEnd = []byte{0}
)
// Items is a sortable list of backend items
type Items []Item
// Len is part of sort.Interface.
func (it Items) Len() int {
return len(it)
}
// Swap is part of sort.Interface.
func (it Items) Swap(i, j int) {
it[i], it[j] = it[j], it[i]
}
// Less is part of sort.Interface.
func (it Items) Less(i, j int) bool {
return bytes.Compare(it[i].Key, it[j].Key) < 0
}
// TTL returns TTL in duration units, rounds up to one second
func TTL(clock clockwork.Clock, expires time.Time) time.Duration {
ttl := expires.Sub(clock.Now())
if ttl < time.Second {
return time.Second
}
return ttl
}
// EarliestExpiry returns first of the
// otherwise returns empty
func EarliestExpiry(times ...time.Time) time.Time {
if len(times) == 0 {
return time.Time{}
}
sort.Sort(earliest(times))
return times[0]
}
// Expiry converts ttl to expiry time, if ttl is 0
// returns empty time
func Expiry(clock clockwork.Clock, ttl time.Duration) time.Time {
if ttl == 0 {
return time.Time{}
}
return clock.Now().UTC().Add(ttl)
}
type earliest []time.Time
func (p earliest) Len() int {
return len(p)
}
func (p earliest) Less(i, j int) bool {
if p[i].IsZero() {
return false
}
if p[j].IsZero() {
return true
}
return p[i].Before(p[j])
}
func (p earliest) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
// Separator is used as a separator between key parts
const Separator = '/'
// Key joins parts into path separated by Separator,
// makes sure path always starts with Separator ("/")
func Key(parts ...string) []byte {
return []byte(strings.Join(append([]string{""}, parts...), string(Separator)))
}
// NoMigrations implements a nop Migrate method of Backend.
// Backend implementations should embed this when no migrations are necessary.
type NoMigrations struct{}
func (NoMigrations) Migrate(context.Context) error { return nil }