Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor compactor constants, fix bucket column #1561

Merged
merged 3 commits into from Sep 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 7 additions & 9 deletions cmd/thanos/bucket.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -50,7 +51,7 @@ var (
sort.Strings(s)
return s
}
inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-COMP", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"}
inspectColumns = []string{"ULID", "FROM", "UNTIL", "RANGE", "UNTIL-DOWN", "#SERIES", "#SAMPLES", "#CHUNKS", "COMP-LEVEL", "COMP-FAILED", "LABELS", "RESOLUTION", "SOURCE"}
)

func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand Down Expand Up @@ -420,13 +421,10 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB
}

timeRange := time.Duration((blockMeta.MaxTime - blockMeta.MinTime) * int64(time.Millisecond))
// Calculate how long it takes until the next compaction.
untilComp := "-"
if blockMeta.Thanos.Downsample.Resolution == 0 { // data currently raw, downsample if range >= 40 hours
untilComp = (time.Duration(40*60*60*1000*time.Millisecond) - timeRange).String()
}
if blockMeta.Thanos.Downsample.Resolution == 5*60*1000 { // data currently 5m resolution, downsample if range >= 10 days
untilComp = (time.Duration(10*24*60*60*1000*time.Millisecond) - timeRange).String()

untilDown := "-"
if until, err := compact.UntilNextDownsampling(blockMeta); err == nil {
untilDown = until.String()
}
var labels []string
for _, key := range getKeysAlphabetically(blockMeta.Thanos.Labels) {
Expand All @@ -438,7 +436,7 @@ func printTable(blockMetas []*metadata.Meta, selectorLabels labels.Labels, sortB
line = append(line, time.Unix(blockMeta.MinTime/1000, 0).Format("02-01-2006 15:04:05"))
line = append(line, time.Unix(blockMeta.MaxTime/1000, 0).Format("02-01-2006 15:04:05"))
line = append(line, timeRange.String())
line = append(line, untilComp)
line = append(line, untilDown)
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSeries))
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumSamples))
line = append(line, p.Sprintf("%d", blockMeta.Stats.NumChunks))
Expand Down
18 changes: 9 additions & 9 deletions cmd/thanos/downsample.go
Expand Up @@ -170,13 +170,13 @@ func downsampleBucket(

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case 0:
case downsample.ResLevel0:
continue
case 5 * 60 * 1000:
case downsample.ResLevel1:
for _, id := range m.Compaction.Sources {
sources5m[id] = struct{}{}
}
case 60 * 60 * 1000:
case downsample.ResLevel2:
for _, id := range m.Compaction.Sources {
sources1h[id] = struct{}{}
}
Expand All @@ -187,7 +187,7 @@ func downsampleBucket(

for _, m := range metas {
switch m.Thanos.Downsample.Resolution {
case 0:
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
Expand All @@ -201,16 +201,16 @@ func downsampleBucket(
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < 40*60*60*1000 {
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 5*60*1000); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel1); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m)).Inc()
return errors.Wrap(err, "downsampling to 5 min")
}
metrics.downsamples.WithLabelValues(compact.GroupKey(*m)).Inc()

case 5 * 60 * 1000:
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
Expand All @@ -224,10 +224,10 @@ func downsampleBucket(
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < 10*24*60*60*1000 {
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, 60*60*1000); err != nil {
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.GroupKey(*m))
return errors.Wrap(err, "downsampling to 60 min")
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/compact/compact.go
Expand Up @@ -162,10 +162,25 @@ func (c *Syncer) SyncMetas(ctx context.Context) error {
}
c.metrics.syncMetas.Inc()
c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds())

return err
}

// UntilNextDownsampling calculates how long it will take until the next downsampling operation.
// Returns an error if there will be no downsampling.
func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) {
timeRange := time.Duration((m.MaxTime - m.MinTime) * int64(time.Millisecond))
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
return time.Duration(0), errors.New("no downsampling")
case downsample.ResLevel1:
return time.Duration(downsample.DownsampleRange1*time.Millisecond) - timeRange, nil
case downsample.ResLevel0:
return time.Duration(downsample.DownsampleRange0*time.Millisecond) - timeRange, nil
default:
panic(fmt.Errorf("invalid resolution %v", m.Thanos.Downsample.Resolution))
}
}

func (c *Syncer) syncMetas(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()
Expand Down
6 changes: 6 additions & 0 deletions pkg/compact/downsample/downsample.go
Expand Up @@ -28,6 +28,12 @@ const (
ResLevel2 = int64(60 * 60 * 1000) // 1 hour in milliseconds
)

// Downsampling ranges i.e. after what time we start to downsample blocks (in seconds).
const (
DownsampleRange0 = 40 * 60 * 60 * 1000 // 40 hours
DownsampleRange1 = 10 * 24 * 60 * 60 * 1000 // 10 days
)

// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
func Downsample(
logger log.Logger,
Expand Down