diff --git a/plugins/metrics/pkg/agent/buffer.go b/plugins/metrics/pkg/agent/buffer.go index 82e2136da6..12cd77b41a 100644 --- a/plugins/metrics/pkg/agent/buffer.go +++ b/plugins/metrics/pkg/agent/buffer.go @@ -13,7 +13,7 @@ import ( "github.com/google/uuid" ) -var ErrBufferNotFound error = fmt.Errorf("buffer not found") +var ErrBufferNotFound = fmt.Errorf("buffer not found") type ChunkBuffer interface { // Add blo cks until the value can be added to the buffer. @@ -35,7 +35,6 @@ type diskBuffer struct { chunkChans map[string]chan string } -// todo: reconcile pre-existing chunks (useful for pod restarts during import) func NewDiskBuffer(dir string) (ChunkBuffer, error) { buffer := &diskBuffer{ dir: path.Join(BufferDir), @@ -46,9 +45,43 @@ func NewDiskBuffer(dir string) (ChunkBuffer, error) { return nil, fmt.Errorf("could not create buffer directory: %w", err) } + if err := buffer.reconcileExistingChunks(); err != nil { + return nil, err + } + return buffer, nil } +func (b *diskBuffer) reconcileExistingChunks() error { + entries, err := os.ReadDir(b.dir) + if err != nil { + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, e := range entries { + if !e.IsDir() { + continue + } + + chunkChan := make(chan string, 100) + + b.chanLocker.Lock() + b.chunkChans[e.Name()] = chunkChan + b.chanLocker.Unlock() + + subBufferDir := path.Join(b.dir, e.Name()) + subEntries, err := os.ReadDir(subBufferDir) + if err != nil { + return fmt.Errorf("could not reconcile existing chunks: %w", err) + } + + for _, se := range subEntries { + chunkChan <- path.Join(subBufferDir, se.Name()) + } + } + return nil +} + func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) error { b.chanLocker.RLock() chunkChan, found := b.chunkChans[name] @@ -62,7 +95,6 @@ func (b *diskBuffer) Add(_ context.Context, name string, meta WriteMetadata) err b.chanLocker.Unlock() } - // todo: will create a new directory for each target name which will not be cleaned up internally filePath := path.Join(b.dir, name, uuid.New().String()) if err := os.MkdirAll(path.Dir(filePath), 0755); err != nil && !errors.Is(err, os.ErrExist) { @@ -123,7 +155,7 @@ func (b *diskBuffer) Get(ctx context.Context, name string) (WriteMetadata, error } } -func (b *diskBuffer) Delete(ctx context.Context, name string) error { +func (b *diskBuffer) Delete(_ context.Context, name string) error { b.chanLocker.Lock() delete(b.chunkChans, name) b.chanLocker.Unlock() diff --git a/plugins/metrics/pkg/agent/runner.go b/plugins/metrics/pkg/agent/runner.go index 14683f1cac..934d2c666a 100644 --- a/plugins/metrics/pkg/agent/runner.go +++ b/plugins/metrics/pkg/agent/runner.go @@ -29,8 +29,7 @@ import ( var TimeDeltaMillis = time.Minute.Milliseconds() -// const BufferDir = "/var/lib/opni-agent/import-buffer" -const BufferDir = "import-buffer" +const BufferDir = "/var/lib/opni-agent/import-buffer" func toLabelMatchers(rrLabelMatchers []*remoteread.LabelMatcher) []*prompb.LabelMatcher { pbLabelMatchers := make([]*prompb.LabelMatcher, 0, len(rrLabelMatchers)) @@ -81,8 +80,6 @@ type TargetRunMetadata struct { Query *remoteread.Query } -// todo: could probably find a better name for this -// todo: replace ProgressRatio and Query with a ProgressDelta type WriteMetadata struct { Query *prompb.Query WriteChunk *prompb.WriteRequest