Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
325 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package receive | ||
|
||
import ( | ||
"errors" | ||
"sort" | ||
|
||
"github.com/improbable-eng/thanos/pkg/store/prompb" | ||
|
||
"github.com/cespare/xxhash" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/discovery/targetgroup" | ||
) | ||
|
||
const sep = '\xff' | ||
|
||
// Hashring finds the correct host to handle a given time series | ||
// for a specified tenant. | ||
// It returns the hostname and any error encountered. | ||
type Hashring interface { | ||
GetHost(string, *prompb.TimeSeries) (string, error) | ||
} | ||
|
||
// hash returns a hash for the given tenant and time series. | ||
func hash(tenant string, ts *prompb.TimeSeries) uint64 { | ||
// Sort labelset to ensure a stable hash. | ||
sort.Slice(ts.Labels, func(i, j int) bool { return ts.Labels[i].Name < ts.Labels[j].Name }) | ||
|
||
b := make([]byte, 0, 1024) | ||
b = append(b, []byte(tenant)...) | ||
for _, v := range ts.Labels { | ||
b = append(b, v.Name...) | ||
b = append(b, sep) | ||
b = append(b, v.Value...) | ||
b = append(b, sep) | ||
} | ||
return xxhash.Sum64(b) | ||
} | ||
|
||
// simpleHashring represents a group of hosts handling write requests. | ||
type simpleHashring struct { | ||
targetgroup.Group | ||
} | ||
|
||
// GetHost returns a hostname to handle the given tenant and time series. | ||
func (s *simpleHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { | ||
// Always return nil here to implement the Hashring interface. | ||
return string(s.Targets[hash(tenant, ts)%uint64(len(s.Targets))][model.AddressLabel]), nil | ||
} | ||
|
||
// multiTenantHashring represents a set of Hashrings for different tenants. | ||
type multiTenantHashring map[string]Hashring | ||
|
||
// GetHost returns a hostname to handle the given tenant and time series. | ||
func (m multiTenantHashring) GetHost(tenant string, ts *prompb.TimeSeries) (string, error) { | ||
if h, ok := m[tenant]; ok { | ||
return h.GetHost(tenant, ts) | ||
} | ||
if h, ok := m[""]; ok { | ||
return h.GetHost(tenant, ts) | ||
} | ||
return "", errors.New("no matching hosts to handle tenant") | ||
} | ||
|
||
// NewHashring creates a multi-tenant hashring for a given | ||
// configuration that maps tenants to groups of hosts. | ||
func NewHashring(groups []*targetgroup.Group) Hashring { | ||
m := make(multiTenantHashring) | ||
for _, g := range groups { | ||
m[g.Source] = &simpleHashring{*g} | ||
} | ||
return m | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
package receive | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/improbable-eng/thanos/pkg/store/prompb" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/prometheus/discovery/targetgroup" | ||
) | ||
|
||
func TestHash(t *testing.T) { | ||
ts := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ | ||
{ | ||
Name: "foo", | ||
Value: "bar", | ||
}, | ||
{ | ||
Name: "baz", | ||
Value: "qux", | ||
}, | ||
}, | ||
} | ||
|
||
ts2 := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ts.Labels[1], ts.Labels[0]}, | ||
} | ||
|
||
if hash("", ts) != hash("", ts2) { | ||
t.Errorf("expected hashes to be independent of label order") | ||
} | ||
} | ||
|
||
func TestGetHost(t *testing.T) { | ||
ts := &prompb.TimeSeries{ | ||
Labels: []prompb.Label{ | ||
{ | ||
Name: "foo", | ||
Value: "bar", | ||
}, | ||
{ | ||
Name: "baz", | ||
Value: "qux", | ||
}, | ||
}, | ||
} | ||
|
||
for _, tc := range []struct { | ||
name string | ||
cfg []*targetgroup.Group | ||
hosts map[string]struct{} | ||
tenant string | ||
}{ | ||
{ | ||
name: "empty", | ||
cfg: []*targetgroup.Group{}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "simple", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host1": struct{}{}}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "specific", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host2": struct{}{}}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many tenants", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant2", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant3", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{"host1": struct{}{}}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many tenants error", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
}, | ||
Source: "tenant2", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant3", | ||
}, | ||
}, | ||
tenant: "tenant4", | ||
}, | ||
{ | ||
name: "many hosts", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host4", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host5", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host6", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{ | ||
"host1": struct{}{}, | ||
"host2": struct{}{}, | ||
"host3": struct{}{}, | ||
}, | ||
tenant: "tenant1", | ||
}, | ||
{ | ||
name: "many hosts 2", | ||
cfg: []*targetgroup.Group{ | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host1", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host2", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host3", | ||
}, | ||
}, | ||
Source: "tenant1", | ||
}, | ||
{ | ||
Targets: []model.LabelSet{ | ||
model.LabelSet{ | ||
model.AddressLabel: "host4", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host5", | ||
}, | ||
model.LabelSet{ | ||
model.AddressLabel: "host6", | ||
}, | ||
}, | ||
Source: "", | ||
}, | ||
}, | ||
hosts: map[string]struct{}{ | ||
"host4": struct{}{}, | ||
"host5": struct{}{}, | ||
"host6": struct{}{}, | ||
}, | ||
tenant: "tenant2", | ||
}, | ||
} { | ||
hs := NewHashring(tc.cfg) | ||
h, err := hs.GetHost(tc.tenant, ts) | ||
if tc.hosts != nil { | ||
if err != nil { | ||
t.Errorf("case %q: got unexpected error: %v", tc.name, err) | ||
continue | ||
} | ||
if _, ok := tc.hosts[h]; !ok { | ||
t.Errorf("case %q: got unexpected host %q", tc.name, h) | ||
} | ||
continue | ||
} | ||
if err == nil { | ||
t.Errorf("case %q: expected error", tc.name) | ||
} | ||
} | ||
} |