-
Notifications
You must be signed in to change notification settings - Fork 35
/
storage.go
188 lines (160 loc) · 4.57 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package raft
import (
"fmt"
"io"
"os"
"path"
"strconv"
etcdraft "github.com/kandoo/beehive/Godeps/_workspace/src/github.com/coreos/etcd/raft"
"github.com/kandoo/beehive/Godeps/_workspace/src/github.com/coreos/etcd/raft/raftpb"
"github.com/kandoo/beehive/Godeps/_workspace/src/github.com/coreos/etcd/snap"
"github.com/kandoo/beehive/Godeps/_workspace/src/github.com/coreos/etcd/wal"
"github.com/kandoo/beehive/Godeps/_workspace/src/github.com/coreos/etcd/wal/walpb"
"github.com/kandoo/beehive/Godeps/_workspace/src/github.com/golang/glog"
)
// This code is from etcdserver/storage.go and keep it in sync.
type DiskStorage interface {
// Save function saves ents and state to the underlying stable storage.
// Save MUST block until st and ents are on stable storage.
Save(st raftpb.HardState, ents []raftpb.Entry) error
// SaveSnap function saves snapshot to the underlying stable storage.
SaveSnap(snap raftpb.Snapshot) error
// Sync syncs the data written with the disk.
Sync() error
// Close closes the storage and performs finalization.
Close() error
}
type storage struct {
*wal.WAL
*snap.Snapshotter
}
// SaveSnap saves the snapshot to disk and release the locked
// wal files since they will not be used.
func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
err := st.WAL.SaveSnapshot(walsnap)
if err != nil {
return err
}
err = st.Snapshotter.SaveSnap(snap)
if err != nil {
return err
}
err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
if err != nil {
return err
}
return nil
}
func mustMkdir(path string) {
if err := os.MkdirAll(path, 0750); err != nil {
glog.Fatalf("cannot create directory %v: %v", path, err)
}
}
func exist(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
// OpenStorage creates or reloads the disk-backed storage in path.
func OpenStorage(node uint64, dir string, stateMachine StateMachine) (
raftStorage *etcdraft.MemoryStorage, diskStorage DiskStorage,
lastSnapIdx, lastEntIdx uint64, exists bool, err error) {
// TODO(soheil): maybe store and return a custom metadata.
glog.V(2).Infof("openning raft storage on %s", dir)
sp := path.Join(dir, "snap")
wp := path.Join(dir, "wal")
exists = exist(sp) && exist(wp) && wal.Exist(wp)
s := snap.New(sp)
raftStorage = etcdraft.NewMemoryStorage()
var w *wal.WAL
if !exists {
mustMkdir(sp)
mustMkdir(wp)
w, err = createWAL(node, wp)
diskStorage = &storage{w, s}
return
}
ss, err := s.Load()
if err != nil && err != snap.ErrNoSnapshot {
return
}
if ss != nil {
if err = stateMachine.Restore(ss.Data); err != nil {
err = fmt.Errorf("raft: cannot restore statemachine from snapshot: %v",
err)
return
}
if err = raftStorage.ApplySnapshot(*ss); err != nil {
err = fmt.Errorf("raft: cannot apply snapshot: %v", err)
return
}
lastSnapIdx = ss.Metadata.Index
glog.Infof("raft: recovered statemachine from snapshot at index %d",
lastSnapIdx)
}
var st raftpb.HardState
var ents []raftpb.Entry
w, st, ents, err = readWAL(node, wp, ss)
if err != nil {
return
}
raftStorage.SetHardState(st)
raftStorage.Append(ents)
if len(ents) != 0 {
lastEntIdx = ents[len(ents)-1].Index
} else if ss != nil {
lastEntIdx = ss.Metadata.Index
} else {
lastEntIdx = 0
}
diskStorage = &storage{w, s}
return
}
func readWAL(node uint64, dir string, snap *raftpb.Snapshot) (w *wal.WAL,
st raftpb.HardState, ents []raftpb.Entry, err error) {
var walsnap walpb.Snapshot
if snap != nil {
walsnap.Index, walsnap.Term = snap.Metadata.Index, snap.Metadata.Term
}
var md []byte
repaired := false
for {
if w, err = wal.Open(dir, walsnap); err != nil {
return
}
if md, st, ents, err = w.ReadAll(); err != nil {
w.Close()
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
err = fmt.Errorf("raft: cannot repair: %v", err)
return
}
if !wal.Repair(dir) {
err = fmt.Errorf("raft: repair failed: %v", err)
} else {
glog.Info("WAL successfully repaired")
repaired = true
}
continue
}
break
}
walNode, err := strconv.ParseUint(string(md), 10, 64)
if err != nil {
err = fmt.Errorf("raft: cannot decode wal metadata: %v", err)
return
}
if walNode != node {
err = fmt.Errorf("raft: wal node is for %v instead of %v", walNode, node)
return
}
return
}
func createWAL(node uint64, path string) (w *wal.WAL, err error) {
glog.V(2).Infof("creating wal for %v in %s", node, path)
w, err = wal.Create(path, []byte(strconv.FormatUint(node, 10)))
return
}