Skip to content

Commit

Permalink
Fix sync finished notice cancel
Browse files Browse the repository at this point in the history
- Fix possible deadlock when cancelling sync finished event notifications with unread notifications.
- Add unit test to test deadlock fix.
- Update libp2p to get fixes.
  • Loading branch information
gammazero committed Apr 17, 2023
1 parent 82e2352 commit 8213ff1
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 306 deletions.
6 changes: 6 additions & 0 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)
s.outEventsMutex.Unlock()

cncl := func() {
// Drain channel to prevent deadlock if there are blocked writes to the
// channel.
go func() {
for range ch {
}
}()
s.outEventsMutex.Lock()
defer s.outEventsMutex.Unlock()
for i, ca := range s.outEventsChans {
Expand Down
71 changes: 70 additions & 1 deletion dagsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,75 @@ func TestAnnounce(t *testing.T) {
require.NoError(t, err)
}

func TestCancelDeadlock(t *testing.T) {
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())
srcHost := test.MkTestHost()
srcLnkS := test.MkLinkSystem(srcStore)
dstHost := test.MkTestHost()
defer srcHost.Close()
defer dstHost.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic)
require.NoError(t, err)
defer pub.Close()

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, nil)
require.NoError(t, err)
defer sub.Close()

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher, cncl := sub.OnSyncFinished()

// Store the whole chain in source node
chainLnks := test.MkChain(srcLnkS, true)

c := chainLnks[2].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), c)
require.NoError(t, err)

peerInfo := peer.AddrInfo{
ID: srcHost.ID(),
Addrs: srcHost.Addrs(),
}
_, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil)
require.NoError(t, err)
// Now there should be an event on the watcher channel.

c = chainLnks[1].(cidlink.Link).Cid
err = pub.SetRoot(context.Background(), c)
require.NoError(t, err)

_, err = sub.Sync(context.Background(), peerInfo, cid.Undef, nil)
require.NoError(t, err)
// Now the event dispatcher is blocked writing to the watcher channel.

// It is necessary to wait a bit for the event dispatcher to block.
time.Sleep(time.Second)

// Cancel watching for sync finished. This should unblock the event
// dispatcher, otherwise it will deadlock.
done := make(chan struct{})
go func() {
cncl()
close(done)
}()

select {
case <-done:
case <-time.After(5 * time.Second):
// Drain the watcher so sub can close and test can exit.
for range watcher {
}
t.Fatal("cancel did not return")
}
}

func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore datastore.Batching, watcher <-chan dagsync.SyncFinished, peerID peer.ID, peerAddrs []multiaddr.Multiaddr, lnk ipld.Link, expectedSync cid.Cid) error {
var err error
c := lnk.(cidlink.Link).Cid
Expand All @@ -404,7 +473,7 @@ func newAnnounceTest(pub dagsync.Publisher, sub *dagsync.Subscriber, dstStore da
return errors.New("timed out waiting for sync to propagate")
case downstream, open := <-watcher:
if !open {
return errors.New("event channle closed without receiving event")
return errors.New("event channel closed without receiving event")
}
if !downstream.Cid.Equals(c) {
return fmt.Errorf("sync returned unexpected cid %s, expected %s", downstream.Cid, c)
Expand Down
62 changes: 31 additions & 31 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,30 @@ require (
github.com/ipfs/go-graphsync v0.14.5
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.20.0
github.com/libp2p/go-libp2p v0.26.4
github.com/libp2p/go-libp2p v0.27.1
github.com/libp2p/go-libp2p-gostream v0.6.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-msgio v0.3.0
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multiaddr v0.9.0
github.com/multiformats/go-multicodec v0.8.1
github.com/multiformats/go-multihash v0.2.1
github.com/multiformats/go-multistream v0.4.1
github.com/multiformats/go-varint v0.0.7
github.com/stretchr/testify v1.8.2
github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa
golang.org/x/crypto v0.6.0
golang.org/x/crypto v0.7.0
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
google.golang.org/protobuf v1.28.1
google.golang.org/protobuf v1.30.0
)

require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bep/debounce v1.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -49,19 +49,19 @@ require (
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/google/pprof v0.0.0-20230405160723-4a4c7d95572b // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hannahhoward/cbor-gen-for v0.0.0-20230214144701-5d17c9d5243c // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/huin/goupnp v1.1.0 // indirect
github.com/ipfs/go-block-format v0.0.3 // indirect
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
Expand All @@ -74,30 +74,30 @@ require (
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/koron/go-ssdp v0.0.3 // indirect
github.com/klauspost/compress v1.16.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/libp2p/go-reuseport v0.2.0 // indirect
github.com/libp2p/go-yamux/v4 v4.0.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-isatty v0.0.18 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/miekg/dns v1.1.53 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/onsi/ginkgo/v2 v2.5.1 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/onsi/ginkgo/v2 v2.9.2 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
Expand All @@ -106,11 +106,11 @@ require (
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.2.1 // indirect
github.com/quic-go/qtls-go1-20 v0.1.1 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
github.com/quic-go/quic-go v0.33.0 // indirect
github.com/quic-go/webtransport-go v0.5.2 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
Expand All @@ -122,17 +122,17 @@ require (
go.opentelemetry.io/otel v1.3.0 // indirect
go.opentelemetry.io/otel/trace v1.3.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/fx v1.18.2 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
nhooyr.io/websocket v1.8.7 // indirect
Expand Down
Loading

0 comments on commit 8213ff1

Please sign in to comment.