Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Get block by id directly on promtool analyze & get latest block if ID not provided #12031

Merged
merged 3 commits into from Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 11 additions & 17 deletions cmd/promtool/tsdb.go
nidhey27 marked this conversation as resolved.
Show resolved Hide resolved
Expand Up @@ -398,26 +398,20 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
if err != nil {
return nil, nil, err
}
blocks, err := db.Blocks()
if err != nil {
return nil, nil, err
}
var block tsdb.BlockReader
switch {
case blockID != "":
for _, b := range blocks {
if b.Meta().ULID.String() == blockID {
block = b
break
}

if blockID == "" {
blockID, err = db.LastBlockID()
if err != nil {
return nil, nil, err
}
case len(blocks) > 0:
block = blocks[len(blocks)-1]
}
if block == nil {
return nil, nil, fmt.Errorf("block %s not found", blockID)

b, err := db.Block(blockID)
if err != nil {
return nil, nil, err
}
return db, block, nil

return db, b, nil
}

func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
Expand Down
54 changes: 54 additions & 0 deletions tsdb/db.go
Expand Up @@ -601,6 +601,60 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return blockReaders, nil
}

// LastBlockID returns the BlockID of latest block.
func (db *DBReadOnly) LastBlockID() (string, error) {
entries, err := os.ReadDir(db.dir)
if err != nil {
return "", err
}

max := uint64(0)

lastBlockID := ""

for _, e := range entries {
// Check if dir is a block dir or not.
dirName := e.Name()
ulidObj, err := ulid.ParseStrict(dirName)
if err != nil {
continue // Not a block dir.
}
timestamp := ulidObj.Time()
if timestamp > max {
max = timestamp
lastBlockID = dirName
}
}

if lastBlockID == "" {
return "", errors.New("no blocks found")
}

return lastBlockID, nil
}

// Block returns a block reader by given block id.
func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}

_, err := os.Stat(filepath.Join(db.dir, blockID))
if os.IsNotExist(err) {
return nil, errors.Errorf("invalid block ID %s", blockID)
}

block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil)
if err != nil {
return nil, err
}
db.closers = append(db.closers, block)

return block, nil
}

// Close all block readers.
func (db *DBReadOnly) Close() error {
select {
Expand Down
19 changes: 18 additions & 1 deletion tsdb/db_test.go
Expand Up @@ -2384,6 +2384,7 @@ func TestDBReadOnly(t *testing.T) {
dbDir string
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
expBlocks []*Block
expBlock *Block
expSeries map[string][]tsdbutil.Sample
expChunks map[string][][]tsdbutil.Sample
expDBHash []byte
Expand Down Expand Up @@ -2427,6 +2428,7 @@ func TestDBReadOnly(t *testing.T) {
require.NoError(t, app.Commit())

expBlocks = dbWritable.Blocks()
expBlock = expBlocks[0]
expDbSize, err := fileutil.DirSize(dbWritable.Dir())
require.NoError(t, err)
require.Greater(t, expDbSize, dbSizeBeforeAppend, "db size didn't increase after an append")
Expand Down Expand Up @@ -2455,7 +2457,22 @@ func TestDBReadOnly(t *testing.T) {
require.Equal(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
}
})

t.Run("block", func(t *testing.T) {
blockID := expBlock.meta.ULID.String()
block, err := dbReadOnly.Block(blockID)
require.NoError(t, err)
require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch")
})
t.Run("invalid block ID", func(t *testing.T) {
blockID := "01GTDVZZF52NSWB5SXQF0P2PGF"
_, err := dbReadOnly.Block(blockID)
require.Error(t, err)
})
t.Run("last block ID", func(t *testing.T) {
blockID, err := dbReadOnly.LastBlockID()
require.NoError(t, err)
require.Equal(t, expBlocks[2].Meta().ULID.String(), blockID)
})
t.Run("querier", func(t *testing.T) {
// Open a read only db and ensure that the API returns the same result as the normal DB.
q, err := dbReadOnly.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
Expand Down