Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Use deadlock-detecting mutexes and waitgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Oct 3, 2021
1 parent 2b51297 commit 043c070
Show file tree
Hide file tree
Showing 18 changed files with 54 additions and 47 deletions.
12 changes: 9 additions & 3 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"errors"
"fmt"

"sync"
"time"

delay "github.com/ipfs/go-ipfs-delay"
"github.com/sasha-s/go-deadlock"

deciface "github.com/ipfs/go-bitswap/decision"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
Expand Down Expand Up @@ -58,6 +58,12 @@ var (
timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
)

func init() {
deadlock.Opts.OnPotentialDeadlock = func() {}
deadlock.Opts.DeadlockTimeout = 1 * time.Minute
deadlock.Opts.MaxMapSize = 1024 * 640
}

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Bitswap)
Expand Down Expand Up @@ -325,7 +331,7 @@ type Bitswap struct {
process process.Process

// Counters for various statistics
counterLk sync.Mutex
counterLk deadlock.Mutex
counters *counters

// Metrics interface metrics
Expand Down Expand Up @@ -585,7 +591,7 @@ func (bs *Bitswap) updateReceiveCounters(blocks []blocks.Block) {
func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
res := make([]bool, len(blks))

wg := sync.WaitGroup{}
wg := deadlock.WaitGroup{}
for i, block := range blks {
wg.Add(1)
go func(i int, b blocks.Block) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multistream v0.2.2
github.com/sasha-s/go-deadlock v0.3.1
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.16.0
)
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -763,6 +765,8 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
Expand Down
5 changes: 2 additions & 3 deletions internal/blockpresencemanager/blockpresencemanager.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package blockpresencemanager

import (
"sync"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/sasha-s/go-deadlock"
)

// BlockPresenceManager keeps track of which peers have indicated that they
// have or explicitly don't have a block
type BlockPresenceManager struct {
sync.RWMutex
deadlock.RWMutex
presence map[cid.Cid]map[peer.ID]bool
}

Expand Down
8 changes: 4 additions & 4 deletions internal/decision/blockstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package decision
import (
"context"
"fmt"
"sync"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
"github.com/sasha-s/go-deadlock"
)

// blockstoreManager maintains a pool of workers that make requests to the blockstore.
Expand Down Expand Up @@ -83,7 +83,7 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (
return res, nil
}

var lk sync.Mutex
var lk deadlock.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
Expand All @@ -105,7 +105,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[
return res, nil
}

var lk sync.Mutex
var lk deadlock.Mutex
return res, bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
Expand All @@ -123,7 +123,7 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[

func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) error {
var err error
wg := sync.WaitGroup{}
wg := deadlock.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
Expand Down
8 changes: 4 additions & 4 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ package decision
import (
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/sasha-s/go-deadlock"

bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
Expand Down Expand Up @@ -142,7 +142,7 @@ type Engine struct {

tagQueued, tagUseful string

lock sync.RWMutex // protects the fields immediately below
lock deadlock.RWMutex // protects the fields immediately below

// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger
Expand All @@ -155,7 +155,7 @@ type Engine struct {

ticker *time.Ticker

taskWorkerLock sync.Mutex
taskWorkerLock deadlock.Mutex
taskWorkerCount int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
Expand All @@ -173,7 +173,7 @@ type Engine struct {
activeGauge metrics.Gauge

// used to ensure metrics are reported each fixed number of operation
metricsLock sync.Mutex
metricsLock deadlock.Mutex
metricUpdateCounter int
}

Expand Down
5 changes: 2 additions & 3 deletions internal/decision/ledger.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package decision

import (
"sync"

pb "github.com/ipfs/go-bitswap/message/pb"
wl "github.com/ipfs/go-bitswap/wantlist"
"github.com/sasha-s/go-deadlock"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -25,7 +24,7 @@ type ledger struct {
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist

lk sync.RWMutex
lk deadlock.RWMutex
}

func (l *ledger) Wants(k cid.Cid, priority int32, wantType pb.Message_Wantlist_WantType) {
Expand Down
6 changes: 3 additions & 3 deletions internal/decision/scoreledger.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package decision

import (
"sync"
"time"

"github.com/benbjohnson/clock"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/sasha-s/go-deadlock"
)

const (
Expand Down Expand Up @@ -55,7 +55,7 @@ type scoreledger struct {
exchangeCount uint64

// the record lock
lock sync.RWMutex
lock deadlock.RWMutex

clock clock.Clock
}
Expand Down Expand Up @@ -110,7 +110,7 @@ type DefaultScoreLedger struct {
// is closed on Close
closing chan struct{}
// protects the fields immediatly below
lock sync.RWMutex
lock deadlock.RWMutex
// ledgerMap lists score ledgers by their partner key.
ledgerMap map[peer.ID]*scoreledger
// how frequently the engine should sample peer usefulness
Expand Down
4 changes: 2 additions & 2 deletions internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package messagequeue

import (
"context"
"sync"
"time"

"github.com/benbjohnson/clock"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/sasha-s/go-deadlock"
)

const (
Expand Down Expand Up @@ -73,7 +73,7 @@ type dontHaveTimeoutMgr struct {
maxExpectedWantProcessTime time.Duration

// All variables below here must be protected by the lock
lk sync.RWMutex
lk deadlock.RWMutex
// has the timeout manager started
started bool
// wants that are active (waiting for a response or timeout)
Expand Down
6 changes: 3 additions & 3 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package messagequeue
import (
"context"
"math"
"sync"
"time"

"github.com/benbjohnson/clock"
Expand All @@ -15,6 +14,7 @@ import (
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/sasha-s/go-deadlock"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -83,15 +83,15 @@ type MessageQueue struct {
responses chan []cid.Cid

// Take lock whenever any of these variables are modified
wllock sync.Mutex
wllock deadlock.Mutex
bcstWants recallWantlist
peerWants recallWantlist
cancels *cid.Set
priority int32

// Dont touch any of these variables outside of run loop
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastIntervalLk deadlock.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
Expand Down
4 changes: 2 additions & 2 deletions internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package notifications

import (
"context"
"sync"

pubsub "github.com/cskr/pubsub"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/sasha-s/go-deadlock"
)

const bufferSize = 16
Expand All @@ -29,7 +29,7 @@ func New() PubSub {
}

type impl struct {
lk sync.RWMutex
lk deadlock.RWMutex
wrapped pubsub.PubSub

closed chan struct{}
Expand Down
6 changes: 3 additions & 3 deletions internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package peermanager

import (
"context"
"sync"

logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"
"github.com/sasha-s/go-deadlock"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -34,15 +34,15 @@ type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
// PeerManager manages a pool of peers and sends messages to peers in the pool.
type PeerManager struct {
// sync access to peerQueues and peerWantManager
pqLk sync.RWMutex
pqLk deadlock.RWMutex
// peerQueues -- interact through internal utility functions get/set/remove/iterate
peerQueues map[peer.ID]PeerQueue
pwm *peerWantManager

createPeerQueue PeerQueueFactory
ctx context.Context

psLk sync.RWMutex
psLk deadlock.RWMutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

Expand Down
6 changes: 3 additions & 3 deletions internal/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package providerquerymanager
import (
"context"
"fmt"
"sync"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/sasha-s/go-deadlock"
)

var log = logging.Logger("bitswap")
Expand Down Expand Up @@ -77,7 +77,7 @@ type ProviderQueryManager struct {
incomingFindProviderRequests chan *findProviderRequest

findProviderTimeout time.Duration
timeoutMutex sync.RWMutex
timeoutMutex deadlock.RWMutex

// do not touch outside the run loop
inProgressRequestStatuses map[cid.Cid]*inProgressRequestStatus
Expand Down Expand Up @@ -232,7 +232,7 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
wg := &deadlock.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
Expand Down
5 changes: 2 additions & 3 deletions internal/sessioninterestmanager/sessioninterestmanager.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package sessioninterestmanager

import (
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/sasha-s/go-deadlock"

cid "github.com/ipfs/go-cid"
)

// SessionInterestManager records the CIDs that each session is interested in.
type SessionInterestManager struct {
lk sync.RWMutex
lk deadlock.RWMutex
wants map[cid.Cid]map[uint64]bool
}

Expand Down
Loading

0 comments on commit 043c070

Please sign in to comment.