diff --git a/blockmanager.go b/blockmanager.go index 376ed40c3..e6f2cbf77 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -123,6 +123,9 @@ type blockManager struct { // newHeadersMtx is the mutex that should be held when reading/writing // the headerTip variable above. + // + // NOTE: When using this mutex along with newFilterHeadersMtx at the + // same time, newHeadersMtx should always be acquired first. newHeadersMtx sync.RWMutex // newHeadersSignal is condition variable which will be used to notify @@ -144,6 +147,9 @@ type blockManager struct { // newFilterHeadersMtx is the mutex that should be held when // reading/writing the filterHeaderTip variable above. + // + // NOTE: When using this mutex along with newHeadersMtx at the same + // time, newHeadersMtx should always be acquired first. newFilterHeadersMtx sync.RWMutex // newFilterHeadersSignal is condition variable which will be used to @@ -477,10 +483,10 @@ waitForHeaders: log.Infof("Waiting for more block headers, then will start "+ "cfheaders sync from height %v...", b.filterHeaderTip) - // NOTE: We can grab the filterHeaderTip here without a lock, as this - // is the only goroutine that can modify this value. b.newHeadersSignal.L.Lock() + b.newFilterHeadersMtx.RLock() for !(b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip || b.BlockHeadersSynced()) { + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.Wait() // While we're awake, we'll quickly check to see if we need to @@ -492,7 +498,12 @@ waitForHeaders: default: } + + // Re-acquire the lock in order to check for the filter header + // tip at the next iteration of the loop. + b.newFilterHeadersMtx.RLock() } + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.L.Unlock() // Now that the block headers are finished or ahead of the filter @@ -505,10 +516,12 @@ waitForHeaders: } lastHash := lastHeader.BlockHash() + b.newFilterHeadersMtx.RLock() log.Infof("Starting cfheaders sync from (block_height=%v, "+ "block_hash=%v) to (block_height=%v, block_hash=%v)", b.filterHeaderTip, b.filterHeaderTipHash, lastHeight, lastHeader.BlockHash()) + b.newFilterHeadersMtx.RUnlock() fType := wire.GCSFilterRegular store := b.server.RegFilterHeaders @@ -610,10 +623,13 @@ waitForHeaders: // we also go back to the loop to utilize the faster check pointed // fetching. b.newHeadersMtx.RLock() + b.newFilterHeadersMtx.RLock() if b.filterHeaderTip+wire.CFCheckptInterval <= b.headerTip { + b.newFilterHeadersMtx.RUnlock() b.newHeadersMtx.RUnlock() goto waitForHeaders } + b.newFilterHeadersMtx.RUnlock() b.newHeadersMtx.RUnlock() log.Infof("Fully caught up with cfheaders at height "+ @@ -626,14 +642,12 @@ waitForHeaders: for { // We'll wait until the filter header tip and the header tip // are mismatched. - // - // NOTE: We can grab the filterHeaderTipHash here without a - // lock, as this is the only goroutine that can modify this - // value. b.newHeadersSignal.L.Lock() + b.newFilterHeadersMtx.RLock() for b.filterHeaderTipHash == b.headerTipHash { // We'll wait here until we're woken up by the // broadcast signal. + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.Wait() // Before we proceed, we'll check if we need to exit at @@ -644,8 +658,12 @@ waitForHeaders: return default: } - } + // Re-acquire the lock in order to check for the filter + // header tip at the next iteration of the loop. + b.newFilterHeadersMtx.RLock() + } + b.newFilterHeadersMtx.RUnlock() b.newHeadersSignal.L.Unlock() // At this point, we know that there're a set of new filter @@ -1083,9 +1101,6 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash, func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, store *headerfs.FilterHeaderStore) (*chainhash.Hash, error) { - b.newFilterHeadersMtx.Lock() - defer b.newFilterHeadersMtx.Unlock() - // Check that the PrevFilterHeader is the same as the last stored so we // can prevent misalignment. tip, tipHeight, err := store.ChainTip() @@ -1165,8 +1180,10 @@ func (b *blockManager) writeCFHeadersMsg(msg *wire.MsgCFHeaders, // has changed as well. Unlike the set of notifications above, this is // for sub-system that only need to know the height has changed rather // than know each new header that's been added to the tip. + b.newFilterHeadersMtx.Lock() b.filterHeaderTip = lastHeight b.filterHeaderTipHash = lastHash + b.newFilterHeadersMtx.Unlock() b.newFilterHeadersSignal.Broadcast() return &lastHeader, nil @@ -1946,19 +1963,6 @@ func (b *blockManager) BlockHeadersSynced() bool { return b.syncPeer.LastBlock() >= b.syncPeer.StartingHeight() } -// SynchronizeFilterHeaders allows the caller to execute a function closure -// that depends on synchronization with the current set of filter headers. This -// allows the caller to execute an action that depends on the current filter -// header state, thereby ensuring that the state would shift from underneath -// them. Each execution of the closure will have the current filter header tip -// passed in to ensue that the caller gets a consistent view. -func (b *blockManager) SynchronizeFilterHeaders(f func(uint32) error) error { - b.newFilterHeadersMtx.RLock() - defer b.newFilterHeadersMtx.RUnlock() - - return f(b.filterHeaderTip) -} - // QueueInv adds the passed inv message and peer to the block handling queue. func (b *blockManager) QueueInv(inv *wire.MsgInv, sp *ServerPeer) { // No channel handling here because peers do not need to block on inv @@ -2625,9 +2629,8 @@ func (b *blockManager) NotificationsSinceHeight( height uint32) ([]blockntfns.BlockNtfn, uint32, error) { b.newFilterHeadersMtx.RLock() - defer b.newFilterHeadersMtx.RUnlock() - bestHeight := b.filterHeaderTip + b.newFilterHeadersMtx.RUnlock() // If a height of 0 is provided by the caller, then a backlog of // notifications is not needed.