Skip to content

Commit

Permalink
cmd/thanos/compact: add bucket UI
Browse files Browse the repository at this point in the history
This commit enhances the compact component so that it runs the bucket UI
whenever the --wait flag is also passed. In order to reduce the overhead
of running the UI in addition to the compactor, this commit also
introduces an abstraction around downloading block meta files allowing
the metadata to be downloaded once and cached. This ensures that the
compactor does not unnecessarily download every metadata file twice.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat committed Nov 4, 2019
1 parent 3bd1a65 commit 94881dd
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1573](https://github.com/thanos-io/thanos/pull/1573) `AliYun OSS` object storage, see [documents](docs/storage.md#aliyun-oss) for further information.
- [#1680](https://github.com/thanos-io/thanos/pull/1680) Add a new `--http-grace-period` CLI option to components which serve HTTP to set how long to wait until HTTP Server shuts down.
- [#1712](https://github.com/thanos-io/thanos/pull/1712) Rename flag on bucket web component from `--listen` to `--http-address` to match other components.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.

### Fixed

Expand Down
37 changes: 18 additions & 19 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,6 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[name+" web"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
ctx, cancel := context.WithCancel(context.Background())

statusProber := prober.NewProber(component.Bucket, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
srv := httpserver.New(logger, reg, component.Bucket, statusProber,
Expand All @@ -342,8 +340,20 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
level.Warn(logger).Log("msg", "Refresh interval should be at least 2 times the timeout")
}

confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Bucket.String())
if err != nil {
return errors.Wrap(err, "bucket client")
}

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return refresh(ctx, logger, bucketUI, *interval, *timeout, name, reg, objStoreConfig)
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
return refresh(ctx, logger, bucketUI, *interval, *timeout, bkt, block.MetaDownloaderFn(block.DownloadMeta))
}, func(error) {
cancel()
})
Expand All @@ -354,25 +364,14 @@ func registerBucketWeb(m map[string]setupFunc, root *kingpin.CmdClause, name str
}
}

// refresh metadata from remote storage periodically and update UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, name string, reg *prometheus.Registry, objStoreConfig *extflag.PathOrContent) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, name)
if err != nil {
return errors.Wrap(err, "bucket client")
}

defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
// refresh metadata from remote storage periodically and update the UI.
func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, duration time.Duration, timeout time.Duration, bkt objstore.Bucket, metaDownloader block.MetaDownloader) error {
return runutil.Repeat(duration, ctx.Done(), func() error {
return runutil.RetryWithLog(logger, time.Minute, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(ctx, timeout)
defer iterCancel()

blocks, err := download(iterCtx, logger, bkt)
blocks, err := download(iterCtx, logger, bkt, metaDownloader)
if err != nil {
bucketUI.Set("[]", err)
return err
Expand All @@ -389,7 +388,7 @@ func refresh(ctx context.Context, logger log.Logger, bucketUI *ui.Bucket, durati
})
}

func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (blocks []metadata.Meta, err error) {
func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket, metaDownloader block.MetaDownloader) (blocks []metadata.Meta, err error) {
level.Info(logger).Log("msg", "synchronizing block metadata")

if err = bkt.Iter(ctx, "", func(name string) error {
Expand All @@ -398,7 +397,7 @@ func download(ctx context.Context, logger log.Logger, bkt objstore.Bucket) (bloc
return nil
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
meta, err := metaDownloader.DownloadMeta(ctx, logger, bkt, id)
if err != nil {
return err
}
Expand Down
28 changes: 26 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/ui"
"gopkg.in/alecthomas/kingpin.v2"
)

Expand Down Expand Up @@ -113,6 +116,8 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

label := cmd.Flag("label", "Prometheus label to use as timeline title").String()

m[component.Compact.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runCompact(g, logger, reg,
*httpAddr,
Expand All @@ -135,6 +140,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {
*blockSyncConcurrency,
*compactionConcurrency,
selectorRelabelConf,
*label,
)
}
}
Expand All @@ -159,6 +165,7 @@ func runCompact(
blockSyncConcurrency int,
concurrency int,
selectorRelabelConf *extflag.PathOrContent,
label string,
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Expand Down Expand Up @@ -211,8 +218,9 @@ func runCompact(
}
}()

metaDownloader := block.NewCachingMetaDownloader(block.MetaDownloaderFn(block.DownloadMeta))
sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, relabelConfig)
blockSyncConcurrency, acceptMalformedIndex, relabelConfig, metaDownloader)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -295,6 +303,9 @@ func runCompact(
return nil
}

// Compaction and bucket refresh interval.
interval := 5 * time.Minute

g.Add(func() error {
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")

Expand All @@ -310,7 +321,7 @@ func runCompact(
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
return runutil.Repeat(interval, ctx.Done(), func() error {
err := f()
if err == nil {
return nil
Expand Down Expand Up @@ -343,6 +354,19 @@ func runCompact(
cancel()
})

if wait {
router := route.New()
bucketUI := ui.NewBucketUI(logger, label)
bucketUI.Register(router, extpromhttp.NewInstrumentationMiddleware(reg))
srv.Handle("/", router)

g.Add(func() error {
return refresh(ctx, logger, bucketUI, interval, time.Minute, bkt, metaDownloader)
}, func(error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting compact node")
statusProber.SetReady()
return nil
Expand Down
1 change: 1 addition & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,6 @@ Flags:
selecting blocks. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--label=LABEL Prometheus label to use as timeline title
```
47 changes: 47 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"

"github.com/go-kit/kit/log/level"

Expand Down Expand Up @@ -188,3 +189,49 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
id, err := ulid.Parse(filepath.Base(path))
return id, err == nil
}

// MetaDownloader abstracts anything that can download block metas.
type MetaDownloader interface {
DownloadMeta(context.Context, log.Logger, objstore.Bucket, ulid.ULID) (metadata.Meta, error)
}

// MetaDownloaderFn turns any func that downloads metas into a MetaDownloader.
type MetaDownloaderFn func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error)

// DownloadMeta implements the MetaDownloader interface.
func (m MetaDownloaderFn) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
return m(ctx, logger, bkt, id)
}

// CachingMetaDownloader is a MetaDownloader that can cache metas.
type CachingMetaDownloader struct {
sync.RWMutex
next MetaDownloader
metas map[ulid.ULID]metadata.Meta
}

// NewCachingMetaDownloader creates a new MetaDownloader that can cache metas
// and uses the given MetaDownloader when something is not in the cache.
func NewCachingMetaDownloader(next MetaDownloader) MetaDownloader {
return &CachingMetaDownloader{
next: next,
metas: make(map[ulid.ULID]metadata.Meta),
}
}

// DownloadMeta implements the MetaDownloader interface.
func (c *CachingMetaDownloader) DownloadMeta(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID) (metadata.Meta, error) {
c.RLock()
m, ok := c.metas[id]
c.RUnlock()
if ok {
return m, nil
}
m, err := c.next.DownloadMeta(ctx, logger, bkt, id)
if err == nil {
c.Lock()
c.metas[id] = m
c.Unlock()
}
return m, err
}
8 changes: 5 additions & 3 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Syncer struct {
metrics *syncerMetrics
acceptMalformedIndex bool
relabelConfig []*relabel.Config
metaDownloader block.MetaDownloader
}

type syncerMetrics struct {
Expand Down Expand Up @@ -145,7 +146,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, relabelConfig []*relabel.Config, md block.MetaDownloader) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -155,6 +156,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
consistencyDelay: consistencyDelay,
blocks: map[ulid.ULID]*metadata.Meta{},
bkt: bkt,
metaDownloader: md,
metrics: newSyncerMetrics(reg),
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
Expand Down Expand Up @@ -291,7 +293,7 @@ func (c *Syncer) syncMetas(ctx context.Context) error {
func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) {
level.Debug(c.logger).Log("msg", "download meta", "block", id)

meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id)
meta, err := c.metaDownloader.DownloadMeta(ctx, c.logger, c.bkt, id)
if err != nil {
if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) {
level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id)
Expand Down Expand Up @@ -356,7 +358,7 @@ func groupKey(res int64, lbls labels.Labels) string {
}

// Groups returns the compaction groups for all blocks currently known to the syncer.
// It creates all groups from the scratch on every call.
// It creates all groups from scratch on every call.
func (c *Syncer) Groups() (res []*Group, err error) {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestSyncer_SyncMetas_e2e(t *testing.T) {
defer cancel()

relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
testutil.Ok(t, err)

// Generate 15 blocks. Initially the first 10 are synced into memory and only the last
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

// Do one initial synchronization with the bucket.
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
testutil.Ok(t, err)
testutil.Ok(t, sy.SyncMetas(ctx))

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestGroup_Compact_e2e(t *testing.T) {

reg := prometheus.NewRegistry()

sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil)
sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, nil, block.MetaDownloaderFn(block.DownloadMeta))
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestSyncer_SyncMetasFilter_e2e(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
testutil.Ok(t, err)

var ids []ulid.ULID
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/oklog/ulid"
Expand Down Expand Up @@ -79,7 +80,7 @@ func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) {

bkt := inmem.NewBucket()
relabelConfig := make([]*relabel.Config, 0)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig)
sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, relabelConfig, block.MetaDownloaderFn(block.DownloadMeta))
testutil.Ok(t, err)

// Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it.
Expand Down

0 comments on commit 94881dd

Please sign in to comment.