Skip to content

Commit

Permalink
Merge efd891d into f4fda53
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Dec 9, 2018
2 parents f4fda53 + efd891d commit 69b0557
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
33 changes: 17 additions & 16 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2133,22 +2133,21 @@ func (c *client) processInboundClientMsg(msg []byte) {
c.checkForImportServices(c.acc, msg)
}

var qa [16][]byte
queues := qa[:0]
var qnames [][]byte

// Check for no interest, short circuit if so.
// This is the fanout scale.
if len(r.psubs)+len(r.qsubs) > 0 {
var qnames *[][]byte
var collect bool
// If we have queue subs in this cluster, then if we run in gateway
// mode and the remote gateways have queue subs, then we need to
// collect the queue groups this message was sent to so that we
// exclude them when sending to gateways.
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
qnames = &queues
collect = true
}
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
qnames = c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, collect)
}

// Now deal with gateways
Expand All @@ -2161,7 +2160,7 @@ func (c *client) processInboundClientMsg(msg []byte) {
if len(r.psubs) == 1 && r.psubs[0].client.gw != nil {
c.sendReplyMsgDirectToGateway(c.acc, r.psubs[0], msg)
} else {
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, queues)
c.sendMsgToGateways(c.acc, msg, c.pa.subject, c.pa.reply, qnames)
}
}
}
Expand Down Expand Up @@ -2195,7 +2194,7 @@ func (c *client) checkForImportServices(acc *Account, msg []byte) {
if c.kind == ROUTER || c.kind == GATEWAY && c.pa.queues == nil && len(rr.qsubs) > 0 {
c.makeQFilter(rr.qsubs)
}
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, nil)
c.processMsgResults(rm.acc, rr, msg, []byte(rm.to), nrr, false)
// If this is not a gateway connection but gateway is enabled,
// try to send this converted message to all gateways.
if c.kind != GATEWAY && c.srv.gateway.enabled {
Expand Down Expand Up @@ -2240,7 +2239,8 @@ func (c *client) addSubToRouteTargets(sub *subscription) {
}

// This processes the sublist results for a given message.
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, queues *[][]byte) {
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject, reply []byte, collect bool) [][]byte {
var queues [][]byte
// msg header for clients.
msgh := c.msgb[1:msgHeadProtoLen]
msgh = append(msgh, subject...)
Expand Down Expand Up @@ -2283,7 +2283,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,

// If we are sourced from a route we need to have direct filtered queues.
if c.kind == ROUTER && c.pa.queues == nil {
return
return queues
}

// Set these up to optionally filter based on the queue lists.
Expand Down Expand Up @@ -2346,8 +2346,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
continue
} else {
c.addSubToRouteTargets(sub)
if queues != nil {
*queues = append(*queues, sub.queue)
if collect {
queues = append(queues, sub.queue)
}
}
break
Expand All @@ -2366,8 +2366,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
if c.deliverMsg(sub, mh, msg) {
// Clear rsub
rsub = nil
if queues != nil {
*queues = append(*queues, sub.queue)
if collect {
queues = append(queues, sub.queue)
}
break
}
Expand All @@ -2377,8 +2377,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, subject,
// If we are here we tried to deliver to a local qsub
// but failed. So we will send it to a remote.
c.addSubToRouteTargets(rsub)
if queues != nil {
*queues = append(*queues, rsub.queue)
if collect {
queues = append(queues, rsub.queue)
}
}
}
Expand All @@ -2387,7 +2387,7 @@ sendToRoutes:

// If no messages for routes return here.
if len(c.in.rts) == 0 {
return
return queues
}

// We address by index to avoid struct copy.
Expand Down Expand Up @@ -2418,6 +2418,7 @@ sendToRoutes:
mh = append(mh, _CRLF_...)
c.deliverMsg(rt.sub, mh, msg)
}
return queues
}

func (c *client) pubPermissionViolation(subject []byte) {
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.0.0-beta.6"
VERSION = "2.0.0-beta.7"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ func (c *client) processInboundGatewayMsg(msg []byte) {
}
}

c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil)
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
}

// Indicates that the remote which we are sending messages to
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *client) processInboundRoutedMsg(msg []byte) {
}
}

c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, nil)
c.processMsgResults(acc, r, msg, c.pa.subject, c.pa.reply, false)
}

// Helper function for routes and gateways to create qfilters need for
Expand Down

0 comments on commit 69b0557

Please sign in to comment.