Skip to content

Commit

Permalink
Added single block.MetaFetcher logic for resilient sync of meta files.
Browse files Browse the repository at this point in the history
This replaces man 4 inconsistent meta.json syncs places in other components.

Fixes: #1335
Fixes: #1919
Fixes: #1300

* One place for sync logic for both compactor and store
* Corrupted disk cache for meta.json is handled gracefully.
* Blocks without meta.json are handled properly for all compactor phases.
* Synchronize was not taking into account deletion by removing meta.json.
* Prepare for future implementation of https://thanos.io/proposals/201901-read-write-operations-bucket.md/
* Better observability for syncronize process.
* More logs for store startup process.
* Remove Compactor Syncer.
* Added metric for partialUploadAttempt deletions.
* More tests.

TODO in separate PR:
* More observability for index-cache loading / adding time.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jan 3, 2020
1 parent fbc6fe6 commit 349c91c
Show file tree
Hide file tree
Showing 13 changed files with 568 additions and 373 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,22 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
We use *breaking* word for marking changes that are not backward compatible (relates only to v0.y.z releases.)

## Unreleased

### Fixed

- []() Compactor, Store: Significantly improved synchronization process of meta files:
* Added metrics for synchronization process for Store Gateway.
* Removed redundant `thanos_compact_sync_meta_total` `thanos_compact_sync_meta_failures_total`, `thanos_compact_sync_meta_duration_seconds` metrics. All is now available under `thanos_blocks_meta_*` metrics.
* Compactor: Mitigated potential data loss in case of wrongly assumed partial upload for very long uploads or uploads of old blocks.
* Store: Don't fail on malformed disk cache for meta.json files.
* Store: Added more debug logs for startup process.
- [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak.
- [#1882](https://github.com/thanos-io/thanos/pull/1882) Receive: upload to object storage as 'receive' rather than 'sidecar'.
- [#1907](https://github.com/thanos-io/thanos/pull/1907) Store: Fixed the duration unit for the metric `thanos_bucket_store_series_gate_duration_seconds`.
- [#1931](https://github.com/thanos-io/thanos/pull/1931) Compact: Fixed the compactor successfully exiting when actually an error occurred while compacting a blocks group.

### Added

- [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44
- [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering.
- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information.
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func registerBucketInspect(m map[string]setupFunc, root *kingpin.CmdClause, name
ctx, cancel := context.WithTimeout(context.Background(), *timeout)
defer cancel()

// Getting Metas.
// Getting MetasFetcher.
var blockMetas []*metadata.Meta
if err = bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
Expand Down
105 changes: 85 additions & 20 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
Expand All @@ -31,6 +33,15 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
)

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."

// partialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned.
// Keep it long as it is based on block creation time not upload start time.
partialUploadThresholdAge = time.Duration(2 * 24 * time.Hour)
)

var (
compactions = compactionSet{
1 * time.Hour,
Expand Down Expand Up @@ -85,7 +96,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) {

objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)).
consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", partialUploadThresholdAge)).
Default("30m"))

retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d"))
Expand Down Expand Up @@ -162,21 +173,28 @@ func runCompact(
) error {
halted := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_compactor_halted",
Help: "Set to 1 if the compactor halted due to an unexpected error",
Help: "Set to 1 if the compactor halted due to an unexpected error.",
})
halted.Set(0)
retried := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_retries_total",
Help: "Total number of retries after retriable compactor error",
Help: "Total number of retries after retriable compactor error.",
})
iterations := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_iterations_total",
Help: "Total number of iterations that were executed successfully",
Help: "Total number of iterations that were executed successfully.",
})
halted.Set(0)

reg.MustRegister(halted)
reg.MustRegister(retried)
reg.MustRegister(iterations)
consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_consistency_delay_seconds",
Help: "Configured consistency delay in seconds.",
}, func() float64 {
return consistencyDelay.Seconds()
})
partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total",
Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.",
})
reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts)

downsampleMetrics := newDownsampleMetrics(reg)

Expand Down Expand Up @@ -225,8 +243,12 @@ func runCompact(
}
}()

sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay,
blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig)
metaFetcher := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg),
block.NewLabelShardedMetaFilter(relabelConfig).Filter,
(&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter,
)

sy, err := compact.NewSyncer(logger, reg, metaFetcher, bkt, blockSyncConcurrency, acceptMalformedIndex, false)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -276,13 +298,11 @@ func runCompact(
level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h])
}

f := func() error {
compactMainFn := func() error {
if err := compactor.Compact(ctx); err != nil {
return errors.Wrap(err, "compaction failed")
}
level.Info(logger).Log("msg", "compaction iterations done")

// TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed.
if !disableDownsampling {
// After all compactions are done, work down the downsampling backlog.
// We run two passes of this to ensure that the 1h downsampling is generated
Expand All @@ -306,6 +326,8 @@ func runCompact(
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil {
return errors.Wrap(err, fmt.Sprintf("retention failed"))
}

bestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts)
return nil
}

Expand All @@ -320,12 +342,12 @@ func runCompact(
}

if !wait {
return f()
return compactMainFn()
}

// --wait=true is specified.
return runutil.Repeat(5*time.Minute, ctx.Done(), func() error {
err := f()
err := compactMainFn()
if err == nil {
iterations.Inc()
return nil
Expand Down Expand Up @@ -363,10 +385,53 @@ func runCompact(
return nil
}

const (
metricIndexGenerateName = "thanos_compact_generated_index_total"
metricIndexGenerateHelp = "Total number of generated indexes."
)
type consistencyDelayMetaFilter struct {
logger log.Logger
consistencyDelay time.Duration
}

func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, _ bool) {
for id, meta := range metas {
if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) &&
meta.Thanos.Source != metadata.BucketRepairSource &&
meta.Thanos.Source != metadata.CompactorSource &&
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(block.TooFreshMeta).Inc()
delete(metas, id)
}
}
}

func bestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) {
level.Info(logger).Log("msg", "started cleaning of aborted partial uploads")
_, partial, err := fetcher.Fetch(ctx)
if err != nil {
level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err)
}

// Delete partial blocks that are older than partialUploadThresholdAge.
// TODO(bwplotka): This is can cause data loss if blocks are:
// * being uploaded longer than partialUploadThresholdAge
// * being uploaded and started after their partialUploadThresholdAge
// can be assumed in this case. Keep partialUploadThresholdAge long for now.
// Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time).
for id := range partial {
if ulid.Now()-id.Time() <= uint64(partialUploadThresholdAge/time.Millisecond) {
// Minimum delay has not expired, ignore for now.
continue
}

deleteAttempts.Inc()
if err := block.Delete(ctx, logger, bkt, id); err != nil {
level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", partialUploadThresholdAge, "err", err)
return
}
level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", partialUploadThresholdAge)
}
level.Info(logger).Log("msg", "cleaning of aborted partial uploads done")
}

// genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage.
func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error {
Expand Down
10 changes: 6 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
yaml "gopkg.in/yaml.v2"
)

const syncerConcurrency = 32

// registerStore registers a store command.
func registerStore(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.")
Expand All @@ -47,7 +49,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
Default("2GB").Bytes()

maxSampleCount := cmd.Flag("store.grpc.series-sample-limit",
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
"Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit.").
Default("0").Uint()

maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int()
Expand All @@ -57,7 +59,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) {
syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("3m").Duration()

blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Expand Down Expand Up @@ -128,7 +130,7 @@ func runStore(
indexCacheSizeBytes uint64,
chunkPoolSizeBytes uint64,
maxSampleCount uint64,
maxConcurrent int,
maxConcurrency int,
component component.Component,
verbose bool,
syncInterval time.Duration,
Expand Down Expand Up @@ -210,7 +212,7 @@ func runStore(
indexCache,
chunkPoolSizeBytes,
maxSampleCount,
maxConcurrent,
maxConcurrency,
verbose,
blockSyncConcurrency,
filterConf,
Expand Down
1 change: 0 additions & 1 deletion pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down
7 changes: 1 addition & 6 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/oklog/ulid"
)

// NOTE(bplotka): For block packages we cannot use testutil, because they import block package. Consider moving simple
// testutil methods to separate package.
func TestIsBlockDir(t *testing.T) {
for _, tc := range []struct {
input string
Expand Down Expand Up @@ -58,10 +56,7 @@ func TestIsBlockDir(t *testing.T) {
} {
t.Run(tc.input, func(t *testing.T) {
id, ok := IsBlockDir(tc.input)
if ok != tc.bdir {
t.Errorf("expected block dir != %v", tc.bdir)
t.FailNow()
}
testutil.Equals(t, tc.bdir, ok)

if id.Compare(tc.id) != 0 {
t.Errorf("expected %s got %s", tc.id, id)
Expand Down

0 comments on commit 349c91c

Please sign in to comment.