Skip to content

Commit

Permalink
flesh out details for walstats
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Jul 15, 2020
1 parent d3debcb commit 3a40bbb
Showing 1 changed file with 93 additions and 3 deletions.
96 changes: 93 additions & 3 deletions pkg/agentctl/walstats.go
Expand Up @@ -3,6 +3,7 @@ package agentctl
import (
"time"

"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
)

Expand Down Expand Up @@ -63,16 +64,105 @@ type WALTargetStats struct {
// CalculateStats calculates the statistics of the WAL for the given directory.
// walDir must be a folder containing segment files and checkpoint directories.
func CalculateStats(walDir string) (WALStats, error) {
w, err := wal.Open(nil, walDir)
if err != nil {
return WALStats{}, err
}
defer w.Close()

c := walStatsCalculator{w: w}
return c.Calculate()
}

type walStatsCalculator struct {
w *wal.WAL
}

func (c *walStatsCalculator) Calculate() (WALStats, error) {
var (
stats WALStats
err error
)

w, err := wal.Open(nil, walDir)
checkpointDir, checkpointIdx, err := wal.LastCheckpoint(c.w.Dir())
if err != nil && err != record.ErrNotFound {
return stats, err
}

firstSegment, lastSegment, err := c.w.Segments()
if err != nil {
return stats, err
}
defer w.Close()

return stats, err
stats.FirstSegment = firstSegment
stats.LastSegment = lastSegment
stats.CheckpointNumber = checkpointIdx

// Calculate stats from the ckecpoint.
if checkpointDir != "" {
err := c.readCheckpoint(checkpointDir)
if err != nil {
return stats, err
}
}

// Calculate stats from the segments.
startFrom := 0
if checkpointDir != "" {
startFrom = checkpointIdx + 1
}
for i := startFrom; i <= lastSegment; i++ {
err := c.readSegment(wal.SegmentName(c.w.Dir(), i))
if err != nil {
return stats, err
}
}

return stats, nil
}

func (c *walStatsCalculator) readCheckpoint(dir string) error {
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return err
}
defer func() {
_ = sr.Close()
}()

return c.readWAL(wal.NewReader(sr))
}

func (c *walStatsCalculator) readSegment(file string) error {
s, err := wal.OpenReadSegment(file)
if err != nil {
return err
}

sr := wal.NewSegmentBufReader(s)
defer func() {
_ = sr.Close()
}()
return c.readWAL(wal.NewReader(sr))
}

func (c *walStatsCalculator) readWAL(r *wal.Reader) error {
var dec record.Decoder

for r.Next() {
rec := r.Record()

// We ignore other record types here; we only write records and samples
// but we don't want to return an error for an unexpected record type;
// doing so would prevent users from getting stats on a traditional
// Prometheus WAL, which would be nice to support.
switch dec.Type(rec) {
case record.Series:
// TODO
case record.Samples:
// TODO
}
}

return r.Err()
}

0 comments on commit 3a40bbb

Please sign in to comment.