Skip to content

Commit

Permalink
Store: Add Time & duration based partitioning
Browse files Browse the repository at this point in the history
Signed-off-by: Povilas Versockas <p.versockas@gmail.com>
  • Loading branch information
povilasv committed Aug 13, 2019
1 parent e1823fe commit 01389e8
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 4 deletions.
19 changes: 19 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -49,7 +50,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
// Sanity check Time filters
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
}

return runStore(g,
logger,
reg,
Expand All @@ -69,6 +82,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
&store.FilterConfig{
MinTime: *minTime,
MaxTime: *maxTime,
},
)
}
}
Expand All @@ -94,6 +111,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
filterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -135,6 +153,7 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
filterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
30 changes: 30 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,35 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
--min-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only metrics, which happened later
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
--max-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which happened eariler than
this value. Option can be a constant time in
RFC3339 format or time duration relative to
current time, such as -1d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.
```

## Time based partioning

By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.

Thanos Store `--min-time`, `--max-time` flags allows you to shard Thanos Store based on constant time or time duration relative to current time.

For example setting: `--min-time=-6w` & `--max-time==-2w` will make Thanos Store Gateway return metrics that fall within `now - 6 weeks` up to `now - 2 weeks` time range.

Constant time needs to be set in RFC3339 format. For example `--min-time=2018-01-01T00:00:00Z`, `--max-time=2019-01-01T23:59:59Z`.

Thanos Store Gateway might not see new blocks immediately, as Time partitioning is partly done in asynchronous block sync job, which is refreshed every 3 minutes by default. Additionally some of the Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store might not immediately see newly created & uploaded blocks anyway.

So we recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this will improve your resiliency to failures.

Thanos Querier deals with overlapping time series by merging them together.
75 changes: 75 additions & 0 deletions pkg/model/timeduration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package model

import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"gopkg.in/alecthomas/kingpin.v2"
)

// TimeOrDurationValue is a custom kingping parser for time in RFC3339
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
// Only one will be set.
type TimeOrDurationValue struct {
Time *time.Time
Dur *model.Duration
}

// Set converts string to TimeOrDurationValue.
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
tdv.Time = &t
return nil
}

// error parsing time, let's try duration.
var minus bool
if s[0] == '-' {
minus = true
s = s[1:]
}
dur, err := model.ParseDuration(s)
if err != nil {
return err
}

if minus {
dur = dur * -1
}
tdv.Dur = &dur
return nil
}

// String returns either tume or duration.
func (tdv *TimeOrDurationValue) String() string {
switch {
case tdv.Time != nil:
return tdv.Time.String()
case tdv.Dur != nil:
return tdv.Dur.String()
}

return "nil"
}

// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp
// if duration is set now+duration is converted to Timestamp.
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
switch {
case tdv.Time != nil:
return timestamp.FromTime(*tdv.Time)
case tdv.Dur != nil:
return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur)))
}

return 0
}

// TimeOrDuration helper for parsing TimeOrDuration with kingpin.
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
value := new(TimeOrDurationValue)
flags.SetValue(value)
return value
}
36 changes: 36 additions & 0 deletions pkg/model/timeduration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package model_test

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/alecthomas/kingpin.v2"
)

func TestTimeOrDurationValue(t *testing.T) {
cmd := kingpin.New("test", "test")

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
Default("9999-12-31T23:59:59Z"))

_, err := cmd.Parse([]string{"--min-time", "10s"})
if err != nil {
t.Fatal(err)
}

testutil.Equals(t, "10s", minTime.String())
testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String())

prevTime := timestamp.FromTime(time.Now())
afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second))

testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.")
testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s")

testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000")
}
74 changes: 74 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -182,6 +183,11 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// FilterConfig is a configuration, which Store uses for filtering metrics.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
Expand All @@ -208,6 +214,8 @@ type BucketStore struct {
// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner

filterConfig *FilterConfig
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -223,6 +231,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
filterConf *FilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -256,6 +265,7 @@ func NewBucketStore(
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
filterConfig: filterConf,
}
s.metrics = metrics

Expand Down Expand Up @@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
if err != nil {
return nil
}

inRange, err := s.isBlockInMinMaxRange(ctx, id)
if err != nil {
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
return nil
}

if !inRange {
return nil
}

allIDs[id] = struct{}{}

if b := s.getBlock(id); b != nil {
Expand Down Expand Up @@ -377,6 +398,31 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}

func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
dir := filepath.Join(s.dir, id.String())

b := &bucketBlock{
logger: s.logger,
bucket: s.bucket,
id: id,
dir: dir,
}
if err := b.loadMeta(ctx, id); err != nil {
return false, err
}

// We check for blocks in configured minTime, maxTime range
switch {
case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
return false, nil

case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
return false, nil
}

return true, nil
}

func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand Down Expand Up @@ -468,6 +514,10 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
maxt = b.meta.MaxTime
}
}

mint = s.normalizeMinTime(mint)
maxt = s.normalizeMaxTime(maxt)

return mint, maxt
}

Expand All @@ -482,6 +532,26 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}, nil
}

func (s *BucketStore) normalizeMinTime(mint int64) int64 {
filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) normalizeMaxTime(maxt int64) int64 {
filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

type seriesEntry struct {
lset []storepb.Label
refs []uint64
Expand Down Expand Up @@ -722,6 +792,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
// Adjust Request MinTime based on filters.
req.MinTime = s.normalizeMinTime(req.MinTime)
req.MaxTime = s.normalizeMaxTime(req.MaxTime)

var (
stats = &queryStats{}
g run.Group
Expand Down
17 changes: 14 additions & 3 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"testing"
"time"

"github.com/oklog/ulid"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -25,6 +25,17 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

var (
minTime = time.Unix(0, 0)
maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z")
minTimeDuration = model.TimeOrDurationValue{Time: &minTime}
maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime}
filterConf = &FilterConfig{
MinTime: minTimeDuration,
MaxTime: maxTimeDuration,
}
)

type noopCache struct{}

func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {}
Expand Down Expand Up @@ -128,7 +139,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
testutil.Ok(t, os.RemoveAll(dir2))
}

store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20)
store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf)
testutil.Ok(t, err)

s.store = store
Expand Down

0 comments on commit 01389e8

Please sign in to comment.