Skip to content

Commit

Permalink
Merge pull request #558 from nats-io/fix_554
Browse files Browse the repository at this point in the history
[FIXED] Request() with UseOldRequest not kicked out on Close()
  • Loading branch information
kozlovic committed Apr 24, 2020
2 parents f7af4f2 + 365fccf commit 076024f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion nats.go
Expand Up @@ -2749,7 +2749,7 @@ func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Ms
inbox := NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, false)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true)
if err != nil {
return nil, err
}
Expand Down
18 changes: 17 additions & 1 deletion test/basic_test.go
Expand Up @@ -572,7 +572,7 @@ func TestOldRequest(t *testing.T) {

response := []byte("I will help you")
nc.Subscribe("foo", func(m *nats.Msg) {
nc.Publish(m.Reply, response)
m.Respond(response)
})
msg, err := nc.Request("foo", []byte("help"), 500*time.Millisecond)
if err != nil {
Expand All @@ -581,6 +581,22 @@ func TestOldRequest(t *testing.T) {
if !bytes.Equal(msg.Data, response) {
t.Fatalf("Received invalid response")
}

// Check that Close() kicks out a Request()
errCh := make(chan error, 1)
start := time.Now()
go func() {
_, err := nc.Request("checkClose", []byte("should be kicked out on close"), time.Second)
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
}
}

func TestRequest(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions test/context_test.go
Expand Up @@ -257,6 +257,35 @@ func testContextRequestWithCancel(t *testing.T, nc *nats.Conn) {
}
}

func TestContextOldRequestClosed(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()

nc, err := nats.Connect(nats.DefaultURL, nats.UseOldRequestStyle())
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer nc.Close()

ctx, cancelCB := context.WithTimeout(context.Background(), time.Second)
defer cancelCB() // should always be called, not discarded, to prevent context leak

errCh := make(chan error, 1)
start := time.Now()
go func() {
_, err = nc.RequestWithContext(ctx, "checkClose", []byte("should be kicked out on close"))
errCh <- err
}()
time.Sleep(100 * time.Millisecond)
nc.Close()
if e := <-errCh; e != nats.ErrConnectionClosed {
t.Fatalf("Unexpected error: %v", err)
}
if dur := time.Since(start); dur >= time.Second {
t.Fatalf("Request took too long to bail out: %v", dur)
}
}

func TestContextRequestWithCancel(t *testing.T) {
s := RunDefaultServer()
defer s.Shutdown()
Expand Down

0 comments on commit 076024f

Please sign in to comment.