Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#632] - fix a race condition causing messages to be lost with non-zero max_msgs in a cluster #633

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 78 additions & 46 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,9 +866,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
var needFlush = struct{}{}
var routeSeen = struct{}{}

func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
if sub.client == nil {
return
return false
}
client := sub.client
client.mu.Lock()
Expand All @@ -895,13 +895,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
if shouldForward {
client.srv.broadcastUnSubscribe(sub)
}
return
return false
}
}

if client.nc == nil {
client.mu.Unlock()
return
return false
}

// Update statistics
Expand Down Expand Up @@ -950,7 +950,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {

client.mu.Unlock()
c.pcd[client] = needFlush
return
return true

writeErr:
if deadlineSet {
Expand All @@ -965,6 +965,10 @@ writeErr:
} else {
c.Debugf("Error writing msg: %v", err)
}
// Honor at most once semantic:
// treat message that we attempted to send as actually sent
// and don't let a higher-level code an attempt to resend it.
return true
}

// processMsg is called to process an inbound msg from a client.
Expand Down Expand Up @@ -1084,6 +1088,7 @@ func (c *client) processMsg(msg []byte) {
si := len(msgh)

isRoute := c.typ == ROUTER
isRouteQsub := false

// If we are a route and we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics. If the match is a queue
Expand All @@ -1092,53 +1097,69 @@ func (c *client) processMsg(msg []byte) {
if sub, ok := srv.routeSidQueueSubscriber(c.pa.sid); ok {
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
if c.deliverMsg(sub, mh, msg) {
return
}
}
return
isRouteQsub = true
// for queue subscription try hard to deliver a message at least once.
// Right now we know fo sure that it's a queue subscription and
// we didn't make a delivery attempt, because either a subscriber limit
// was exceeded or a subscription is already gone.
// So, let a code below find yet another matching subscription.
// We are at risk that a message might go forth and back
// between routes during these attempts, but at the end
// it shall either be delivered (at most once) or drop.
c.Debugf("Re-sending message of a detached queue sid %s", c.pa.sid)
}
}

// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}
// Don't process normal subscriptions in case of a queue subscription resend.
// Otherwise, we'd end up with potentially delivering the same message twice.
if !isRouteQsub {
// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}

// Loop over all normal subscriptions that match.
// Loop over all normal subscriptions that match.

for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
continue
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil || sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
continue
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil || sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}

// Now process any queue subs we have if not a route
if !isRoute {
// Now process any queue subs we have if not a route.
// ... Or if it's a route and we need to resend.
if isRouteQsub || !isRoute {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.cache.prand == nil {
Expand All @@ -1147,11 +1168,22 @@ func (c *client) processMsg(msg []byte) {
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
index := c.cache.prand.Intn(len(qsubs))
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
// Iterate over all subscribed clients starting at a random index
// until we find one that's able to deliver a message.
// Drop a message on the floor if there are noone.
start_index := c.cache.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (start_index + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if isRouteQsub {
c.Tracef("Re-sending msg of %s to %s", c.pa.sid, sub.sid)
}
if c.deliverMsg(sub, mh, msg) {
break
}
}
}
}
}
Expand Down