Skip to content

Commit

Permalink
Feat: Get block by id directly on promtool analyze & get latest block…
Browse files Browse the repository at this point in the history
… if ID not provided (#12031)

* feat: analyze latest block or block by ID in CLI (promtool)

Signed-off-by: nidhey27 <nidhey.indurkar@infracloud.io>

* address remarks

Signed-off-by: nidhey60@gmail.com <nidhey.indurkar@infracloud.io>

* address latest review comments

Signed-off-by: nidhey60@gmail.com <nidhey.indurkar@infracloud.io>

---------

Signed-off-by: nidhey27 <nidhey.indurkar@infracloud.io>
Signed-off-by: nidhey60@gmail.com <nidhey.indurkar@infracloud.io>
  • Loading branch information
nidhey27 committed Jun 1, 2023
1 parent dfae954 commit a8772a4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 18 deletions.
28 changes: 11 additions & 17 deletions cmd/promtool/tsdb.go
Expand Up @@ -398,26 +398,20 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
if err != nil {
return nil, nil, err
}
blocks, err := db.Blocks()
if err != nil {
return nil, nil, err
}
var block tsdb.BlockReader
switch {
case blockID != "":
for _, b := range blocks {
if b.Meta().ULID.String() == blockID {
block = b
break
}

if blockID == "" {
blockID, err = db.LastBlockID()
if err != nil {
return nil, nil, err
}
case len(blocks) > 0:
block = blocks[len(blocks)-1]
}
if block == nil {
return nil, nil, fmt.Errorf("block %s not found", blockID)

b, err := db.Block(blockID)
if err != nil {
return nil, nil, err
}
return db, block, nil

return db, b, nil
}

func analyzeBlock(path, blockID string, limit int, runExtended bool) error {
Expand Down
54 changes: 54 additions & 0 deletions tsdb/db.go
Expand Up @@ -607,6 +607,60 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return blockReaders, nil
}

// LastBlockID returns the BlockID of latest block.
func (db *DBReadOnly) LastBlockID() (string, error) {
entries, err := os.ReadDir(db.dir)
if err != nil {
return "", err
}

max := uint64(0)

lastBlockID := ""

for _, e := range entries {
// Check if dir is a block dir or not.
dirName := e.Name()
ulidObj, err := ulid.ParseStrict(dirName)
if err != nil {
continue // Not a block dir.
}
timestamp := ulidObj.Time()
if timestamp > max {
max = timestamp
lastBlockID = dirName
}
}

if lastBlockID == "" {
return "", errors.New("no blocks found")
}

return lastBlockID, nil
}

// Block returns a block reader by given block id.
func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
select {
case <-db.closed:
return nil, ErrClosed
default:
}

_, err := os.Stat(filepath.Join(db.dir, blockID))
if os.IsNotExist(err) {
return nil, errors.Errorf("invalid block ID %s", blockID)
}

block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil)
if err != nil {
return nil, err
}
db.closers = append(db.closers, block)

return block, nil
}

// Close all block readers.
func (db *DBReadOnly) Close() error {
select {
Expand Down
19 changes: 18 additions & 1 deletion tsdb/db_test.go
Expand Up @@ -2384,6 +2384,7 @@ func TestDBReadOnly(t *testing.T) {
dbDir string
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
expBlocks []*Block
expBlock *Block
expSeries map[string][]tsdbutil.Sample
expChunks map[string][][]tsdbutil.Sample
expDBHash []byte
Expand Down Expand Up @@ -2427,6 +2428,7 @@ func TestDBReadOnly(t *testing.T) {
require.NoError(t, app.Commit())

expBlocks = dbWritable.Blocks()
expBlock = expBlocks[0]
expDbSize, err := fileutil.DirSize(dbWritable.Dir())
require.NoError(t, err)
require.Greater(t, expDbSize, dbSizeBeforeAppend, "db size didn't increase after an append")
Expand Down Expand Up @@ -2455,7 +2457,22 @@ func TestDBReadOnly(t *testing.T) {
require.Equal(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch")
}
})

t.Run("block", func(t *testing.T) {
blockID := expBlock.meta.ULID.String()
block, err := dbReadOnly.Block(blockID)
require.NoError(t, err)
require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch")
})
t.Run("invalid block ID", func(t *testing.T) {
blockID := "01GTDVZZF52NSWB5SXQF0P2PGF"
_, err := dbReadOnly.Block(blockID)
require.Error(t, err)
})
t.Run("last block ID", func(t *testing.T) {
blockID, err := dbReadOnly.LastBlockID()
require.NoError(t, err)
require.Equal(t, expBlocks[2].Meta().ULID.String(), blockID)
})
t.Run("querier", func(t *testing.T) {
// Open a read only db and ensure that the API returns the same result as the normal DB.
q, err := dbReadOnly.Querier(context.TODO(), math.MinInt64, math.MaxInt64)
Expand Down

0 comments on commit a8772a4

Please sign in to comment.