-
Notifications
You must be signed in to change notification settings - Fork 348
/
meta_range_writer.go
187 lines (169 loc) · 5.18 KB
/
meta_range_writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package committed
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
)
type GeneralMetaRangeWriter struct {
ctx context.Context
metadata graveler.Metadata
params *Params // for breaking ranges
namespace Namespace
metaRangeManager RangeManager
rangeManager RangeManager
rangeWriter RangeWriter // writer for the current range
lastKey Key
batchWriteCloser BatchWriterCloser
ranges []Range
}
const (
MetadataTypeKey = "type"
MetadataRangesType = "ranges"
MetadataMetarangesType = "metaranges"
)
var (
ErrUnsortedKeys = errors.New("keys should be written in ascending order")
ErrNilValue = errors.New("record value should not be nil")
)
func NewGeneralMetaRangeWriter(ctx context.Context, rangeManager, metaRangeManager RangeManager, params *Params, namespace Namespace, md graveler.Metadata) *GeneralMetaRangeWriter {
return &GeneralMetaRangeWriter{
ctx: ctx,
metadata: md,
rangeManager: rangeManager,
metaRangeManager: metaRangeManager,
batchWriteCloser: NewBatchCloser(params.MaxUploaders),
params: params,
namespace: namespace,
}
}
// WriteRecord writes a record to the current range, decides if should close range
func (w *GeneralMetaRangeWriter) WriteRecord(record graveler.ValueRecord) error {
if w.lastKey != nil && bytes.Compare(record.Key, w.lastKey) <= 0 {
return ErrUnsortedKeys
}
if record.Value == nil {
return ErrNilValue
}
var err error
if w.rangeWriter == nil {
w.rangeWriter, err = w.rangeManager.GetWriter(w.ctx, w.namespace, w.metadata)
if err != nil {
return fmt.Errorf("get range writer: %w", err)
}
w.rangeWriter.SetMetadata(MetadataTypeKey, MetadataRangesType)
}
v, err := MarshalValue(record.Value)
if err != nil {
return err
}
err = w.rangeWriter.WriteRecord(Record{Key: Key(record.Key), Value: v})
if err != nil {
return fmt.Errorf("write record to range: %w", err)
}
w.lastKey = Key(record.Key.Copy())
if w.shouldBreakAtKey(record.Key) {
return w.closeCurrentRange()
}
return nil
}
func (w *GeneralMetaRangeWriter) closeCurrentRange() error {
if w.rangeWriter == nil {
return nil
}
if err := w.batchWriteCloser.CloseWriterAsync(w.rangeWriter); err != nil {
return fmt.Errorf("write range: %w", err)
}
w.rangeWriter = nil
return nil
}
func (w *GeneralMetaRangeWriter) getBatchedRanges() ([]Range, error) {
wr, err := w.batchWriteCloser.Wait()
if err != nil {
return nil, fmt.Errorf("batch write closer wait: %w", err)
}
ranges := make([]Range, len(wr))
for i, r := range wr {
ranges[i] = Range{
ID: r.RangeID,
MinKey: r.First,
MaxKey: r.Last,
EstimatedSize: r.EstimatedRangeSizeBytes,
Count: int64(r.Count),
}
}
return ranges, nil
}
func (w *GeneralMetaRangeWriter) WriteRange(rng Range) error {
if w.lastKey != nil && bytes.Compare(rng.MinKey, w.lastKey) <= 0 {
return ErrUnsortedKeys
}
if err := w.closeCurrentRange(); err != nil {
return err
}
w.lastKey = make(Key, len(rng.MaxKey))
copy(w.lastKey, rng.MaxKey)
w.ranges = append(w.ranges, rng)
return nil
}
func (w *GeneralMetaRangeWriter) Close(ctx context.Context) (*graveler.MetaRangeID, error) {
if err := w.closeCurrentRange(); err != nil {
return nil, err
}
ranges, err := w.getBatchedRanges()
if err != nil {
return nil, err
}
ranges = append(ranges, w.ranges...)
sort.Slice(ranges, func(i, j int) bool {
return bytes.Compare(ranges[i].MaxKey, ranges[j].MaxKey) < 0
})
w.ranges = ranges
return w.writeRangesToMetaRange(ctx)
}
// shouldBreakAtKey returns true if should break range after the given key
func (w *GeneralMetaRangeWriter) shouldBreakAtKey(key graveler.Key) bool {
return w.rangeWriter.ShouldBreakAtKey(key, w.params)
}
// writeRangesToMetaRange writes all ranges to a MetaRange and returns the MetaRangeID
func (w *GeneralMetaRangeWriter) writeRangesToMetaRange(ctx context.Context) (*graveler.MetaRangeID, error) {
metaRangeWriter, err := w.metaRangeManager.GetWriter(w.ctx, w.namespace, w.metadata)
if err != nil {
return nil, fmt.Errorf("failed creating metarange writer: %w", err)
}
// write user provided metadata, if any
for k, v := range w.metadata {
metaRangeWriter.SetMetadata(k, v)
}
// set type
metaRangeWriter.SetMetadata(MetadataTypeKey, MetadataMetarangesType)
defer func() {
if abortErr := metaRangeWriter.Abort(); abortErr != nil {
logging.FromContext(ctx).WithError(err).WithField("namespace", w.namespace).Error("failed aborting metarange writer")
}
}()
for _, p := range w.ranges {
rangeValue, err := rangeToValue(p)
if err != nil {
return nil, err
}
if err := metaRangeWriter.WriteRecord(Record{Key: p.MaxKey, Value: rangeValue}); err != nil {
return nil, fmt.Errorf("failed writing range to metarange writer: %w", err)
}
}
wr, err := metaRangeWriter.Close()
if err != nil {
return nil, fmt.Errorf("failed closing metarange writer: %w", err)
}
metaRangeID := graveler.MetaRangeID(wr.RangeID)
return &metaRangeID, nil
}
func (w *GeneralMetaRangeWriter) Abort() error {
if w.rangeWriter == nil {
return nil
}
return w.rangeWriter.Abort()
}