Skip to content

Commit

Permalink
Fixed panic when server needs to send message to more than 8 routes
Browse files Browse the repository at this point in the history
Resolves #955

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 17, 2019
1 parent 0c8bf0e commit 288f00f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 4 deletions.
8 changes: 4 additions & 4 deletions server/client.go
Expand Up @@ -2352,15 +2352,15 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
}
}

var rt *routeTarget
lrts := len(c.in.rts)

// If we are here we do not have the sub yet in our list
// If we have to grow do so here.
if len(c.in.rts) == cap(c.in.rts) {
if lrts == cap(c.in.rts) {
c.in.rts = append(c.in.rts, routeTarget{})
}

var rt *routeTarget

lrts := len(c.in.rts)
c.in.rts = c.in.rts[:lrts+1]
rt = &c.in.rts[lrts]
rt.sub = sub
Expand Down
49 changes: 49 additions & 0 deletions server/routes_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1025,3 +1026,51 @@ func TestRouteSendLocalSubsWithLowMaxPending(t *testing.T) {
// Check that all subs have been sent ok
checkExpectedSubs(t, numSubs, srvA, srvB)
}

func TestRouteNoCrashOnAddingSubToRoute(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

numRoutes := routeTargetInit + 2
total := int32(numRoutes)
count := int32(0)
ch := make(chan bool, 1)
cb := func(_ *nats.Msg) {
if n := atomic.AddInt32(&count, 1); n == total {
ch <- true
}
}

var servers []*Server
servers = append(servers, s)

seedURL := fmt.Sprintf("nats://%s:%d", opts.Cluster.Host, opts.Cluster.Port)
for i := 0; i < numRoutes; i++ {
ropts := DefaultOptions()
ropts.Routes = RoutesFromStr(seedURL)
rs := RunServer(ropts)
defer rs.Shutdown()
servers = append(servers, rs)

// Create a sub on each routed server
nc := natsConnect(t, fmt.Sprintf("nats://%s:%d", ropts.Host, ropts.Port))
defer nc.Close()
natsSub(t, nc, "foo", cb)
}
checkClusterFormed(t, servers...)

// Make sure all subs are registered in s.
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if s.globalAccount().TotalSubs() != int(numRoutes) {
return fmt.Errorf("Not all %v routed subs were registered", numRoutes)
}
return nil
})

pubNC := natsConnect(t, fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
defer pubNC.Close()
natsPub(t, pubNC, "foo", []byte("hello world!"))

waitCh(t, ch, "Did not get all messages")
}

0 comments on commit 288f00f

Please sign in to comment.