Skip to content

Commit

Permalink
feat: concurrent storage put (#1304)
Browse files Browse the repository at this point in the history
* feat: concurrent storage put

* add a build tag for stress testing

* enable storage put stress test

* write labels in a single transaction

* make cache.GetOrCreate atomic

* adjust test params

* fix cache GetOrSet persistence flag

* add storage queue params

* set default workers to 1

Co-authored-by: Dmitry Filimonov <dmitry@pyroscope.io>
  • Loading branch information
kolesnikovae and petethepig committed Aug 30, 2022
1 parent 7ba1b8e commit ec5f8b6
Show file tree
Hide file tree
Showing 19 changed files with 265 additions and 102 deletions.
13 changes: 4 additions & 9 deletions pkg/cli/server.go
Expand Up @@ -66,11 +66,6 @@ type serverService struct {
group *errgroup.Group
}

const (
storageQueueWorkers = 1
storageQueueSize = 100
)

func newServerService(c *config.Server) (*serverService, error) {
logLevel, err := logrus.ParseLevel(c.LogLevel)
if err != nil {
Expand All @@ -97,7 +92,9 @@ func newServerService(c *config.Server) (*serverService, error) {
}

svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure)
svc.storage, err = storage.New(storage.NewConfig(svc.config), svc.logger, prometheus.DefaultRegisterer, svc.healthController)

storageConfig := storage.NewConfig(svc.config)
svc.storage, err = storage.New(storageConfig, svc.logger, prometheus.DefaultRegisterer, svc.healthController)
if err != nil {
return nil, fmt.Errorf("new storage: %w", err)
}
Expand Down Expand Up @@ -146,9 +143,7 @@ func newServerService(c *config.Server) (*serverService, error) {
return nil, fmt.Errorf("new metric exporter: %w", err)
}

svc.storageQueue = storage.NewIngestionQueue(svc.logger, svc.storage, prometheus.DefaultRegisterer,
storageQueueWorkers,
storageQueueSize)
svc.storageQueue = storage.NewIngestionQueue(svc.logger, svc.storage, prometheus.DefaultRegisterer, storageConfig)

defaultMetricsRegistry := prometheus.DefaultRegisterer

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Expand Up @@ -106,6 +106,8 @@ type Server struct {
BadgerLogLevel string `def:"error" desc:"log level: debug|info|warn|error" mapstructure:"badger-log-level"`

StoragePath string `def:"<installPrefix>/var/lib/pyroscope" desc:"directory where pyroscope stores profiling data" mapstructure:"storage-path"`
StorageQueueSize int `desc:"storage queue size" mapstructure:"storage-queue-size"`
StorageQueueWorkers int `desc:"number of workers handling internal storage queue" mapstructure:"storage-queue-workers"`
MinFreeSpacePercentage float64 `def:"5" desc:"percentage of available disk space at which ingestion requests are discarded. Defaults to 5% but not less than 1GB. Set 0 to disable" mapstructure:"min-free-space-percentage"`

APIBindAddr string `def:":4040" desc:"port for the HTTP(S) server used for data ingestion and web UI" mapstructure:"api-bind-addr"`
Expand Down
27 changes: 9 additions & 18 deletions pkg/storage/cache/cache.go
Expand Up @@ -214,27 +214,15 @@ func dropPrefixBatch(db *badger.DB, p []byte, n int) (bool, error) {
}

func (cache *Cache) GetOrCreate(key string) (interface{}, error) {
v, err := cache.get(key) // find the key from cache first
if err != nil {
return nil, err
}
if v != nil {
return v, nil
}
v = cache.codec.New(key)
cache.lfu.Set(key, v)
return v, nil
return cache.get(key, true)
}

func (cache *Cache) Lookup(key string) (interface{}, bool) {
v, err := cache.get(key)
if v == nil || err != nil {
return nil, false
}
return v, true
v, err := cache.get(key, false)
return v, v != nil && err == nil
}

func (cache *Cache) get(key string) (interface{}, error) {
func (cache *Cache) get(key string, createNotFound bool) (interface{}, error) {
cache.metrics.ReadsCounter.Inc()
return cache.lfu.GetOrSet(key, func() (interface{}, error) {
cache.metrics.MissesCounter.Inc()
Expand All @@ -249,11 +237,14 @@ func (cache *Cache) get(key string) (interface{}, error) {
})

switch {
default:
return nil, err
case err == nil:
case errors.Is(err, badger.ErrKeyNotFound):
if createNotFound {
return cache.codec.New(key), nil
}
return nil, nil
default:
return nil, err
}

cache.metrics.DBReads.Observe(float64(len(buf)))
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/cache/lfu/lfu.go
Expand Up @@ -69,6 +69,11 @@ func (c *Cache) GetOrSet(key string, value func() (interface{}, error)) (interfa
e = new(cacheEntry)
e.key = key
e.value = v
// The item returned by value() is either newly allocated or was just
// read from the DB, therefore we mark it as persisted to avoid redundant
// writes or writing empty object. Once the item is invalidated, caller
// has to explicitly set it with Set call.
e.persisted = true
c.values[key] = e
c.increment(e)
c.len++
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/config.go
Expand Up @@ -20,6 +20,8 @@ type Config struct {
retention time.Duration
retentionExemplars time.Duration
retentionLevels config.RetentionLevels
queueWorkers int
queueSize int

NewBadger func(name string, p Prefix, codec cache.Codec) (BadgerDBWithCache, error)
}
Expand All @@ -41,6 +43,8 @@ func NewConfig(server *config.Server) *Config {
retentionExemplars: server.ExemplarsRetention,
retentionLevels: server.RetentionLevels,
hideApplications: server.HideApplications,
queueSize: server.StorageQueueSize,
queueWorkers: server.StorageQueueWorkers,
inMemory: false,
}
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/storage/exemplars.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -375,20 +376,16 @@ func (s *Storage) ensureAppSegmentExists(in *PutInput) error {
return fmt.Errorf("segments cache for %v: %w", k, err)
}
st := r.(*segment.Segment)
if !isMetadataEqual(st, in) {
st.SetMetadata(in.SpyName, in.SampleRate, in.Units, in.AggregationType)
s.segments.Put(k, st)
}
st.SetMetadata(metadata.Metadata{
SpyName: in.SpyName,
SampleRate: in.SampleRate,
Units: in.Units,
AggregationType: in.AggregationType,
})
s.segments.Put(k, st)
return err
}

func isMetadataEqual(s *segment.Segment, in *PutInput) bool {
return in.SpyName == s.SpyName() &&
in.AggregationType == s.AggregationType() &&
in.SampleRate == s.SampleRate() &&
in.Units == s.Units()
}

func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error {
if len(b.entries) == exemplarsPerBatch {
return errBatchIsFull
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/labels/labels.go
Expand Up @@ -17,6 +17,20 @@ func New(db *badger.DB) *Labels {
return ll
}

func (ll *Labels) PutLabels(labels map[string]string) error {
return ll.db.Update(func(txn *badger.Txn) error {
for k, v := range labels {
if err := txn.SetEntry(badger.NewEntry([]byte("l:"+k), nil)); err != nil {
return err
}
if err := txn.SetEntry(badger.NewEntry([]byte("v:"+k+":"+v), nil)); err != nil {
return err
}
}
return nil
})
}

func (ll *Labels) Put(key, val string) {
kk := "l:" + key
kv := "v:" + key + ":" + val
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/metadata/metadata.go
Expand Up @@ -25,3 +25,10 @@ const (
func (a AggregationType) String() string {
return string(a)
}

type Metadata struct {
SpyName string
SampleRate uint32
Units Units
AggregationType AggregationType
}
16 changes: 15 additions & 1 deletion pkg/storage/queue.go
Expand Up @@ -22,7 +22,21 @@ type IngestionQueue struct {
discardedTotal prometheus.Counter
}

func NewIngestionQueue(logger logrus.FieldLogger, putter Putter, r prometheus.Registerer, queueWorkers, queueSize int) *IngestionQueue {
const (
defaultQueueSize = 100
defaultWorkers = 1
)

func NewIngestionQueue(logger logrus.FieldLogger, putter Putter, r prometheus.Registerer, c *Config) *IngestionQueue {
queueSize := c.queueSize
if queueSize == 0 {
queueSize = defaultQueueSize
}
queueWorkers := c.queueWorkers
if queueWorkers == 0 {
queueWorkers = defaultWorkers
}

q := IngestionQueue{
logger: logger,
putter: putter,
Expand Down
37 changes: 17 additions & 20 deletions pkg/storage/segment/segment.go
Expand Up @@ -410,28 +410,25 @@ func (s *Segment) WalkNodesToDelete(t *RetentionPolicy, cb func(depth int, t tim
return s.root.walkNodesToDelete(t.normalize(), cb)
}

// TODO: this should be refactored
func (s *Segment) SetMetadata(spyName string, sampleRate uint32, units metadata.Units, aggregationType metadata.AggregationType) {
s.spyName = spyName
s.sampleRate = sampleRate
s.units = units
s.aggregationType = aggregationType
}

func (s *Segment) SpyName() string {
return s.spyName
}

func (s *Segment) SampleRate() uint32 {
return s.sampleRate
}

func (s *Segment) Units() metadata.Units {
return s.units
func (s *Segment) SetMetadata(md metadata.Metadata) {
s.m.Lock()
s.spyName = md.SpyName
s.sampleRate = md.SampleRate
s.units = md.Units
s.aggregationType = md.AggregationType
s.m.Unlock()
}

func (s *Segment) AggregationType() metadata.AggregationType {
return s.aggregationType
func (s *Segment) GetMetadata() metadata.Metadata {
s.m.Lock()
md := metadata.Metadata{
SpyName: s.spyName,
SampleRate: s.sampleRate,
Units: s.units,
AggregationType: s.aggregationType,
}
s.m.Unlock()
return md
}

var zeroTime time.Time
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/segment/timeline.go
Expand Up @@ -58,9 +58,11 @@ func GenerateTimeline(st, et time.Time) *Timeline {
}

func (tl *Timeline) PopulateTimeline(s *Segment) {
s.m.Lock()
if s.root != nil {
s.root.populateTimeline(tl, s)
}
s.m.Unlock()
}

func (sn streeNode) populateTimeline(tl *Timeline, s *Segment) {
Expand Down
23 changes: 9 additions & 14 deletions pkg/storage/storage_get.go
Expand Up @@ -87,11 +87,9 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
resultTrie *tree.Tree
lastSegment *segment.Segment
writesTotal uint64

aggregationType = "sum"
timeline = segment.GenerateTimeline(gi.StartTime, gi.EndTime)
timeline = segment.GenerateTimeline(gi.StartTime, gi.EndTime)
timelines = make(map[string]*segment.Timeline)
)
timelines := make(map[string]*segment.Timeline)

for _, k := range dimensionKeys() {
// TODO: refactor, store `Key`s in dimensions
Expand All @@ -107,11 +105,6 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
}

st := res.(*segment.Segment)
switch st.AggregationType() {
case averageAggregationType, "avg":
aggregationType = averageAggregationType
}

timelineKey := "*"
if v, ok := parsedKey.Labels()[gi.GroupBy]; ok {
timelineKey = v
Expand Down Expand Up @@ -145,19 +138,21 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
return nil, nil
}

if writesTotal > 0 && aggregationType == averageAggregationType {
md := lastSegment.GetMetadata()
switch md.AggregationType {
case averageAggregationType, "avg":
resultTrie = resultTrie.Clone(big.NewRat(1, int64(writesTotal)))
}

return &GetOutput{
Tree: resultTrie,
Timeline: timeline,
Groups: timelines,
SpyName: lastSegment.SpyName(),
SampleRate: lastSegment.SampleRate(),
SpyName: md.SpyName,
SampleRate: md.SampleRate,
Units: md.Units,
AggregationType: md.AggregationType,
Count: writesTotal,
Units: lastSegment.Units(),
AggregationType: lastSegment.AggregationType(),
}, nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/storage_get_exemplar.go
Expand Up @@ -48,10 +48,11 @@ func (s *Storage) GetExemplar(ctx context.Context, gi GetExemplarInput) (out Get
}

if m.segment != nil {
out.SpyName = m.segment.SpyName()
out.Units = m.segment.Units()
out.SampleRate = m.segment.SampleRate()
out.AggregationType = m.segment.AggregationType()
md := m.segment.GetMetadata()
out.SpyName = md.SpyName
out.Units = md.Units
out.SampleRate = md.SampleRate
out.AggregationType = md.AggregationType
}

return out, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/storage_merge_exemplars.go
Expand Up @@ -40,10 +40,11 @@ func (s *Storage) MergeExemplars(ctx context.Context, mi MergeExemplarsInput) (o
out.Tree = m.tree
out.Count = m.count
if m.segment != nil {
out.SpyName = m.segment.SpyName()
out.Units = m.segment.Units()
out.SampleRate = m.segment.SampleRate()
out.AggregationType = m.segment.AggregationType()
md := m.segment.GetMetadata()
out.SpyName = md.SpyName
out.Units = md.Units
out.SampleRate = md.SampleRate
out.AggregationType = md.AggregationType
}

if out.Count > 1 && out.AggregationType == metadata.AverageAggregationType {
Expand Down
16 changes: 9 additions & 7 deletions pkg/storage/storage_put.go
Expand Up @@ -26,9 +26,6 @@ type PutInput struct {
}

func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
// TODO: This is a pretty broad lock. We should find a way to make these locks more selective.
s.putMutex.Lock()
defer s.putMutex.Unlock()
if s.hc.IsOutOfDiskSpace() {
return errOutOfSpace
}
Expand All @@ -53,8 +50,8 @@ func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
"aggregationType": pi.AggregationType,
}).Debug("storage.Put")

for k, v := range pi.Key.Labels() {
s.labels.Put(k, v)
if err := s.labels.PutLabels(pi.Key.Labels()); err != nil {
return fmt.Errorf("unable to write labels: %w", err)
}

sk := pi.Key.SegmentKey()
Expand All @@ -75,9 +72,14 @@ func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
}

st := r.(*segment.Segment)
st.SetMetadata(pi.SpyName, pi.SampleRate, pi.Units, pi.AggregationType)
samples := pi.Val.Samples()
st.SetMetadata(metadata.Metadata{
SpyName: pi.SpyName,
SampleRate: pi.SampleRate,
Units: pi.Units,
AggregationType: pi.AggregationType,
})

samples := pi.Val.Samples()
err = st.Put(pi.StartTime, pi.EndTime, samples, func(depth int, t time.Time, r *big.Rat, addons []segment.Addon) {
tk := pi.Key.TreeKey(depth, t)
res, err := s.trees.GetOrCreate(tk)
Expand Down

0 comments on commit ec5f8b6

Please sign in to comment.