Skip to content

Commit

Permalink
Merge pull request #103 from halseth/query-batch-logging
Browse files Browse the repository at this point in the history
Add more query batch logging
  • Loading branch information
Roasbeef committed Oct 19, 2018
2 parents 3467bc0 + 5c29ea8 commit 8018ab7
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 29 deletions.
34 changes: 19 additions & 15 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,8 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
// latest known checkpoint.
curHeader, curHeight, err := store.ChainTip()
if err != nil {
panic("getting chaintip from store")
panic(fmt.Sprintf("failed getting chaintip from filter "+
"store: %v", err))
}

initialFilterHeader := curHeader
Expand Down Expand Up @@ -813,14 +814,8 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
endHeightRange,
)
if err != nil {
// Try to recover this.
select {
case <-b.quit:
return
case <-time.After(QueryTimeout):
currentInterval--
continue
}
panic(fmt.Sprintf("failed getting block header at "+
"height %v: %v", endHeightRange, err))
}
stopHash := stopHeader.BlockHash()

Expand Down Expand Up @@ -901,8 +896,9 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
startHeight := checkPointIndex*wire.CFCheckptInterval + 1
lastHeight := startHeight + wire.CFCheckptInterval

log.Debugf("Got cfheaders from height=%v to height=%v",
startHeight, lastHeight)
log.Debugf("Got cfheaders from height=%v to "+
"height=%v, prev_hash=%v", startHeight,
lastHeight, r.PrevFilterHeader)

// If this is out of order but not yet written, we can
// verify that the checkpoints match, and then store
Expand Down Expand Up @@ -939,6 +935,10 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
r.PrevFilterHeader = *curHeader
offset := curHeight + 1 - startHeight
r.FilterHashes = r.FilterHashes[offset:]

log.Debugf("Using offset %d for initial "+
"filter header range (new prev_hash=%v)",
offset, r.PrevFilterHeader)
}

curHeader, err = b.writeCFHeadersMsg(r, store)
Expand Down Expand Up @@ -1009,7 +1009,9 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
return nil, err
}
if *tip != msg.PrevFilterHeader {
return nil, fmt.Errorf("attempt to write cfheaders out of order")
return nil, fmt.Errorf("attempt to write cfheaders out of "+
"order! Tip=%v, prev_hash=%v.", *tip,
msg.PrevFilterHeader)
}

// Cycle through the headers and compute each header based on the prev
Expand Down Expand Up @@ -1045,14 +1047,16 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
// particular checkpoint interval.
lastHeight := startHeight + uint32(numHeaders) - 1
lastBlockHeader := matchingBlockHeaders[numHeaders-1]
lastHash := lastBlockHeader.BlockHash()

// We only need to set the height and hash of the very last filter
// header in the range to ensure that the index properly updates the
// tip of the chain.
headerBatch[numHeaders-1].HeaderHash = lastBlockHeader.BlockHash()
headerBatch[numHeaders-1].HeaderHash = lastHash
headerBatch[numHeaders-1].Height = lastHeight

log.Debugf("Writing filter headers up to height=%v", lastHeight)
log.Debugf("Writing filter headers up to height=%v, hash=%v",
lastHeight, lastHash)

// Write the header batch.
err = store.WriteHeaders(headerBatch...)
Expand Down Expand Up @@ -1082,7 +1086,7 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
// for sub-system that only need to know the height has changed rather
// than know each new header that's been added to the tip.
b.filterHeaderTip = lastHeight
b.filterHeaderTipHash = lastBlockHeader.BlockHash()
b.filterHeaderTipHash = lastHash
b.newFilterHeadersSignal.Broadcast()

return &lastHeader, nil
Expand Down
305 changes: 305 additions & 0 deletions blockmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
package neutrino

import (
"encoding/binary"
"io/ioutil"
"math/rand"
"os"
"sync"
"testing"

"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil/gcs/builder"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightninglabs/neutrino/headerfs"
)

// TestBlockManagerInitialInterval tests that the block manager is able to
// handle checkpointed filter header query responses in out of order, and when
// a partial interval is already written to the store.
func TestBlockManagerInitialInterval(t *testing.T) {
t.Parallel()

testCases := []struct {
// permute indicates whether responses should be permutated.
permute bool

// partialInterval indicates whether we should write parts of
// the first checkpoint interval to the filter header store
// before starting the test.
partialInterval bool
}{
{
permute: false,
partialInterval: false,
},
{
permute: false,
partialInterval: true,
},
{
permute: true,
partialInterval: false,
},
{
permute: true,
partialInterval: true,
},
}

for _, test := range testCases {

// Set up the block and filter header stores.
tempDir, err := ioutil.TempDir("", "neutrino")
if err != nil {
t.Fatalf("Failed to create temporary directory: %s",
err)
}
defer os.RemoveAll(tempDir)

db, err := walletdb.Create("bdb", tempDir+"/weks.db")
if err != nil {
t.Fatalf("Error opening DB: %s", err)
}
defer db.Close()

hdrStore, err := headerfs.NewBlockHeaderStore(
tempDir, db, &chaincfg.SimNetParams,
)
if err != nil {
t.Fatalf("Error creating block header store: %s", err)
}

cfStore, err := headerfs.NewFilterHeaderStore(
tempDir, db, headerfs.RegularFilter,
&chaincfg.SimNetParams,
)
if err != nil {
t.Fatalf("Error creating filter header store: %s", err)
}

// Keep track of the filter headers and block headers. Since
// the genesis headers are written automatically when the store
// is created, we query it to add to the slices.
genesisBlockHeader, _, err := hdrStore.ChainTip()
if err != nil {
t.Fatal(err)
}

var blockHeaders []headerfs.BlockHeader
blockHeaders = append(blockHeaders, headerfs.BlockHeader{
BlockHeader: genesisBlockHeader,
Height: 0,
})

genesisFilterHeader, _, err := cfStore.ChainTip()
if err != nil {
t.Fatal(err)
}

var cfHeaders []headerfs.FilterHeader
cfHeaders = append(cfHeaders, headerfs.FilterHeader{
HeaderHash: genesisBlockHeader.BlockHash(),
FilterHash: *genesisFilterHeader,
Height: 0,
})

// The filter hashes (not the filter headers!) will be sent as
// part of the CFHeaders response, so we also keep track of
// them
genesisFilter, err := builder.BuildBasicFilter(
chaincfg.SimNetParams.GenesisBlock, nil,
)
if err != nil {
t.Fatalf("unable to build genesis filter: %v", err)
}

genesisFilterHash, err := builder.GetFilterHash(genesisFilter)
if err != nil {
t.Fatal("fail")
}

var filterHashes []chainhash.Hash
filterHashes = append(filterHashes, genesisFilterHash)

// Also keep track of the current filter header. We use this to
// calculate the next filter header, as it commits to the
// previous.
currentCFHeader := *genesisFilterHeader

// checkpoints will be the checkpoints passed to
// getCheckpointedCFHeaders.
var checkpoints []*chainhash.Hash

maxHeight := 20 * uint32(wire.CFCheckptInterval)
for height := uint32(1); height <= maxHeight; height++ {
header := heightToHeader(height)
blockHeader := headerfs.BlockHeader{
BlockHeader: header,
Height: height,
}

blockHeaders = append(blockHeaders, blockHeader)

// It doesn't really matter what filter the filter
// header commit to, so just use the height as a nonce
// for the filters.
filterHash := chainhash.Hash{}
binary.BigEndian.PutUint32(filterHash[:], height)
filterHashes = append(filterHashes, filterHash)

// Calculate the current filter header, and add to our
// slice.
currentCFHeader = chainhash.DoubleHashH(
append(filterHash[:], currentCFHeader[:]...),
)
cfHeaders = append(cfHeaders, headerfs.FilterHeader{
HeaderHash: header.BlockHash(),
FilterHash: currentCFHeader,
Height: height,
})

// Each interval we must record a checkpoint.
if height > 0 && height%wire.CFCheckptInterval == 0 {
// We must make a copy of the current header to
// avoid mutation.
cfh := currentCFHeader
checkpoints = append(checkpoints, &cfh)
}

}

// Write all block headers but the genesis, since it is already
// in the store.
if err = hdrStore.WriteHeaders(blockHeaders[1:]...); err != nil {
t.Fatalf("Error writing batch of headers: %s", err)
}

// We emulate the case where a few filter headers are already
// written to the store by writing 1/3 of the first interval.
if test.partialInterval {
err = cfStore.WriteHeaders(
cfHeaders[1 : wire.CFCheckptInterval/3]...,
)
if err != nil {
t.Fatalf("Error writing batch of headers: %s",
err)
}
}

// Set up a chain service with a custom query batch method.
cs := &ChainService{
BlockHeaders: hdrStore,
}
cs.queryBatch = func(msgs []wire.Message, f func(*ServerPeer,
wire.Message, wire.Message) bool, q <-chan struct{},
qo ...QueryOption) {

// Craft response for each message.
var responses []*wire.MsgCFHeaders
for _, msg := range msgs {
// Only GetCFHeaders expected.
q, ok := msg.(*wire.MsgGetCFHeaders)
if !ok {
t.Fatalf("got unexpected message %T",
msg)
}

// The start height must be set to a checkpoint
// height+1.
if q.StartHeight%wire.CFCheckptInterval != 1 {
t.Fatalf("unexpexted start height %v",
q.StartHeight)
}

var prevFilterHeader chainhash.Hash
switch q.StartHeight {

// If the start height is 1 the
// prevFilterHeader is set to the genesis
// header.
case 1:
prevFilterHeader = *genesisFilterHeader

// Otherwise we use one of the created
// checkpoints.
default:
j := q.StartHeight/wire.CFCheckptInterval - 1
prevFilterHeader = *checkpoints[j]
}

resp := &wire.MsgCFHeaders{
FilterType: q.FilterType,
StopHash: q.StopHash,
PrevFilterHeader: prevFilterHeader,
}

// Keep adding filter hashes until we reach the
// stop hash.
for h := q.StartHeight; ; h++ {
resp.FilterHashes = append(
resp.FilterHashes, &filterHashes[h],
)

blockHash := blockHeaders[h].BlockHash()
if blockHash == q.StopHash {
break
}
}

responses = append(responses, resp)
}

// We permute the response order if the test signals
// that.
perm := rand.Perm(len(responses))
for i, v := range perm {
index := i
if test.permute {
index = v
}
if !f(nil, msgs[index], responses[index]) {
t.Fatalf("got response false")
}
}
}

// Set up a blockManager with the chain service we defined...
bm := blockManager{
server: cs,
blkHeaderProgressLogger: newBlockProgressLogger(
"Processed", "block", log,
),
fltrHeaderProgessLogger: newBlockProgressLogger(
"Verified", "filter header", log,
),
}
bm.newHeadersSignal = sync.NewCond(&bm.newHeadersMtx)
bm.newFilterHeadersSignal = sync.NewCond(&bm.newFilterHeadersMtx)

// ...and call the get checkpointed cf headers method with the
// checkpoints we created.
bm.getCheckpointedCFHeaders(
checkpoints, cfStore, wire.GCSFilterRegular,
)

// Finally make sure the filter header tip is what we expect.
tip, tipHeight, err := cfStore.ChainTip()
if err != nil {
t.Fatal(err)
}

if tipHeight != maxHeight {
t.Fatalf("expected tip height to be %v, was %v",
maxHeight, tipHeight)
}

lastCheckpoint := checkpoints[len(checkpoints)-1]
if *tip != *lastCheckpoint {
t.Fatalf("expected tip to be %v, was %v",
lastCheckpoint, tip)
}
}
}
Loading

0 comments on commit 8018ab7

Please sign in to comment.