Skip to content

Commit

Permalink
[FIXED] Sending publish async completion signal after reconnect (#1391)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Sep 12, 2023
1 parent 6740b12 commit 3acdc37
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
4 changes: 4 additions & 0 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ func (js *jetStream) resetPendingAcksOnReconnect() {
paf.err = nats.ErrDisconnected
}
js.publisher.acks = nil
if js.publisher.doneCh != nil {
close(js.publisher.doneCh)
js.publisher.doneCh = nil
}
js.publisher.Unlock()
}
}
Expand Down
37 changes: 37 additions & 0 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,43 @@ func TestPublishMsgAsyncWithPendingMsgs(t *testing.T) {
t.Fatalf("Expected error: %v; got: %v", jetstream.ErrTooManyStalledMsgs, err)
}
})

t.Run("with server restart", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

go func() {
for i := 0; i < 50; i++ {
_, _ = js.PublishAsync("FOO.1", []byte("msg"))
}
}()
srv = restartBasicJSServer(t, srv)
defer shutdownJSServerAndRemoveStorage(t, srv)

select {
case <-js.PublishAsyncComplete():
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive completion signal")
}
})
}

func TestPublishAsyncResetPendingOnReconnect(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,10 @@ func (js *js) resetPendingAcksOnReconnect() {
paf.err = ErrDisconnected
}
js.pafs = nil
if js.dch != nil {
close(js.dch)
js.dch = nil
}
js.mu.Unlock()
}
}
Expand Down

0 comments on commit 3acdc37

Please sign in to comment.