Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Use concurrent.Writer in place of bufio.Writer #149

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
82 changes: 36 additions & 46 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package tsdb

import (
"bufio"
"encoding/binary"
"fmt"
"hash"
Expand All @@ -27,6 +26,7 @@ import (
"sync"
"time"

"github.com/alin-sinpalean/concurrent-writer/concurrent"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -185,7 +185,7 @@ type SegmentWAL struct {
segmentSize int64

crc32 hash.Hash32
cur *bufio.Writer
cur *concurrent.Writer
curN int64

stopc chan struct{}
Expand Down Expand Up @@ -551,15 +551,20 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) {

// cut finishes the currently active segments and opens the next one.
// The encoder is reset to point to the new segment.
func (w *SegmentWAL) cut() error {
// Optionally, a channel may be provided to allow the caller to wait on the
// current segment having been finished, as this is done asynchronously.
func (w *SegmentWAL) cut(done chan interface{}) error {
// Sync current head to disk and close.
if hf := w.head(); hf != nil {
if err := w.flush(); err != nil {
return err
}
cur := w.cur
// Finish last segment asynchronously to not block the WAL moving along
// in the new segment.
go func() {
if cur != nil {
if err := cur.Flush(); err != nil {
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
}
off, err := hf.Seek(0, os.SEEK_CUR)
if err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
Expand All @@ -573,6 +578,9 @@ func (w *SegmentWAL) cut() error {
if err := hf.Close(); err != nil {
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err)
}
if done != nil {
close(done)
}
}()
}

Expand All @@ -594,7 +602,7 @@ func (w *SegmentWAL) cut() error {
w.files = append(w.files, newSegmentFile(f))

// TODO(gouthamve): make the buffer size a constant.
w.cur = bufio.NewWriterSize(f, 8*1024*1024)
w.cur = concurrent.NewWriterSize(f, 8*1024*1024)
w.curN = 8

return nil
Expand All @@ -609,52 +617,32 @@ func (w *SegmentWAL) head() *segmentFile {

// Sync flushes the changes to disk.
func (w *SegmentWAL) Sync() error {
var head *segmentFile
var err error
// Retrieve references to the head segment and current writer under mutex lock.
w.mtx.Lock()
cur := w.cur
head := w.head()
w.mtx.Unlock()

// Flush the writer and retrieve the reference to the head segment under mutex lock.
func() {
w.mtx.Lock()
defer w.mtx.Unlock()
if err = w.flush(); err != nil {
return
}
head = w.head()
}()
if err != nil {
// But only flush and fsync after releasing the mutex as it will block on disk I/O.
return syncImpl(cur, head, w.metrics.fsyncDuration)
}

func syncImpl(cur *concurrent.Writer, head *segmentFile, fsyncDuration prometheus.Summary) error {
if cur == nil {
return nil
}
if err := cur.Flush(); err != nil {
return errors.Wrap(err, "flush buffer")
}
if head != nil {
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
start := time.Now()
err := fileutil.Fdatasync(head.File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
fsyncDuration.Observe(time.Since(start).Seconds())
return err
}
return nil
}

func (w *SegmentWAL) sync() error {
if err := w.flush(); err != nil {
return err
}
if w.head() == nil {
return nil
}

start := time.Now()
err := fileutil.Fdatasync(w.head().File)
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
return err
}

func (w *SegmentWAL) flush() error {
if w.cur == nil {
return nil
}
return w.cur.Flush()
}

func (w *SegmentWAL) run(interval time.Duration) {
var tick <-chan time.Time

Expand Down Expand Up @@ -683,14 +671,16 @@ func (w *SegmentWAL) Close() error {
<-w.donec

w.mtx.Lock()
defer w.mtx.Unlock()
cur := w.cur
hf := w.head()
w.mtx.Unlock()

if err := w.sync(); err != nil {
if err := syncImpl(cur, hf, w.metrics.fsyncDuration); err != nil {
return err
}
// On opening, a WAL must be fully consumed once. Afterwards
// only the current segment will still be open.
if hf := w.head(); hf != nil {
if hf != nil {
return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name())
}
return nil
Expand All @@ -716,7 +706,7 @@ func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error {
// XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened.
// Probably fine in general but may yield a lot of short files in some cases.
if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize {
if err := w.cut(); err != nil {
if err := w.cut(nil); err != nil {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func TestSegmentWAL_cut(t *testing.T) {

require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))

require.NoError(t, w.cut(), "cut failed")
done := make(chan interface{})
require.NoError(t, w.cut(done), "cut failed")
<-done

// Cutting creates a new file.
require.Equal(t, 2, len(w.files))
Expand Down Expand Up @@ -389,17 +391,15 @@ func TestWALRestoreCorrupted(t *testing.T) {
require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 2, V: 3}}))

require.NoError(t, w.cut())
done := make(chan interface{})
require.NoError(t, w.cut(done))
<-done

require.NoError(t, w.LogSamples([]RefSample{{T: 3, V: 4}}))
require.NoError(t, w.LogSamples([]RefSample{{T: 5, V: 6}}))

require.NoError(t, w.Close())

// cut() truncates and fsyncs the first segment async. If it happens after
// the corruption we apply below, the corruption will be overwritten again.
// Fire and forget a sync to avoid flakyness.
w.files[0].Sync()
// Corrupt the second entry in the first file.
// After re-opening we must be able to read the first entry
// and the rest, including the second file, must be truncated for clean further
Expand Down