Skip to content

Commit

Permalink
Extract external labels from hashring configs in receive cmd and put …
Browse files Browse the repository at this point in the history
…it in multiTSDB

Signed-off-by: haanhvu <haanh6594@gmail.com>
  • Loading branch information
haanhvu committed Nov 1, 2022
1 parent 20022b9 commit 9fa5237
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 26 deletions.
38 changes: 22 additions & 16 deletions cmd/thanos/receive.go
Expand Up @@ -203,6 +203,7 @@ func runReceive(
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
nil,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)

Expand Down Expand Up @@ -256,6 +257,15 @@ func runReceive(
// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
hashringChangedChan := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up hashring")
{
if hashringConfigs, err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil {
return err
}

dbs.hashringConfigs = hashringConfigs
}

if enableIngestion {
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
Expand All @@ -270,13 +280,6 @@ func runReceive(
}
}

level.Debug(logger).Log("msg", "setting up hashring")
{
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion); err != nil {
return err
}
}

level.Debug(logger).Log("msg", "setting up HTTP server")
{
srv := httpserver.New(logger, reg, comp, httpProbe,
Expand Down Expand Up @@ -434,23 +437,24 @@ func setupHashring(g *run.Group,
webHandler *receive.Handler,
statusProber prober.Probe,
enableIngestion bool,
) error {
) []receive.HashringConfig, error {
// Note: the hashring configuration watcher
// is the sender and thus closes the chan.
// In the single-node case, which has no configuration
// watcher, we close the chan ourselves.
updates := make(chan receive.Hashring, 1)
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)
var hashringConfigs []receive.HashringConfig

// The Hashrings config file path is given initializing config watcher.
if conf.hashringsFilePath != "" {
cw, err := receive.NewConfigWatcher(log.With(logger, "component", "config-watcher"), reg, conf.hashringsFilePath, *conf.refreshInterval)
if err != nil {
return errors.Wrap(err, "failed to initialize config watcher")
return nil, errors.Wrap(err, "failed to initialize config watcher")
}

// Check the hashring configuration on before running the watcher.
if err := cw.ValidateConfig(); err != nil {
if hcs, err := cw.ValidateConfig(); err != nil {
cw.Stop()
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
Expand All @@ -459,10 +463,12 @@ func setupHashring(g *run.Group,
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
level.Info(logger).Log("msg", "the hashring initialized with config watcher.")
return receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw)
return nil, receive.HashringFromConfigWatcher(ctx, algorithm, conf.replicationFactor, updates, cw)
}, func(error) {
cancel()
})

hashringConfigs = hcs
} else {
var (
ring receive.Hashring
Expand All @@ -473,7 +479,7 @@ func setupHashring(g *run.Group,
ring, err = receive.HashringFromConfig(algorithm, conf.replicationFactor, conf.hashringsFileContent)
if err != nil {
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
return nil, errors.Wrap(err, "failed to validate hashring configuration file")
}
level.Info(logger).Log("msg", "the hashring initialized directly with the given content through the flag.")
} else {
Expand All @@ -486,7 +492,7 @@ func setupHashring(g *run.Group,
defer close(updates)
updates <- ring
<-cancel
return nil
return hashringConfigs, nil
}, func(error) {
close(cancel)
})
Expand All @@ -503,7 +509,7 @@ func setupHashring(g *run.Group,
select {
case h, ok := <-updates:
if !ok {
return nil
return hashringConfigs, nil
}
webHandler.Hashring(h)
// If ingestion is enabled, send a signal to TSDB to flush.
Expand All @@ -514,14 +520,14 @@ func setupHashring(g *run.Group,
statusProber.Ready()
}
case <-cancel:
return nil
return hashringConfigs, nil
}
}
}, func(err error) {
close(cancel)
},
)
return nil
return hashringConfigs, nil
}

// startTSDBAndUpload starts the multi-TSDB and sets up the rungroup to flush the TSDB and reload on hashring change.
Expand Down
6 changes: 3 additions & 3 deletions pkg/receive/config.go
Expand Up @@ -185,9 +185,9 @@ func (cw *ConfigWatcher) C() <-chan []HashringConfig {
}

// ValidateConfig returns an error if the configuration that's being watched is not valid.
func (cw *ConfigWatcher) ValidateConfig() error {
_, _, err := loadConfig(cw.logger, cw.path)
return err
func (cw *ConfigWatcher) ValidateConfig() ([]HashringConfig, error) {
config, _, err := loadConfig(cw.logger, cw.path)
return config, err
}

// Stop shuts down the config watcher.
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/config_test.go
Expand Up @@ -65,7 +65,7 @@ func TestValidateConfig(t *testing.T) {
testutil.Ok(t, err)
defer cw.Stop()

if err := cw.ValidateConfig(); err != nil && !errors.Is(err, tc.err) {
if _, err := cw.ValidateConfig(); err != nil && !errors.Is(err, tc.err) {
t.Errorf("case %q: got unexpected error: %v", tc.name, err)
}
})
Expand Down
34 changes: 28 additions & 6 deletions pkg/receive/multitsdb.go
Expand Up @@ -10,13 +10,15 @@ import (
"path"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -57,6 +59,7 @@ type MultiTSDB struct {
tenants map[string]*tenant
allowOutOfOrderUpload bool
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig
}

// NewMultiTSDB creates new MultiTSDB.
Expand All @@ -71,6 +74,7 @@ func NewMultiTSDB(
bucket objstore.Bucket,
allowOutOfOrderUpload bool,
hashFunc metadata.HashFunc,
hashringConfigs []HashringConfig
) *MultiTSDB {
if l == nil {
l = log.NewNopLogger()
Expand All @@ -88,6 +92,7 @@ func NewMultiTSDB(
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
hashringConfigs: hashringConfigs,
}
}

Expand Down Expand Up @@ -139,7 +144,6 @@ type tenant struct {
storeTSDB *store.TSDBStore
exemplarsTSDB *exemplars.TSDB
ship *shipper.Shipper
externalLabels HashringConfig.ExternalLabels

mtx *sync.RWMutex
}
Expand Down Expand Up @@ -182,13 +186,12 @@ func (t *tenant) shipper() *shipper.Shipper {
return t.ship
}

func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB, externalLabels HashringConfig.ExternalLabels) {
func (t *tenant) set(storeTSDB *store.TSDBStore, tenantTSDB *tsdb.DB, ship *shipper.Shipper, exemplarsTSDB *exemplars.TSDB) {
t.readyS.Set(tenantTSDB)
t.mtx.Lock()
t.storeTSDB = storeTSDB
t.ship = ship
t.exemplarsTSDB = exemplarsTSDB
t.externalLabels = HashringConfig.ExternalLabels
t.mtx.Unlock()
}

Expand Down Expand Up @@ -496,10 +499,29 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
reg = NewUnRegisterer(reg)

lset := labelpb.ExtendSortedLabels(t.labels, labels.FromStrings(t.tenantLabelName, tenantID))
tenantExternalLabels := thanos.parseFlagLabels(tenant.externalLabels)
for _, tenantInstance := range t.tenants {
lset = labelpb.ExtendSortedLabels(t.labels, tenantExternalLabels)

hcLoop:
for hc := range t.hashringConfigs {
for t := range hc.Tenants {
if t == tenantID {
var elset labels.Labels
for el := range hc.ExternalLabels {
parts := strings.SplitN(l, "=", 2)
if len(parts) != 2 {
return nil, errors.Errorf("unrecognized label %q", el)
}
if !model.LabelName.IsValid(model.LabelName(parts[0])) {
return nil, errors.Errorf("unsupported format for label %s", el)
}
elset = append(elset, labels.Label{Name: parts[0], Value: parts[1]})
}
sort.Sort(elset)
lset = labelpb.ExtendSortedLabels(lset, elset)
break hcLoop
}
}
}

dataDir := t.defaultTenantDataDir(tenantID)

level.Info(logger).Log("msg", "opening TSDB")
Expand Down

0 comments on commit 9fa5237

Please sign in to comment.