Skip to content

Commit

Permalink
[IMPROVED] Better attempt at delivering messages to routed queue subs
Browse files Browse the repository at this point in the history
This PR is based out of #633. It imroves parsing QRSID so that the
TestRouteQueueSemantics test now passes (when dealing with malformed
QRSID).
A test similar to what is reported in #632 was also added. This
test however, uncovers a race condition that will be fixed in a
separate PR.

Resolves #632
  • Loading branch information
kozlovic committed Mar 9, 2018
1 parent 9d7dda1 commit ee8f001
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 31 deletions.
21 changes: 11 additions & 10 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,23 +1141,27 @@ func (c *client) processMsg(msg []byte) {
// since they are sent direct via L2 semantics. If the match is a queue
// subscription, we will return from here regardless if we find a sub.
if isRoute {
if sub, ok := srv.routeSidQueueSubscriber(c.pa.sid); ok {
isQueue, sub, err := srv.routeSidQueueSubscriber(c.pa.sid)
if isQueue {
// We got an invalid QRSID, so stop here
if err != nil {
c.Errorf("Unable to deliver messaage: %v", err)
return
}
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if c.deliverMsg(sub, mh, msg) {
return
}
}
isRouteQsub = true
// for queue subscription try hard to deliver a message at least once.
// Right now we know fo sure that it's a queue subscription and
// At this point we know fo sure that it's a queue subscription and
// we didn't make a delivery attempt, because either a subscriber limit
// was exceeded or a subscription is already gone.
// So, let a code below find yet another matching subscription.
// So, let the code below find yet another matching subscription.
// We are at risk that a message might go forth and back
// between routes during these attempts, but at the end
// it shall either be delivered (at most once) or drop.
c.Debugf("Re-sending message of a detached queue sid %s", c.pa.sid)
}
}

Expand Down Expand Up @@ -1218,15 +1222,12 @@ func (c *client) processMsg(msg []byte) {
// Iterate over all subscribed clients starting at a random index
// until we find one that's able to deliver a message.
// Drop a message on the floor if there are noone.
start_index := c.cache.prand.Intn(len(qsubs))
startIndex := c.cache.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (start_index + i) % len(qsubs)
index := (startIndex + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if isRouteQsub {
c.Tracef("Re-sending msg of %s to %s", c.pa.sid, sub.sid)
}
if c.deliverMsg(sub, mh, msg) {
break
}
Expand Down
64 changes: 46 additions & 18 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,27 +453,34 @@ const (
QRSID_LEN = len(QRSID)
)

func (s *Server) routeSidQueueSubscriber(rsid []byte) (*subscription, bool) {
cid, sid, ok := parseRouteSid(rsid)
if !ok {
return nil, false
// Parse the given rsid. If the protocol does not start with QRSID,
// returns false and no subscription nor error.
// If it does start with QRSID, returns true and possibly a subscription
// or an error if the QRSID protocol is malformed.
func (s *Server) routeSidQueueSubscriber(rsid []byte) (bool, *subscription, error) {
if !bytes.HasPrefix(rsid, []byte(QRSID)) {
return false, nil, nil
}
cid, sid, err := parseRouteQueueSid(rsid)
if err != nil {
return true, nil, err
}

s.mu.Lock()
client := s.clients[cid]
s.mu.Unlock()

if client == nil {
return nil, true
return true, nil, nil
}

client.mu.Lock()
sub, ok := client.subs[string(sid)]
client.mu.Unlock()
if ok {
return sub, true
return true, sub, nil
}
return nil, true
return true, nil, nil
}

func routeSid(sub *subscription) string {
Expand All @@ -484,19 +491,40 @@ func routeSid(sub *subscription) string {
return fmt.Sprintf("%s%s:%d:%s", qi, RSID, sub.client.cid, sub.sid)
}

func parseRouteSid(rsid []byte) (uint64, []byte, bool) {
if !bytes.HasPrefix(rsid, []byte(QRSID)) {
return 0, nil, false
}

// We don't care what's char of rsid[QRSID_LEN+1], it should be ':'
for i, count := QRSID_LEN+1, len(rsid); i < count; i++ {
switch rsid[i] {
case ':':
return uint64(parseInt64(rsid[QRSID_LEN+1 : i])), rsid[i+1:], true
// Parse the given `rsid` knowing that it starts with `QRSID`.
// Returns the cid and sid or an error not a valid QRSID.
func parseRouteQueueSid(rsid []byte) (uint64, []byte, error) {
var (
cid uint64
sid []byte
cidFound bool
sidFound bool
)
// A valid QRSID needs to be at least QRSID:x:y
// First character here should be `:`
if len(rsid) >= QRSID_LEN+4 {
if rsid[QRSID_LEN] == ':' {
for i, count := QRSID_LEN+1, len(rsid); i < count; i++ {
switch rsid[i] {
case ':':
cid = uint64(parseInt64(rsid[QRSID_LEN+1 : i]))
cidFound = true
sid = rsid[i+1:]
}
}
if cidFound {
// We can't assume the content of sid, so as long
// as it is not len 0, we have to say it is a valid one.
if len(rsid) > 0 {
sidFound = true
}
}
}
}
return 0, nil, true
if cidFound && sidFound {
return cid, sid, nil
}
return 0, nil, fmt.Errorf("invalid QRSID: %s", rsid)
}

func (s *Server) addRoute(c *client, info *Info) (bool, bool) {
Expand Down
69 changes: 69 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -858,3 +859,71 @@ func TestServerPoolUpdatedWhenRouteGoesAway(t *testing.T) {
checkPool(expected)
nc.Close()
}

func TestRoutedQueueUnsubscribe(t *testing.T) {
optsA, _ := ProcessConfigFile("./configs/seed.conf")
optsA.NoSigs, optsA.NoLog = true, true
srvA := RunServer(optsA)
defer srvA.Shutdown()

srvARouteURL := fmt.Sprintf("nats://%s:%d", optsA.Cluster.Host, srvA.ClusterAddr().Port)
optsB := nextServerOpts(optsA)
optsB.Routes = RoutesFromStr(srvARouteURL)

srvB := RunServer(optsB)
defer srvB.Shutdown()

// Wait for these 2 to connect to each other
checkClusterFormed(t, srvA, srvB)

// Have a client connection to each server
ncA, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsA.Host, optsA.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncA.Close()
ncB, err := nats.Connect(fmt.Sprintf("nats://%s:%d", optsB.Host, optsB.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer ncB.Close()

received := int32(0)
cb := func(m *nats.Msg) {
atomic.AddInt32(&received, 1)
}

// Create 50 queue subs with auto-unsubscribe to each server.
cons := []*nats.Conn{ncA, ncB}
for _, c := range cons {
for i := 0; i < 50; i++ {
qsub, err := c.QueueSubscribe("foo", "bar", cb)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
if err := qsub.AutoUnsubscribe(1); err != nil {
t.Fatalf("Error on auto-unsubscribe: %v", err)
}
}
c.Flush()
}

total := 100
// Now send messages from each server
for i := 0; i < total; i++ {
c := cons[i%2]
c.Publish("foo", []byte("hello"))
}
for _, c := range cons {
c.Flush()
}

timeout := time.Now().Add(2 * time.Second)
for time.Now().Before(timeout) {
if atomic.LoadInt32(&received) == int32(total) {
return
}
time.Sleep(15 * time.Millisecond)
}
t.Fatalf("Should have received %v messages, got %v", total, atomic.LoadInt32(&received))
}
6 changes: 3 additions & 3 deletions test/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ func TestRouteQueueSemantics(t *testing.T) {
// Queue group one.
routeSend("MSG foo QRSID:1:2 2\r\nok\r\n")
// Invlaid queue sid.
routeSend("MSG foo QRSID 2\r\nok\r\n")
routeSend("MSG foo QRSID:1 2\r\nok\r\n")
routeSend("MSG foo QRSID:1: 2\r\nok\r\n")
routeSend("MSG foo QRSID 2\r\nok\r\n") // cid and sid missing
routeSend("MSG foo QRSID:1 2\r\nok\r\n") // cid not terminated with ':'
routeSend("MSG foo QRSID:1: 2\r\nok\r\n") // cid==1 but sid missing. It needs to be at least one character.

// Use ping roundtrip to make sure its processed.
routeSend("PING\r\n")
Expand Down

0 comments on commit ee8f001

Please sign in to comment.