-
Notifications
You must be signed in to change notification settings - Fork 2k
/
flushable.go
83 lines (69 loc) · 2.14 KB
/
flushable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package store
import (
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
type sortingStrategy uint64
const (
sortingStrategyStore sortingStrategy = iota + 1
sortingStrategyNone
)
// flushableServer is an extension of storepb.Store_SeriesServer with a Flush method.
type flushableServer interface {
storepb.Store_SeriesServer
Flush() error
}
func newFlushableServer(
upstream storepb.Store_SeriesServer,
sortingsortingStrategy sortingStrategy,
) flushableServer {
switch sortingsortingStrategy {
case sortingStrategyStore:
return &resortingServer{Store_SeriesServer: upstream}
case sortingStrategyNone:
return &passthroughServer{Store_SeriesServer: upstream}
default:
// should not happen.
panic("unexpected sorting strategy")
}
}
// passthroughServer is a flushableServer that forwards all data to
// an upstream server without additional processing.
type passthroughServer struct {
storepb.Store_SeriesServer
}
func (p *passthroughServer) Flush() error { return nil }
// resortingServer is a flushableServer that resorts all series by their labels.
// This is required if replica labels are stored internally in a TSDB.
// Data is resorted and sent to an upstream server upon calling Flush.
type resortingServer struct {
storepb.Store_SeriesServer
series []*storepb.Series
}
func (r *resortingServer) Send(response *storepb.SeriesResponse) error {
if response.GetSeries() == nil {
return r.Store_SeriesServer.Send(response)
}
series := response.GetSeries()
labelpb.ReAllocZLabelsStrings(&series.Labels, false)
r.series = append(r.series, series)
return nil
}
func (r *resortingServer) Flush() error {
slices.SortFunc(r.series, func(a, b *storepb.Series) int {
return labels.Compare(
labelpb.ZLabelsToPromLabels(a.Labels),
labelpb.ZLabelsToPromLabels(b.Labels),
)
})
for _, response := range r.series {
if err := r.Store_SeriesServer.Send(storepb.NewSeriesResponse(response)); err != nil {
return err
}
}
return nil
}