Skip to content

Commit

Permalink
store: add consistency delay to fetch blocks
Browse files Browse the repository at this point in the history
Signed-off-by: khyatisoneji <khyatisoneji5@gmail.com>
  • Loading branch information
khyatisoneji committed Jan 17, 2020
1 parent 46a97fd commit 9e9e53f
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 85 deletions.
26 changes: 4 additions & 22 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -242,13 +242,14 @@ func runCompact(

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay).Filter,
)
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}

sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false)
garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder()
sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, garbageBlocksFinder, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -385,25 +386,6 @@ func runCompact(
return nil
}

type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error {
genIndex := prometheus.NewCounter(prometheus.CounterOpts{
Expand Down
19 changes: 19 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"fmt"
"time"

"github.com/go-kit/kit/log"
Expand All @@ -12,6 +13,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -75,6 +78,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {

selectorRelabelConf := regSelectorRelabelFlags(cmd)

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)).
Default("30m"))

m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
Expand Down Expand Up @@ -109,6 +115,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
},
selectorRelabelConf,
*advertiseCompatibilityLabel,
time.Duration(*consistencyDelay),
)
}
}
Expand Down Expand Up @@ -140,7 +147,16 @@ func runStore(
filterConf *store.FilterConfig,
selectorRelabelConf *extflag.PathOrContent,
advertiseCompatibilityLabel bool,
consistencyDelay time.Duration,
) error {
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
reg.MustRegister(consistencyDelayMetric)

// Initiate HTTP listener providing metrics endpoint and readiness/liveness probes.
statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg))
srv := httpserver.New(logger, reg, component, statusProber,
Expand Down Expand Up @@ -206,9 +222,12 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder()
metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter,
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
block.NewConsistencyDelayMetaFilter(logger, consistencyDelay).Filter,
garbagecollector.NewGarbageBlocksFilter(garbageBlocksFinder).Filter,
)
if err != nil {
return errors.Wrap(err, "meta fetcher")
Expand Down
4 changes: 4 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ Flags:
Prometheus relabel-config syntax. See format
details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--consistency-delay=30m Minimum age of fresh (non-compacted) blocks
before they are being processed. Malformed
blocks older than the maximum of
consistency-delay and 48h0m0s will be removed.
```

Expand Down
29 changes: 29 additions & 0 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,32 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync
delete(metas, id)
}
}

// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay.
type ConsistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter.
func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration) *ConsistencyDelayMetaFilter {
return &ConsistencyDelayMetaFilter{
logger: logger,
consistencyDelay: consistencyDelay,
}
}

// Filter filters out blocks that filters blocks that have are created before a specified consistency delay.
func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
}
66 changes: 5 additions & 61 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/compact/garbagecollector"
"github.com/thanos-io/thanos/pkg/objstore"
)

Expand All @@ -46,6 +47,7 @@ type Syncer struct {
metrics *syncerMetrics
acceptMalformedIndex bool
enableVerticalCompaction bool
garbageBlocksFinder *garbagecollector.GarbageBlocksFinder
}

type syncerMetrics struct {
Expand Down Expand Up @@ -120,7 +122,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics {

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, garbageBlocksFinder *garbagecollector.GarbageBlocksFinder, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -131,6 +133,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket
fetcher: fetcher,
blocks: map[ulid.ULID]*metadata.Meta{},
metrics: newSyncerMetrics(reg),
garbageBlocksFinder: garbageBlocksFinder,
blockSyncConcurrency: blockSyncConcurrency,
acceptMalformedIndex: acceptMalformedIndex,
// The syncer offers an option to enable vertical compaction, even if it's
Expand Down Expand Up @@ -245,67 +248,8 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error {
return nil
}

func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) {
// Map each block to its highest priority parent. Initial blocks have themselves
// in their source section, i.e. are their own parent.
parents := map[ulid.ULID]ulid.ULID{}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}

// For each source block we contain, check whether we are the highest priority parent block.
for _, sid := range meta.Compaction.Sources {
pid, ok := parents[sid]
// No parents for the source block so far.
if !ok {
parents[sid] = id
continue
}
pmeta, ok := s.blocks[pid]
if !ok {
return nil, errors.Errorf("previous parent block %s not found", pid)
}
// The current block is the higher priority parent for the source if its
// compaction level is higher than that of the previously set parent.
// If compaction levels are equal, the more recent ULID wins.
//
// The ULID recency alone is not sufficient since races, e.g. induced
// by downtime of garbage collection, may re-compact blocks that are
// were already compacted into higher-level blocks multiple times.
level, plevel := meta.Compaction.Level, pmeta.Compaction.Level

if level > plevel || (level == plevel && id.Compare(pid) > 0) {
parents[sid] = id
}
}
}

// A block can safely be deleted if they are not the highest priority parent for
// any source block.
topParents := map[ulid.ULID]struct{}{}
for _, pid := range parents {
topParents[pid] = struct{}{}
}

for id, meta := range s.blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}
if _, ok := topParents[id]; ok {
continue
}

ids = append(ids, id)
}
return ids, nil
}

func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error {
garbageIds, err := s.GarbageBlocks(resolution)
garbageIds, err := s.garbageBlocksFinder.GarbageBlocks(resolution, s.blocks)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false)
sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 1, false, false)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestGroup_Compact_e2e(t *testing.T) {
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil)
testutil.Ok(t, err)

sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false)
sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 5, false, false)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil)
Expand Down
95 changes: 95 additions & 0 deletions pkg/compact/garbagecollector/garbage_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package garbagecollector

import (
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
)

type GarbageBlocksFinder struct{}

func NewGarbageBlocksFinder() *GarbageBlocksFinder {
return &GarbageBlocksFinder{}
}

func (s *GarbageBlocksFinder) GarbageBlocks(resolution int64, blocks map[ulid.ULID]*metadata.Meta) (ids []ulid.ULID, err error) {
// Map each block to its highest priority parent. Initial blocks have themselves
// in their source section, i.e. are their own parent.
parents := map[ulid.ULID]ulid.ULID{}

for id, meta := range blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}

// For each source block we contain, check whether we are the highest priority parent block.
for _, sid := range meta.Compaction.Sources {
pid, ok := parents[sid]
// No parents for the source block so far.
if !ok {
parents[sid] = id
continue
}
pmeta, ok := blocks[pid]
if !ok {
return nil, errors.Errorf("previous parent block %s not found", pid)
}
// The current block is the higher priority parent for the source if its
// compaction level is higher than that of the previously set parent.
// If compaction levels are equal, the more recent ULID wins.
//
// The ULID recency alone is not sufficient since races, e.g. induced
// by downtime of garbage collection, may re-compact blocks that are
// were already compacted into higher-level blocks multiple times.
level, plevel := meta.Compaction.Level, pmeta.Compaction.Level

if level > plevel || (level == plevel && id.Compare(pid) > 0) {
parents[sid] = id
}
}
}

// A block can safely be deleted if they are not the highest priority parent for
// any source block.
topParents := map[ulid.ULID]struct{}{}
for _, pid := range parents {
topParents[pid] = struct{}{}
}

for id, meta := range blocks {
// Skip any block that has a different resolution.
if meta.Thanos.Downsample.Resolution != resolution {
continue
}
if _, ok := topParents[id]; ok {
continue
}

ids = append(ids, id)
}
return ids, nil
}

type GarbageBlocksFilter struct {
garbageBlocksFinder *GarbageBlocksFinder
}

func NewGarbageBlocksFilter(garbageBlocksFinder *GarbageBlocksFinder) *GarbageBlocksFilter {
return &GarbageBlocksFilter{
garbageBlocksFinder: garbageBlocksFinder,
}
}

func (f *GarbageBlocksFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) {
for _, res := range []int64{
downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2,
} {
garbageIds, _ := f.garbageBlocksFinder.GarbageBlocks(res, metas)
for _, id := range garbageIds {
delete(metas, id)
}
}
}

0 comments on commit 9e9e53f

Please sign in to comment.