forked from decred/dcrd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
reconcile.go
118 lines (103 loc) · 4.42 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package ffldb
import (
"fmt"
"hash/crc32"
"github.com/decred/dcrd/database"
)
// The serialized write cursor location format is:
//
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Castagnoli CRC-32 checksum (4 bytes)
// serializeWriteRow serialize the current block file and offset where new
// will be written into a format suitable for storage into the metadata.
func serializeWriteRow(curBlockFileNum, curFileOffset uint32) []byte {
var serializedRow [12]byte
byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
checksum := crc32.Checksum(serializedRow[:8], castagnoli)
byteOrder.PutUint32(serializedRow[8:12], checksum)
return serializedRow[:]
}
// deserializeWriteRow deserializes the write cursor location stored in the
// metadata. Returns ErrCorruption if the checksum of the entry doesn't match.
func deserializeWriteRow(writeRow []byte) (uint32, uint32, error) {
// Ensure the checksum matches. The checksum is at the end.
gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
wantChecksumBytes := writeRow[8:12]
wantChecksum := byteOrder.Uint32(wantChecksumBytes)
if gotChecksum != wantChecksum {
str := fmt.Sprintf("metadata for write cursor does not match "+
"the expected checksum - got %d, want %d", gotChecksum,
wantChecksum)
return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
}
fileNum := byteOrder.Uint32(writeRow[0:4])
fileOffset := byteOrder.Uint32(writeRow[4:8])
return fileNum, fileOffset, nil
}
// reconcileDB reconciles the metadata with the flat block files on disk. It
// will also initialize the underlying database if the create flag is set.
func reconcileDB(pdb *db, create bool) (database.DB, error) {
// Perform initial internal bucket and value creation during database
// creation.
if create {
if err := initDB(pdb.cache.ldb); err != nil {
return nil, err
}
}
// Load the current write cursor position from the metadata.
var curFileNum, curOffset uint32
err := pdb.View(func(tx database.Tx) error {
writeRow := tx.Metadata().Get(writeLocKeyName)
if writeRow == nil {
str := "write cursor does not exist"
return makeDbErr(database.ErrCorruption, str, nil)
}
var err error
curFileNum, curOffset, err = deserializeWriteRow(writeRow)
return err
})
if err != nil {
return nil, err
}
// When the write cursor position found by scanning the block files on
// disk is AFTER the position the metadata believes to be true, truncate
// the files on disk to match the metadata. This can be a fairly common
// occurrence in unclean shutdown scenarios while the block files are in
// the middle of being written. Since the metadata isn't updated until
// after the block data is written, this is effectively just a rollback
// to the known good point before the unclean shutdown.
wc := pdb.store.writeCursor
if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset > curOffset) {
log.Info("Detected unclean shutdown - Repairing...")
log.Debugf("Metadata claims file %d, offset %d. Block data is "+
"at file %d, offset %d", curFileNum, curOffset,
wc.curFileNum, wc.curOffset)
pdb.store.handleRollback(curFileNum, curOffset)
log.Infof("Database sync complete")
}
// When the write cursor position found by scanning the block files on
// disk is BEFORE the position the metadata believes to be true, return
// a corruption error. Since sync is called after each block is written
// and before the metadata is updated, this should only happen in the
// case of missing, deleted, or truncated block files, which generally
// is not an easily recoverable scenario. In the future, it might be
// possible to rescan and rebuild the metadata from the block files,
// however, that would need to happen with coordination from a higher
// layer since it could invalidate other metadata.
if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset < curOffset) {
str := fmt.Sprintf("metadata claims file %d, offset %d, but "+
"block data is at file %d, offset %d", curFileNum,
curOffset, wc.curFileNum, wc.curOffset)
log.Warnf("***Database corruption detected***: %v", str)
return nil, makeDbErr(database.ErrCorruption, str, nil)
}
return pdb, nil
}