Skip to content

Commit

Permalink
Merge pull request #155 from wpaulino/filter-header-tip-lock
Browse files Browse the repository at this point in the history
blockmanager: address filter header tip lock inconsistency
  • Loading branch information
halseth committed Jun 20, 2019
2 parents 3f503ac + e9f6be1 commit 32af2fb
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions blockmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ type blockManager struct {

// newHeadersMtx is the mutex that should be held when reading/writing
// the headerTip variable above.
//
// NOTE: When using this mutex along with newFilterHeadersMtx at the
// same time, newHeadersMtx should always be acquired first.
newHeadersMtx sync.RWMutex

// newHeadersSignal is condition variable which will be used to notify
Expand All @@ -144,6 +147,9 @@ type blockManager struct {

// newFilterHeadersMtx is the mutex that should be held when
// reading/writing the filterHeaderTip variable above.
//
// NOTE: When using this mutex along with newHeadersMtx at the same
// time, newHeadersMtx should always be acquired first.
newFilterHeadersMtx sync.RWMutex

// newFilterHeadersSignal is condition variable which will be used to
Expand Down Expand Up @@ -477,10 +483,10 @@ waitForHeaders:
log.Infof("Waiting for more block headers, then will start "+
"cfheaders sync from height %v...", b.filterHeaderTip)

// NOTE: We can grab the filterHeaderTip here without a lock, as this
// is the only goroutine that can modify this value.
b.newHeadersSignal.L.Lock()
b.newFilterHeadersMtx.RLock()
for !(b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip || b.BlockHeadersSynced()) {
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.Wait()

// While we're awake, we'll quickly check to see if we need to
Expand All @@ -492,7 +498,12 @@ waitForHeaders:
default:

}

// Re-acquire the lock in order to check for the filter header
// tip at the next iteration of the loop.
b.newFilterHeadersMtx.RLock()
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.L.Unlock()

// Now that the block headers are finished or ahead of the filter
Expand All @@ -505,10 +516,12 @@ waitForHeaders:
}
lastHash := lastHeader.BlockHash()

b.newFilterHeadersMtx.RLock()
log.Infof("Starting cfheaders sync from (block_height=%v, "+
"block_hash=%v) to (block_height=%v, block_hash=%v)",
b.filterHeaderTip, b.filterHeaderTipHash, lastHeight,
lastHeader.BlockHash())
b.newFilterHeadersMtx.RUnlock()

fType := wire.GCSFilterRegular
store := b.server.RegFilterHeaders
Expand Down Expand Up @@ -610,10 +623,13 @@ waitForHeaders:
// we also go back to the loop to utilize the faster check pointed
// fetching.
b.newHeadersMtx.RLock()
b.newFilterHeadersMtx.RLock()
if b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip {
b.newFilterHeadersMtx.RUnlock()
b.newHeadersMtx.RUnlock()
goto waitForHeaders
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersMtx.RUnlock()

log.Infof("Fully caught up with cfheaders at height "+
Expand All @@ -626,14 +642,12 @@ waitForHeaders:
for {
// We'll wait until the filter header tip and the header tip
// are mismatched.
//
// NOTE: We can grab the filterHeaderTipHash here without a
// lock, as this is the only goroutine that can modify this
// value.
b.newHeadersSignal.L.Lock()
b.newFilterHeadersMtx.RLock()
for b.filterHeaderTipHash == b.headerTipHash {
// We'll wait here until we're woken up by the
// broadcast signal.
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.Wait()

// Before we proceed, we'll check if we need to exit at
Expand All @@ -644,8 +658,12 @@ waitForHeaders:
return
default:
}
}

// Re-acquire the lock in order to check for the filter
// header tip at the next iteration of the loop.
b.newFilterHeadersMtx.RLock()
}
b.newFilterHeadersMtx.RUnlock()
b.newHeadersSignal.L.Unlock()

// At this point, we know that there're a set of new filter
Expand Down Expand Up @@ -1083,9 +1101,6 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
store *headerfs.FilterHeaderStore) (*chainhash.Hash, error) {

b.newFilterHeadersMtx.Lock()
defer b.newFilterHeadersMtx.Unlock()

// Check that the PrevFilterHeader is the same as the last stored so we
// can prevent misalignment.
tip, tipHeight, err := store.ChainTip()
Expand Down Expand Up @@ -1165,8 +1180,10 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders,
// has changed as well. Unlike the set of notifications above, this is
// for sub-system that only need to know the height has changed rather
// than know each new header that's been added to the tip.
b.newFilterHeadersMtx.Lock()
b.filterHeaderTip = lastHeight
b.filterHeaderTipHash = lastHash
b.newFilterHeadersMtx.Unlock()
b.newFilterHeadersSignal.Broadcast()

return &lastHeader, nil
Expand Down Expand Up @@ -1946,19 +1963,6 @@ func (b *blockManager) BlockHeadersSynced() bool {
return b.syncPeer.LastBlock() >= b.syncPeer.StartingHeight()
}

// SynchronizeFilterHeaders allows the caller to execute a function closure
// that depends on synchronization with the current set of filter headers. This
// allows the caller to execute an action that depends on the current filter
// header state, thereby ensuring that the state would shift from underneath
// them. Each execution of the closure will have the current filter header tip
// passed in to ensue that the caller gets a consistent view.
func (b *blockManager) SynchronizeFilterHeaders(f func(uint32) error) error {
b.newFilterHeadersMtx.RLock()
defer b.newFilterHeadersMtx.RUnlock()

return f(b.filterHeaderTip)
}

// QueueInv adds the passed inv message and peer to the block handling queue.
func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *ServerPeer) {
// No channel handling here because peers do not need to block on inv
Expand Down Expand Up @@ -2625,9 +2629,8 @@ func (b *blockManager) NotificationsSinceHeight(
height uint32) ([]blockntfns.BlockNtfn, uint32, error) {

b.newFilterHeadersMtx.RLock()
defer b.newFilterHeadersMtx.RUnlock()

bestHeight := b.filterHeaderTip
b.newFilterHeadersMtx.RUnlock()

// If a height of 0 is provided by the caller, then a backlog of
// notifications is not needed.
Expand Down

0 comments on commit 32af2fb

Please sign in to comment.