-
Notifications
You must be signed in to change notification settings - Fork 2k
/
scheme.go
315 lines (257 loc) · 9.51 KB
/
scheme.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package replicate
import (
"bytes"
"context"
"fmt"
"io"
"path"
"sort"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
thanosblock "github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
// BlockFilter is block filter that filters out compacted and unselected blocks.
type BlockFilter struct {
logger log.Logger
labelSelector labels.Selector
labelSelectorStr string
resolutionLevels map[compact.ResolutionLevel]struct{}
compactionLevels map[int]struct{}
blockIDs []ulid.ULID
}
// NewBlockFilter returns block filter.
func NewBlockFilter(
logger log.Logger,
labelSelector labels.Selector,
resolutionLevels []compact.ResolutionLevel,
compactionLevels []int,
blockIDs []ulid.ULID,
) *BlockFilter {
allowedResolutions := make(map[compact.ResolutionLevel]struct{})
for _, resolutionLevel := range resolutionLevels {
allowedResolutions[resolutionLevel] = struct{}{}
}
allowedCompactions := make(map[int]struct{})
for _, compactionLevel := range compactionLevels {
allowedCompactions[compactionLevel] = struct{}{}
}
return &BlockFilter{
labelSelector: labelSelector,
labelSelectorStr: storepb.PromMatchersToString(labelSelector...),
logger: logger,
resolutionLevels: allowedResolutions,
compactionLevels: allowedCompactions,
blockIDs: blockIDs,
}
}
// Filter return true if block is non-compacted and matches selector.
func (bf *BlockFilter) Filter(b *metadata.Meta) bool {
if len(b.Thanos.Labels) == 0 {
level.Error(bf.logger).Log("msg", "filtering block", "reason", "labels should not be empty")
return false
}
// If required block IDs are set, we only match required blocks and ignore others.
if len(bf.blockIDs) > 0 {
for _, id := range bf.blockIDs {
if b.ULID == id {
return true
}
}
return false
}
blockLabels := labels.FromMap(b.Thanos.Labels)
labelMatch := bf.labelSelector.Matches(blockLabels)
if !labelMatch {
level.Debug(bf.logger).Log("msg", "filtering block", "reason", "labels don't match", "block_labels", blockLabels.String(), "selector", bf.labelSelectorStr)
return false
}
gotResolution := compact.ResolutionLevel(b.Thanos.Downsample.Resolution)
if _, ok := bf.resolutionLevels[gotResolution]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "resolution doesn't match allowed resolutions", "got_resolution", gotResolution, "allowed_resolutions", fmt.Sprintf("%v", bf.resolutionLevels))
return false
}
gotCompactionLevel := b.BlockMeta.Compaction.Level
if _, ok := bf.compactionLevels[gotCompactionLevel]; !ok {
level.Info(bf.logger).Log("msg", "filtering block", "reason", "compaction level doesn't match allowed levels", "got_compaction_level", gotCompactionLevel, "allowed_compaction_levels", fmt.Sprintf("%v", bf.compactionLevels))
return false
}
return true
}
type blockFilterFunc func(b *metadata.Meta) bool
// TODO: Add filters field.
type replicationScheme struct {
fromBkt objstore.InstrumentedBucketReader
toBkt objstore.Bucket
blockFilter blockFilterFunc
fetcher thanosblock.MetadataFetcher
logger log.Logger
metrics *replicationMetrics
reg prometheus.Registerer
}
type replicationMetrics struct {
blocksAlreadyReplicated prometheus.Counter
blocksReplicated prometheus.Counter
objectsReplicated prometheus.Counter
}
func newReplicationMetrics(reg prometheus.Registerer) *replicationMetrics {
m := &replicationMetrics{
blocksAlreadyReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_already_replicated_total",
Help: "Total number of blocks skipped due to already being replicated.",
}),
blocksReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_blocks_replicated_total",
Help: "Total number of blocks replicated.",
}),
objectsReplicated: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_replicate_objects_replicated_total",
Help: "Total number of objects replicated.",
}),
}
return m
}
func newReplicationScheme(
logger log.Logger,
metrics *replicationMetrics,
blockFilter blockFilterFunc,
fetcher thanosblock.MetadataFetcher,
from objstore.InstrumentedBucketReader,
to objstore.Bucket,
reg prometheus.Registerer,
) *replicationScheme {
if logger == nil {
logger = log.NewNopLogger()
}
return &replicationScheme{
logger: logger,
blockFilter: blockFilter,
fetcher: fetcher,
fromBkt: from,
toBkt: to,
metrics: metrics,
reg: reg,
}
}
func (rs *replicationScheme) execute(ctx context.Context) error {
availableBlocks := []*metadata.Meta{}
metas, partials, err := rs.fetcher.Fetch(ctx)
if err != nil {
return err
}
for id := range partials {
level.Info(rs.logger).Log("msg", "block meta not uploaded yet. Skipping.", "block_uuid", id.String())
}
for id, meta := range metas {
if rs.blockFilter(meta) {
level.Info(rs.logger).Log("msg", "adding block to be replicated", "block_uuid", id.String())
availableBlocks = append(availableBlocks, meta)
}
}
// In order to prevent races in compactions by the target environment, we
// need to replicate oldest start timestamp first.
sort.Slice(availableBlocks, func(i, j int) bool {
return availableBlocks[i].BlockMeta.MinTime < availableBlocks[j].BlockMeta.MinTime
})
for _, b := range availableBlocks {
if err := rs.ensureBlockIsReplicated(ctx, b.BlockMeta.ULID); err != nil {
return errors.Wrapf(err, "ensure block %v is replicated", b.BlockMeta.ULID.String())
}
}
return nil
}
// ensureBlockIsReplicated ensures that a block present in the origin bucket is
// present in the target bucket.
func (rs *replicationScheme) ensureBlockIsReplicated(ctx context.Context, id ulid.ULID) error {
blockID := id.String()
chunksDir := path.Join(blockID, thanosblock.ChunksDirname)
indexFile := path.Join(blockID, thanosblock.IndexFilename)
metaFile := path.Join(blockID, thanosblock.MetaFilename)
level.Debug(rs.logger).Log("msg", "ensuring block is replicated", "block_uuid", blockID)
originMetaFile, err := rs.fromBkt.ReaderWithExpectedErrs(rs.fromBkt.IsObjNotFoundErr).Get(ctx, metaFile)
if err != nil {
return errors.Wrap(err, "get meta file from origin bucket")
}
defer runutil.CloseWithLogOnErr(rs.logger, originMetaFile, "close original meta file")
targetMetaFile, err := rs.toBkt.Get(ctx, metaFile)
if targetMetaFile != nil {
defer runutil.CloseWithLogOnErr(rs.logger, targetMetaFile, "close target meta file")
}
if err != nil && !rs.toBkt.IsObjNotFoundErr(err) && err != io.EOF {
return errors.Wrap(err, "get meta file from target bucket")
}
// TODO(bwplotka): Allow injecting custom labels as shipper does.
originMetaFileContent, err := io.ReadAll(originMetaFile)
if err != nil {
return errors.Wrap(err, "read origin meta file")
}
if targetMetaFile != nil && !rs.toBkt.IsObjNotFoundErr(err) {
targetMetaFileContent, err := io.ReadAll(targetMetaFile)
if err != nil {
return errors.Wrap(err, "read target meta file")
}
if bytes.Equal(originMetaFileContent, targetMetaFileContent) {
// If the origin meta file content and target meta file content is
// equal, we know we have already successfully replicated
// previously.
level.Debug(rs.logger).Log("msg", "skipping block as already replicated", "block_uuid", blockID)
rs.metrics.blocksAlreadyReplicated.Inc()
return nil
}
}
if err := rs.fromBkt.Iter(ctx, chunksDir, func(objectName string) error {
err := rs.ensureObjectReplicated(ctx, objectName)
if err != nil {
return errors.Wrapf(err, "replicate object %v", objectName)
}
return nil
}); err != nil {
return err
}
if err := rs.ensureObjectReplicated(ctx, indexFile); err != nil {
return errors.Wrap(err, "replicate index file")
}
level.Debug(rs.logger).Log("msg", "replicating meta file", "object", metaFile)
if err := rs.toBkt.Upload(ctx, metaFile, bytes.NewBuffer(originMetaFileContent)); err != nil {
return errors.Wrap(err, "upload meta file")
}
rs.metrics.blocksReplicated.Inc()
return nil
}
// ensureBlockIsReplicated ensures that an object present in the origin bucket
// is present in the target bucket.
func (rs *replicationScheme) ensureObjectReplicated(ctx context.Context, objectName string) error {
level.Debug(rs.logger).Log("msg", "ensuring object is replicated", "object", objectName)
exists, err := rs.toBkt.Exists(ctx, objectName)
if err != nil {
return errors.Wrapf(err, "check if %v exists in target bucket", objectName)
}
// skip if already exists.
if exists {
level.Debug(rs.logger).Log("msg", "skipping object as already replicated", "object", objectName)
return nil
}
level.Debug(rs.logger).Log("msg", "object not present in target bucket, replicating", "object", objectName)
r, err := rs.fromBkt.Get(ctx, objectName)
if err != nil {
return errors.Wrapf(err, "get %v from origin bucket", objectName)
}
defer r.Close()
if err = rs.toBkt.Upload(ctx, objectName, r); err != nil {
return errors.Wrapf(err, "upload %v to target bucket", objectName)
}
level.Info(rs.logger).Log("msg", "object replicated", "object", objectName)
rs.metrics.objectsReplicated.Inc()
return nil
}