diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go new file mode 100644 index 00000000000..c5b0263f736 --- /dev/null +++ b/pkg/receive/hashring.go @@ -0,0 +1,72 @@ +package receive + +import ( + "errors" + "sort" + + "github.com/improbable-eng/thanos/pkg/store/prompb" + + "github.com/cespare/xxhash" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +const sep = '\xff' + +// Hashring finds the correct host to handle a given time series +// for a specified tenant. +// It returns the hostname and any error encountered. +type Hashring interface { + GetHost(string, *prompb.TimeSeries) (string, error) +} + +// hash returns a hash for the given tenant and time series. +func hash(tenant string, ts *prompb.TimeSeries) uint64 { + // Sort labelset to ensure a stable hash. + sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) + + b := make([]byte, 0, 1024) + b = append(b, []byte(tenant)...) + for _, v := range ts.Labels { + b = append(b, v.Name...) + b = append(b, sep) + b = append(b, v.Value...) + b = append(b, sep) + } + return xxhash.Sum64(b) +} + +// simpleHashring represents a group of hosts handling write requests. +type simpleHashring struct { + targetgroup.Group +} + +// GetHost returns a hostname to handle the given tenant and time series. +func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { + // Always return nil here to implement the Hashring interface. + return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil +} + +// multiTenantHashring represents a set of Hashrings for different tenants. +type multiTenantHashring map[string]Hashring + +// GetHost returns a hostname to handle the given tenant and time series. +func (m multiTenantHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { + if h, ok := m[tenant]; ok { + return h.GetHost(tenant, ts) + } + if h, ok := m[""]; ok { + return h.GetHost(tenant, ts) + } + return "", errors.New("no matching hosts to handle tenant") +} + +// NewHashring creates a multi-tenant hashring for a given +// configuration that maps tenants to groups of hosts. +func NewHashring(groups []*targetgroup.Group) Hashring { + m := make(multiTenantHashring) + for _, g := range groups { + m[g.Source] = &simpleHashring{*g} + } + return m +} diff --git a/pkg/receive/hashring_test.go b/pkg/receive/hashring_test.go new file mode 100644 index 00000000000..4eaa0f04990 --- /dev/null +++ b/pkg/receive/hashring_test.go @@ -0,0 +1,253 @@ +package receive + +import ( + "testing" + + "github.com/improbable-eng/thanos/pkg/store/prompb" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +func TestHash(t *testing.T) { + ts := &prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "qux", + }, + }, + } + + ts2 := &prompb.TimeSeries{ + Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]}, + } + + if hash("", ts) != hash("", ts2) { + t.Errorf("expected hashes to be independent of label order") + } +} + +func TestGetHost(t *testing.T) { + ts := &prompb.TimeSeries{ + Labels: []prompb.Label{ + { + Name: "foo", + Value: "bar", + }, + { + Name: "baz", + Value: "qux", + }, + }, + } + + for _, tc := range []struct { + name string + cfg []*targetgroup.Group + hosts map[string]struct{} + tenant string + }{ + { + name: "empty", + cfg: []*targetgroup.Group{}, + tenant: "tenant1", + }, + { + name: "simple", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "", + }, + }, + hosts: map[string]struct{}{"host1": struct{}{}}, + tenant: "tenant1", + }, + { + name: "specific", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant1", + }, + }, + hosts: map[string]struct{}{"host2": struct{}{}}, + tenant: "tenant1", + }, + { + name: "many tenants", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant2", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant3", + }, + }, + hosts: map[string]struct{}{"host1": struct{}{}}, + tenant: "tenant1", + }, + { + name: "many tenants error", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host2", + }, + }, + Source: "tenant2", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant3", + }, + }, + tenant: "tenant4", + }, + { + name: "many hosts", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + model.LabelSet{ + model.AddressLabel: "host2", + }, + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host4", + }, + model.LabelSet{ + model.AddressLabel: "host5", + }, + model.LabelSet{ + model.AddressLabel: "host6", + }, + }, + Source: "", + }, + }, + hosts: map[string]struct{}{ + "host1": struct{}{}, + "host2": struct{}{}, + "host3": struct{}{}, + }, + tenant: "tenant1", + }, + { + name: "many hosts 2", + cfg: []*targetgroup.Group{ + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host1", + }, + model.LabelSet{ + model.AddressLabel: "host2", + }, + model.LabelSet{ + model.AddressLabel: "host3", + }, + }, + Source: "tenant1", + }, + { + Targets: []model.LabelSet{ + model.LabelSet{ + model.AddressLabel: "host4", + }, + model.LabelSet{ + model.AddressLabel: "host5", + }, + model.LabelSet{ + model.AddressLabel: "host6", + }, + }, + Source: "", + }, + }, + hosts: map[string]struct{}{ + "host4": struct{}{}, + "host5": struct{}{}, + "host6": struct{}{}, + }, + tenant: "tenant2", + }, + } { + hs := NewHashring(tc.cfg) + h, err := hs.GetHost(tc.tenant, ts) + if tc.hosts != nil { + if err != nil { + t.Errorf("case %q: got unexpected error: %v", tc.name, err) + continue + } + if _, ok := tc.hosts[h]; !ok { + t.Errorf("case %q: got unexpected host %q", tc.name, h) + } + continue + } + if err == nil { + t.Errorf("case %q: expected error", tc.name) + } + } +}