Skip to content

Commit

Permalink
Always write SyncFinished events (#67)
Browse files Browse the repository at this point in the history
* Always write SyncFinished events

SyncFinished events are always written to a queue without blocking.
- Subscriber does not block all other sync goroutines to send a SyncFinished event, when readers are busy.
- SyncFinished events are not lost
- Subscriber can continue to update ad chains asynchronously to processing those chains.

Fixes deadlock when:
- sti workers are blocked waiting for sync handler
- sync handler is blocked writing SyncFinished event
- SyncFinished not being read because sti workers busy
  • Loading branch information
gammazero committed Jun 28, 2023
1 parent fd14056 commit 25aed4f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 79 deletions.
90 changes: 47 additions & 43 deletions dagsync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/gammazero/channelqueue"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -68,10 +69,8 @@ type Subscriber struct {
// distributeEvents goroutine.
inEvents chan SyncFinished

// outEventsChans is a slice of channels, where each channel delivers a
// copy of a SyncFinished to an OnSyncFinished reader.
outEventsChans []chan<- SyncFinished
outEventsMutex sync.Mutex
addEventChan chan chan<- SyncFinished
rmEventChan chan chan<- SyncFinished

// closing signals that the Subscriber is closing.
closing chan struct{}
Expand Down Expand Up @@ -208,6 +207,9 @@ func NewSubscriber(host host.Host, ds datastore.Batching, lsys ipld.LinkSystem,
handlers: make(map[peer.ID]*handler),
inEvents: make(chan SyncFinished, 1),

addEventChan: make(chan chan<- SyncFinished),
rmEventChan: make(chan chan<- SyncFinished),

dtSync: dtSync,
httpSync: httpsync.NewSync(lsys, opts.httpClient, blockHook),
syncRecLimit: opts.syncRecLimit,
Expand Down Expand Up @@ -307,14 +309,6 @@ func (s *Subscriber) doClose() error {

err = s.dtSync.Close()

// Dismiss any event readers.
s.outEventsMutex.Lock()
for _, ch := range s.outEventsChans {
close(ch)
}
s.outEventsChans = nil
s.outEventsMutex.Unlock()

// Stop the distribution goroutine.
close(s.inEvents)

Expand All @@ -333,33 +327,23 @@ func (s *Subscriber) OnSyncFinished() (<-chan SyncFinished, context.CancelFunc)
// Channel is buffered to prevent distribute() from blocking if a reader is
// not reading the channel immediately.
log.Info("Configuring subscriber OnSyncFinished...")
ch := make(chan SyncFinished, 1)

s.outEventsMutex.Lock()
s.outEventsChans = append(s.outEventsChans, ch)
s.outEventsMutex.Unlock()
cq := channelqueue.New[SyncFinished](-1)
ch := cq.In()
s.addEventChan <- ch

cncl := func() {
// Drain channel to prevent deadlock if blocked writes are preventing
// the mutex from being unlocked.
go func() {
for range ch {
}
}()
s.outEventsMutex.Lock()
defer s.outEventsMutex.Unlock()
for i, ca := range s.outEventsChans {
if ca == ch {
s.outEventsChans[i] = s.outEventsChans[len(s.outEventsChans)-1]
s.outEventsChans[len(s.outEventsChans)-1] = nil
s.outEventsChans = s.outEventsChans[:len(s.outEventsChans)-1]
close(ch)
break
}
if ch == nil {
return
}
select {
case s.rmEventChan <- ch:
case <-s.closing:
}
ch = nil
}
log.Info("Subscriber OnSyncFinished configured.")
return ch, cncl
return cq.Out(), cncl
}

// RemoveHandler removes a handler for a publisher.
Expand Down Expand Up @@ -525,16 +509,35 @@ func (s *Subscriber) Sync(ctx context.Context, peerInfo peer.AddrInfo, nextCid c
// the even to all channels in outEventsChans. This delivers the SyncFinished
// to all OnSyncFinished channel readers.
func (s *Subscriber) distributeEvents() {
for event := range s.inEvents {
if !event.Cid.Defined() {
panic("SyncFinished event with undefined cid")
}
// Send update to all change notification channels.
s.outEventsMutex.Lock()
for _, ch := range s.outEventsChans {
ch <- event
var outEventsChans []chan<- SyncFinished

for {
select {
case event, ok := <-s.inEvents:
if !ok {
// Dismiss any event readers.
for _, ch := range outEventsChans {
close(ch)
}
return
}
// Send update to all change notification channels.
for _, ch := range outEventsChans {
ch <- event
}
case ch := <-s.addEventChan:
outEventsChans = append(outEventsChans, ch)
case ch := <-s.rmEventChan:
for i, ca := range outEventsChans {
if ca == ch {
outEventsChans[i] = outEventsChans[len(outEventsChans)-1]
outEventsChans[len(outEventsChans)-1] = nil
outEventsChans = outEventsChans[:len(outEventsChans)-1]
close(ch)
break
}
}
}
s.outEventsMutex.Unlock()
}
}

Expand Down Expand Up @@ -682,9 +685,9 @@ func (h *handler) handleAsync(ctx context.Context, nextCid cid.Cid, syncer Synce
// Wait for any other goroutine, for this handler, to finish
// updating the latest sync.
h.syncMutex.Lock()
defer h.syncMutex.Unlock()

if ctx.Err() != nil {
h.syncMutex.Unlock()
log.Warnw("Abandoned pending sync", "err", ctx.Err(), "publisher", h.peerID)
return
}
Expand All @@ -698,6 +701,7 @@ func (h *handler) handleAsync(ctx context.Context, nextCid cid.Cid, syncer Synce
h.qlock.Unlock()

syncCount, err := h.handle(ctx, c, h.subscriber.dss, true, syncer, h.subscriber.generalBlockHook, h.subscriber.segDepthLimit)
h.syncMutex.Unlock()
if err != nil {
// Failed to handle the sync, so allow another announce for the same CID.
h.subscriber.receiver.UncacheCid(c)
Expand Down
54 changes: 18 additions & 36 deletions dagsync/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/p2psender"
"github.com/ipni/go-libipni/dagsync"
Expand Down Expand Up @@ -509,7 +508,7 @@ func TestHttpPeerAddrPeerstore(t *testing.T) {
require.NoError(t, err)
}

func TestBackpressureDoesntDeadlock(t *testing.T) {
func TestSyncFinishedAlwaysDelivered(t *testing.T) {
t.Parallel()
pubHostSys := newHostSystem(t)
subHostSys := newHostSystem(t)
Expand All @@ -534,7 +533,7 @@ func TestBackpressureDoesntDeadlock(t *testing.T) {
Seed: 2,
}.BuildWithPrev(t, pubHostSys.lsys, nextLL)

// Purposefully not pulling from this channel yet to create backpressure
// Purposefully not pulling from this channel yet to build up events.
onSyncFinishedChan, cncl := sub.OnSyncFinished()
defer cncl()

Expand Down Expand Up @@ -575,52 +574,35 @@ func TestBackpressureDoesntDeadlock(t *testing.T) {
syncDoneCh <- err
}()

specificSyncDoneCh := make(chan error)
go func() {
// A sleep so that the upper sync starts first
time.Sleep(time.Second)
sel := dagsync.ExploreRecursiveWithStopNode(selector.RecursionLimitDepth(1), nil, nil)
_, err = sub.Sync(context.Background(), peerInfo, nextLL.(cidlink.Link).Cid, sel)
specificSyncDoneCh <- err
}()

timer := time.NewTimer(10 * time.Second)
select {
case <-syncDoneCh:
t.Fatal("sync should not have finished because it should be blocked by backpressure on the onSyncFinishedChan")
case err := <-specificSyncDoneCh:
// This sync should finish because it's not blocked by backpressure. This is
// because it's a simple sync not trying to setLatest. This is the kind of
// sync a user will call explicitly, so it should not be blocked by the
// backpressure of SyncFinishedEvent (it doesn't emit a SyncFinishedEvent).
// Sync should have finished SyncFinished events are always delivered.
require.NoError(t, err)
case <-time.After(10 * time.Second):
case <-timer.C:
t.Fatal("timed out waiting for sync to finish")
}
timer.Stop()

// Now pull from onSyncFinishedChan
timer.Reset(2 * time.Second)
select {
case <-onSyncFinishedChan:
default:
t.Fatal("Expected event to be ready to read from onSyncFinishedChan")
case <-timer.C:
t.Fatal("did not get event from onSyncFinishedChan")
}
emptySyncFinishedChan(onSyncFinishedChan)

// Now the syncDoneCh should be able to proceed
err = <-syncDoneCh
require.NoError(t, err)

// So that we can close properly
emptySyncFinishedChan(onSyncFinishedChan)
}

func emptySyncFinishedChan(ch <-chan dagsync.SyncFinished) {
for {
var count int
for done := false; !done; {
select {
case <-ch:
default:
return
case <-onSyncFinishedChan:
count++
case <-timer.C:
done = true
}
}
timer.Stop()

require.Equal(t, 3, count)
}

func waitForSync(t *testing.T, logPrefix string, store *dssync.MutexDatastore, expectedCid cidlink.Link, watcher <-chan dagsync.SyncFinished) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc7
github.com/gammazero/channelqueue v0.2.1
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
Expand Down Expand Up @@ -47,6 +48,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.0 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJn
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gammazero/channelqueue v0.2.1 h1:AcK6wnLrj8koTTn3RxjRCyfmS677TjhIZb1FSMi14qc=
github.com/gammazero/channelqueue v0.2.1/go.mod h1:824o5HHE+yO1xokh36BIuSv8YWwXW0364ku91eRMFS4=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
Expand Down

0 comments on commit 25aed4f

Please sign in to comment.