-
Notifications
You must be signed in to change notification settings - Fork 42
/
storage.go
265 lines (218 loc) · 7.43 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
//此源码被清华学神尹成大魔王专业翻译分析并修改
//尹成QQ77025077
//尹成微信18510341407
//尹成所在QQ群721929980
//尹成邮箱 yinc13@mails.tsinghua.edu.cn
//尹成毕业于清华大学,微软区块链领域全球最有价值专家
//https://mvp.microsoft.com/zh-cn/PublicProfile/4033620
/*
版权所有IBM公司。保留所有权利。
SPDX许可证标识符:Apache-2.0
**/
package etcdraft
import (
"os"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/hyperledger/fabric/common/flogging"
"github.com/pkg/errors"
)
//memoryStorage目前由etcd/raft.memoryStorage支持。这个接口是
//定义为公开FSM的依赖项,以便在
//未来。todo(jay)在需要时向该接口添加其他必要的方法
//它们在实现中,例如applysnapshot。
type MemoryStorage interface {
raft.Storage
Append(entries []raftpb.Entry) error
SetHardState(st raftpb.HardState) error
CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) (raftpb.Snapshot, error)
Compact(compactIndex uint64) error
ApplySnapshot(snap raftpb.Snapshot) error
}
//raftstorage封装了ETCD/raft数据所需的存储,即内存、wal
type RaftStorage struct {
SnapshotCatchUpEntries uint64
lg *flogging.FabricLogger
ram MemoryStorage
wal *wal.WAL
snap *snap.Snapshotter
}
//createStorage试图创建一个存储来保存etcd/raft数据。
//如果数据出现在指定的磁盘中,则加载它们以重建存储状态。
func CreateStorage(
lg *flogging.FabricLogger,
applied uint64,
walDir string,
snapDir string,
ram MemoryStorage,
) (*RaftStorage, error) {
sn, err := createSnapshotter(snapDir)
if err != nil {
return nil, err
}
snapshot, err := sn.Load()
if err != nil {
if err == snap.ErrNoSnapshot {
lg.Debugf("No snapshot found at %s", snapDir)
} else {
return nil, errors.Errorf("failed to load snapshot: %s", err)
}
} else {
//发现快照
lg.Debugf("Loaded snapshot at Term %d and Index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
}
w, err := createWAL(lg, walDir, applied, snapshot)
if err != nil {
return nil, err
}
_, st, ents, err := w.ReadAll()
if err != nil {
return nil, errors.Errorf("failed to read WAL: %s", err)
}
if snapshot != nil {
lg.Debugf("Applying snapshot to raft MemoryStorage")
if err := ram.ApplySnapshot(*snapshot); err != nil {
return nil, errors.Errorf("Failed to apply snapshot to memory: %s", err)
}
}
lg.Debugf("Setting HardState to {Term: %d, Commit: %d}", st.Term, st.Commit)
ram.SetHardState(st) //memoryStorage.setHardstate始终返回零
lg.Debugf("Appending %d entries to memory storage", len(ents))
ram.Append(ents) //memoryStorage.append始终返回nil
return &RaftStorage{lg: lg, ram: ram, wal: w, snap: sn}, nil
}
func createSnapshotter(snapDir string) (*snap.Snapshotter, error) {
if err := os.MkdirAll(snapDir, os.ModePerm); err != nil {
return nil, errors.Errorf("failed to mkdir '%s' for snapshot: %s", snapDir, err)
}
return snap.New(snapDir), nil
}
func createWAL(lg *flogging.FabricLogger, walDir string, applied uint64, snapshot *raftpb.Snapshot) (*wal.WAL, error) {
hasWAL := wal.Exist(walDir)
if !hasWAL && applied != 0 {
return nil, errors.Errorf("applied index is not zero but no WAL data found")
}
if !hasWAL {
lg.Infof("No WAL data found, creating new WAL at path '%s'", walDir)
//Todo(Jay_Guo)添加元数据,以便在需要时与Wal一起持久化。
//用例可以是新节点上的数据转储和恢复。
w, err := wal.Create(walDir, nil)
if err == os.ErrExist {
lg.Fatalf("programming error, we've just checked that WAL does not exist")
}
if err != nil {
return nil, errors.Errorf("failed to initialize WAL: %s", err)
}
if err = w.Close(); err != nil {
return nil, errors.Errorf("failed to close the WAL just created: %s", err)
}
} else {
lg.Infof("Found WAL data at path '%s', replaying it", walDir)
}
walsnap := walpb.Snapshot{}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
lg.Debugf("Loading WAL at Term %d and Index %d", walsnap.Term, walsnap.Index)
w, err := wal.Open(walDir, walsnap)
if err != nil {
return nil, errors.Errorf("failed to open existing WAL: %s", err)
}
return w, nil
}
//快照返回存储在内存中的最新快照
func (rs *RaftStorage) Snapshot() raftpb.Snapshot {
sn, _ := rs.ram.Snapshot() //快照始终返回零错误
return sn
}
//保存保存的ETCD/RAFT数据
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}
if !raft.IsEmptySnap(snapshot) {
if err := rs.saveSnap(snapshot); err != nil {
return err
}
if err := rs.ram.ApplySnapshot(snapshot); err != nil {
if err == raft.ErrSnapOutOfDate {
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
snapshot.Metadata.Term, snapshot.Metadata.Index)
} else {
rs.lg.Fatalf("Unexpected programming error: %s", err)
}
}
}
if err := rs.ram.Append(entries); err != nil {
return err
}
return nil
}
func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
//必须先将快照索引保存到wal,然后才能保存
//快照保持不变,我们只打开
//以前保存的快照索引。
walsnap := walpb.Snapshot{
Index: snap.Metadata.Index,
Term: snap.Metadata.Term,
}
rs.lg.Debugf("Saving snapshot to WAL")
if err := rs.wal.SaveSnapshot(walsnap); err != nil {
return errors.Errorf("failed to save snapshot to WAL: %s", err)
}
rs.lg.Debugf("Saving snapshot to disk")
if err := rs.snap.SaveSnap(snap); err != nil {
return errors.Errorf("failed to save snapshot to disk: %s", err)
}
rs.lg.Debugf("Releasing lock to wal files prior to %d", snap.Metadata.Index)
if err := rs.wal.ReleaseLockTo(snap.Metadata.Index); err != nil {
return err
}
return nil
}
//takesnapshot从memorystorage在索引I处获取快照,并将其保存到wal和disk。
func (rs *RaftStorage) TakeSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error {
rs.lg.Debugf("Creating snapshot at index %d from MemoryStorage", i)
snap, err := rs.ram.CreateSnapshot(i, cs, data)
if err != nil {
return errors.Errorf("failed to create snapshot from MemoryStorage: %s", err)
}
if err = rs.saveSnap(snap); err != nil {
return err
}
//在内存中保留一些条目,以便缓慢的追随者赶上
if i > rs.SnapshotCatchUpEntries {
compacti := i - rs.SnapshotCatchUpEntries
rs.lg.Debugf("Purging in-memory raft entries prior to %d", compacti)
if err = rs.ram.Compact(compacti); err != nil {
if err == raft.ErrCompacted {
rs.lg.Warnf("Raft entries prior to %d are already purged", compacti)
} else {
rs.lg.Fatalf("Failed to purg raft entries: %s", err)
}
}
}
rs.lg.Infof("Snapshot is taken at index %d", i)
return nil
}
//ApplySnapshot将快照应用于本地内存存储
func (rs *RaftStorage) ApplySnapshot(snap raftpb.Snapshot) {
if err := rs.ram.ApplySnapshot(snap); err != nil {
if err == raft.ErrSnapOutOfDate {
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
snap.Metadata.Term, snap.Metadata.Index)
} else {
rs.lg.Fatalf("Unexpected programming error: %s", err)
}
}
}
//关闭关闭存储
func (rs *RaftStorage) Close() error {
if err := rs.wal.Close(); err != nil {
return err
}
return nil
}