Skip to content

Commit

Permalink
Merge branch 'VidScale-master' into fix_632
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Mar 9, 2018
2 parents dd3dccc + 1bd40f9 commit 9d7dda1
Showing 1 changed file with 78 additions and 46 deletions.
124 changes: 78 additions & 46 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,9 @@ func (c *client) msgHeader(mh []byte, sub *subscription) []byte {
var needFlush = struct{}{}
var routeSeen = struct{}{}

func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
if sub.client == nil {
return
return false
}
client := sub.client
client.mu.Lock()
Expand All @@ -944,13 +944,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {
if shouldForward {
client.srv.broadcastUnSubscribe(sub)
}
return
return false
}
}

if client.nc == nil {
client.mu.Unlock()
return
return false
}

// Update statistics
Expand Down Expand Up @@ -999,7 +999,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) {

client.mu.Unlock()
c.pcd[client] = needFlush
return
return true

writeErr:
if deadlineSet {
Expand All @@ -1014,6 +1014,10 @@ writeErr:
} else {
c.Debugf("Error writing msg: %v", err)
}
// Honor at most once semantic:
// treat message that we attempted to send as actually sent
// and don't let a higher-level code an attempt to resend it.
return true
}

// processMsg is called to process an inbound msg from a client.
Expand Down Expand Up @@ -1131,6 +1135,7 @@ func (c *client) processMsg(msg []byte) {
si := len(msgh)

isRoute := c.typ == ROUTER
isRouteQsub := false

// If we are a route and we have a queue subscription, deliver direct
// since they are sent direct via L2 semantics. If the match is a queue
Expand All @@ -1139,53 +1144,69 @@ func (c *client) processMsg(msg []byte) {
if sub, ok := srv.routeSidQueueSubscriber(c.pa.sid); ok {
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
if c.deliverMsg(sub, mh, msg) {
return
}
}
return
isRouteQsub = true
// for queue subscription try hard to deliver a message at least once.
// Right now we know fo sure that it's a queue subscription and
// we didn't make a delivery attempt, because either a subscriber limit
// was exceeded or a subscription is already gone.
// So, let a code below find yet another matching subscription.
// We are at risk that a message might go forth and back
// between routes during these attempts, but at the end
// it shall either be delivered (at most once) or drop.
c.Debugf("Re-sending message of a detached queue sid %s", c.pa.sid)
}
}

// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}
// Don't process normal subscriptions in case of a queue subscription resend.
// Otherwise, we'd end up with potentially delivering the same message twice.
if !isRouteQsub {
// Used to only send normal subscriptions once across a given route.
var rmap map[string]struct{}

// Loop over all normal subscriptions that match.
// Loop over all normal subscriptions that match.

for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
continue
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil || sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
for _, sub := range r.psubs {
// Check if this is a send to a ROUTER, make sure we only send it
// once. The other side will handle the appropriate re-processing
// and fan-out. Also enforce 1-Hop semantics, so no routing to another.
if sub.client.typ == ROUTER {
// Skip if sourced from a ROUTER and going to another ROUTER.
// This is 1-Hop semantics for ROUTERs.
if isRoute {
continue
}
// Check to see if we have already sent it here.
if rmap == nil {
rmap = make(map[string]struct{}, srv.numRoutes())
}
sub.client.mu.Lock()
if sub.client.nc == nil || sub.client.route == nil ||
sub.client.route.remoteID == "" {
c.Debugf("Bad or Missing ROUTER Identity, not processing msg")
sub.client.mu.Unlock()
continue
}
if _, ok := rmap[sub.client.route.remoteID]; ok {
c.Debugf("Ignoring route, already processed")
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
continue
}
rmap[sub.client.route.remoteID] = routeSeen
sub.client.mu.Unlock()
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}
// Normal delivery
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
}

// Now process any queue subs we have if not a route
if !isRoute {
// Now process any queue subs we have if not a route.
// ... Or if it's a route and we need to resend.
if isRouteQsub || !isRoute {
// Check to see if we have our own rand yet. Global rand
// has contention with lots of clients, etc.
if c.cache.prand == nil {
Expand All @@ -1194,11 +1215,22 @@ func (c *client) processMsg(msg []byte) {
// Process queue subs
for i := 0; i < len(r.qsubs); i++ {
qsubs := r.qsubs[i]
index := c.cache.prand.Intn(len(qsubs))
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
c.deliverMsg(sub, mh, msg)
// Iterate over all subscribed clients starting at a random index
// until we find one that's able to deliver a message.
// Drop a message on the floor if there are noone.
start_index := c.cache.prand.Intn(len(qsubs))
for i := 0; i < len(qsubs); i++ {
index := (start_index + i) % len(qsubs)
sub := qsubs[index]
if sub != nil {
mh := c.msgHeader(msgh[:si], sub)
if isRouteQsub {
c.Tracef("Re-sending msg of %s to %s", c.pa.sid, sub.sid)
}
if c.deliverMsg(sub, mh, msg) {
break
}
}
}
}
}
Expand Down

0 comments on commit 9d7dda1

Please sign in to comment.