Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: recover corrupted tail automatically #22

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
40 changes: 23 additions & 17 deletions wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,20 @@ type Options struct {
// Perms represents the datafiles modes and permission bits
DirPerms os.FileMode
FilePerms os.FileMode
// RecoverCorruptedTail will attempt to recover a corrupted tail in the last segment automatically.
RecoverCorruptedTail bool
}

// DefaultOptions for Open().
var DefaultOptions = &Options{
NoSync: false, // Fsync after every write
SegmentSize: 20971520, // 20 MB log segment files.
LogFormat: Binary, // Binary format is small and fast.
SegmentCacheSize: 2, // Number of cached in-memory segments
NoCopy: false, // Make a new copy of data for every Read call.
DirPerms: 0750, // Permissions for the created directories
FilePerms: 0640, // Permissions for the created data files
NoSync: false, // Fsync after every write
SegmentSize: 20971520, // 20 MB log segment files.
LogFormat: Binary, // Binary format is small and fast.
SegmentCacheSize: 2, // Number of cached in-memory segments
NoCopy: false, // Make a new copy of data for every Read call.
DirPerms: 0750, // Permissions for the created directories
FilePerms: 0640, // Permissions for the created data files
RecoverCorruptedTail: false, // Don't recover corrupted tail.
}

// Log represents a write ahead log
Expand Down Expand Up @@ -262,15 +265,15 @@ func (l *Log) load() error {
l.firstIndex = l.segments[0].index
// Open the last segment for appending
lseg := l.segments[len(l.segments)-1]
l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, l.opts.FilePerms)
if err != nil {
// Load the last segment entries
if err := l.loadSegmentEntries(lseg, l.opts.RecoverCorruptedTail); err != nil {
return err
}
if _, err := l.sfile.Seek(0, 2); err != nil {
l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, l.opts.FilePerms)
if err != nil {
return err
}
// Load the last segment entries
if err := l.loadSegmentEntries(lseg); err != nil {
if _, err := l.sfile.Seek(int64(len(lseg.ebuf)), 0); err != nil {
return err
}
l.lastIndex = lseg.index + uint64(len(lseg.epos)) - 1
Expand Down Expand Up @@ -529,7 +532,7 @@ func (l *Log) findSegment(index uint64) int {
return i - 1
}

func (l *Log) loadSegmentEntries(s *segment) error {
func (l *Log) loadSegmentEntries(s *segment, ignoreCorruptedTail bool) error {
data, err := ioutil.ReadFile(s.path)
if err != nil {
return err
Expand All @@ -544,14 +547,17 @@ func (l *Log) loadSegmentEntries(s *segment) error {
} else {
n, err = loadNextBinaryEntry(data)
}
if err == ErrCorrupt && ignoreCorruptedTail {
break
}
if err != nil {
return err
}
data = data[n:]
epos = append(epos, bpos{pos, pos + n})
pos += n
}
s.ebuf = ebuf
s.ebuf = ebuf[:pos]
s.epos = epos
return nil
}
Expand Down Expand Up @@ -607,7 +613,7 @@ func (l *Log) loadSegment(index uint64) (*segment, error) {
s := l.segments[idx]
if len(s.epos) == 0 {
// load the entries from cache
if err := l.loadSegmentEntries(s); err != nil {
if err := l.loadSegmentEntries(s, false); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -791,7 +797,7 @@ func (l *Log) truncateFront(index uint64) (err error) {
return err
}
// Load the last segment entries
if err = l.loadSegmentEntries(s); err != nil {
if err = l.loadSegmentEntries(s, false); err != nil {
return err
}
}
Expand Down Expand Up @@ -897,7 +903,7 @@ func (l *Log) truncateBack(index uint64) (err error) {
l.segments = append([]*segment{}, l.segments[:segIdx+1]...)
l.lastIndex = index
l.clearCache()
if err = l.loadSegmentEntries(s); err != nil {
if err = l.loadSegmentEntries(s, false); err != nil {
return err
}
return nil
Expand Down
18 changes: 18 additions & 0 deletions wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ func TestOutliers(t *testing.T) {
t.Run("fail-corrupted-tail-json", func(t *testing.T) {
defer os.RemoveAll("testlog/corrupt-tail")
opts := makeOpts(512, true, JSON)
optsRecoverTail := *opts
optsRecoverTail.RecoverCorruptedTail = true
os.MkdirAll("testlog/corrupt-tail", 0777)
ioutil.WriteFile(
"testlog/corrupt-tail/00000000000000000001",
Expand All @@ -527,27 +529,43 @@ func TestOutliers(t *testing.T) {
l.Close()
t.Fatalf("expected %v, got %v", ErrCorrupt, err)
}
if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil {
l.Close()
t.Fatalf("expected %v, got %v", nil, err)
}
ioutil.WriteFile(
"testlog/corrupt-tail/00000000000000000001",
[]byte(`{}`+"\n"), 0666)
if l, err := Open("testlog/corrupt-tail", opts); err != ErrCorrupt {
l.Close()
t.Fatalf("expected %v, got %v", ErrCorrupt, err)
}
if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil {
l.Close()
t.Fatalf("expected %v, got %v", nil, err)
}
ioutil.WriteFile(
"testlog/corrupt-tail/00000000000000000001",
[]byte(`{"index":"1"}`+"\n"), 0666)
if l, err := Open("testlog/corrupt-tail", opts); err != ErrCorrupt {
l.Close()
t.Fatalf("expected %v, got %v", ErrCorrupt, err)
}
if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil {
l.Close()
t.Fatalf("expected %v, got %v", nil, err)
}
ioutil.WriteFile(
"testlog/corrupt-tail/00000000000000000001",
[]byte(`{"index":"1","data":"?"}`), 0666)
if l, err := Open("testlog/corrupt-tail", opts); err != ErrCorrupt {
l.Close()
t.Fatalf("expected %v, got %v", ErrCorrupt, err)
}
if l, err := Open("testlog/corrupt-tail", &optsRecoverTail); err != nil {
l.Close()
t.Fatalf("expected %v, got %v", nil, err)
}
})

t.Run("start-marker-file", func(t *testing.T) {
Expand Down