Skip to content

Commit

Permalink
Merge e7b6c57 into 69d89f7
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Dec 4, 2018
2 parents 69d89f7 + e7b6c57 commit 92e37f5
Show file tree
Hide file tree
Showing 7 changed files with 1,227 additions and 451 deletions.
22 changes: 20 additions & 2 deletions server/client.go
Expand Up @@ -629,6 +629,9 @@ func (c *client) readLoop() {
s := c.srv
c.in.rsz = startBufSize
defer s.grWG.Done()
if c.gw != nil && c.gw.outbound {
defer c.gatewayOutboundConnectionReadLoopExited()
}
c.mu.Unlock()

if nc == nil {
Expand Down Expand Up @@ -1489,13 +1492,16 @@ func (c *client) processSub(argo []byte) (err error) {
sid := string(sub.sid)
acc := c.acc

updateGWs := false
// Subscribe here.
if c.subs[sid] == nil {
c.subs[sid] = sub
if acc != nil && acc.sl != nil {
err = acc.sl.Insert(sub)
if err != nil {
delete(c.subs, sid)
} else {
updateGWs = c.srv.gateway.enabled
}
}
}
Expand All @@ -1515,6 +1521,9 @@ func (c *client) processSub(argo []byte) (err error) {
// If we are routing and this is a local sub, add to the route map for the associated account.
if kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, 1)
if updateGWs {
c.srv.gatewayUpdateSubInterest(acc.Name, sub, 1)
}
}
}

Expand Down Expand Up @@ -1739,6 +1748,7 @@ func (c *client) processUnsub(arg []byte) error {
kind := c.kind
var acc *Account

updateGWs := false
if sub, ok = c.subs[string(sid)]; ok {
acc = c.acc
if max > 0 {
Expand All @@ -1748,6 +1758,7 @@ func (c *client) processUnsub(arg []byte) error {
sub.max = 0
unsub = true
}
updateGWs = c.srv.gateway.enabled
}
c.mu.Unlock()

Expand All @@ -1759,6 +1770,9 @@ func (c *client) processUnsub(arg []byte) error {
c.unsubscribe(acc, sub, false)
if acc != nil && kind == CLIENT || kind == SYSTEM {
c.srv.updateRouteSubscriptionMap(acc, sub, -1)
if updateGWs {
c.srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
}

Expand Down Expand Up @@ -2089,7 +2103,8 @@ func (c *client) processInboundClientMsg(msg []byte) {
// mode and the remote gateways have queue subs, then we need to
// collect the queue groups this message was sent to so that we
// exclude them when sending to gateways.
if len(r.qsubs) > 0 && c.srv.gateway.enabled && atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
qnames = &queues
}
c.processMsgResults(c.acc, r, msg, c.pa.subject, c.pa.reply, qnames)
Expand Down Expand Up @@ -2619,7 +2634,7 @@ func (c *client) closeConnection(reason ClosedState) {
// Remove clients subscriptions.
if kind == CLIENT {
acc.sl.RemoveBatch(subs)
} else {
} else if kind == ROUTER {
go c.removeRemoteSubs()
}

Expand Down Expand Up @@ -2653,6 +2668,9 @@ func (c *client) closeConnection(reason ClosedState) {
qsubs[key] = &qsub{sub, 1}
}
}
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
}
}
// Process any qsubs here.
for _, esub := range qsubs {
Expand Down

0 comments on commit 92e37f5

Please sign in to comment.