Skip to content

Commit

Permalink
[FIXED] Possible slow consumers when routes exchange sub list
Browse files Browse the repository at this point in the history
If each server has a long list of subscriptions, when the route
is established, sending this list could result in each server
treating the peer as a slow consumer, resulting in a reconnect,
etc..
Also bumping the fan-in threshold for route connections.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic authored and derekcollison committed Feb 20, 2019
1 parent c329da7 commit 04d824c
Show file tree
Hide file tree
Showing 6 changed files with 336 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ before_script:
script:
- go test -i $EXCLUDE_VENDOR
- go test -run=TestNoRace $EXCLUDE_VENDOR
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else GOGC=10 go test -race -p=1 $EXCLUDE_VENDOR; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else GOGC=10 go test -v -race -p=1 --failfast $EXCLUDE_VENDOR; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]] && [ "$TRAVIS_TAG" != "" ]; then ghr --owner nats-io --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
4 changes: 2 additions & 2 deletions scripts/cov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ rm -rf ./cov
mkdir cov
go test -v -covermode=atomic -coverprofile=./cov/conf.out ./conf
go test -v -covermode=atomic -coverprofile=./cov/log.out ./logger
go test -v -covermode=atomic -coverprofile=./cov/server.out ./server
go test -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=./server ./test
go test -v -covermode=atomic -coverprofile=./cov/server.out --failfast ./server
go test -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=./server --failfast ./test
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down
166 changes: 122 additions & 44 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
startBufSize = 512 // For INFO/CONNECT block
minBufSize = 64 // Smallest to shrink to for PING/PONG
maxBufSize = 65536 // 64k
backedUpSize = maxBufSize * 4
shortsToShrink = 2
)

Expand Down Expand Up @@ -202,11 +203,12 @@ type outbound struct {
sws int32 // Number of short writes, used for dynamic resizing.
pb int32 // Total pending/queued bytes.
pm int32 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
sg *sync.Cond // Flusher conditional for signaling to writeLoop.
wdl time.Duration // Snapshot of write deadline.
mp int32 // Snapshot of max pending.
fsp int32 // Flush signals that are pending from readLoop's pcd.
mp int32 // Snapshot of max pending for client.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
lft time.Duration // Last flush time.
lwb int32 // Last byte size of Write().
sgw bool // Indicate flusher is waiting on condition wait.
}

Expand Down Expand Up @@ -264,6 +266,7 @@ type readCache struct {
msgs int32
bytes int32
subs int32
psd time.Duration // Possible stall duration for fast producers.

rsz int32 // Read buffer size
srs int32 // Short reads, used for dynamic buffer resizing.
Expand Down Expand Up @@ -634,7 +637,8 @@ func (c *client) writeLoop() {
// buffered outbound structure for efficient writev to the underlying socket.
for {
c.mu.Lock()
if waitOk && (c.out.pb == 0 || c.out.fsp > 0) && len(c.out.nb) == 0 && !c.flags.isSet(clearConnection) {
owtf := c.out.fsp > 0 && c.out.pb < maxBufSize && c.out.fsp < 10
if waitOk && (c.out.pb == 0 || owtf) && !c.flags.isSet(clearConnection) {
// Wait on pending data.
c.out.sgw = true
c.out.sg.Wait()
Expand All @@ -652,29 +656,96 @@ func (c *client) writeLoop() {
}
}

// Will determine if a client connection is considered behind due to outbound buffering. This will
// allow us to react and try to stabilize bursty fast producers or fan in scenarios.
// Assume lock is held.
func (c *client) outboundBackedUp() bool {
return c.out.pb > backedUpSize && c.out.pm > 1
}

// Given a raw estimated delay we will box with a floor and ceiling.
func (c *client) boxedDelay(psd time.Duration) time.Duration {
// Ceiling at 250ms.
if psd > 250*time.Millisecond {
c.Debugf("Stall delay of %v reset to max of %v", psd, 250*time.Millisecond)
psd = 250 * time.Millisecond
}
return psd
}

// This determines how much of the pending backlog for this client
// we should consider to be responsible for in our fast producer
// stall delay.
// Assume lock is held.
func (c *client) percentBacklog() int32 {
// If behind 50.0% of our max, wait for 100%.
// If behind 12.5% of our max, wait for 33%
// If behind 6.25% of our max, wait for 25%
// Otherwise wait 20%
// We also note how many others are writing.
// This will be at least one, but we dampen by half
fsp := c.out.fsp / 2
if fsp == 0 {
fsp = 1
}
switch {
case c.out.pb > c.out.mp/2:
return c.out.pb / 1 / fsp
case c.out.pb > c.out.mp/8:
return c.out.pb / 3 / fsp
case c.out.pb > c.out.mp/16:
return c.out.pb / 4 / fsp
default:
return c.out.pb / 5 / fsp
}
}

// flushClients will make sure to flush any clients we may have
// sent to during processing. We pass in a budget as a time.Duration
// for how much time to spend in place flushing for this client. This
// will normally be called in the readLoop of the client who sent the
// message that now is being delivered.
func (c *client) flushClients(budget time.Duration) {
func (c *client) flushClients(budget time.Duration) time.Time {
last := time.Now()
// Check pending clients for flush.
for cp := range c.pcd {
// TODO(dlc) - Wonder if it makes more sense to create a new map?
delete(c.pcd, cp)

// Queue up a flush for those in the set
cp.mu.Lock()
// Update last activity for message delivery
cp.last = last
// Remove ourselves from the pending list.
cp.out.fsp--

// Just ignore if this was closed.
if cp.flags.isSet(clearConnection) {
cp.mu.Unlock()
continue
}

// Check if this client is backed up and if so calculate any delay intention.
// We only delay clients, not routes, gateways etc.
if c.kind == CLIENT && cp.outboundBackedUp() {
td := cp.percentBacklog()
delay := time.Duration(td) * cp.out.lft / time.Duration(cp.out.lwb)
c.in.psd += delay
// Attempt to flush in place if we can. If we do subtract off of our potential stall delay.
if cp.flushOutbound() {
c.in.psd -= cp.out.lft
}
}

if budget > 0 && cp.flushOutbound() {
budget -= cp.out.lft
} else {
cp.flushSignal()
}

cp.mu.Unlock()
// TODO(dlc) - Wonder if it makes more sense to create a new map?
delete(c.pcd, cp)
}
return last
}

// readLoop is the main socket read functionality.
Expand Down Expand Up @@ -722,6 +793,9 @@ func (c *client) readLoop() {
c.in.bytes = 0
c.in.subs = 0

// Clear possible stall delay factor.
c.in.psd = 0

// Main call into parser for inbound data. This will generate callouts
// to process messages, etc.
if err := c.parse(b[:n]); err != nil {
Expand Down Expand Up @@ -752,15 +826,25 @@ func (c *client) readLoop() {
}

// Flush, or signal to writeLoop to flush to socket.
c.flushClients(budget)
last := c.flushClients(budget)

// If we have detected clients that we are sending to are behind let's
// try to throttle a bit and see if they catch up. This allows servers to
// survive bursts and high fan-in scenarios. We do not hold our lock here
// so outbound is free to continue.
if psd := c.in.psd; psd > 25*time.Microsecond {
psd = c.boxedDelay(psd)
c.Debugf("Delaying %v due to fast producer", psd)
time.Sleep(psd)
}

// Update activity, check read buffer size.
c.mu.Lock()
nc := c.nc

// Activity based on interest changes or data/msgs.
if c.in.msgs > 0 || c.in.subs > 0 {
c.last = time.Now()
c.last = last
}

if n >= cap(b) {
Expand Down Expand Up @@ -837,18 +921,8 @@ func (c *client) flushOutbound() bool {
attempted := c.out.pb
apm := c.out.pm

// What we are doing here is seeing if we are getting behind. This is
// generally not a gradual thing and will spike quickly. Use some basic
// logic to try to understand when this is happening through no fault of
// our own. How we attempt to get back into a more balanced state under
// load will be to hold our lock during IO, forcing others to wait and
// applying back pressure to the publishers sending to us.
releaseLock := c.out.pb < maxBufSize*4

// Do NOT hold lock during actual IO unless we are behind
if releaseLock {
c.mu.Unlock()
}
// Do NOT hold lock during actual IO.
c.mu.Unlock()

// flush here
now := time.Now()
Expand All @@ -861,23 +935,22 @@ func (c *client) flushOutbound() bool {
nc.SetWriteDeadline(time.Time{})
lft := time.Since(now)

// Re-acquire client lock if we let it go during IO
if releaseLock {
c.mu.Lock()
}
// Re-acquire client lock.
c.mu.Lock()

// Update flush time statistics
// Update flush time statistics.
c.out.lft = lft
c.out.lwb = int32(n)

// Subtract from pending bytes and messages.
c.out.pb -= int32(n)
c.out.pb -= c.out.lwb
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials.

// Check for partial writes
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
if n != int64(attempted) && n > 0 {
if c.out.lwb != attempted && n > 0 {
c.handlePartialWrite(nb)
} else if n >= int64(c.out.sz) {
} else if c.out.lwb >= c.out.sz {
c.out.sws = 0
}

Expand Down Expand Up @@ -918,7 +991,7 @@ func (c *client) flushOutbound() bool {
}

// Adjust based on what we wrote plus any pending.
pt := int32(n) + c.out.pb
pt := c.out.lwb + c.out.pb

// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate.
Expand Down Expand Up @@ -947,7 +1020,7 @@ func (c *client) flushOutbound() bool {
}

// Check that if there is still data to send and writeLoop is in wait,
// we need to signal.
// then we need to signal.
if c.out.pb > 0 {
c.flushSignal()
}
Expand All @@ -957,10 +1030,12 @@ func (c *client) flushOutbound() bool {

// flushSignal will use server to queue the flush IO operation to a pool of flushers.
// Lock must be held.
func (c *client) flushSignal() {
func (c *client) flushSignal() bool {
if c.out.sgw {
c.out.sg.Signal()
return true
}
return false
}

func (c *client) traceMsg(msg []byte) {
Expand Down Expand Up @@ -1378,11 +1453,11 @@ func (c *client) sendErr(err string) {
}

func (c *client) sendOK() {
proto := []byte("+OK\r\n")
c.mu.Lock()
c.traceOutOp("OK", nil)
// Can not autoflush this one, needs to be async.
c.sendProto([]byte("+OK\r\n"), false)
// FIXME(dlc) - ??
c.sendProto(proto, false)
c.pcd[c] = needFlush
c.mu.Unlock()
}
Expand Down Expand Up @@ -2001,7 +2076,7 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
}

// Check for closed connection
if client.nc == nil {
if client.flags.isSet(clearConnection) {
client.mu.Unlock()
return false
}
Expand All @@ -2016,20 +2091,22 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
// This is specifically looking at situations where we are getting behind and may want
// to intervene before this producer goes back to top of readloop. We are in the producer's
// readloop go routine at this point.
// FIXME(dlc) - We may call this alot, maybe suppress after first call?
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
client.flushSignal()
}

if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}

// Increment the flush pending signals if we are setting for the first time.
// Add the data size we are responsible for here. This will be processed when we
// return to the top of the readLoop.
if _, ok := c.pcd[client]; !ok {
client.out.fsp++
// Remember for when we return to the top of the loop.
c.pcd[client] = needFlush
}

if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}

client.mu.Unlock()

return true
Expand Down Expand Up @@ -2587,7 +2664,8 @@ func (c *client) clearConnection(reason ClosedState) {
c.flags.set(clearConnection)

nc := c.nc
if nc == nil || c.srv == nil {
srv := c.srv
if nc == nil || srv == nil {
return
}
// Flush any pending.
Expand All @@ -2609,8 +2687,8 @@ func (c *client) clearConnection(reason ClosedState) {
nc.SetWriteDeadline(time.Time{})

// Save off the connection if its a client.
if c.kind == CLIENT && c.srv != nil {
go c.srv.saveClosedClient(c, nc, reason)
if c.kind == CLIENT {
go srv.saveClosedClient(c, nc, reason)
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
MAX_PAYLOAD_SIZE = (1024 * 1024)

// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
MAX_PENDING_SIZE = (64 * 1024 * 1024)
MAX_PENDING_SIZE = (128 * 1024 * 1024)

// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
Expand Down
5 changes: 4 additions & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,10 @@ func (c *client) processRouteInfo(info *Info) {
c.Debugf("Registering remote route %q", info.ID)

// Send our subs to the other side.
s.sendSubsToRoute(c)
s.startGoRoutine(func() {
s.sendSubsToRoute(c)
s.grWG.Done()
})

// Send info about the known gateways to this route.
s.sendGatewayConfigsToRoute(c)
Expand Down

0 comments on commit 04d824c

Please sign in to comment.