Skip to content

Commit

Permalink
Merge pull request #1059 from nats-io/fix_1058
Browse files Browse the repository at this point in the history
[FIXED] Don't send message to subscription that has just been closed
  • Loading branch information
kozlovic committed Jun 7, 2020
2 parents e15a1b0 + 8251c30 commit f47739e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/server.go
Expand Up @@ -3952,7 +3952,7 @@ func (s *StanServer) processReplicatedSendAndAck(ssa *spb.SubSentAndAck) {
// are not sent and subscriber is marked as stalled.
// Sub lock should be held before calling.
func (s *StanServer) sendMsgToSub(sub *subState, m *pb.MsgProto, force bool) (bool, bool) {
if sub == nil || m == nil || !sub.initialized || (sub.newOnHold && !m.Redelivered) {
if sub == nil || m == nil || !sub.initialized || sub.ClientID == "" || (sub.newOnHold && !m.Redelivered) {
return false, false
}

Expand Down
103 changes: 103 additions & 0 deletions server/server_delivery_test.go
Expand Up @@ -384,3 +384,106 @@ func TestDeliveryRaceBetweenNextMsgAndStoring(t *testing.T) {
t.Fatal("Timeout!")
}
}

func TestDeliveryStopsWhenSubClosed(t *testing.T) {
s := runServer(t, clusterName)
defer s.Shutdown()

// Use a bare NATS connection to send incorrect requests
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
t.Fatalf("Unexpected error on connect: %v", err)
}
defer nc.Close()

// Get the connect subject
connSubj := fmt.Sprintf("%s.%s", s.opts.DiscoverPrefix, clusterName)
connReq := &pb.ConnectRequest{
ClientID: clientName,
HeartbeatInbox: nats.NewInbox(),
}
crb, _ := connReq.Marshal()
respMsg, err := nc.Request(connSubj, crb, 5*time.Second)
if err != nil {
t.Fatalf("Request error: %v", err)
}
connResponse := &pb.ConnectResponse{}
connResponse.Unmarshal(respMsg.Data)

subSubj := connResponse.SubRequests
subCloseSubj := connResponse.SubCloseRequests

subReq := &pb.SubscriptionRequest{
ClientID: clientName,
MaxInFlight: 1024,
Subject: "foo",
StartPosition: pb.StartPosition_NewOnly,
AckWaitInSecs: 1,
}
subCloseReq := &pb.UnsubscribeRequest{
ClientID: clientName,
Subject: "foo",
}

inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Unable to create nats subscriber: %v", err)
}
// Send a subscription request
subReq.Inbox = inbox
bytes, _ := subReq.Marshal()
msg, err := nc.Request(subSubj, bytes, time.Second)
if err != nil {
t.Fatalf("Error sending subscription request: %v", err)
}
// Parse the response
subReqResp := &pb.SubscriptionResponse{}
if err := subReqResp.Unmarshal(msg.Data); err != nil {
t.Fatalf(" Invalid subscription create response: %v", err)
}
if subReqResp.Error != "" {
t.Fatalf("Received response error: %q", subReqResp.Error)
}

ssub := s.clients.getSubs(clientName)[0]

// Close
subCloseReq.Inbox = subReqResp.AckInbox
bytes, _ = subCloseReq.Marshal()
msg, err = nc.Request(subCloseSubj, bytes, time.Second)
if err != nil {
t.Fatalf("Unable to sending unsub request: %v", err)
}
// Parse the response
subCloseResp := &pb.SubscriptionResponse{}
if err := subCloseResp.Unmarshal(msg.Data); err != nil {
t.Fatalf("Invalid subscription close response: %v", err)
}
if subCloseResp.Error != "" {
t.Fatalf("Error on close: %q", subCloseResp.Error)
}

// Now check that if the server was trying to send a message, the
// message would not be sent and ack timer not setup.
m := &pb.MsgProto{
Subject: "foo",
Sequence: 1,
Data: []byte("hello"),
Timestamp: time.Now().UnixNano(),
}
sent, sendMore := s.sendMsgToSub(ssub, m, false)
if sent || sendMore {
t.Fatalf("Should have returned false and false, but returned %v and %v", sent, sendMore)
}
// Expect to no receive anything on the sub
if msg, err := sub.NextMsg(100 * time.Millisecond); msg != nil || err == nil {
t.Fatalf("No message expected, got %+v", msg)
}
ssub.RLock()
timerSet := ssub.ackTimer != nil
ssub.RUnlock()
if timerSet {
t.Fatal("Ack timer should not be set, but it was")
}
}

0 comments on commit f47739e

Please sign in to comment.