/
reconcile.go
executable file
·90 lines (84 loc) · 4.19 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package ffldb
import (
"fmt"
"hash/crc32"
database "github.com/p9c/node9/pkg/db"
"github.com/p9c/node9/pkg/log"
)
func // serializeWriteRow serialize the current block file and offset where
// new will be written into a format suitable for storage into the metadata.
// The serialized write cursor location format is:
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Castagnoli CRC-32 checksum (4 bytes)
serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
var serializedRow [12]byte
byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
checksum := crc32.Checksum(serializedRow[:8], castagnoli)
byteOrder.PutUint32(serializedRow[8:12], checksum)
return serializedRow[:]
}
func // deserializeWriteRow deserializes the write cursor location stored in the
// metadata. Returns ErrCorruption if the checksum of the entry doesn't match.
deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
// Ensure the checksum matches. The checksum is at the end.
gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
wantChecksumBytes := writeRow[8:12]
wantChecksum := byteOrder.Uint32(wantChecksumBytes)
if gotChecksum != wantChecksum {
str := fmt.Sprintf("metadata for write cursor does not match "+
"the expected checksum - got %d, want %d", gotChecksum,
wantChecksum)
return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
}
fileNum := byteOrder.Uint32(writeRow[0:4])
fileOffset := byteOrder.Uint32(writeRow[4:8])
return fileNum, fileOffset, nil
}
func // reconcileDB reconciles the metadata with the flat block files on
// disk. It will also initialize the underlying database if the create flag is set.
reconcileDB(pdb *db, create bool) (database.DB, error) {
// Perform initial internal bucket and value creation during database creation.
if create {
if err := initDB(pdb.cache.ldb); err != nil {
return nil, err
}
}
// Load the current write cursor position from the metadata.
var curFileNum, curOffset uint32
err := pdb.View(func(tx database.Tx) error {
writeRow := tx.Metadata().Get(writeLocKeyName)
if writeRow == nil {
str := "write cursor does not exist"
return makeDbErr(database.ErrCorruption, str, nil)
}
var err error
curFileNum, curOffset, err = deserializeWriteRow(writeRow)
return err
})
if err != nil {
return nil, err
}
// When the write cursor position found by scanning the block files on disk is AFTER the position the metadata believes to be true, truncate the files on disk to match the metadata. This can be a fairly common occurrence in unclean shutdown scenarios while the block files are in the middle of being written. Since the metadata isn't updated until after the block data is written, this is effectively just a rollback to the known good point before the unclean shutdown.
wc := pdb.store.writeCursor
if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset > curOffset) {
log.WARN("detected unclean shutdown - repairing")
log.DEBUGF("metadata claims file %d, " +
"offset %d. block data is at file %d, offset %d",
curFileNum, curOffset, wc.curFileNum, wc.curOffset)
pdb.store.handleRollback(curFileNum, curOffset)
log.DEBUG("database sync complete")
}
// When the write cursor position found by scanning the block files on disk is BEFORE the position the metadata believes to be true, return a corruption error. Since sync is called after each block is written and before the metadata is updated, this should only happen in the case of missing, deleted, or truncated block files, which generally is not an easily recoverable scenario. In the future, it might be possible to rescan and rebuild the metadata from the block files, however, that would need to happen with coordination from a higher layer since it could invalidate other metadata.
if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset < curOffset) {
str := fmt.Sprintf("metadata claims file %d, offset %d, but "+
"block data is at file %d, offset %d", curFileNum,
curOffset, wc.curFileNum, wc.curOffset)
log.WARN("***Database corruption detected***:", str)
return nil, makeDbErr(database.ErrCorruption, str, nil)
}
return pdb, nil
}