forked from p9c/pod-archive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
txindex.go
429 lines (401 loc) · 16.2 KB
/
txindex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
package indexers
import (
"errors"
"fmt"
qu "github.com/l0k18/pod/pkg/util/quit"
blockchain "github.com/l0k18/pod/pkg/chain"
chainhash "github.com/l0k18/pod/pkg/chain/hash"
"github.com/l0k18/pod/pkg/chain/wire"
database "github.com/l0k18/pod/pkg/db"
"github.com/l0k18/pod/pkg/util"
)
const (
// txIndexName is the human-readable name for the index.
txIndexName = "transaction index"
)
var (
// txIndexKey is the key of the transaction index and the db bucket used
// to house it.
txIndexKey = []byte("txbyhashidx")
// idByHashIndexBucketName is the name of the db bucket used to house the
// block id -> block hash index.
idByHashIndexBucketName = []byte("idbyhashidx")
// hashByIDIndexBucketName is the name of the db bucket used to house the
// block hash -> block id index.
hashByIDIndexBucketName = []byte("hashbyididx")
// errNoBlockIDEntry is an error that indicates a requested entry does
// not exist in the block ID index.
errNoBlockIDEntry = errors.New("no entry in the block ID index")
)
// The transaction index consists of an entry for every transaction in the main chain. In order to significantly
// optimize the space requirements a separate index which provides an internal mapping between each block that has been
// indexed and a unique ID for use within the hash to location mappings. The ID is simply a sequentially incremented
// uint32.
//
// This is useful because it is only 4 bytes versus 32 bytes hashes and thus saves a ton of space in the index. There
// are three buckets used in total.
//
// The first bucket maps the hash of each transaction to the specific block location. The second bucket maps the hash of
// each block to the unique ID and the third maps that ID back to the block hash.
//
// NOTE: Although it is technically possible for multiple transactions to have the same hash as long as the previous
// transaction with the same hash is fully spent, this code only stores the most recent one because doing otherwise
// would add a non-trivial amount of space and overhead for something that will realistically never happen per the
// probability and even if it did, the old one must be fully spent and so the most likely transaction a caller would
// want for a given hash is the most recent one anyways.
//
// The serialized format for keys and values in the block hash to ID bucket is:
//
// <hash> = <ID>
// Field Type Size
// hash chainhash.Hash 32 bytes
// ID uint32 4 bytes
// -----
// Total: 36 bytes
//
// The serialized format for keys and values in the ID to block hash bucket is:
//
// <ID> = <hash>
// Field Type Size
// ID uint32 4 bytes
// hash chainhash.Hash 32 bytes
// -----
// Total: 36 bytes
// The serialized format for the keys and values in the tx index bucket is:
// <txhash> = <block id><start offset><tx length>
// Field Type Size
// txhash chainhash.Hash 32 bytes
// block id uint32 4 bytes
// start offset uint32 4 bytes
// tx length uint32 4 bytes
// -----
// Total: 44 bytes
//
// dbPutBlockIDIndexEntry uses an existing database transaction to update or add the index entries for the hash to id
// and id to hash mappings for the provided values.
func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash, id uint32) error {
// Serialize the height for use in the index entries.
var serializedID [4]byte
byteOrder.PutUint32(serializedID[:], id)
// Add the block hash to ID mapping to the index.
meta := dbTx.Metadata()
hashIndex := meta.Bucket(idByHashIndexBucketName)
if err := hashIndex.Put(hash[:], serializedID[:]); err != nil {
return err
}
// Add the block ID to hash mapping to the index.
idIndex := meta.Bucket(hashByIDIndexBucketName)
return idIndex.Put(serializedID[:], hash[:])
}
// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index entries from the hash to id and id to
// hash mappings for the provided hash.
func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash) error {
// Remove the block hash to ID mapping.
meta := dbTx.Metadata()
hashIndex := meta.Bucket(idByHashIndexBucketName)
serializedID := hashIndex.Get(hash[:])
if serializedID == nil {
return nil
}
if err := hashIndex.Delete(hash[:]); err != nil {
return err
}
// Remove the block ID to hash mapping.
idIndex := meta.Bucket(hashByIDIndexBucketName)
return idIndex.Delete(serializedID)
}
// dbFetchBlockIDByHash uses an existing database transaction to retrieve the block id for the provided hash from the
// index.
func dbFetchBlockIDByHash(dbTx database.Tx, hash *chainhash.Hash) (uint32, error) {
hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName)
serializedID := hashIndex.Get(hash[:])
if serializedID == nil {
return 0, errNoBlockIDEntry
}
return byteOrder.Uint32(serializedID), nil
}
// dbFetchBlockHashBySerializedID uses an existing database transaction to retrieve the hash for the provided serialized
// block id from the index.
func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*chainhash.Hash, error) {
idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName)
hashBytes := idIndex.Get(serializedID)
if hashBytes == nil {
return nil, errNoBlockIDEntry
}
var hash chainhash.Hash
copy(hash[:], hashBytes)
return &hash, nil
}
// dbFetchBlockHashByID uses an existing database transaction to retrieve the hash for the provided block id from the
// index.
func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*chainhash.Hash, error) {
var serializedID [4]byte
byteOrder.PutUint32(serializedID[:], id)
return dbFetchBlockHashBySerializedID(dbTx, serializedID[:])
}
// putTxIndexEntry serializes the provided values according to the format described about for a transaction index entry.
// The target byte slice must be at least large enough to handle the number of bytes defined by the txEntrySize constant
// or it will panic.
func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) {
byteOrder.PutUint32(target, blockID)
byteOrder.PutUint32(target[4:], uint32(txLoc.TxStart))
byteOrder.PutUint32(target[8:], uint32(txLoc.TxLen))
}
// dbPutTxIndexEntry uses an existing database transaction to update the transaction index given the provided serialized
// data that is expected to have been serialized putTxIndexEntry.
func dbPutTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash, serializedData []byte) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
return txIndex.Put(txHash[:], serializedData)
}
// dbFetchTxIndexEntry uses an existing database transaction to fetch the block region for the provided transaction hash
// from the transaction index. When there is no entry for the provided hash, nil will be returned for the both the
// region and the error.
func dbFetchTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) (*database.BlockRegion, error) {
// Load the record from the database and return now if it doesn't exist.
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
if len(serializedData) == 0 {
return nil, nil
}
// Ensure the serialized data has enough bytes to properly deserialize.
if len(serializedData) < 12 {
return nil, database.DBError{
ErrorCode: database.ErrCorruption,
Description: fmt.Sprintf("corrupt transaction index "+
"entry for %s", txHash),
}
}
// Load the block hash associated with the block ID.
hash, err := dbFetchBlockHashBySerializedID(dbTx, serializedData[0:4])
if err != nil {
Error(err)
return nil, database.DBError{
ErrorCode: database.ErrCorruption,
Description: fmt.Sprintf("corrupt transaction index "+
"entry for %s: %v", txHash, err),
}
}
// Deserialize the final entry.
region := database.BlockRegion{Hash: &chainhash.Hash{}}
copy(region.Hash[:], hash[:])
region.Offset = byteOrder.Uint32(serializedData[4:8])
region.Len = byteOrder.Uint32(serializedData[8:12])
return ®ion, nil
}
// dbAddTxIndexEntries uses an existing database transaction to add a transaction index entry for every transaction in
// the passed block.
func dbAddTxIndexEntries(dbTx database.Tx, block *util.Block, blockID uint32) error {
// The offset and length of the transactions within the serialized block.
txLocs, err := block.TxLoc()
if err != nil {
Error(err)
return err
}
// As an optimization, allocate a single slice big enough to hold all of the serialized transaction index entries
// for the block and serialize them directly into the slice. Then, pass the appropriate subslice to the database to
// be written. This approach significantly cuts down on the number of required allocations.
offset := 0
serializedValues := make([]byte, len(block.Transactions())*txEntrySize)
for i, tx := range block.Transactions() {
putTxIndexEntry(serializedValues[offset:], blockID, txLocs[i])
endOffset := offset + txEntrySize
err := dbPutTxIndexEntry(dbTx, tx.Hash(),
serializedValues[offset:endOffset:endOffset])
if err != nil {
Error(err)
return err
}
offset += txEntrySize
}
return nil
}
// dbRemoveTxIndexEntry uses an existing database transaction to remove the most recent transaction index entry for the
// given hash.
func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
if len(serializedData) == 0 {
return fmt.Errorf("can't remove non-existent transaction %s "+
"from the transaction index", txHash)
}
return txIndex.Delete(txHash[:])
}
// dbRemoveTxIndexEntries uses an existing database transaction to remove the latest transaction entry for every
// transaction in the passed block.
func dbRemoveTxIndexEntries(dbTx database.Tx, block *util.Block) error {
for _, tx := range block.Transactions() {
err := dbRemoveTxIndexEntry(dbTx, tx.Hash())
if err != nil {
Error(err)
return err
}
}
return nil
}
// TxIndex implements a transaction by hash index. That is to say, it supports querying all transactions by their hash.
type TxIndex struct {
db database.DB
curBlockID uint32
}
// Ensure the TxIndex type implements the Indexer interface.
var _ Indexer = (*TxIndex)(nil)
// Init initializes the hash-based transaction index. In particular, it finds the highest used block ID and stores it
// for later use when connecting or disconnecting blocks. This is part of the Indexer interface.
func (idx *TxIndex) Init() error {
// Find the latest known block id field for the internal block id index and initialize it. This is done because it's
// a lot more efficient to do a single search at initialize time than it is to write another value to the database
// on every update.
err := idx.db.View(func(dbTx database.Tx) error {
// Scan forward in large gaps to find a block id that doesn't exist yet to serve as an upper bound for the
// binary search below.
var highestKnown, nextUnknown uint32
testBlockID := uint32(1)
increment := uint32(100000)
for {
_, err := dbFetchBlockHashByID(dbTx, testBlockID)
if err != nil {
// Trace(err)
nextUnknown = testBlockID
break
}
highestKnown = testBlockID
testBlockID += increment
}
Tracef("forward scan (highest known %d, next unknown %d)", highestKnown, nextUnknown)
// No used block IDs due to new database.
if nextUnknown == 1 {
return nil
}
// Use a binary search to find the final highest used block id. This will take at most ceil(log_2(increment))
// attempts.
for {
testBlockID = (highestKnown + nextUnknown) / 2
_, err := dbFetchBlockHashByID(dbTx, testBlockID)
if err != nil {
// Trace(err)
nextUnknown = testBlockID
} else {
highestKnown = testBlockID
}
Tracef("binary scan (highest known %d, next unknown %d)", highestKnown, nextUnknown)
if highestKnown+1 == nextUnknown {
break
}
}
idx.curBlockID = highestKnown
return nil
})
if err != nil {
Error(err)
return err
}
Trace("current internal block ID:", idx.curBlockID)
return nil
}
// Key returns the database key to use for the index as a byte slice. This is part of the Indexer interface.
func (idx *TxIndex) Key() []byte {
return txIndexKey
}
// Name returns the human-readable name of the index. This is part of the Indexer interface.
func (idx *TxIndex) Name() string {
return txIndexName
}
// Create is invoked when the indexer manager determines the index needs to be created for the first time. It creates
// the buckets for the hash-based transaction index and the internal block ID indexes. This is part of the Indexer
// interface.
func (idx *TxIndex) Create(dbTx database.Tx) error {
meta := dbTx.Metadata()
if _, err := meta.CreateBucket(idByHashIndexBucketName); err != nil {
return err
}
if _, err := meta.CreateBucket(hashByIDIndexBucketName); err != nil {
return err
}
_, err := meta.CreateBucket(txIndexKey)
return err
}
// ConnectBlock is invoked by the index manager when a new block has been connected to the main chain. This indexer adds
// a hash-to-transaction mapping for every transaction in the passed block. This is part of the Indexer interface.
func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block *util.Block, stxos []blockchain.SpentTxOut) error {
// Increment the internal block ID to use for the block being connected and add all of the transactions in the block
// to the index.
newBlockID := idx.curBlockID + 1
if err := dbAddTxIndexEntries(dbTx, block, newBlockID); err != nil {
return err
}
// Add the new block ID index entry for the block being connected and update the current internal block ID
// accordingly.
err := dbPutBlockIDIndexEntry(dbTx, block.Hash(), newBlockID)
if err != nil {
Error(err)
return err
}
idx.curBlockID = newBlockID
return nil
}
// DisconnectBlock is invoked by the index manager when a block has been disconnected from the main chain.
//
// This indexer removes the hash-to-transaction mapping for every transaction in the block.
//
// This is part of the Indexer interface.
func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block *util.Block,
stxos []blockchain.SpentTxOut) error {
// Remove all of the transactions in the block from the index.
if err := dbRemoveTxIndexEntries(dbTx, block); err != nil {
return err
}
// Remove the block ID index entry for the block being disconnected and decrement the current internal block ID to
// account for it.
if err := dbRemoveBlockIDIndexEntry(dbTx, block.Hash()); err != nil {
return err
}
idx.curBlockID--
return nil
}
// TxBlockRegion returns the block region for the provided transaction hash from the transaction index.
//
// The block region can in turn be used to load the raw transaction bytes.
//
// When there is no entry for the provided hash, nil will be returned for the both the entry and the error.
//
// This function is safe for concurrent access.
func (idx *TxIndex) TxBlockRegion(hash *chainhash.Hash) (*database.BlockRegion, error) {
var region *database.BlockRegion
err := idx.db.View(func(dbTx database.Tx) error {
var err error
region, err = dbFetchTxIndexEntry(dbTx, hash)
return err
})
return region, err
}
// NewTxIndex returns a new instance of an indexer that is used to create a mapping of the hashes of all transactions in
// the blockchain to the respective block, location within the block, and size of the transaction.
//
// It implements the Indexer interface which plugs into the IndexManager that in turn is used by the blockchain package.
//
// This allows the index to be seamlessly maintained along with the chain.
func NewTxIndex(db database.DB) *TxIndex {
return &TxIndex{db: db}
}
// dropBlockIDIndex drops the internal block id index.
func dropBlockIDIndex(db database.DB) error {
return db.Update(func(dbTx database.Tx) error {
meta := dbTx.Metadata()
err := meta.DeleteBucket(idByHashIndexBucketName)
if err != nil {
Error(err)
return err
}
return meta.DeleteBucket(hashByIDIndexBucketName)
})
}
// DropTxIndex drops the transaction index from the provided database if it exists. Since the address index relies on
// it, the address index will also be dropped when it exists.
func DropTxIndex(db database.DB, interrupt qu.C) error {
err := dropIndex(db, addrIndexKey, addrIndexName, interrupt)
if err != nil {
Error(err)
return err
}
return dropIndex(db, txIndexKey, txIndexName, interrupt)
}