Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added hashmod support for store sharding. #2172

Merged
merged 1 commit into from Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 15 additions & 6 deletions pkg/block/fetcher.go
Expand Up @@ -391,7 +391,7 @@ func (f *TimePartitionMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, syn

var _ MetaFetcherFilter = (&LabelShardedMetaFilter{}).Filter

// LabelShardedMetaFilter is a MetaFetcher filter that filters out blocks that have no labels after relabelling.
// LabelShardedMetaFilter represents struct that allows sharding.
type LabelShardedMetaFilter struct {
relabelConfig []*relabel.Config
}
Expand All @@ -401,14 +401,23 @@ func NewLabelShardedMetaFilter(relabelConfig []*relabel.Config) *LabelShardedMet
return &LabelShardedMetaFilter{relabelConfig: relabelConfig}
}

// Filter filters out blocks that filters blocks that have no labels after relabelling.
// Special label that will have an ULID of the meta.json being referenced to.
const blockIDLabel = "__block_id"
bwplotka marked this conversation as resolved.
Show resolved Hide resolved

// Filter filters out blocks that have no labels after relabelling of each block external (Thanos) labels.
func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
var lbls labels.Labels
for id, m := range metas {
if processedLabels := relabel.Process(labels.FromMap(m.Thanos.Labels), f.relabelConfig...); processedLabels != nil {
continue
lbls = lbls[:0]
lbls = append(lbls, labels.Label{Name: blockIDLabel, Value: id.String()})
for k, v := range m.Thanos.Labels {
lbls = append(lbls, labels.Label{Name: k, Value: v})
}

if processedLabels := relabel.Process(lbls, f.relabelConfig...); len(processedLabels) == 0 {
synced.WithLabelValues(labelExcludedMeta).Inc()
delete(metas, id)
}
synced.WithLabelValues(labelExcludedMeta).Inc()
delete(metas, id)
}
}

Expand Down
100 changes: 99 additions & 1 deletion pkg/block/fetcher_test.go
Expand Up @@ -278,7 +278,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
})
}

func TestLabelShardedMetaFilter_Filter(t *testing.T) {
func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
relabelContentYaml := `
- action: drop
regex: "A"
Expand Down Expand Up @@ -340,6 +340,104 @@ func TestLabelShardedMetaFilter_Filter(t *testing.T) {

}

func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
relabelContentYamlFmt := `
- action: hashmod
source_labels: ["%s"]
target_label: shard
modulus: 3
- action: keep
source_labels: ["shard"]
regex: %d
`
for i := 0; i < 3; i++ {
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
var relabelConfig []*relabel.Config
testutil.Ok(t, yaml.Unmarshal([]byte(fmt.Sprintf(relabelContentYamlFmt, blockIDLabel, i)), &relabelConfig))

f := NewLabelShardedMetaFilter(relabelConfig)

input := map[ulid.ULID]*metadata.Meta{
ULID(1): {
Thanos: metadata.Thanos{
Labels: map[string]string{"cluster": "B", "message": "keepme"},
},
},
ULID(2): {
Thanos: metadata.Thanos{
Labels: map[string]string{"something": "A", "message": "keepme"},
},
},
ULID(3): {
Thanos: metadata.Thanos{
Labels: map[string]string{"cluster": "A", "message": "keepme"},
},
},
ULID(4): {
Thanos: metadata.Thanos{
Labels: map[string]string{"cluster": "A", "something": "B", "message": "keepme"},
},
},
ULID(5): {
Thanos: metadata.Thanos{
Labels: map[string]string{"cluster": "B"},
},
},
ULID(6): {
Thanos: metadata.Thanos{
Labels: map[string]string{"cluster": "B", "message": "keepme"},
},
},
ULID(7): {},
ULID(8): {},
ULID(9): {},
ULID(10): {},
ULID(11): {},
ULID(12): {},
ULID(13): {},
ULID(14): {},
ULID(15): {},
}
expected := map[ulid.ULID]*metadata.Meta{}
switch i {
case 0:
expected = map[ulid.ULID]*metadata.Meta{
ULID(2): input[ULID(2)],
ULID(6): input[ULID(6)],
ULID(11): input[ULID(11)],
ULID(13): input[ULID(13)],
}
case 1:
expected = map[ulid.ULID]*metadata.Meta{
ULID(5): input[ULID(5)],
ULID(7): input[ULID(7)],
ULID(10): input[ULID(10)],
ULID(12): input[ULID(12)],
ULID(14): input[ULID(14)],
ULID(15): input[ULID(15)],
}
case 2:
expected = map[ulid.ULID]*metadata.Meta{
ULID(1): input[ULID(1)],
ULID(3): input[ULID(3)],
ULID(4): input[ULID(4)],
ULID(8): input[ULID(8)],
ULID(9): input[ULID(9)],
}
}
deleted := len(input) - len(expected)

synced := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"state"})
f.Filter(input, synced, false)

testutil.Equals(t, expected, input)
testutil.Equals(t, float64(deleted), promtest.ToFloat64(synced.WithLabelValues(labelExcludedMeta)))

})

}
}

func TestTimePartitionMetaFilter_Filter(t *testing.T) {
mint := time.Unix(0, 1*time.Millisecond.Nanoseconds())
maxt := time.Unix(0, 10*time.Millisecond.Nanoseconds())
Expand Down