From 1563eb7a6baa8b77ec9d410cb4e7364d3de060b0 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 3 Sep 2019 15:00:54 -0600 Subject: [PATCH] [FIXED] Ensure Subscription response sent prior to messages Original issue was addressed in PR #247, but unfortunately the fix did not take into account durable subscriptions. If a durable subscription is restarted, the subscription object is in memory and would have the flag "initialized" already set to true, which would not prevent published messages at the time of the subscription request processing to allow messages to be sent prior to the subscription response. Resolves #930 Signed-off-by: Ivan Kozlovic --- server/server.go | 27 +++++---- server/server_test.go | 127 ++++++++++++++++++++++++++++-------------- 2 files changed, 101 insertions(+), 53 deletions(-) diff --git a/server/server.go b/server/server.go index d17235db..5022abaa 100644 --- a/server/server.go +++ b/server/server.go @@ -4584,18 +4584,16 @@ func (s *StanServer) addSubscription(ss *subStore, sub *subState) error { } // updateDurable adds back `sub` to the client and updates the store. -// No lock is needed for `sub` since it has just been created. -func (s *StanServer) updateDurable(ss *subStore, sub *subState) error { - // Reset the hasFailedHB boolean since it may have been set - // if the client previously crashed and server set this - // flag to its subs. - sub.hasFailedHB = false +func (s *StanServer) updateDurable(ss *subStore, sub *subState, clientID string) error { // Store in the client - if !s.clients.addSub(sub.ClientID, sub) { - return fmt.Errorf("can't find clientID: %v", sub.ClientID) + if !s.clients.addSub(clientID, sub) { + return fmt.Errorf("can't find clientID: %v", clientID) } // Update this subscription in the store - if err := sub.store.UpdateSub(&sub.SubState); err != nil { + sub.Lock() + err := sub.store.UpdateSub(&sub.SubState) + sub.Unlock() + if err != nil { return err } ss.Lock() @@ -4708,11 +4706,20 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox if s.isClustered { sub.norepl = false } + // Reset the hasFailedHB boolean since it may have been set + // if the client previously crashed and server set this + // flag to its subs. + sub.hasFailedHB = false + // Reset initialized for restarting durable subscription. + // Without this (and if there is no message to be redelivered, + // which would set newOnHold), we could be sending avail + // messages before the response has been sent to client library. + sub.initialized = false sub.Unlock() // Case of restarted durable subscriber, or first durable queue // subscriber re-joining a group that was left with pending messages. - err = s.updateDurable(ss, sub) + err = s.updateDurable(ss, sub, sr.ClientID) } else { subIsNew = true // Create sub here (can be plain, durable or queue subscriber) diff --git a/server/server_test.go b/server/server_test.go index 3c367b32..0327e30a 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1172,7 +1172,7 @@ func TestMsgsNotSentToSubBeforeSubReqResponse(t *testing.T) { connResponse.Unmarshal(respMsg.Data) subSubj := connResponse.SubRequests - unsubSubj := connResponse.UnsubRequests + subCloseSubj := connResponse.SubCloseRequests pubSubj := connResponse.PubPrefix + ".foo" pubMsg := &pb.PubMsg{ @@ -1184,53 +1184,94 @@ func TestMsgsNotSentToSubBeforeSubReqResponse(t *testing.T) { ClientID: clientName, MaxInFlight: 1024, Subject: "foo", - StartPosition: pb.StartPosition_First, - AckWaitInSecs: 30, + StartPosition: pb.StartPosition_NewOnly, + AckWaitInSecs: 1, } - unsubReq := &pb.UnsubscribeRequest{ + subCloseReq := &pb.UnsubscribeRequest{ ClientID: clientName, Subject: "foo", } - for i := 0; i < 100; i++ { - // Use the same subscriber for subscription request response and data, - // so we can reliably check if data comes before response. - inbox := nats.NewInbox() - sub, err := nc.SubscribeSync(inbox) - if err != nil { - t.Fatalf("Unable to create nats subscriber: %v", err) - } - subReq.Inbox = inbox - bytes, _ := subReq.Marshal() - // Send the request with inbox as the Reply - if err := nc.PublishRequest(subSubj, inbox, bytes); err != nil { - t.Fatalf("Error sending request: %v", err) - } - // Followed by a data message - pubMsg.Guid = nuid.Next() - bytes, _ = pubMsg.Marshal() - if err := nc.Publish(pubSubj, bytes); err != nil { - t.Fatalf("Error sending msg: %v", err) - } - nc.Flush() - // Dequeue - msg, err := sub.NextMsg(2 * time.Second) - if err != nil { - t.Fatalf("Did not get our message: %v", err) - } - // It should always be the subscription response!!! - msgProto := &pb.MsgProto{} - err = msgProto.Unmarshal(msg.Data) - if err == nil && msgProto.Sequence != 0 { - t.Fatalf("Iter=%v - Did not receive valid subscription response: %#v - %v", (i + 1), msgProto, err) - } - subReqResp := &pb.SubscriptionResponse{} - subReqResp.Unmarshal(msg.Data) - unsubReq.Inbox = subReqResp.AckInbox - bytes, _ = unsubReq.Marshal() - if err := nc.Publish(unsubSubj, bytes); err != nil { - t.Fatalf("Unable to send unsub request: %v", err) + tsubQueueName := []string{"", "queue"} + tsubDurName := []string{"", "dur"} + for _, qname := range tsubQueueName { + for _, dname := range tsubDurName { + for i := 0; i < 2; i++ { + // Use the same subscriber for subscription request response and data, + // so we can reliably check if data comes before response. + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("Unable to create nats subscriber: %v", err) + } + subReq.Inbox = inbox + subReq.QGroup = qname + subReq.DurableName = dname + bytes, _ := subReq.Marshal() + // Send the request with inbox as the Reply + if err := nc.PublishRequest(subSubj, inbox, bytes); err != nil { + t.Fatalf("Error sending request: %v", err) + } + // Followed by a data message + pubMsg.Guid = nuid.Next() + bytes, _ = pubMsg.Marshal() + if err := nc.Publish(pubSubj, bytes); err != nil { + t.Fatalf("Error sending msg: %v", err) + } + nc.Flush() + // Dequeue + msg, err := sub.NextMsg(2 * time.Second) + if err != nil { + t.Fatalf("Did not get our message: %v", err) + } + // It should always be the subscription response!!! + msgProto := &pb.MsgProto{} + err = msgProto.Unmarshal(msg.Data) + if err == nil && msgProto.Sequence != 0 { + t.Fatalf("Queue %q - Durable %q - Invalid subscription create response: %#v - %v", + qname, dname, msgProto, err) + } + subReqResp := &pb.SubscriptionResponse{} + if err := subReqResp.Unmarshal(msg.Data); err != nil { + t.Fatalf("Queue %q - Durable %q - Invalid subscription create response: %v", + qname, dname, err) + } + if subReqResp.Error != "" { + t.Fatalf("Received response error: %q", subReqResp.Error) + } + // If the message arrived just before the + // subscription request, and since we ask for + // new-only, it is possible that there is no + // message. Asking for all avail is not good for + // this test since - for durables - it would test + // the newOnHold more than the initialized flag. + msg, err = sub.NextMsg(time.Second) + if err == nil { + if err := msgProto.Unmarshal(msg.Data); err != nil { + t.Fatalf("Invalid message: %v", err) + } + ackReq := &pb.Ack{ + Sequence: msgProto.Sequence, + Subject: msgProto.Subject, + } + bytes, _ = ackReq.Marshal() + nc.Publish(subReqResp.AckInbox, bytes) + } + subCloseReq.Inbox = subReqResp.AckInbox + bytes, _ = subCloseReq.Marshal() + msg, err = nc.Request(subCloseSubj, bytes, 2*time.Second) + if err != nil { + t.Fatalf("Unable to send unsub request: %v", err) + } + subCloseResp := &pb.SubscriptionResponse{} + if err := subCloseResp.Unmarshal(msg.Data); err != nil { + t.Fatalf("Invalid subscription close response: %v", err) + } + if subCloseResp.Error != "" { + t.Fatalf("Error on close: %q", subCloseResp.Error) + } + sub.Unsubscribe() + } } - sub.Unsubscribe() } }