Skip to content

Commit

Permalink
Added options to New() to pass StateLog and EntriesLog
Browse files Browse the repository at this point in the history
Created EntriesLog interface and added implementation for fileStore.
No replication yet.
  • Loading branch information
kozlovic committed Sep 28, 2016
1 parent 3e72b20 commit 6f5e239
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 90 deletions.
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
HandlerReqErr = errors.New("graft: Handler is required")
RpcDriverReqErr = errors.New("graft: RPCDriver is required")
LogReqErr = errors.New("graft: Log is required")
MixedStateErr = errors.New("graft: Cannot mix use of SetStateLog option and logPath parameter")
LogNoExistErr = errors.New("graft: Log file does not exist")
LogNoStateErr = errors.New("graft: Log file does not have any state")
LogCorruptErr = errors.New("graft: Encountered corrupt log file")
Expand Down
2 changes: 1 addition & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestErrorHandler(t *testing.T) {

// Force a write to err
node.mu.Lock()
node.log.Close()
node.log.(*fileStore).Close()
node.mu.Unlock()

err = errWait(t, errCh)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func TestHeartBeatAsLeaderErrorOnWrite(t *testing.T) {

// Close log to cause error
node.mu.Lock()
node.log.Close()
node.log.(*fileStore).Close()
node.mu.Unlock()

// Send an heartbeat with higher term, which should cause the current leader
Expand Down
191 changes: 181 additions & 10 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package graft

import (
"bufio"
"hash/crc32"
"io"
"io/ioutil"
"os"
"sync"
Expand All @@ -14,10 +16,14 @@ import (

type fileStore struct {
sync.RWMutex
stateLog *os.File
stateEmpty bool
state pb.State
closed bool
stateLog *os.File
stateEmpty bool
state pb.State
entriesLog *os.File
entriesOffsets map[uint64]int64
firstIndex uint64
lastIndex uint64
closed bool
}

const (
Expand Down Expand Up @@ -49,12 +55,30 @@ func newFileStateStore(path string) (*fileStore, error) {
return fs, nil
}

func (fs *fileStore) recoverState() error {
// Move to beginning of file (for cases where we readState other than
// when the node is created - namely in tests).
if _, err := fs.stateLog.Seek(0, os.SEEK_SET); err != nil {
return err
// newFileStateAndLogStore returns an object that allows storing/retrieving State
// and Log entries to/from stable storage.
func newFileStateAndLogStore(statePath, entriesPath string) (*fileStore, error) {
fs, err := newFileStateStore(statePath)
if err != nil {
return nil, err
}
fs.entriesOffsets = make(map[uint64]int64)
log, err := os.OpenFile(entriesPath, os.O_RDWR|os.O_CREATE, 0660)
if err != nil {
return nil, err
}
fs.entriesLog = log
// If state is not empty, try to recover entries
if !fs.stateEmpty {
if err := fs.recoverEntries(); err != nil {
fs.Close()
return nil, err
}
}
return fs, nil
}

func (fs *fileStore) recoverState() error {
buf, err := ioutil.ReadAll(fs.stateLog)
if err != nil {
return err
Expand All @@ -73,6 +97,63 @@ func (fs *fileStore) recoverState() error {
return nil
}

func (fs *fileStore) recoverEntries() error {
r := bufio.NewReaderSize(fs.entriesLog, 1024*1024)
entry := pb.Entry{}
offset := int64(0)
buf := make([]byte, 64)
for {
size, err := fs.recoverEntry(r, &buf, &entry)
if err != nil {
return err
}
if size == 0 {
return nil
}
index := entry.Index
if fs.firstIndex == 0 {
fs.firstIndex = index
}
fs.lastIndex = index
fs.entriesOffsets[index] = offset
offset += int64(headerSize + size)
}
}

func (fs *fileStore) recoverEntry(r io.Reader, buf *[]byte, entry *pb.Entry) (int, error) {
header := make([]byte, headerSize)
if _, err := io.ReadFull(r, header); err != nil {
if err == io.EOF {
return 0, nil
}
return 0, LogCorruptErr
}
size := int(util.ReadUInt32(header[:4]))
storedCRC := util.ReadUInt32(header[4:])
var b []byte
if buf == nil {
b = make([]byte, size)
} else {
b = *buf
if cap(b) <= size {
*buf = make([]byte, size+size/2)
b = *buf
}
}
n, err := io.ReadFull(r, b[:size])
if err != nil {
return 0, LogCorruptErr
}
crc := crc32.ChecksumIEEE(b[:n])
if crc != storedCRC {
return 0, LogCorruptErr
}
if err := entry.Unmarshal(b[:n]); err != nil {
return 0, LogCorruptErr
}
return size, nil
}

// GetState implements the StateLog interface.
func (fs *fileStore) GetState() (*pb.State, error) {
fs.RLock()
Expand All @@ -85,7 +166,7 @@ func (fs *fileStore) GetState() (*pb.State, error) {
return &ps, nil
}

// SetState stores the state on stable storage.
// SetState implements the StateLog interface.
func (fs *fileStore) SetState(state *pb.State) error {
fs.Lock()
defer fs.Unlock()
Expand Down Expand Up @@ -121,6 +202,93 @@ func (fs *fileStore) SetState(state *pb.State) error {
return nil
}

// StoreEntries implements the EntriesLog interface.
func (fs *fileStore) StoreEntries(entries []*pb.Entry) error {
fs.Lock()
defer fs.Unlock()

// Set the file offset to end of file
offset, err := fs.entriesLog.Seek(0, os.SEEK_END)
if err != nil {
return err
}
totalSize := 0
sizes := make([]int, len(entries))
for i, e := range entries {
sizes[i] = e.Size()
totalSize += headerSize + sizes[i]
}
bw := bufio.NewWriterSize(fs.entriesLog, totalSize)
for i, e := range entries {
// No error expected here since we use a buffered writer
// of a big enough size
size := sizes[i]
fs.writeEntry(bw, e, size)
fs.entriesOffsets[e.Index] = offset
if fs.firstIndex == 0 {
fs.firstIndex = e.Index
}
if e.Index > fs.lastIndex {
fs.lastIndex = e.Index
}
offset += int64(headerSize + size)
}
// Flush the buffered
if err := bw.Flush(); err != nil {
return err
}
// We need to sync the file
if err := fs.entriesLog.Sync(); err != nil {
return err
}
return nil
}

func (fs *fileStore) writeEntry(w io.Writer, e *pb.Entry, size int) error {
buf := make([]byte, headerSize+size)
e.MarshalTo(buf[headerSize:])
crc := crc32.ChecksumIEEE(buf[headerSize:])
util.WriteUInt32(buf[:4], uint32(size))
util.WriteUInt32(buf[4:], crc)
_, err := w.Write(buf)
return err
}

// GetEntry implements the EntriesLog interface.
func (fs *fileStore) GetEntry(index uint64) (*pb.Entry, error) {
fs.Lock()
defer fs.Unlock()

// If we don't have this entry, this is not an error
if index < fs.firstIndex || index > fs.lastIndex {
return nil, nil
}
offset := fs.entriesOffsets[index]
r := fs.entriesLog
// Set the file position
if _, err := r.Seek(offset, os.SEEK_SET); err != nil {
return nil, err
}
entry := pb.Entry{}
if _, err := fs.recoverEntry(r, nil, &entry); err != nil {
return nil, err
}
return &entry, nil
}

// FirstIndex implements the EntriesLog interface.
func (fs *fileStore) FirstIndex() (uint64, error) {
fs.RLock()
defer fs.RUnlock()
return fs.firstIndex, nil
}

func (fs *fileStore) LastIndex() (uint64, error) {
fs.RLock()
defer fs.RUnlock()
return fs.lastIndex, nil
}

// helper function to close a file and possibly remove it.
func closeFile(err *error, f *os.File) {
name := f.Name()
Expand All @@ -146,5 +314,8 @@ func (fs *fileStore) Close() error {
if fs.stateLog != nil {
closeFile(&err, fs.stateLog)
}
if fs.entriesLog != nil {
closeFile(&err, fs.stateLog)
}
return err
}

0 comments on commit 6f5e239

Please sign in to comment.