Skip to content

Commit

Permalink
Merge 1563eb7 into 0d12646
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 3, 2019
2 parents 0d12646 + 1563eb7 commit 633675b
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 53 deletions.
27 changes: 17 additions & 10 deletions server/server.go
Expand Up @@ -4584,18 +4584,16 @@ func (s *StanServer) addSubscription(ss *subStore, sub *subState) error {
}

// updateDurable adds back `sub` to the client and updates the store.
// No lock is needed for `sub` since it has just been created.
func (s *StanServer) updateDurable(ss *subStore, sub *subState) error {
// Reset the hasFailedHB boolean since it may have been set
// if the client previously crashed and server set this
// flag to its subs.
sub.hasFailedHB = false
func (s *StanServer) updateDurable(ss *subStore, sub *subState, clientID string) error {
// Store in the client
if !s.clients.addSub(sub.ClientID, sub) {
return fmt.Errorf("can't find clientID: %v", sub.ClientID)
if !s.clients.addSub(clientID, sub) {
return fmt.Errorf("can't find clientID: %v", clientID)
}
// Update this subscription in the store
if err := sub.store.UpdateSub(&sub.SubState); err != nil {
sub.Lock()
err := sub.store.UpdateSub(&sub.SubState)
sub.Unlock()
if err != nil {
return err
}
ss.Lock()
Expand Down Expand Up @@ -4708,11 +4706,20 @@ func (s *StanServer) processSub(c *channel, sr *pb.SubscriptionRequest, ackInbox
if s.isClustered {
sub.norepl = false
}
// Reset the hasFailedHB boolean since it may have been set
// if the client previously crashed and server set this
// flag to its subs.
sub.hasFailedHB = false
// Reset initialized for restarting durable subscription.
// Without this (and if there is no message to be redelivered,
// which would set newOnHold), we could be sending avail
// messages before the response has been sent to client library.
sub.initialized = false
sub.Unlock()

// Case of restarted durable subscriber, or first durable queue
// subscriber re-joining a group that was left with pending messages.
err = s.updateDurable(ss, sub)
err = s.updateDurable(ss, sub, sr.ClientID)
} else {
subIsNew = true
// Create sub here (can be plain, durable or queue subscriber)
Expand Down
127 changes: 84 additions & 43 deletions server/server_test.go
Expand Up @@ -1172,7 +1172,7 @@ func TestMsgsNotSentToSubBeforeSubReqResponse(t *testing.T) {
connResponse.Unmarshal(respMsg.Data)

subSubj := connResponse.SubRequests
unsubSubj := connResponse.UnsubRequests
subCloseSubj := connResponse.SubCloseRequests
pubSubj := connResponse.PubPrefix + ".foo"

pubMsg := &pb.PubMsg{
Expand All @@ -1184,53 +1184,94 @@ func TestMsgsNotSentToSubBeforeSubReqResponse(t *testing.T) {
ClientID: clientName,
MaxInFlight: 1024,
Subject: "foo",
StartPosition: pb.StartPosition_First,
AckWaitInSecs: 30,
StartPosition: pb.StartPosition_NewOnly,
AckWaitInSecs: 1,
}
unsubReq := &pb.UnsubscribeRequest{
subCloseReq := &pb.UnsubscribeRequest{
ClientID: clientName,
Subject: "foo",
}
for i := 0; i < 100; i++ {
// Use the same subscriber for subscription request response and data,
// so we can reliably check if data comes before response.
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Unable to create nats subscriber: %v", err)
}
subReq.Inbox = inbox
bytes, _ := subReq.Marshal()
// Send the request with inbox as the Reply
if err := nc.PublishRequest(subSubj, inbox, bytes); err != nil {
t.Fatalf("Error sending request: %v", err)
}
// Followed by a data message
pubMsg.Guid = nuid.Next()
bytes, _ = pubMsg.Marshal()
if err := nc.Publish(pubSubj, bytes); err != nil {
t.Fatalf("Error sending msg: %v", err)
}
nc.Flush()
// Dequeue
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatalf("Did not get our message: %v", err)
}
// It should always be the subscription response!!!
msgProto := &pb.MsgProto{}
err = msgProto.Unmarshal(msg.Data)
if err == nil && msgProto.Sequence != 0 {
t.Fatalf("Iter=%v - Did not receive valid subscription response: %#v - %v", (i + 1), msgProto, err)
}
subReqResp := &pb.SubscriptionResponse{}
subReqResp.Unmarshal(msg.Data)
unsubReq.Inbox = subReqResp.AckInbox
bytes, _ = unsubReq.Marshal()
if err := nc.Publish(unsubSubj, bytes); err != nil {
t.Fatalf("Unable to send unsub request: %v", err)
tsubQueueName := []string{"", "queue"}
tsubDurName := []string{"", "dur"}
for _, qname := range tsubQueueName {
for _, dname := range tsubDurName {
for i := 0; i < 2; i++ {
// Use the same subscriber for subscription request response and data,
// so we can reliably check if data comes before response.
inbox := nats.NewInbox()
sub, err := nc.SubscribeSync(inbox)
if err != nil {
t.Fatalf("Unable to create nats subscriber: %v", err)
}
subReq.Inbox = inbox
subReq.QGroup = qname
subReq.DurableName = dname
bytes, _ := subReq.Marshal()
// Send the request with inbox as the Reply
if err := nc.PublishRequest(subSubj, inbox, bytes); err != nil {
t.Fatalf("Error sending request: %v", err)
}
// Followed by a data message
pubMsg.Guid = nuid.Next()
bytes, _ = pubMsg.Marshal()
if err := nc.Publish(pubSubj, bytes); err != nil {
t.Fatalf("Error sending msg: %v", err)
}
nc.Flush()
// Dequeue
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Fatalf("Did not get our message: %v", err)
}
// It should always be the subscription response!!!
msgProto := &pb.MsgProto{}
err = msgProto.Unmarshal(msg.Data)
if err == nil && msgProto.Sequence != 0 {
t.Fatalf("Queue %q - Durable %q - Invalid subscription create response: %#v - %v",
qname, dname, msgProto, err)
}
subReqResp := &pb.SubscriptionResponse{}
if err := subReqResp.Unmarshal(msg.Data); err != nil {
t.Fatalf("Queue %q - Durable %q - Invalid subscription create response: %v",
qname, dname, err)
}
if subReqResp.Error != "" {
t.Fatalf("Received response error: %q", subReqResp.Error)
}
// If the message arrived just before the
// subscription request, and since we ask for
// new-only, it is possible that there is no
// message. Asking for all avail is not good for
// this test since - for durables - it would test
// the newOnHold more than the initialized flag.
msg, err = sub.NextMsg(time.Second)
if err == nil {
if err := msgProto.Unmarshal(msg.Data); err != nil {
t.Fatalf("Invalid message: %v", err)
}
ackReq := &pb.Ack{
Sequence: msgProto.Sequence,
Subject: msgProto.Subject,
}
bytes, _ = ackReq.Marshal()
nc.Publish(subReqResp.AckInbox, bytes)
}
subCloseReq.Inbox = subReqResp.AckInbox
bytes, _ = subCloseReq.Marshal()
msg, err = nc.Request(subCloseSubj, bytes, 2*time.Second)
if err != nil {
t.Fatalf("Unable to send unsub request: %v", err)
}
subCloseResp := &pb.SubscriptionResponse{}
if err := subCloseResp.Unmarshal(msg.Data); err != nil {
t.Fatalf("Invalid subscription close response: %v", err)
}
if subCloseResp.Error != "" {
t.Fatalf("Error on close: %q", subCloseResp.Error)
}
sub.Unsubscribe()
}
}
sub.Unsubscribe()
}
}

Expand Down

0 comments on commit 633675b

Please sign in to comment.