forked from marcetin/parallelcoin
/
reconcile.go
108 lines (102 loc) · 4.18 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package ffldb
import (
"fmt"
"hash/crc32"
"github.com/p9c/pod/pkg/database"
)
// serializeWriteRow serialize the current block file and offset where new will be written into a format suitable for
// storage into the metadata.
//
// The serialized write cursor location format is:
//
// [0:4] Block file (4 bytes)
//
// [4:8] File offset (4 bytes)
//
// [8:12] Castagnoli CRC-32 checksum (4 bytes)
func serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
var serializedRow [12]byte
byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
checksum := crc32.Checksum(serializedRow[:8], castagnoli)
byteOrder.PutUint32(serializedRow[8:12], checksum)
return serializedRow[:]
}
// deserializeWriteRow deserializes the write cursor location stored in the metadata. Returns ErrCorruption if the
// checksum of the entry doesn't match.
func deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
// Ensure the checksum matches. The checksum is at the end.
gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
wantChecksumBytes := writeRow[8:12]
wantChecksum := byteOrder.Uint32(wantChecksumBytes)
if gotChecksum != wantChecksum {
str := fmt.Sprintf(
"metadata for write cursor does not match "+
"the expected checksum - got %d, want %d", gotChecksum,
wantChecksum,
)
return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
}
fileNum := byteOrder.Uint32(writeRow[0:4])
fileOffset := byteOrder.Uint32(writeRow[4:8])
return fileNum, fileOffset, nil
}
// reconcileDB reconciles the metadata with the flat block files on disk. It will also initialize the underlying
// database if the create flag is set.
func reconcileDB(pdb *db, create bool) (database.DB, error) {
// Perform initial internal bucket and value creation during database creation.
if create {
if e := initDB(pdb.cache.ldb); E.Chk(e) {
return nil, e
}
}
// Load the current write cursor position from the metadata.
var curFileNum, curOffset uint32
e := pdb.View(
func(tx database.Tx) (e error) {
writeRow := tx.Metadata().Get(writeLocKeyName)
if writeRow == nil {
str := "write cursor does not exist"
return makeDbErr(database.ErrCorruption, str, nil)
}
curFileNum, curOffset, e = deserializeWriteRow(writeRow)
return e
},
)
if e != nil {
return nil, e
}
// When the write cursor position found by scanning the block files on disk is AFTER the position the metadata
// believes to be true, truncate the files on disk to match the metadata. This can be a fairly common occurrence in
// unclean shutdown scenarios while the block files are in the middle of being written. Since the metadata isn't
// updated until after the block data is written, this is effectively just a rollback to the known good point before
// the unclean shutdown.
wc := pdb.store.writeCursor
if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset > curOffset) {
D.Ln("detected unclean shutdown - repairing")
D.F(
"metadata claims file %d, offset %d. block data is at file %d, offset %d",
curFileNum, curOffset, wc.curFileNum, wc.curOffset,
)
pdb.store.handleRollback(curFileNum, curOffset)
D.Ln("database sync complete")
}
// When the write cursor position found by scanning the block files on disk is BEFORE the position the metadata
// believes to be true, return a corruption error. Since sync is called after each block is written and before the
// metadata is updated, this should only happen in the case of missing, deleted, or truncated block files, which
// generally is not an easily recoverable scenario.
//
// todo: In the future, it might be possible to rescan and rebuild the metadata from the block files, however, that
// would need to happen with coordination from a higher layer since it could invalidate other metadata.
if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset < curOffset) {
str := fmt.Sprintf(
"metadata claims file %d, offset %d, but block data is at file %d, offset %d",
curFileNum, curOffset, wc.curFileNum, wc.curOffset,
)
W.Ln("***Database corruption detected***:", str)
return nil, makeDbErr(database.ErrCorruption, str, nil)
}
return pdb, nil
}