Skip to content

Commit

Permalink
Move handling changes from startTSDBAndUpload to runReceive + add som…
Browse files Browse the repository at this point in the history
…e logs to see the flow + hashring config example

Signed-off-by: haanhvu <haanh6594@gmail.com>
  • Loading branch information
haanhvu committed Nov 18, 2022
1 parent 8df908f commit d66c778
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 9 deletions.
59 changes: 50 additions & 9 deletions cmd/thanos/receive.go
Expand Up @@ -234,6 +234,12 @@ func runReceive(
hashFunc,
cf,
)

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "1. External label initialized: "+el)
}
}
} else {
if len(conf.hashringsFileContent) > 0 {
var err error
Expand Down Expand Up @@ -332,15 +338,15 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm), cw); err != nil {
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
return err
}
}
}

level.Debug(logger).Log("msg", "setting up hashring")
{
if err := setupHashring(g, logger, reg, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, updates, cw, cf); err != nil {
if err := setupHashring(g, logger, conf, hashringChangedChan, webHandler, statusProber, enableIngestion, updates, cw, cf, dbs); err != nil {
return err
}
}
Expand Down Expand Up @@ -496,7 +502,6 @@ func runReceive(
// If no hashring is provided, we setup a single node hashring with local endpoint.
func setupHashring(g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
conf *receiveConfig,
hashringChangedChan chan struct{},
webHandler *receive.Handler,
Expand All @@ -505,6 +510,7 @@ func setupHashring(g *run.Group,
updates chan receive.Hashring,
cw *receive.ConfigWatcher,
cf []receive.HashringConfig,
dbs *receive.MultiTSDB,
) error {
algorithm := receive.HashringAlgorithm(conf.hashringsAlgorithm)

Expand Down Expand Up @@ -558,7 +564,28 @@ func setupHashring(g *run.Group,
if !ok {
return nil
}

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "2. External label before reloading config: "+el)
}
}

webHandler.Hashring(h)
cf, err := cw.ValidateConfig()
if err != nil {
cw.Stop()
close(updates)
return errors.Wrap(err, "failed to validate hashring configuration file")
}
dbs.SetHashringConfig(cf)

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "3. External label after reloading config: "+el)
}
}

// If ingestion is enabled, send a signal to TSDB to flush.
if enableIngestion {
hashringChangedChan <- struct{}{}
Expand Down Expand Up @@ -590,7 +617,6 @@ func startTSDBAndUpload(g *run.Group,
statusProber prober.Probe,
bkt objstore.Bucket,
hashringAlgorithm receive.HashringAlgorithm,
cw *receive.ConfigWatcher,
) error {

log.With(logger, "component", "storage")
Expand Down Expand Up @@ -653,18 +679,33 @@ func startTSDBAndUpload(g *run.Group,

level.Info(logger).Log("msg", "updating storage")
dbUpdatesStarted.Inc()

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "4. External label before flush: "+el)
}
}

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
cf, err := cw.ValidateConfig()
if err != nil {
cw.Stop()
return errors.Wrap(err, "failed to validate hashring configuration file")

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "5. External label after flush and before open: "+el)
}
}
dbs.SetHashringConfig(cf)

if err := dbs.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}

for _, hc := range dbs.GetHashringConfig() {
for _, el := range hc.ExternalLabels {
level.Info(logger).Log("msg", "6. External label after open: "+el)
}
}

if upload {
uploadC <- struct{}{}
<-uploadDone
Expand Down
8 changes: 8 additions & 0 deletions hashring.json
@@ -0,0 +1,8 @@
[
{
"hashring": "tenant-a",
"endpoints": ["127.0.0.1:10901"],
"tenants": ["tenant-a"],
"external_labels": ["key1=value1", "key2=value2"]
}
]
4 changes: 4 additions & 0 deletions pkg/receive/multitsdb.go
Expand Up @@ -608,6 +608,10 @@ func (t *MultiTSDB) SetHashringConfig(cfg []HashringConfig) {
t.hashringConfigs = cfg
}

func (t *MultiTSDB) GetHashringConfig() []HashringConfig {
return t.hashringConfigs
}

// ErrNotReady is returned if the underlying storage is not ready yet.
var ErrNotReady = errors.New("TSDB not ready")

Expand Down

0 comments on commit d66c778

Please sign in to comment.