Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/receive: basic hashring support #1217

Merged
merged 2 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.0.0-20181022225951-5152f14ace1c
github.com/NYTimes/gziphandler v1.1.1
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/cespare/xxhash v1.1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect choice 👍

let's document that we're using this hash in the design doc/proposal

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still to be addressed

Copy link
Member Author

@squat squat Jun 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

github.com/fatih/structtag v1.0.0
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.7
Expand Down
114 changes: 114 additions & 0 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package receive

import (
"errors"
"sort"

"github.com/improbable-eng/thanos/pkg/store/prompb"

"github.com/cespare/xxhash"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

const sep = '\xff'

// Hashring finds the correct host to handle a given time series
// for a specified tenant.
// It returns the hostname and any error encountered.
type Hashring interface {
GetHost(tenant string, timeSeries *prompb.TimeSeries) (string, error)
}

// Matcher determines whether or tenant matches a hashring.
type Matcher interface {
Match(tenant string, hashring string) bool
}

// MultiMatcher is a list of Matchers that implements the Matcher interface.
type MultiMatcher []Matcher

// Match implements the Matcher interface.
func (m MultiMatcher) Match(tenant, hashring string) bool {
for i := range m {
if !m[i].Match(tenant, hashring) {
return false
}
}
return true
}

// MatcherFunc is a shim to use a func as a Matcher.
type MatcherFunc func(string, string) bool

// Match implements the Matcher interface.
func (m MatcherFunc) Match(tenant, hashring string) bool {
return m(tenant, hashring)
}

// ExactMatcher is a matcher that checks if the tenant exactly matches the hashring name.
var ExactMatcher = MatcherFunc(func(tenant, hashring string) bool { return tenant == hashring })

// hash returns a hash for the given tenant and time series.
func hash(tenant string, ts *prompb.TimeSeries) uint64 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing to do, just a side note to think about for the future: we probably want to at some point cache this, as sorting + hashing seems rather expensive, plus we can probably re-use the byte slices created for these hashes, but let's leave these optimizations for later until we actually measured that they are necessary

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we'll probably want to make a buffer pool to keep allocations low. caching may be expensive as there can easily tens of millions of time series + tenant combinations

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah optimizing here now would be premature, once it becomes a problem we can try out the different strategies and prove with numbers what's best, nothing to do for now

// Sort labelset to ensure a stable hash.
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name })

b := make([]byte, 0, 1024)
b = append(b, []byte(tenant)...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a separator at the end, otherwise with clever label names collisions could be engineered

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch here

b = append(b, sep)
for _, v := range ts.Labels {
b = append(b, v.Name...)
b = append(b, sep)
b = append(b, v.Value...)
b = append(b, sep)
}
return xxhash.Sum64(b)
}

// simpleHashring represents a group of hosts handling write requests.
type simpleHashring struct {
targetgroup.Group
}

// GetHost returns a hostname to handle the given tenant and time series.
func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) {
// Always return nil here to implement the Hashring interface.
return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil
}

// matchingHashring represents a set of hashrings.
// Which hashring to use is determined by the matcher.
type matchingHashring struct {
cache map[string]Hashring
hashrings map[string]Hashring
matcher Matcher
}

// GetHost returns a hostname to handle the given tenant and time series.
func (m matchingHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) {
if h, ok := m.cache[tenant]; ok {
return h.GetHost(tenant, ts)
}
for name := range m.hashrings {
if m.matcher.Match(tenant, name) {
m.cache[tenant] = m.hashrings[name]
return m.hashrings[name].GetHost(tenant, ts)
}
}
return "", errors.New("no matching hosts to handle tenant")
}

// NewHashring creates a multi-tenant hashring for a given slice of
// groups. Which tenant's hashring to use is determined by the Matcher.
func NewHashring(matcher Matcher, groups []*targetgroup.Group) Hashring {
m := matchingHashring{
cache: make(map[string]Hashring),
hashrings: make(map[string]Hashring),
matcher: matcher,
}
for _, g := range groups {
m.hashrings[g.Source] = &simpleHashring{*g}
}
return m
}
249 changes: 249 additions & 0 deletions pkg/receive/hashring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package receive

import (
"testing"

"github.com/improbable-eng/thanos/pkg/store/prompb"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
)

func TestHash(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

ts2 := &prompb.TimeSeries{
Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]},
}

if hash("", ts) != hash("", ts2) {
t.Errorf("expected hashes to be independent of label order")
}
}

func TestGetHost(t *testing.T) {
ts := &prompb.TimeSeries{
Labels: []prompb.Label{
{
Name: "foo",
Value: "bar",
},
{
Name: "baz",
Value: "qux",
},
},
}

for _, tc := range []struct {
name string
cfg []*targetgroup.Group
hosts map[string]struct{}
tenant string
}{
{
name: "empty",
cfg: []*targetgroup.Group{},
tenant: "tenant1",
},
{
name: "simple",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
},
},
hosts: map[string]struct{}{"host1": struct{}{}},
},
{
name: "specific",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant1",
},
},
hosts: map[string]struct{}{"host2": struct{}{}},
tenant: "tenant1",
},
{
name: "many tenants",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant2",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant3",
},
},
hosts: map[string]struct{}{"host1": struct{}{}},
tenant: "tenant1",
},
{
name: "many tenants error",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host2",
},
},
Source: "tenant2",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant3",
},
},
tenant: "tenant4",
},
{
name: "many hosts",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
model.LabelSet{
model.AddressLabel: "host2",
},
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host4",
},
model.LabelSet{
model.AddressLabel: "host5",
},
model.LabelSet{
model.AddressLabel: "host6",
},
},
Source: "",
},
},
hosts: map[string]struct{}{
"host1": struct{}{},
"host2": struct{}{},
"host3": struct{}{},
},
tenant: "tenant1",
},
{
name: "many hosts 2",
cfg: []*targetgroup.Group{
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host1",
},
model.LabelSet{
model.AddressLabel: "host2",
},
model.LabelSet{
model.AddressLabel: "host3",
},
},
Source: "tenant1",
},
{
Targets: []model.LabelSet{
model.LabelSet{
model.AddressLabel: "host4",
},
model.LabelSet{
model.AddressLabel: "host5",
},
model.LabelSet{
model.AddressLabel: "host6",
},
},
},
},
hosts: map[string]struct{}{
"host4": struct{}{},
"host5": struct{}{},
"host6": struct{}{},
},
},
} {
hs := NewHashring(ExactMatcher, tc.cfg)
h, err := hs.GetHost(tc.tenant, ts)
if tc.hosts != nil {
if err != nil {
t.Errorf("case %q: got unexpected error: %v", tc.name, err)
continue
}
if _, ok := tc.hosts[h]; !ok {
t.Errorf("case %q: got unexpected host %q", tc.name, h)
}
continue
}
if err == nil {
t.Errorf("case %q: expected error", tc.name)
}
}
}