From 1ab57fc5ce59a04ab4391b259540d658c73a162b Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 12 Jun 2019 15:25:04 -0700 Subject: [PATCH 1/2] blockmanager: remove unused SynchronizeFilterHeaders method --- blockmanager.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index f466b1d29..a3ea7674d 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1942,19 +1942,6 @@ func (b *blockManager) BlockHeadersSynced() bool { return b.syncPeer.LastBlock() >= b.syncPeer.StartingHeight() } -// SynchronizeFilterHeaders allows the caller to execute a function closure -// that depends on synchronization with the current set of filter headers. This -// allows the caller to execute an action that depends on the current filter -// header state, thereby ensuring that the state would shift from underneath -// them. Each execution of the closure will have the current filter header tip -// passed in to ensue that the caller gets a consistent view. -func (b *blockManager) SynchronizeFilterHeaders(f func(uint32) error) error { - b.newFilterHeadersMtx.RLock() - defer b.newFilterHeadersMtx.RUnlock() - - return f(b.filterHeaderTip) -} - // QueueInv adds the passed inv message and peer to the block handling queue. func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *ServerPeer) { // No channel handling here because peers do not need to block on inv From e9f6be15533f95c8b774d722ea54325bc231c4cc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 12 Jun 2019 16:28:24 -0700 Subject: [PATCH 2/2] blockmanager: address filter header tip lock inconsistency There was a previous assumption that filterHeaderTip and filterHeaderTipHash were only being accessed within the cfHandler goroutine. This is no longer the case, as we require the filter header tip in order to deliver a backlog of notifications for block notification subscribers. We also establish a locking order when needing to acquire both newHeadersMtx and newFilterHeadersMtx in certain cases. --- blockmanager.go | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index a3ea7674d..24e4004ae 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -119,6 +119,9 @@ type blockManager struct { // newHeadersMtx is the mutex that should be held when reading/writing // the headerTip variable above. + // + // NOTE: When using this mutex along with newFilterHeadersMtx at the + // same time, newHeadersMtx should always be acquired first. newHeadersMtx sync.RWMutex // newHeadersSignal is condition variable which will be used to notify @@ -140,6 +143,9 @@ type blockManager struct { // newFilterHeadersMtx is the mutex that should be held when // reading/writing the filterHeaderTip variable above. + // + // NOTE: When using this mutex along with newHeadersMtx at the same + // time, newHeadersMtx should always be acquired first. newFilterHeadersMtx sync.RWMutex // newFilterHeadersSignal is condition variable which will be used to @@ -473,10 +479,10 @@ waitForHeaders: log.Infof("Waiting for more block headers, then will start "+ "cfheaders sync from height %v...", b.filterHeaderTip) - // NOTE: We can grab the filterHeaderTip here without a lock, as this - // is the only goroutine that can modify this value. b.newHeadersSignal.L.Lock() + b.newFilterHeadersMtx.RLock() for !(b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip || b.BlockHeadersSynced()) { + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.Wait() // While we're awake, we'll quickly check to see if we need to @@ -488,7 +494,12 @@ waitForHeaders: default: } + + // Re-acquire the lock in order to check for the filter header + // tip at the next iteration of the loop. + b.newFilterHeadersMtx.RLock() } + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.L.Unlock() // Now that the block headers are finished or ahead of the filter @@ -501,10 +512,12 @@ waitForHeaders: } lastHash := lastHeader.BlockHash() + b.newFilterHeadersMtx.RLock() log.Infof("Starting cfheaders sync from (block_height=%v, "+ "block_hash=%v) to (block_height=%v, block_hash=%v)", b.filterHeaderTip, b.filterHeaderTipHash, lastHeight, lastHeader.BlockHash()) + b.newFilterHeadersMtx.RUnlock() fType := wire.GCSFilterRegular store := b.server.RegFilterHeaders @@ -606,10 +619,13 @@ waitForHeaders: // we also go back to the loop to utilize the faster check pointed // fetching. b.newHeadersMtx.RLock() + b.newFilterHeadersMtx.RLock() if b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip { + b.newFilterHeadersMtx.RUnlock() b.newHeadersMtx.RUnlock() goto waitForHeaders } + b.newFilterHeadersMtx.RUnlock() b.newHeadersMtx.RUnlock() log.Infof("Fully caught up with cfheaders at height "+ @@ -622,14 +638,12 @@ waitForHeaders: for { // We'll wait until the filter header tip and the header tip // are mismatched. - // - // NOTE: We can grab the filterHeaderTipHash here without a - // lock, as this is the only goroutine that can modify this - // value. b.newHeadersSignal.L.Lock() + b.newFilterHeadersMtx.RLock() for b.filterHeaderTipHash == b.headerTipHash { // We'll wait here until we're woken up by the // broadcast signal. + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.Wait() // Before we proceed, we'll check if we need to exit at @@ -640,8 +654,12 @@ waitForHeaders: return default: } - } + // Re-acquire the lock in order to check for the filter + // header tip at the next iteration of the loop. + b.newFilterHeadersMtx.RLock() + } + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.L.Unlock() // At this point, we know that there're a set of new filter @@ -1079,9 +1097,6 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, store *headerfs.FilterHeaderStore) (*chainhash.Hash, error) { - b.newFilterHeadersMtx.Lock() - defer b.newFilterHeadersMtx.Unlock() - // Check that the PrevFilterHeader is the same as the last stored so we // can prevent misalignment. tip, tipHeight, err := store.ChainTip() @@ -1161,8 +1176,10 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, // has changed as well. Unlike the set of notifications above, this is // for sub-system that only need to know the height has changed rather // than know each new header that's been added to the tip. + b.newFilterHeadersMtx.Lock() b.filterHeaderTip = lastHeight b.filterHeaderTipHash = lastHash + b.newFilterHeadersMtx.Unlock() b.newFilterHeadersSignal.Broadcast() return &lastHeader, nil @@ -2608,9 +2625,8 @@ func (b *blockManager) NotificationsSinceHeight( height uint32) ([]blockntfns.BlockNtfn, uint32, error) { b.newFilterHeadersMtx.RLock() - defer b.newFilterHeadersMtx.RUnlock() - bestHeight := b.filterHeaderTip + b.newFilterHeadersMtx.RUnlock() // If a height of 0 is provided by the caller, then a backlog of // notifications is not needed.