Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Added storage size based retention method and new metrics #343

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` and new public interface `SizeReader: Size() int64`

## 0.3.1
- [BUGFIX] Fixed most windows test and some actual bugs for unclosed file readers.
Expand Down
38 changes: 36 additions & 2 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"path/filepath"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -140,6 +142,12 @@ type Appendable interface {
Appender() Appender
}

// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand All @@ -166,6 +174,7 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}

// BlockDesc describes a block by ULID and time range.
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -257,7 +266,10 @@ type Block struct {

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if logger == nil {
logger = log.NewNopLogger()
}
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
Expand All @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}

tr, err := readTombstones(dir)
tr, tsr, err := readTombstones(dir)
if err != nil {
return nil, err
}

// TODO refactor to set this at block creation time as
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}

pb := &Block{
dir: dir,
meta: *meta,
Expand All @@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")

Expand Down
8 changes: 4 additions & 4 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Equals(t, true, b.meta.Compaction.Failed)
testutil.Ok(t, b.Close())

b, err = OpenBlock(tmpdir, nil)
b, err = OpenBlock(nil, tmpdir, nil)
testutil.Ok(t, err)
testutil.Equals(t, true, b.meta.Compaction.Failed)
}
Expand All @@ -72,7 +72,7 @@ func createEmptyBlock(t *testing.T, dir string, meta *BlockMeta) *Block {

testutil.Ok(t, writeTombstoneFile(dir, newMemTombstones()))

b, err := OpenBlock(dir, nil)
b, err := OpenBlock(nil, dir, nil)
testutil.Ok(t, err)
return b
}
Expand All @@ -97,7 +97,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int
continue
}
}
ref, err := app.Add(lbl, int64(ts), rand.Float64())
ref, err := app.Add(lbl, ts, rand.Float64())
testutil.Ok(tb, err)
refs[i] = ref
}
Expand All @@ -113,7 +113,7 @@ func createPopulatedBlock(tb testing.TB, dir string, nSeries int, mint, maxt int
ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)

blk, err := OpenBlock(filepath.Join(dir, ulid.String()), nil)
blk, err := OpenBlock(nil, filepath.Join(dir, ulid.String()), nil)
testutil.Ok(tb, err)
return blk
}
24 changes: 15 additions & 9 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
// Reader implements a SeriesReader for a serialized byte stream
// of series data.
type Reader struct {
// The underlying bytes holding the encoded series data.
bs []ByteSlice

// Closers for resources behind the byte slices.
cs []io.Closer

bs []ByteSlice // The underlying bytes holding the encoded series data.
cs []io.Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc.Pool
}

func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64

for i, b := range cr.bs {
if b.Len() < 4 {
Expand All @@ -304,7 +302,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks {
return nil, errors.Errorf("invalid magic number %x", m)
}
totalSize += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}

Expand All @@ -327,9 +327,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
pool = chunkenc.NewPool()
}

var bs []ByteSlice
var cs []io.Closer

var (
bs []ByteSlice
cs []io.Closer
)
for _, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
Expand All @@ -345,6 +346,11 @@ func (s *Reader) Close() error {
return closeAll(s.cs...)
}

// Size returns the size of the chunks.
func (s *Reader) Size() int64 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
return s.size
}

func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var (
seq = int(ref >> 32)
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u

if b == nil {
var err error
b, err = OpenBlock(d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool)
if err != nil {
return uid, err
}
Expand Down
Loading