forked from aergoio/aergo
/
waldb.go
221 lines (182 loc) · 5.58 KB
/
waldb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package raftv2
import (
"errors"
"github.com/aergoio/aergo/consensus"
"github.com/aergoio/aergo/types"
"github.com/aergoio/etcd/raft"
"github.com/aergoio/etcd/raft/raftpb"
)
var (
ErrInvalidEntry = errors.New("Invalid raftpb.entry")
ErrWalEntryTooLowTerm = errors.New("term of wal entry is too low")
)
type WalDB struct {
consensus.ChainWAL
}
func NewWalDB(chainWal consensus.ChainWAL) *WalDB {
return &WalDB{chainWal}
}
func (wal *WalDB) SaveEntry(state raftpb.HardState, entries []raftpb.Entry) error {
if len(entries) != 0 {
walEnts, blocks, confChanges := wal.convertFromRaft(entries)
if err := wal.WriteRaftEntry(walEnts, blocks, confChanges); err != nil {
return err
}
}
// hardstate must save after entries since entries may include commited one
if !raft.IsEmptyHardState(state) {
// save hardstate
if err := wal.WriteHardState(&state); err != nil {
return err
}
}
return nil
}
func (wal *WalDB) convertFromRaft(entries []raftpb.Entry) ([]*consensus.WalEntry, []*types.Block, []*raftpb.ConfChange) {
lenEnts := len(entries)
if lenEnts == 0 {
return nil, nil, nil
}
getWalEntryType := func(entry *raftpb.Entry) consensus.EntryType {
switch entry.Type {
case raftpb.EntryNormal:
if entry.Data != nil {
return consensus.EntryBlock
} else {
return consensus.EntryEmpty
}
case raftpb.EntryConfChange:
return consensus.EntryConfChange
default:
panic("not support raftpb entrytype")
}
}
getWalData := func(entry *raftpb.Entry) (*types.Block, []byte, error) {
if entry.Type == raftpb.EntryNormal && entry.Data != nil {
block, err := unmarshalEntryData(entry.Data)
if err != nil {
logger.Error().Str("entry", types.RaftEntryToString(entry)).Msg("failed to unmarshal entry")
return nil, nil, ErrInvalidEntry
}
return block, block.BlockHash(), nil
} else {
return nil, entry.Data, nil
}
}
getConfChange := func(entry *raftpb.Entry) (*raftpb.ConfChange, error) {
if entry.Type == raftpb.EntryConfChange {
cc, _, err := unmarshalConfChangeEntry(entry)
if err != nil {
logger.Error().Str("entry", types.RaftEntryToString(entry)).Msg("failed to unmarshal entry")
return nil, ErrInvalidEntry
}
return cc, nil
}
return nil, nil
}
blocks := make([]*types.Block, lenEnts)
walents := make([]*consensus.WalEntry, lenEnts)
confChanges := make([]*raftpb.ConfChange, lenEnts)
var (
data []byte
err error
)
for i, entry := range entries {
if blocks[i], data, err = getWalData(&entry); err != nil {
panic("entry unmarshalEntryData error")
}
if confChanges[i], err = getConfChange(&entry); err != nil {
panic("entry unmarshalEntryConfChange error")
}
walents[i] = &consensus.WalEntry{
Type: getWalEntryType(&entry),
Term: entry.Term,
Index: entry.Index,
Data: data,
}
}
return walents, blocks, confChanges
}
var ErrInvalidWalEntry = errors.New("invalid wal entry")
var ErrWalConvBlock = errors.New("failed to convert bytes of block from wal entry")
func (wal *WalDB) convertWalToRaft(walEntry *consensus.WalEntry) (*raftpb.Entry, error) {
var raftEntry = &raftpb.Entry{Term: walEntry.Term, Index: walEntry.Index}
getDataFromWalEntry := func(walEntry *consensus.WalEntry) ([]byte, error) {
if walEntry.Type != consensus.EntryBlock {
return nil, ErrWalConvBlock
}
block, err := wal.GetBlock(walEntry.Data)
if err != nil {
return nil, err
}
data, err := marshalEntryData(block)
if err != nil {
return nil, err
}
return data, nil
}
switch walEntry.Type {
case consensus.EntryConfChange:
raftEntry.Type = raftpb.EntryConfChange
raftEntry.Data = walEntry.Data
case consensus.EntryEmpty:
raftEntry.Type = raftpb.EntryNormal
raftEntry.Data = nil
case consensus.EntryBlock:
data, err := getDataFromWalEntry(walEntry)
if err != nil {
return nil, err
}
raftEntry.Data = data
default:
return nil, ErrInvalidWalEntry
}
return raftEntry, nil
}
var (
ErrWalGetHardState = errors.New("failed to read hard state")
ErrWalGetLastIdx = errors.New("failed to read last Idx")
)
// ReadAll returns hard state, all uncommitted entries
// - read last hard state
// - read all uncommited entries after snapshot index
func (wal *WalDB) ReadAll(snapshot *raftpb.Snapshot) (id *consensus.RaftIdentity, state *raftpb.HardState, ents []raftpb.Entry, err error) {
if id, err = wal.GetIdentity(); err != nil {
return nil, state, ents, err
}
state, err = wal.GetHardState()
if err != nil {
return id, state, ents, ErrWalGetHardState
}
commitIdx := state.Commit
lastIdx, err := wal.GetRaftEntryLastIdx()
if err != nil {
return id, state, ents, ErrWalGetLastIdx
}
var snapIdx, snapTerm uint64
if snapshot != nil {
snapIdx = snapshot.Metadata.Index
snapTerm = snapshot.Metadata.Term
}
logger.Info().Uint64("snapidx", snapIdx).Uint64("snapterm", snapTerm).Uint64("commit", commitIdx).Uint64("last", lastIdx).Msg("read all entries of wal")
start := snapIdx + 1
for i := start; i <= lastIdx; i++ {
walEntry, err := wal.GetRaftEntry(i)
// if snapshot is nil, initial confchange entry isn't saved to db
if err != nil {
logger.Error().Err(err).Uint64("idx", i).Msg("failed to get raft entry")
return id, state, nil, err
}
if walEntry.Term < snapTerm {
logger.Error().Str("wal", walEntry.ToString()).Err(ErrWalEntryTooLowTerm).Msg("invalid wal entry")
return id, state, nil, ErrWalEntryTooLowTerm
}
raftEntry, err := wal.convertWalToRaft(walEntry)
if err != nil {
return id, state, nil, err
}
logger.Debug().Str("walentry", walEntry.ToString()).Msg("read wal entry")
ents = append(ents, *raftEntry)
}
return id, state, ents, nil
}