This repository has been archived by the owner on Feb 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 7
/
wal.go
541 lines (465 loc) · 16.1 KB
/
wal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
package wal
import (
"bufio"
"encoding/base64"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"path/filepath"
"time"
"github.com/pkg/errors"
auto "github.com/tendermint/classic/libs/autofile"
cmn "github.com/tendermint/classic/libs/common"
"github.com/tendermint/classic/libs/log"
tmtime "github.com/tendermint/classic/types/time"
"github.com/tendermint/go-amino-x"
)
const (
// how often the WAL should be sync'd during period sync'ing
walDefaultFlushInterval = 2 * time.Second
)
var (
crc32c = crc32.MakeTable(crc32.Castagnoli)
base64stdnp = base64.StdEncoding.WithPadding(base64.NoPadding)
)
//--------------------------------------------------------
// types and functions for savings consensus messages
type WALMessage interface {
AssertWALMessage()
}
// TimedWALMessage wraps WALMessage and adds Time for debugging purposes.
type TimedWALMessage struct {
Time time.Time `json:"time"`
Msg WALMessage `json:"msg"`
}
// Some lines are MetaMessages to denote new heights, etc.
// NOTE: The encoding must not contain the '#' character,
// so arbitrary strings will not work without some kind of escaping.
// TODO: consider alternative meta line schemas in the long run, or escape the
// '#' character in such a way that scanning randomly from any position in a
// file resumes correctly after the first and only likely corruption.
type MetaMessage struct {
Height int64 `json:"h"`
}
//--------------------------------------------------------
// Simple write-ahead logger
// WAL is an interface for any write-ahead logger.
type WAL interface {
// config methods
SetLogger(l log.Logger)
// write methods
Write(WALMessage) error
WriteSync(WALMessage) error
WriteMetaSync(MetaMessage) error
FlushAndSync() error
// search methods
SearchForHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
// service methods
Start() error
Stop() error
Wait()
}
// Write ahead logger writes msgs to disk before they are processed.
// Can be used for crash-recovery and deterministic replay.
// TODO: currently the wal is overwritten during replay catchup, give it a mode
// so it's either reading or appending - must read to end to start appending
// again.
type baseWAL struct {
cmn.BaseService
group *auto.Group
maxSize int64
enc *WALWriter
flushTicker *time.Ticker
flushInterval time.Duration
}
var _ WAL = &baseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
// `maxSize` is the maximum allowable amino bytes of a TimedWALMessage
// including the amino (byte) size prefix, but excluding any crc checks.
func NewWAL(walFile string, maxSize int64, groupOptions ...func(*auto.Group)) (*baseWAL, error) {
err := cmn.EnsureDir(filepath.Dir(walFile), 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to ensure WAL directory is in place")
}
group, err := auto.OpenGroup(walFile, groupOptions...)
if err != nil {
return nil, err
}
wal := &baseWAL{
group: group,
maxSize: maxSize,
enc: NewWALWriter(group, maxSize),
flushInterval: walDefaultFlushInterval,
}
wal.BaseService = *cmn.NewBaseService(nil, "baseWAL", wal)
return wal, nil
}
// SetFlushInterval allows us to override the periodic flush interval for the WAL.
func (wal *baseWAL) SetFlushInterval(i time.Duration) {
wal.flushInterval = i
}
func (wal *baseWAL) Group() *auto.Group {
return wal.group
}
func (wal *baseWAL) SetLogger(l log.Logger) {
wal.BaseService.Logger = l
wal.group.SetLogger(l)
}
func (wal *baseWAL) OnStart() error {
size := wal.group.ReadGroupInfo().TotalSize
if size == 0 {
wal.WriteMetaSync(MetaMessage{Height: 1})
}
err := wal.group.Start()
if err != nil {
return err
}
wal.flushTicker = time.NewTicker(wal.flushInterval)
go wal.processFlushTicks()
return nil
}
func (wal *baseWAL) processFlushTicks() {
for {
select {
case <-wal.flushTicker.C:
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("Periodic WAL flush failed", "err", err)
}
case <-wal.Quit():
return
}
}
}
// FlushAndSync flushes and fsync's the underlying group's data to disk.
// See auto#FlushAndSync
func (wal *baseWAL) FlushAndSync() error {
return wal.group.FlushAndSync()
}
// Stop the underlying autofile group.
// Use Wait() to ensure it's finished shutting down
// before cleaning up files.
func (wal *baseWAL) OnStop() {
wal.flushTicker.Stop()
wal.FlushAndSync()
wal.group.Stop()
wal.group.Close()
}
// Wait for the underlying autofile group to finish shutting down
// so it's safe to cleanup files.
func (wal *baseWAL) Wait() {
wal.group.Wait()
}
// Write is called in newStep and for each receive on the
// peerMsgQueue and the timeoutTicker.
// NOTE: does not call fsync()
func (wal *baseWAL) Write(msg WALMessage) error {
if wal == nil {
return nil
}
if err := wal.enc.Write(TimedWALMessage{tmtime.Now(), msg}); err != nil {
wal.Logger.Error("Error writing msg to consensus wal. WARNING: recover may not be possible for the current height",
"err", err, "msg", msg)
return err
}
return nil
}
// WriteMetaSync writes the new height and finalizes the previous height.
// NOTE: It is useless to implement WriteMeta() (asyncronous) because there is
// usually something to do in sync after the aforementioned finalization
// occurs.
func (wal *baseWAL) WriteMetaSync(meta MetaMessage) error {
if wal == nil {
return nil
}
if err := wal.enc.WriteMeta(meta); err != nil {
wal.Logger.Error("Error writing height to consensus wal. WARNING: full recover may not be possible for the previous height",
"err", err)
return err
}
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
"err", err)
return err
}
return nil
}
// WriteSync is called when we receive a msg from ourselves
// so that we write to disk before sending signed messages.
// NOTE: calls fsync()
func (wal *baseWAL) WriteSync(msg WALMessage) error {
if wal == nil {
return nil
}
if err := wal.Write(msg); err != nil {
return err
}
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("WriteSync failed to flush consensus wal. WARNING: may result in creating alternative proposals / votes for the current height iff the node restarted",
"err", err)
return err
}
return nil
}
// WALSearchOptions are optional arguments to SearchForHeight.
type WALSearchOptions struct {
// IgnoreDataCorruptionErrors set to true will result in skipping data corruption errors.
IgnoreDataCorruptionErrors bool
}
// SearchForHeight scans meta lines to find the first line after height as
// denoted by a meta line, and returns an auto.GroupReader. Group reader will
// be nil if found equals false.
//
// The result is a buffered ReadCloser.
//
// CONTRACT: caller must close group reader.
func (wal *baseWAL) SearchForHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
var (
msg *TimedWALMessage
meta *MetaMessage
gr *auto.GroupReader
)
lastHeightFound := int64(-1)
// NOTE: starting from the last file in the group because we're usually
// searching for the last height. See replay.go
min, max := wal.group.MinIndex(), wal.group.MaxIndex()
wal.Logger.Info("Searching for height", "height", height, "min", min, "max", max)
for index := max; index >= min; index-- {
gr, err = wal.group.NewReader(index, index+1) // read only for index.
if err != nil {
return nil, false, err
}
dec := NewWALReader(gr, wal.maxSize)
for {
msg, meta, err = dec.ReadMessage()
// error case
if err != nil {
if err == io.EOF {
// No need to look for height in older files if we've seen h < height,
// since earlier blocks have lower heights than h.
if lastHeightFound > 0 && lastHeightFound < height {
dec.Close()
return nil, false, nil
}
// check next file
dec.Close()
break
} else if options != nil && options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) {
wal.Logger.Error("Corrupted entry. Skipping...", "err", err)
continue // skip corrupted line and ignore error.
} else {
dec.Close()
return nil, false, err
}
}
// meta case
if meta != nil {
lastHeightFound = meta.Height
if meta.Height == height { // found
wal.Logger.Info("Found", "height", height, "index", index)
// NOTE: dec itself is an io.ReadCloser for this purpose.
// NOTE: the result is buffered, specifically a bufio.Reader.
return dec, true, nil
}
}
// msg case
if msg != nil {
// do nothing, we're only interested in meta lines.
// TODO: optimize by implementing ReadNextMeta(),
// which skips decoding non-meta messages.
}
}
}
return nil, false, nil
}
///////////////////////////////////////////////////////////////////////////////
// A WALWriter writes custom-encoded WAL messages to an output stream.
// Each binary WAL entry is length encoded, then crc encoded,
// then base64 encoded and delimited by newlines.
// The base64 encoding used is encodeStd,
// `ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/`.
//
// Each WAL item is also newline delimited.
//
// Meta lines are prefixed with a '#' (which is not a valid base64 character)
// denote meta information, such as new height. The purpose of base64
// encoding is to enable backwards traversal of items (e.g. in search of some
// previous height).
//
// Base64 is chosen to optimize the worst-case scenario for encoding delimited
// binary bytes while enabling metadata insertion and also supporting
// backwards traversal of binary blobs (to enable binary search, etc). In the
// future, base64 could be replaced with a similar encoding formula, and the
// crc function could change too, but otherwise the structure of the WAL
// should not change, including the rule that all metalines should start with
// '#' and that all items be delimited with a newline. This implementation
// enforces ASCII text characters, but other implementations may choose
// otherwise.
//
// Format: base64(4 bytes CRC sum + length-prefixed amino bytes) + newline.
// e.g.
// ```
// ABCDEFGHIJKLMNOPQRSTUV00
// ABCDEFGHIJKLMNOPQRSTUV01
// ABCDEFGHIJKLMNOPQRSTUV02
// #{"h":"123"}
// ABCDEFGHIJKLMNOPQRSTUV03
// ABCDEFGHIJKLMNOPQRSTUV04
//
type WALWriter struct {
wr io.Writer
maxSize int64 // max WALMessage amino size excluding time/crc/base64.
}
// NewWALWriter returns a new encoder that writes to wr.
func NewWALWriter(wr io.Writer, maxSize int64) *WALWriter {
return &WALWriter{wr, maxSize}
}
// Write writes the custom encoding of v to the stream, followed by a newline
// byte. It returns an error if the amino-encoded size of v is greater than
// maxSize. Any error encountered during the write is also returned.
func (enc *WALWriter) Write(v TimedWALMessage) error {
twmBytes := amino.MustMarshalSized(v)
length := int64(len(twmBytes))
if 0 < enc.maxSize && enc.maxSize < length {
return fmt.Errorf("msg is too big: %d bytes, max: %d bytes", length, enc.maxSize)
}
totalLength := 4 + int(length)
crc := crc32.Checksum(twmBytes, crc32c)
line := make([]byte, totalLength)
binary.BigEndian.PutUint32(line[0:4], crc)
copy(line[4:], twmBytes)
line64 := base64stdnp.EncodeToString(line)
line64 += "\n"
_, err := enc.wr.Write([]byte(line64))
return err
}
// Meta lines are in JSON for readibility.
// TODO: CRC not used (yet), concatenate the CRC bytes with the JSON bytes.
func (enc *WALWriter) WriteMeta(meta MetaMessage) error {
metaJSON := amino.MustMarshalJSON(meta)
metaLine := "#" + string(metaJSON) + "\n"
_, err := enc.wr.Write([]byte(metaLine))
return err
}
///////////////////////////////////////////////////////////////////////////////
// IsDataCorruptionError returns true if data has been corrupted inside WAL.
func IsDataCorruptionError(err error) bool {
_, ok := err.(DataCorruptionError)
return ok
}
// DataCorruptionError is an error that occures if data on disk was corrupted.
type DataCorruptionError struct {
cause error
}
func (e DataCorruptionError) Error() string {
return fmt.Sprintf("DataCorruptionError[%v]", e.cause)
}
func (e DataCorruptionError) Cause() error {
return e.cause
}
// A WALReader reads and decodes custom-encoded WAL messages from an input
// stream. See WALWriter for the format used.
//
// It will also compare the checksums and make sure data size is equal to the
// length from the header. If that is not the case, error will be returned.
//
// WALReader is itself an io.ReaderCloser, and it uses a bufio reader under the
// hood, which means it will usually end up reading more bytes than was
// actually returned via calls to ReadMessage().
type WALReader struct {
rd io.Reader // NOTE: use brd instead.
brd *bufio.Reader
maxSize int64
}
// NewWALReader returns a new decoder that reads from rd.
func NewWALReader(rd io.Reader, maxSize int64) *WALReader {
return &WALReader{rd, bufio.NewReader(rd), maxSize}
}
// Reads a line until the "\n" delimiter byte, and returns that line without
// the delimiter.
// A line must end with "\n", otherwise EOF.
func (dec *WALReader) readline() ([]byte, error) {
bz, err := dec.brd.ReadBytes('\n')
if 0 < len(bz) {
bz = bz[:len(bz)-1]
}
return bz, err
}
// Implement io.ReadCloser for SearchForHeight() result reader.
func (dec *WALReader) Read(p []byte) (n int, err error) {
return dec.brd.Read(p)
}
// Implement io.ReadCloser for SearchForHeight() result reader.
func (dec *WALReader) Close() (err error) {
// There is no corresponding .Close on a bufio.
// we will set brd to nil and let the program panic
// when methods are called after a close.
dec.brd = nil
// Close rd if it is a Closer.
if cl, ok := dec.rd.(io.Closer); ok {
err = cl.Close()
return
}
return
}
// Decode reads the next custom-encoded value from its reader and returns it.
// One TimedWALMessage or MetaError or error is returned, the rest are nil.
func (dec *WALReader) ReadMessage() (*TimedWALMessage, *MetaMessage, error) {
line64, err := dec.readline()
if err != nil {
return nil, nil, err
}
if len(line64) == 0 {
return nil, nil, DataCorruptionError{fmt.Errorf("found empty line")}
}
// special case for MetaMessage.
if line64[0] == '#' {
var meta MetaMessage
err := amino.UnmarshalJSON(line64[1:], &meta)
return nil, &meta, err
}
// is usual TimedWALMessage.
// decode base64.
line, err := base64stdnp.DecodeString(string(line64))
if err != nil {
return nil, nil, DataCorruptionError{fmt.Errorf("failed to decode base64: %v", err)}
}
// read crc out of bytes.
crcSize := int64(4)
if int64(len(line)) < crcSize {
return nil, nil, DataCorruptionError{fmt.Errorf("failed to read checksum: %v", err)}
}
crc, twmBytes := binary.BigEndian.Uint32(line[:crcSize]), line[crcSize:]
if dec.maxSize < int64(len(twmBytes)) {
return nil, nil, DataCorruptionError{fmt.Errorf("length %d exceeded maximum possible value of %d bytes", int64(len(twmBytes)), dec.maxSize)}
}
// check checksum before decoding twmBytes
if len(twmBytes) == 0 {
return nil, nil, DataCorruptionError{fmt.Errorf("failed to read amino sized bytes: %v", err)}
}
actualCRC := crc32.Checksum(twmBytes, crc32c)
if actualCRC != crc {
return nil, nil, DataCorruptionError{fmt.Errorf("checksums do not match: read: %v, actual: %v", crc, actualCRC)}
}
// decode amino sized bytes.
var res = new(TimedWALMessage) // nolint: gosimple
err = amino.UnmarshalSized(twmBytes, res)
if err != nil {
return nil, nil, DataCorruptionError{fmt.Errorf("failed to decode twmBytes: %v", err)}
}
return res, nil, err
}
type NopWAL struct{}
var _ WAL = NopWAL{}
func (NopWAL) SetLogger(l log.Logger) {}
func (NopWAL) Write(m WALMessage) error { return nil }
func (NopWAL) WriteSync(m WALMessage) error { return nil }
func (NopWAL) WriteMetaSync(m MetaMessage) error { return nil }
func (NopWAL) FlushAndSync() error { return nil }
func (NopWAL) SearchForHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (NopWAL) Start() error { return nil }
func (NopWAL) Stop() error { return nil }
func (NopWAL) Wait() {}