-
Notifications
You must be signed in to change notification settings - Fork 348
/
range_manager.go
176 lines (148 loc) · 5.15 KB
/
range_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package sstable
import (
"bytes"
"context"
"crypto"
"errors"
"fmt"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/graveler/committed"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/pyramid"
)
type NewSSTableReaderFn func(ctx context.Context, ns committed.Namespace, id committed.ID) (*sstable.Reader, error)
type Unrefer interface {
Unref()
}
type RangeManager struct {
newReader NewSSTableReaderFn
fs pyramid.FS
hash crypto.Hash
cache Unrefer
}
func NewPebbleSSTableRangeManager(cache *pebble.Cache, fs pyramid.FS, hash crypto.Hash) *RangeManager {
if cache != nil { // nil cache allowed (size=0), see sstable.ReaderOptions
cache.Ref()
}
opts := sstable.ReaderOptions{Cache: cache}
newReader := func(ctx context.Context, ns committed.Namespace, id committed.ID) (*sstable.Reader, error) {
return newReader(ctx, fs, ns, id, opts)
}
return NewPebbleSSTableRangeManagerWithNewReader(newReader, opts.Cache, fs, hash)
}
func newReader(ctx context.Context, fs pyramid.FS, ns committed.Namespace, id committed.ID, opts sstable.ReaderOptions) (*sstable.Reader, error) {
file, err := fs.Open(ctx, string(ns), string(id))
if err != nil {
return nil, fmt.Errorf("open sstable file %s %s: %w", ns, id, err)
}
r, err := sstable.NewReader(file, opts)
if err != nil {
return nil, fmt.Errorf("open sstable reader %s %s: %w", ns, id, err)
}
return r, nil
}
func NewPebbleSSTableRangeManagerWithNewReader(newReader NewSSTableReaderFn, cache Unrefer, fs pyramid.FS, hash crypto.Hash) *RangeManager {
return &RangeManager{
fs: fs,
hash: hash,
newReader: newReader,
cache: cache,
}
}
var (
// ErrKeyNotFound is the error returned when a path is not found
ErrKeyNotFound = errors.New("path not found")
_ committed.RangeManager = &RangeManager{}
)
func (m *RangeManager) Exists(ctx context.Context, ns committed.Namespace, id committed.ID) (bool, error) {
return m.fs.Exists(ctx, string(ns), string(id))
}
func (m *RangeManager) GetValueGE(ctx context.Context, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) {
reader, err := m.newReader(ctx, ns, id)
if err != nil {
return nil, err
}
defer m.execAndLog(ctx, reader.Close, "close reader")
// TODO(ariels): reader.NewIter(lookup, lookup)?
it, err := reader.NewIter(nil, nil)
if err != nil {
return nil, fmt.Errorf("create iterator: %w", err)
}
defer m.execAndLog(ctx, it.Close, "close iterator")
// Ranges are keyed by MaxKey, seek to the range that might contain key.
key, value := it.SeekGE(lookup)
if key == nil {
if it.Error() != nil {
return nil, fmt.Errorf("read metarange from sstable id %s: %w", id, it.Error())
}
return nil, ErrKeyNotFound
}
return &committed.Record{
Key: key.UserKey,
Value: value,
}, nil
}
// GetValue returns the Record matching the key in the SSTable referenced by the id.
// If key is not found, (nil, ErrKeyNotFound) is returned.
func (m *RangeManager) GetValue(ctx context.Context, ns committed.Namespace, id committed.ID, lookup committed.Key) (*committed.Record, error) {
reader, err := m.newReader(ctx, ns, id)
if err != nil {
return nil, err
}
defer m.execAndLog(ctx, reader.Close, "close reader")
it, err := reader.NewIter(nil, nil)
if err != nil {
return nil, fmt.Errorf("create iterator: %w", err)
}
defer m.execAndLog(ctx, it.Close, "close iterator")
// actual reading
key, value := it.SeekGE(lookup)
if key == nil {
if it.Error() != nil {
return nil, fmt.Errorf("read key from sstable id %s: %w", id, it.Error())
}
// lookup path is after the last path in the SSTable
return nil, ErrKeyNotFound
}
if !bytes.Equal(lookup, key.UserKey) {
// lookup path in range but key not found
return nil, ErrKeyNotFound
}
return &committed.Record{
Key: key.UserKey,
Value: value,
}, nil
}
// NewRangeIterator takes a given SSTable and returns an EntryIterator seeked to >= "from" path
func (m *RangeManager) NewRangeIterator(ctx context.Context, ns committed.Namespace, tid committed.ID) (committed.ValueIterator, error) {
reader, err := m.newReader(ctx, ns, tid)
if err != nil {
return nil, err
}
iter, err := reader.NewIter(nil, nil)
if err != nil {
if e := reader.Close(); e != nil {
logging.FromContext(ctx).WithError(e).Errorf("Failed de-referencing sstable %s", tid)
}
return nil, fmt.Errorf("creating sstable iterator: %w", err)
}
return NewIterator(iter, reader.Close), nil
}
// GetWriter returns a new SSTable writer instance
func (m *RangeManager) GetWriter(ctx context.Context, ns committed.Namespace, metadata graveler.Metadata) (committed.RangeWriter, error) {
return NewDiskWriter(ctx, m.fs, ns, m.hash.New(), metadata)
}
func (m *RangeManager) GetURI(ctx context.Context, ns committed.Namespace, id committed.ID) (string, error) {
return m.fs.GetRemoteURI(ctx, string(ns), string(id))
}
func (m *RangeManager) execAndLog(ctx context.Context, f func() error, msg string) {
if err := f(); err != nil {
logging.FromContext(ctx).WithError(err).Error(msg)
}
}
func (m *RangeManager) Close() error {
m.cache.Unref()
return nil
}