Skip to content

Commit

Permalink
promql.ActiveQueryTracker: Unmap mmapped file when done
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
aknuds1 committed May 8, 2024
1 parent a25160e commit f2b4f76
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
4 changes: 3 additions & 1 deletion promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func TestQueryConcurrency(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)
queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil)
t.Cleanup(queryTracker.Close)
t.Cleanup(func() {
require.NoError(t, queryTracker.Close())
})

opts := EngineOpts{
Logger: nil,
Expand Down
32 changes: 28 additions & 4 deletions promql/query_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package promql
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
Expand All @@ -36,6 +37,8 @@ type ActiveQueryTracker struct {
maxConcurrent int
}

var _ io.Closer = &ActiveQueryTracker{}

type Entry struct {
Query string `json:"query"`
Timestamp int64 `json:"timestamp_sec"`
Expand Down Expand Up @@ -83,6 +86,23 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
}
}

type mmapedFile struct {
f io.Closer
m mmap.MMap
}

func (f *mmapedFile) Close() error {
err := f.m.Unmap()
if fErr := f.f.Close(); fErr != nil && err == nil {
return fmt.Errorf("close file mmapedFile.f: %w", fErr)
}

if err != nil {
return fmt.Errorf("mmapedFile: unmapping: %w", err)
}
return nil
}

func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
if err != nil {
Expand All @@ -108,7 +128,7 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io
return nil, nil, err
}

return fileAsBytes, file, err
return fileAsBytes, &mmapedFile{f: file, m: fileAsBytes}, err
}

func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
Expand Down Expand Up @@ -204,9 +224,13 @@ func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int
}
}

func (tracker *ActiveQueryTracker) Close() {
// Close closes tracker.
func (tracker *ActiveQueryTracker) Close() error {
if tracker == nil || tracker.closer == nil {
return
return nil
}
if err := tracker.closer.Close(); err != nil {
return fmt.Errorf("close ActiveQueryTracker.closer: %w", err)
}
tracker.closer.Close()
return nil
}
25 changes: 13 additions & 12 deletions promql/query_logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package promql
import (
"context"
"os"
"path/filepath"
"testing"

"github.com/grafana/regexp"
Expand Down Expand Up @@ -104,26 +105,26 @@ func TestIndexReuse(t *testing.T) {
}

func TestMMapFile(t *testing.T) {
file, err := os.CreateTemp("", "mmapedFile")
require.NoError(t, err)

filename := file.Name()
defer os.Remove(filename)

fileAsBytes, _, err := getMMapedFile(filename, 2, nil)
dir := t.TempDir()
fpath := filepath.Join(dir, "mmapedFile")
const data = "ab"

fileAsBytes, closer, err := getMMapedFile(fpath, 2, nil)
require.NoError(t, err)
copy(fileAsBytes, "ab")
copy(fileAsBytes, data)
require.NoError(t, closer.Close())

f, err := os.Open(filename)
f, err := os.Open(fpath)
require.NoError(t, err)
t.Cleanup(func() {
_ = f.Close()
})

bytes := make([]byte, 4)
n, err := f.Read(bytes)
require.Equal(t, 2, n)
require.NoError(t, err, "Unexpected error while reading file.")

require.Equal(t, fileAsBytes, bytes[:2], "Mmap failed")
require.Equal(t, 2, n)
require.Equal(t, []byte(data), bytes[:2], "Mmap failed")
}

func TestParseBrokenJSON(t *testing.T) {
Expand Down

0 comments on commit f2b4f76

Please sign in to comment.