Skip to content

Commit

Permalink
feat: separate retention policy for exemplars (#971)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Mar 26, 2022
1 parent 14ec17b commit 06d14cf
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 33 deletions.
5 changes: 3 additions & 2 deletions pkg/config/config.go
Expand Up @@ -123,8 +123,9 @@ type Server struct {
// currently only used in our demo app
HideApplications []string `def:"" desc:"please don't use, this will soon be deprecated" mapstructure:"hide-applications"`

Retention time.Duration `def:"" desc:"sets the maximum amount of time the profiling data is stored for. Data before this threshold is deleted. Disabled by default" mapstructure:"retention"`
RetentionLevels RetentionLevels `def:"" desc:"specifies how long the profiling data stored per aggregation level. Disabled by default" mapstructure:"retention-levels"`
Retention time.Duration `def:"" desc:"sets the maximum amount of time the profiling data is stored for. Data before this threshold is deleted. Disabled by default" mapstructure:"retention"`
ExemplarsRetention time.Duration `def:"" desc:"sets the maximum amount of time profile exemplars are stored for. Data before this threshold is deleted. Disabled by default" mapstructure:"exemplars-retention"`
RetentionLevels RetentionLevels `def:"" desc:"specifies how long the profiling data stored per aggregation level. Disabled by default" mapstructure:"retention-levels"`

// Deprecated fields. They can be set (for backwards compatibility) but have no effect
// TODO: we should print some warning messages when people try to use these
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/config.go
Expand Up @@ -14,10 +14,11 @@ type Config struct {
cacheEvictThreshold float64
cacheEvictVolume float64
maxNodesSerialization int
retention time.Duration
hideApplications []string
retentionLevels config.RetentionLevels
inMemory bool
retention time.Duration
retentionExemplars time.Duration
retentionLevels config.RetentionLevels
}

// NewConfig returns a new storage config from a server config
Expand All @@ -34,6 +35,7 @@ func NewConfig(server *config.Server) *Config {
cacheEvictVolume: server.CacheEvictVolume,
maxNodesSerialization: server.MaxNodesSerialization,
retention: server.Retention,
retentionExemplars: server.ExemplarsRetention,
retentionLevels: server.RetentionLevels,
hideApplications: server.HideApplications,
inMemory: false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/profile_test.go
Expand Up @@ -115,7 +115,7 @@ var _ = Describe("Profiles retention policy", func() {
Val: tree,
})).ToNot(HaveOccurred())

rp := &segment.RetentionPolicy{AbsoluteTime: t3}
rp := &segment.RetentionPolicy{ExemplarsRetentionTime: t3}
Expect(s.EnforceRetentionPolicy(rp)).ToNot(HaveOccurred())

o, err := s.MergeProfiles(context.Background(), MergeProfilesInput{
Expand Down
30 changes: 14 additions & 16 deletions pkg/storage/retention.go
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"errors"
"fmt"
"time"

"github.com/dgraph-io/badger/v2"
Expand All @@ -12,27 +13,24 @@ import (
)

func (s *Storage) EnforceRetentionPolicy(rp *segment.RetentionPolicy) error {
if rp.LowerTimeBoundary().IsZero() {
return nil
}

// It may make sense running it concurrently with some throttling.
s.logger.Debug("enforcing retention policy")
if !rp.AbsoluteTime.IsZero() {
if err := s.profiles.truncateBefore(context.TODO(), rp.AbsoluteTime); err != nil {
s.logger.WithError(err).Error("failed to truncate profiles storage")
// It may make sense running it concurrently with some throttling.
err := s.iterateOverAllSegments(func(k *segment.Key) error {
return s.deleteSegmentData(k, rp)
})
if errors.Is(err, errClosed) {
s.logger.Info("enforcing canceled")
return nil
}
return err
}
err := s.iterateOverAllSegments(func(k *segment.Key) error {
return s.deleteSegmentData(k, rp)
})

if errors.Is(err, errClosed) {
s.logger.Info("enforcing canceled")
err = nil
if !rp.ExemplarsRetentionTime.IsZero() {
if err := s.profiles.truncateBefore(context.TODO(), rp.ExemplarsRetentionTime); err != nil {
return fmt.Errorf("failed to truncate profiles storage: %w", err)
}
}

return err
return nil
}

func (s *Storage) deleteSegmentData(k *segment.Key, rp *segment.RetentionPolicy) error {
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/segment/retention.go
Expand Up @@ -9,6 +9,8 @@ type RetentionPolicy struct {

AbsoluteTime time.Time
Levels map[int]time.Time

ExemplarsRetentionTime time.Time
}

func NewRetentionPolicy() *RetentionPolicy {
Expand All @@ -27,6 +29,11 @@ func (r *RetentionPolicy) SetAbsolutePeriod(period time.Duration) *RetentionPoli
return r
}

func (r *RetentionPolicy) SetExemplarsRetentionPeriod(period time.Duration) *RetentionPolicy {
r.ExemplarsRetentionTime = r.periodToTime(period)
return r
}

func (r *RetentionPolicy) SetLevelPeriod(level int, period time.Duration) *RetentionPolicy {
if r.Levels == nil {
r.Levels = make(map[int]time.Time)
Expand All @@ -35,6 +42,18 @@ func (r *RetentionPolicy) SetLevelPeriod(level int, period time.Duration) *Reten
return r
}

func (r *RetentionPolicy) SetLevels(levels ...time.Duration) *RetentionPolicy {
if r.Levels == nil {
r.Levels = make(map[int]time.Time)
}
for level, period := range levels {
if period != 0 {
r.Levels[level] = r.periodToTime(period)
}
}
return r
}

func (r RetentionPolicy) isToBeDeleted(sn *streeNode) bool {
return sn.isBefore(r.AbsoluteTime) || sn.isBefore(r.levelMaxTime(sn.depth))
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/storage/storage.go
Expand Up @@ -293,18 +293,13 @@ func (s *Storage) retentionTask() {
}

func (s *Storage) retentionPolicy() *segment.RetentionPolicy {
rp := segment.NewRetentionPolicy().SetAbsolutePeriod(s.config.retention)
levels := []time.Duration{
s.config.retentionLevels.Zero,
s.config.retentionLevels.One,
s.config.retentionLevels.Two,
}
for i, p := range levels {
if p != 0 {
rp.SetLevelPeriod(i, p)
}
}
return rp
return segment.NewRetentionPolicy().
SetAbsolutePeriod(s.config.retention).
SetExemplarsRetentionPeriod(s.config.retentionExemplars).
SetLevels(
s.config.retentionLevels.Zero,
s.config.retentionLevels.One,
s.config.retentionLevels.Two)
}

func (s *Storage) databases() []*db {
Expand Down

0 comments on commit 06d14cf

Please sign in to comment.