Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge pull request #162 from BasPH/fsync-duration
Browse files Browse the repository at this point in the history
Instrument WAL fsync
  • Loading branch information
fabxc committed Oct 5, 2017
2 parents da565f9 + 5e1c258 commit 27f1b8a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
return nil, errors.Wrap(err, "create leveled compactor")
}

wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval)
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
if err != nil {
return nil, err
}
Expand Down
36 changes: 33 additions & 3 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/client_golang/prometheus"
)

// WALEntryType indicates what data a WAL entry contains.
Expand Down Expand Up @@ -65,6 +66,26 @@ type SeriesCB func([]RefSeries) error
// is only valid until the call returns.
type DeletesCB func([]Stone) error

type walMetrics struct {
fsyncDuration prometheus.Summary
}

func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
m := &walMetrics{}

m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_wal_fsync_duration_seconds",
Help: "Duration of WAL fsync.",
})

if r != nil {
r.MustRegister(
m.fsyncDuration,
)
}
return m
}

// WAL is a write ahead log that can log new series labels and samples.
// It must be completely read before new entries are logged.
type WAL interface {
Expand Down Expand Up @@ -150,6 +171,7 @@ func newCRC32() hash.Hash32 {
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
mtx sync.Mutex
metrics *walMetrics

dirFile *os.File
files []*segmentFile
Expand All @@ -169,7 +191,7 @@ type SegmentWAL struct {

// OpenSegmentWAL opens or creates a write ahead log in the given directory.
// The WAL must be read completely before new data is written.
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) {
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
Expand All @@ -190,6 +212,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
segmentSize: walSegmentSizeBytes,
crc32: newCRC32(),
}
w.metrics = newWalMetrics(w, r)

fns, err := sequenceFiles(w.dirFile.Name())
if err != nil {
Expand Down Expand Up @@ -592,7 +615,10 @@ func (w *SegmentWAL) Sync() error {
}
if head != nil {
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
return fileutil.Fdatasync(head.File)
start := time.Now()
err := fileutil.Fdatasync(head.File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
return nil
}
Expand All @@ -604,7 +630,11 @@ func (w *SegmentWAL) sync() error {
if w.head() == nil {
return nil
}
return fileutil.Fdatasync(w.head().File)

start := time.Now()
err := fileutil.Fdatasync(w.head().File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}

func (w *SegmentWAL) flush() error {
Expand Down
18 changes: 9 additions & 9 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestSegmentWAL_Open(t *testing.T) {
}

// Initialize 5 correct segment files.
w, err := OpenSegmentWAL(tmpdir, nil, 0)
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)

require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestSegmentWAL_Open(t *testing.T) {
_, err = f.WriteAt([]byte{0}, 4)
require.NoError(t, err)

w, err = OpenSegmentWAL(tmpdir, nil, 0)
w, err = OpenSegmentWAL(tmpdir, nil, 0, nil)
require.Error(t, err, "open with corrupted segments")
}

Expand All @@ -84,7 +84,7 @@ func TestSegmentWAL_cut(t *testing.T) {
defer os.RemoveAll(tmpdir)

// This calls cut() implicitly the first time without a previous tail.
w, err := OpenSegmentWAL(tmpdir, nil, 0)
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
require.NoError(t, err)

require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
require.NoError(t, err)
// defer os.RemoveAll(dir)

w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)
w.segmentSize = 10000

Expand Down Expand Up @@ -181,7 +181,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
require.NoError(t, w.Close())

// The same again with a new WAL.
w, err = OpenSegmentWAL(dir, nil, 0)
w, err = OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)

var readSeries []RefSeries
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
// Open WAL a bunch of times, validate all previous data can be read,
// write more data to it, close it.
for k := 0; k < numMetrics; k += numMetrics / iterations {
w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)

// Set smaller segment size so we can actually write several files.
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

w, err := OpenSegmentWAL(dir, nil, 0)
w, err := OpenSegmentWAL(dir, nil, 0, nil)
require.NoError(t, err)

require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
Expand All @@ -415,7 +415,7 @@ func TestWALRestoreCorrupted(t *testing.T) {

logger := log.NewLogfmtLogger(os.Stderr)

w2, err := OpenSegmentWAL(dir, logger, 0)
w2, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)

r := w2.Reader()
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestWALRestoreCorrupted(t *testing.T) {

// We should see the first valid entry and the new one, everything after
// is truncated.
w3, err := OpenSegmentWAL(dir, logger, 0)
w3, err := OpenSegmentWAL(dir, logger, 0, nil)
require.NoError(t, err)

r = w3.Reader()
Expand Down

0 comments on commit 27f1b8a

Please sign in to comment.