forked from decred/dcrd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chainindexer.go
482 lines (428 loc) · 14.2 KB
/
chainindexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
// Copyright (c) 2013-2014 The btcsuite developers
// Copyright (c) 2015 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"sync"
"sync/atomic"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/database"
"github.com/decred/dcrd/txscript"
"github.com/decred/dcrd/wire"
"github.com/decred/dcrutil"
"github.com/btcsuite/golangcrypto/ripemd160"
)
type indexState int
const (
// Our two operating modes.
// We go into "CatchUp" mode when, on boot, the current best
// chain height is greater than the last block we've indexed.
// "CatchUp" mode is characterized by several concurrent worker
// goroutines indexing blocks organized by a manager goroutine.
// When in "CatchUp" mode, incoming requests to index newly solved
// blocks are backed up for later processing. Once we've finished
// catching up, we process these queued jobs, and then enter into
// "maintenance" mode.
indexCatchUp indexState = iota
// When in "maintenance" mode, we have a single worker serially
// processing incoming jobs to index newly solved blocks.
indexMaintain
)
// addrIndexer provides a concurrent service for indexing the transactions of
// target blocks based on the addresses involved in the transaction.
type addrIndexer struct {
server *server
started int32
shutdown int32
state indexState
progressLogger *blockProgressLogger
currentIndexTip int64
chainTip int64
sync.Mutex
}
// newAddrIndexer creates a new block address indexer.
// Use Start to begin processing incoming index jobs.
func newAddrIndexer(s *server) (*addrIndexer, error) {
_, chainHeight, err := s.db.NewestSha()
if err != nil {
return nil, err
}
_, lastIndexedHeight, err := s.db.FetchAddrIndexTip()
if err != nil && err != database.ErrAddrIndexDoesNotExist {
return nil, err
}
var state indexState
if chainHeight == lastIndexedHeight {
state = indexMaintain
} else {
state = indexCatchUp
}
ai := &addrIndexer{
server: s,
state: state,
currentIndexTip: lastIndexedHeight,
chainTip: chainHeight,
progressLogger: newBlockProgressLogger("Indexed addresses of",
adxrLog),
}
return ai, nil
}
// Start begins processing of incoming indexing jobs.
func (a *addrIndexer) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
adxrLog.Trace("Starting address indexer")
err := a.initialize()
if err != nil {
adxrLog.Errorf("Couldn't start address indexer: %v", err.Error())
return
}
}
// Stop gracefully shuts down the address indexer by stopping all ongoing
// worker goroutines, waiting for them to finish their current task.
func (a *addrIndexer) Stop() error {
if atomic.AddInt32(&a.shutdown, 1) != 1 {
adxrLog.Warnf("Address indexer is already in the process of " +
"shutting down")
return nil
}
adxrLog.Infof("Address indexer shutting down")
return nil
}
// IsCaughtUp returns a bool representing if the address indexer has
// caught up with the best height on the main chain.
func (a *addrIndexer) IsCaughtUp() bool {
a.Lock()
defer a.Unlock()
return a.state == indexMaintain
}
// initialize starts the address indexer and fills the database up to the
// top height of the current database.
func (a *addrIndexer) initialize() error {
if a.state == indexCatchUp {
adxrLog.Infof("Building up address index from height %v to %v.",
a.currentIndexTip+1, a.chainTip)
// Starting from the next block after our current index tip,
// feed our workers each successive block to index until we've
// caught up to the current highest block height.
lastBlockIdxHeight := a.currentIndexTip + 1
for lastBlockIdxHeight <= a.chainTip {
// Skip the genesis block.
if !(lastBlockIdxHeight == 0) {
targetSha, err := a.server.db.FetchBlockShaByHeight(
lastBlockIdxHeight)
if err != nil {
return fmt.Errorf("Unable to look up the sha of the "+
"next target block (height %v): %v",
lastBlockIdxHeight, err)
}
targetBlock, err := a.server.db.FetchBlockBySha(targetSha)
if err != nil {
// Unable to locate a target block by sha, this
// is a critical error, we may have an
// inconsistency in the DB.
return fmt.Errorf("Unable to look up the next "+
"target block (sha %v): %v", targetSha, err)
}
targetParent, err := a.server.db.FetchBlockBySha(
&targetBlock.MsgBlock().Header.PrevBlock)
if err != nil {
// Unable to locate a target block by sha, this
// is a critical error, we may have an
// inconsistency in the DB.
return fmt.Errorf("Unable to look up the next "+
"target block parent (sha %v): %v",
targetBlock.MsgBlock().Header.PrevBlock, err)
}
addrIndex, err := a.indexBlockAddrs(targetBlock, targetParent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.UpdateAddrIndexForBlock(targetSha,
lastBlockIdxHeight,
addrIndex)
if err != nil {
return fmt.Errorf("Unable to insert block: %v", err.Error())
}
}
lastBlockIdxHeight++
}
a.Lock()
a.state = indexMaintain
a.Unlock()
}
adxrLog.Debugf("Address indexer has queued up to best height, safe " +
"to begin maintainence mode")
return nil
}
// convertToAddrIndex indexes all data pushes greater than 8 bytes within the
// passed SPK and returns a TxAddrIndex with the given data. Our "address"
// index is actually a hash160 index, where in the ideal case the data push
// is either the hash160 of a publicKey (P2PKH) or a Script (P2SH).
func convertToAddrIndex(scrVersion uint16, scr []byte, height int64,
locInBlock *wire.TxLoc, txType stake.TxType) ([]*database.TxAddrIndex, error) {
var tais []*database.TxAddrIndex
if scr == nil || locInBlock == nil {
return nil, fmt.Errorf("passed nil pointer")
}
var indexKey [ripemd160.Size]byte
// Get the script classes and extract the PKH if applicable.
// If it's multisig, unknown, etc, just hash the script itself.
class, addrs, _, err := txscript.ExtractPkScriptAddrs(scrVersion, scr,
activeNetParams.Params)
if err != nil {
return nil, fmt.Errorf("script conversion error: %v", err.Error())
}
knownType := false
for _, addr := range addrs {
switch {
case class == txscript.PubKeyTy:
copy(indexKey[:], addr.Hash160()[:])
case class == txscript.PubkeyAltTy:
copy(indexKey[:], addr.Hash160()[:])
case class == txscript.PubKeyHashTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.PubkeyHashAltTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeSubmissionTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeGenTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeRevocationTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeSubChangeTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.MultiSigTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.ScriptHashTy:
copy(indexKey[:], addr.ScriptAddress()[:])
}
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
knownType = true
}
// This is a commitment for a future vote or
// revocation. Extract the address data from
// it and store it in the addrIndex.
if txType == stake.TxTypeSStx && class == txscript.NullDataTy {
addr, err := stake.AddrFromSStxPkScrCommitment(scr,
activeNetParams.Params)
if err != nil {
return nil, fmt.Errorf("ticket commit pkscr conversion error: %v",
err.Error())
}
copy(indexKey[:], addr.ScriptAddress()[:])
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
} else if !knownType {
copy(indexKey[:], dcrutil.Hash160(scr))
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
}
return tais, nil
}
// lookupTransaction is a special transaction lookup function that searches
// the database, the block, and its parent for a transaction. This is needed
// because indexBlockAddrs is called AFTER a block is added/removed in the
// blockchain in blockManager, necessitating that the blocks internally be
// searched for inputs for any given transaction too. Additionally, it's faster
// to get the tx from the blocks here since they're already
func (a *addrIndexer) lookupTransaction(txHash chainhash.Hash, blk *dcrutil.Block,
parent *dcrutil.Block) (*wire.MsgTx, error) {
// Search the previous block and parent first.
txTreeRegularValid := dcrutil.IsFlagSet16(blk.MsgBlock().Header.VoteBits,
dcrutil.BlockValid)
// Search the regular tx tree of this and the last block if the
// tx tree regular was validated.
if txTreeRegularValid {
for _, stx := range parent.STransactions() {
if stx.Sha().IsEqual(&txHash) {
return stx.MsgTx(), nil
}
}
for _, tx := range parent.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
for _, tx := range blk.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
} else {
// Just search this block's regular tx tree and the previous
// block's stake tx tree.
for _, stx := range parent.STransactions() {
if stx.Sha().IsEqual(&txHash) {
return stx.MsgTx(), nil
}
}
for _, tx := range blk.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
}
// Lookup and fetch the referenced output's tx in the database.
txList, err := a.server.db.FetchTxBySha(&txHash)
if err != nil {
adxrLog.Errorf("Error fetching tx %v: %v",
txHash, err)
return nil, err
}
if len(txList) == 0 {
return nil, fmt.Errorf("transaction %v not found",
txHash)
}
return txList[len(txList)-1].Tx, nil
}
// indexBlockAddrs returns a populated index of the all the transactions in the
// passed block based on the addresses involved in each transaction.
func (a *addrIndexer) indexBlockAddrs(blk *dcrutil.Block,
parent *dcrutil.Block) (database.BlockAddrIndex, error) {
var addrIndex database.BlockAddrIndex
_, stxLocs, err := blk.TxLoc()
if err != nil {
return nil, err
}
txTreeRegularValid := dcrutil.IsFlagSet16(blk.MsgBlock().Header.VoteBits,
dcrutil.BlockValid)
// Add regular transactions iff the block was validated.
if txTreeRegularValid {
txLocs, _, err := parent.TxLoc()
if err != nil {
return nil, err
}
for txIdx, tx := range parent.Transactions() {
// Tx's offset and length in the block.
locInBlock := &txLocs[txIdx]
// Coinbases don't have any inputs.
if !blockchain.IsCoinBase(tx) {
// Index the SPK's of each input's previous outpoint
// transaction.
for _, txIn := range tx.MsgTx().TxIn {
prevOutTx, err := a.lookupTransaction(
txIn.PreviousOutPoint.Hash,
blk,
parent)
inputOutPoint := prevOutTx.TxOut[txIn.PreviousOutPoint.Index]
toAppend, err := convertToAddrIndex(inputOutPoint.Version,
inputOutPoint.PkScript, parent.Height(), locInBlock,
stake.TxTypeRegular)
if err != nil {
adxrLog.Tracef("Error converting tx txin %v: %v",
txIn.PreviousOutPoint.Hash, err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
for _, txOut := range tx.MsgTx().TxOut {
toAppend, err := convertToAddrIndex(txOut.Version, txOut.PkScript,
parent.Height(), locInBlock, stake.TxTypeRegular)
if err != nil {
adxrLog.Tracef("Error converting tx txout %v: %v",
tx.MsgTx().TxSha(), err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
}
// Add stake transactions.
for stxIdx, stx := range blk.STransactions() {
// Tx's offset and length in the block.
locInBlock := &stxLocs[stxIdx]
txType := stake.DetermineTxType(stx)
// Index the SPK's of each input's previous outpoint
// transaction.
for i, txIn := range stx.MsgTx().TxIn {
// Stakebases don't have any inputs.
if txType == stake.TxTypeSSGen && i == 0 {
continue
}
// Lookup and fetch the referenced output's tx.
prevOutTx, err := a.lookupTransaction(
txIn.PreviousOutPoint.Hash,
blk,
parent)
inputOutPoint := prevOutTx.TxOut[txIn.PreviousOutPoint.Index]
toAppend, err := convertToAddrIndex(inputOutPoint.Version,
inputOutPoint.PkScript, blk.Height(), locInBlock,
txType)
if err != nil {
adxrLog.Tracef("Error converting stx txin %v: %v",
txIn.PreviousOutPoint.Hash, err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
for _, txOut := range stx.MsgTx().TxOut {
toAppend, err := convertToAddrIndex(txOut.Version, txOut.PkScript,
blk.Height(), locInBlock, txType)
if err != nil {
adxrLog.Tracef("Error converting stx txout %v: %v",
stx.MsgTx().TxSha(), err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
return addrIndex, nil
}
// InsertBlock synchronously queues a newly solved block to have its
// transactions indexed by address.
func (a *addrIndexer) InsertBlock(block *dcrutil.Block, parent *dcrutil.Block) error {
addrIndex, err := a.indexBlockAddrs(block, parent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.UpdateAddrIndexForBlock(block.Sha(),
block.Height(),
addrIndex)
if err != nil {
return fmt.Errorf("Unable to insert block: %v", err.Error())
}
return nil
}
// RemoveBlock removes all transactions from a block on the tip from the
// address index database.
func (a *addrIndexer) RemoveBlock(block *dcrutil.Block,
parent *dcrutil.Block) error {
addrIndex, err := a.indexBlockAddrs(block, parent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.DropAddrIndexForBlock(block.Sha(),
block.Height(),
addrIndex)
if err != nil {
return fmt.Errorf("Unable to remove block: %v", err.Error())
}
return nil
}