diff --git a/server/client.go b/server/client.go index 8d2fbd8db4..f67ea6d26d 100644 --- a/server/client.go +++ b/server/client.go @@ -915,9 +915,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte { var needFlush = struct{}{} var routeSeen = struct{}{} -func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { +func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { if sub.client == nil { - return + return false } client := sub.client client.mu.Lock() @@ -944,13 +944,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { if shouldForward { client.srv.broadcastUnSubscribe(sub) } - return + return false } } if client.nc == nil { client.mu.Unlock() - return + return false } // Update statistics @@ -999,7 +999,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) { client.mu.Unlock() c.pcd[client] = needFlush - return + return true writeErr: if deadlineSet { @@ -1014,6 +1014,10 @@ writeErr: } else { c.Debugf("Error writing msg: %v", err) } + // Honor at most once semantic: + // treat message that we attempted to send as actually sent + // and don't let a higher-level code an attempt to resend it. + return true } // processMsg is called to process an inbound msg from a client. @@ -1131,6 +1135,7 @@ func (c *client) processMsg(msg []byte) { si := len(msgh) isRoute := c.typ == ROUTER + isRouteQsub := false // If we are a route and we have a queue subscription, deliver direct // since they are sent direct via L2 semantics. If the match is a queue @@ -1139,53 +1144,69 @@ func (c *client) processMsg(msg []byte) { if sub, ok := srv.routeSidQueueSubscriber(c.pa.sid); ok { if sub != nil { mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) + if c.deliverMsg(sub, mh, msg) { + return + } } - return + isRouteQsub = true + // for queue subscription try hard to deliver a message at least once. + // Right now we know fo sure that it's a queue subscription and + // we didn't make a delivery attempt, because either a subscriber limit + // was exceeded or a subscription is already gone. + // So, let a code below find yet another matching subscription. + // We are at risk that a message might go forth and back + // between routes during these attempts, but at the end + // it shall either be delivered (at most once) or drop. + c.Debugf("Re-sending message of a detached queue sid %s", c.pa.sid) } } - // Used to only send normal subscriptions once across a given route. - var rmap map[string]struct{} + // Don't process normal subscriptions in case of a queue subscription resend. + // Otherwise, we'd end up with potentially delivering the same message twice. + if !isRouteQsub { + // Used to only send normal subscriptions once across a given route. + var rmap map[string]struct{} - // Loop over all normal subscriptions that match. + // Loop over all normal subscriptions that match. - for _, sub := range r.psubs { - // Check if this is a send to a ROUTER, make sure we only send it - // once. The other side will handle the appropriate re-processing - // and fan-out. Also enforce 1-Hop semantics, so no routing to another. - if sub.client.typ == ROUTER { - // Skip if sourced from a ROUTER and going to another ROUTER. - // This is 1-Hop semantics for ROUTERs. - if isRoute { - continue - } - // Check to see if we have already sent it here. - if rmap == nil { - rmap = make(map[string]struct{}, srv.numRoutes()) - } - sub.client.mu.Lock() - if sub.client.nc == nil || sub.client.route == nil || - sub.client.route.remoteID == "" { - c.Debugf("Bad or Missing ROUTER Identity, not processing msg") - sub.client.mu.Unlock() - continue - } - if _, ok := rmap[sub.client.route.remoteID]; ok { - c.Debugf("Ignoring route, already processed") + for _, sub := range r.psubs { + // Check if this is a send to a ROUTER, make sure we only send it + // once. The other side will handle the appropriate re-processing + // and fan-out. Also enforce 1-Hop semantics, so no routing to another. + if sub.client.typ == ROUTER { + // Skip if sourced from a ROUTER and going to another ROUTER. + // This is 1-Hop semantics for ROUTERs. + if isRoute { + continue + } + // Check to see if we have already sent it here. + if rmap == nil { + rmap = make(map[string]struct{}, srv.numRoutes()) + } + sub.client.mu.Lock() + if sub.client.nc == nil || sub.client.route == nil || + sub.client.route.remoteID == "" { + c.Debugf("Bad or Missing ROUTER Identity, not processing msg") + sub.client.mu.Unlock() + continue + } + if _, ok := rmap[sub.client.route.remoteID]; ok { + c.Debugf("Ignoring route, already processed") + sub.client.mu.Unlock() + continue + } + rmap[sub.client.route.remoteID] = routeSeen sub.client.mu.Unlock() - continue } - rmap[sub.client.route.remoteID] = routeSeen - sub.client.mu.Unlock() + // Normal delivery + mh := c.msgHeader(msgh[:si], sub) + c.deliverMsg(sub, mh, msg) } - // Normal delivery - mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) } - // Now process any queue subs we have if not a route - if !isRoute { + // Now process any queue subs we have if not a route. + // ... Or if it's a route and we need to resend. + if isRouteQsub || !isRoute { // Check to see if we have our own rand yet. Global rand // has contention with lots of clients, etc. if c.cache.prand == nil { @@ -1194,11 +1215,22 @@ func (c *client) processMsg(msg []byte) { // Process queue subs for i := 0; i < len(r.qsubs); i++ { qsubs := r.qsubs[i] - index := c.cache.prand.Intn(len(qsubs)) - sub := qsubs[index] - if sub != nil { - mh := c.msgHeader(msgh[:si], sub) - c.deliverMsg(sub, mh, msg) + // Iterate over all subscribed clients starting at a random index + // until we find one that's able to deliver a message. + // Drop a message on the floor if there are noone. + start_index := c.cache.prand.Intn(len(qsubs)) + for i := 0; i < len(qsubs); i++ { + index := (start_index + i) % len(qsubs) + sub := qsubs[index] + if sub != nil { + mh := c.msgHeader(msgh[:si], sub) + if isRouteQsub { + c.Tracef("Re-sending msg of %s to %s", c.pa.sid, sub.sid) + } + if c.deliverMsg(sub, mh, msg) { + break + } + } } } }