Skip to content

Commit

Permalink
pump/storage: Refactor, encapsulate writing and offset updating in lo…
Browse files Browse the repository at this point in the history
…gFile
  • Loading branch information
suzaku committed Jun 24, 2019
1 parent 0b0000d commit 7ee62a6
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 31 deletions.
33 changes: 33 additions & 0 deletions pump/storage/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -51,6 +52,8 @@ type logFile struct {
fid uint32
path string

writeOffset int64

// guard fd
lock sync.RWMutex
fd *os.File
Expand Down Expand Up @@ -139,6 +142,7 @@ func newLogFile(fid uint32, name string) (lf *logFile, err error) {
fd: fd,
path: name,
corruptionReporter: logReporter,
writeOffset: info.Size(),
}

if info.Size() >= fileFooterLength {
Expand Down Expand Up @@ -186,6 +190,35 @@ func (lf *logFile) updateMaxTS(ts int64) {
}
}

// IncWriteOffset moves the write offset forward for n bytes and returns the new offset.
func (lf *logFile) IncWriteOffset(n int64) int64 {
return atomic.AddInt64(&lf.writeOffset, n)
}

// GetWriteOffset returns the write offset of the log file.
func (lf *logFile) GetWriteOffset() int64 {
return atomic.LoadInt64(&lf.writeOffset)
}

// Write writes data to disk and update the write offset.
// If sync is set, it also cares to call `fsync` to make sure
// the buffered data is flushed to disk.
func (lf *logFile) Write(data []byte, sync bool) error {
n, err := lf.fd.Write(data)
lf.IncWriteOffset(int64(n))

if err != nil {
return errors.Annotatef(err, "unable to write to log file: %s", lf.path)
}
if sync {
err = lf.fdatasync()
if err != nil {
return errors.Annotatef(err, "fdatasync file %s failed", lf.path)
}
}
return nil
}

// finalize write the footer to the file, then we never write this file anymore
func (lf *logFile) finalize() error {
if lf.end {
Expand Down
14 changes: 13 additions & 1 deletion pump/storage/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math/rand"
"os"
"path"
"path/filepath"
"strconv"

fuzz "github.com/google/gofuzz"
Expand Down Expand Up @@ -73,6 +74,18 @@ func (lfs *LogFileSuit) TearDownTest(c *check.C) {
os.Remove(lfs.lf.path)
}

func (lfs *LogFileSuit) TestWriteOffset(c *check.C) {
dir := c.MkDir()
f, err := newLogFile(1024, filepath.Join(dir, "1024.vlog"))
c.Assert(err, check.IsNil)
c.Assert(f.GetWriteOffset(), check.Equals, int64(0))

data := make([]byte, 379)
err = f.Write(data, true)
c.Assert(err, check.IsNil)
c.Assert(f.GetWriteOffset(), check.Equals, int64(379))
}

func (lfs *LogFileSuit) TestSeekToNextRecord(c *check.C) {
buffer := new(bytes.Buffer)

Expand Down Expand Up @@ -107,7 +120,6 @@ func (lfs *LogFileSuit) TestSeekToNextRecord(c *check.C) {
c.Assert(err, check.NotNil)
c.Assert(bytes, check.Equals, len(data)-idx)
}

}
}

Expand Down
36 changes: 6 additions & 30 deletions pump/storage/vlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func (vp *valuePointer) UnmarshalBinary(data []byte) error {
type valueLog struct {
buf *bytes.Buffer // buf to write to the current log file

// writable offset of the curFile(the max fid file)
writableLogOffset int64

dirPath string
sync bool
maxFid uint32
Expand Down Expand Up @@ -245,13 +242,6 @@ func (vlog *valueLog) openOrCreateFiles() error {
if err != nil {
return errors.Annotatef(err, "error create new file")
}
} else {
info, err := curFile.fd.Stat()
if err != nil {
return errors.Trace(err)
}

vlog.writableLogOffset = info.Size()
}
}

Expand All @@ -265,8 +255,6 @@ func (vlog *valueLog) createLogFile(fid uint32) (*logFile, error) {
return nil, errors.Annotate(err, "unable to create log file")
}

vlog.writableLogOffset = 0

vlog.filesLock.Lock()
vlog.filesMap[fid] = logFile
vlog.filesLock.Unlock()
Expand All @@ -282,7 +270,7 @@ func (vlog *valueLog) close() error {
curFile := vlog.filesMap[vlog.maxFid]

// finalize the curFile when it's tool big, so when restart, we don't need to scan the too big curFile to recover the maxTS of the file
if vlog.writableOffset() >= finalizeFileSizeAtClose {
if curFile.GetWriteOffset() >= finalizeFileSizeAtClose {
err = curFile.finalize()
if err != nil {
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
Expand Down Expand Up @@ -315,10 +303,6 @@ func (vlog *valueLog) readValue(vp valuePointer) ([]byte, error) {
return record.payload, nil
}

func (vlog *valueLog) writableOffset() int64 {
return atomic.LoadInt64(&vlog.writableLogOffset)
}

// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
vlog.filesLock.RLock()
Expand All @@ -328,17 +312,9 @@ func (vlog *valueLog) write(reqs []*request) error {
var bufReqs []*request

toDisk := func() error {
n, err := curFile.fd.Write(vlog.buf.Bytes())
atomic.AddInt64(&vlog.writableLogOffset, int64(n))

err := curFile.Write(vlog.buf.Bytes(), vlog.sync)
if err != nil {
return errors.Annotatef(err, "unable to write to log file: %s", curFile.path)
}
if vlog.sync {
err = curFile.fdatasync()
if err != nil {
return errors.Annotatef(err, "fdatasync file %s failed", curFile.path)
}
return errors.Trace(err)
}

for _, req := range bufReqs {
Expand All @@ -348,7 +324,7 @@ func (vlog *valueLog) write(reqs []*request) error {
bufReqs = bufReqs[:0]

// rotate file
if vlog.writableOffset() > vlog.opt.ValueLogFileSize {
if curFile.GetWriteOffset() > vlog.opt.ValueLogFileSize {
err := curFile.finalize()
if err != nil {
return errors.Annotatef(err, "finalize file %s failed", curFile.path)
Expand All @@ -365,15 +341,15 @@ func (vlog *valueLog) write(reqs []*request) error {

for _, req := range reqs {
req.valuePointer.Fid = curFile.fid
req.valuePointer.Offset = vlog.writableOffset() + int64(vlog.buf.Len())
req.valuePointer.Offset = curFile.GetWriteOffset() + int64(vlog.buf.Len())
_, err := encodeRecord(vlog.buf, req.payload)
if err != nil {
return errors.Trace(err)
}

bufReqs = append(bufReqs, req)

writeNow := vlog.writableOffset()+int64(vlog.buf.Len()) > vlog.opt.ValueLogFileSize
writeNow := curFile.GetWriteOffset()+int64(vlog.buf.Len()) > vlog.opt.ValueLogFileSize

if writeNow {
if err := toDisk(); err != nil {
Expand Down

0 comments on commit 7ee62a6

Please sign in to comment.