Skip to content

Commit

Permalink
Merge pull request #81 from coreos/fixLog
Browse files Browse the repository at this point in the history
Fix log
  • Loading branch information
benbjohnson committed Jul 23, 2013
2 parents 59de4e7 + ee35508 commit 34aa0be
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 65 deletions.
114 changes: 65 additions & 49 deletions log.go
Expand Up @@ -139,61 +139,56 @@ func (l *Log) open(path string) error {
defer l.mutex.Unlock()

// Read all the entries from the log if one exists.
var lastIndex int = 0
if _, err := os.Stat(path); !os.IsNotExist(err) {
// Open the log file.
file, err := os.Open(path)
if err != nil {
var readBytes int64

var err error
debugln("log.open.open ", path)
// open log file
l.file, err = os.OpenFile(path, os.O_RDWR, 0600)
l.path = path

if err != nil {
// if the log file does not exist before
// we create the log file and set commitIndex to 0
if os.IsNotExist(err) {
l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
debugln("log.open.create ", path)

return err
}
defer file.Close()
reader := bufio.NewReader(file)
return err
}
debugln("log.open.exist ", path)

// Read the file and decode entries.
for {
if _, err := reader.Peek(1); err == io.EOF {
break
}
reader := bufio.NewReader(l.file)

// Instantiate log entry and decode into it.
entry, _ := newLogEntry(l, 0, 0, nil)
n, err := entry.decode(reader)
if err != nil {
file.Close()
if err = os.Truncate(path, int64(lastIndex)); err != nil {
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
}
break
}
// Read the file and decode entries.
for {
if _, err := reader.Peek(1); err == io.EOF {
debugln("open.log.append: finish ")
break
}

// Append entry.
l.entries = append(l.entries, entry)
l.commitIndex = entry.Index
// Instantiate log entry and decode into it.
entry, _ := newLogEntry(l, 0, 0, nil)
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)

// Lookup and decode command.
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
file.Close()
return err
n, err := entry.decode(reader)
if err != nil {
if err = os.Truncate(path, readBytes); err != nil {
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
}

// Apply the command.
returnValue, err := l.ApplyFunc(command)
l.results = append(l.results, &logResult{returnValue: returnValue, err: err})

lastIndex += n
break
}

file.Close()
}
// Append entry.
l.entries = append(l.entries, entry)
debugln("open.log.append log index ", entry.Index)

// Open the file for appending.
var err error
l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
readBytes += int64(n)
}
l.path = path
l.results = make([]*logResult, len(l.entries))
debugln("open.log.recovery number of log ", len(l.entries))
return nil
}

Expand Down Expand Up @@ -385,11 +380,6 @@ func (l *Log) setCommitIndex(index uint64) error {
entryIndex := i - 1 - l.startIndex
entry := l.entries[entryIndex]

// Write to storage.
if _, err := entry.encode(l.file); err != nil {
return err
}

// Update commit index.
l.commitIndex = entry.Index

Expand All @@ -406,6 +396,14 @@ func (l *Log) setCommitIndex(index uint64) error {
return nil
}

// Set the commitIndex at the head of the log file to the current
// commit Index. This should be called after obtained a log lock
func (l *Log) flushCommitIndex() {
l.file.Seek(0, os.SEEK_SET)
fmt.Fprintf(l.file, "%8x\n", l.commitIndex)
l.file.Seek(0, os.SEEK_END)
}

//--------------------------------------
// Truncation
//--------------------------------------
Expand All @@ -431,6 +429,9 @@ func (l *Log) truncate(index uint64, term uint64) error {

// If we're truncating everything then just clear the entries.
if index == l.startIndex {
debugln("log.truncate.clear")
l.file.Truncate(0)
l.file.Seek(0, os.SEEK_SET)
l.entries = []*LogEntry{}
} else {
// Do not truncate if the entry at index does not have the matching term.
Expand All @@ -443,6 +444,9 @@ func (l *Log) truncate(index uint64, term uint64) error {
// Otherwise truncate up to the desired entry.
if index < l.startIndex+uint64(len(l.entries)) {
debugln("log.truncate.finish")
position := l.entries[index-l.startIndex].Position
l.file.Truncate(position)
l.file.Seek(position, os.SEEK_SET)
l.entries = l.entries[0 : index-l.startIndex]
}
}
Expand Down Expand Up @@ -488,6 +492,15 @@ func (l *Log) appendEntry(entry *LogEntry) error {
}
}

position, _ := l.file.Seek(0, os.SEEK_CUR)

entry.Position = position

// Write to storage.
if _, err := entry.encode(l.file); err != nil {
return err
}

// Append to entries list if stored on disk.
l.entries = append(l.entries, entry)
l.results = append(l.results, nil)
Expand Down Expand Up @@ -523,6 +536,9 @@ func (l *Log) compact(index uint64, term uint64) error {
return err
}
for _, entry := range entries {
position, _ := l.file.Seek(0, os.SEEK_CUR)
entry.Position = position

if _, err = entry.encode(file); err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions log_entry.go
Expand Up @@ -16,6 +16,7 @@ type LogEntry struct {
Term uint64
CommandName string
Command []byte
Position int64 // position in the log file
commit chan bool
}

Expand Down Expand Up @@ -62,11 +63,10 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
err := p.Marshal(pb)

if err != nil {
panic(err)
return -1, err
}

_, err = fmt.Fprintf(w, "%x", len(p.Bytes()))
_, err = fmt.Fprintf(w, "%x\n", len(p.Bytes()))

if err != nil {
return -1, err
Expand All @@ -80,10 +80,9 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
func (e *LogEntry) decode(r io.Reader) (int, error) {

var length int
_, err := fmt.Fscanf(r, "%x", &length)
_, err := fmt.Fscanf(r, "%x\n", &length)

if err != nil {
panic(err)
return -1, err
}

Expand All @@ -100,7 +99,7 @@ func (e *LogEntry) decode(r io.Reader) (int, error) {
err = p.Unmarshal(pb)

if err != nil {
return 0, err
return -1, err
}

e.Term = pb.GetTerm()
Expand Down
28 changes: 27 additions & 1 deletion log_test.go
Expand Up @@ -115,6 +115,7 @@ func TestLogRecovery(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-")

e0.encode(f)
e1.encode(f)
f.WriteString("CORRUPT!")
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestLogTruncate(t *testing.T) {
if err := log.open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.close()

defer os.Remove(path)

entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
Expand Down Expand Up @@ -200,4 +201,29 @@ func TestLogTruncate(t *testing.T) {
t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2})
}

// Append after truncate
if err := log.appendEntry(entry3); err != nil {
t.Fatalf("Unable to append after truncate: %v", err)
}

log.close()

// Recovery the truncated log
log = newLog()
if err := log.open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
// Validate existing log entries.
if len(log.entries) != 3 {
t.Fatalf("Expected 3 entries, got %d", len(log.entries))
}
if log.entries[0].Index != 1 || log.entries[0].Term != 1 {
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
}
if log.entries[1].Index != 2 || log.entries[1].Term != 1 {
t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
}
if log.entries[2].Index != 3 || log.entries[2].Term != 2 {
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
}
}
2 changes: 1 addition & 1 deletion peer.go
Expand Up @@ -252,7 +252,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
return
}
// Send response to server for processing.
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
}

//--------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion protobuf/snapshot_recovery_request.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 34aa0be

Please sign in to comment.