Skip to content

Commit

Permalink
Merge 69cdc02 into cf3f51f
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Feb 20, 2019
2 parents cf3f51f + 69cdc02 commit 8d39940
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 56 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -22,6 +22,6 @@ before_script:
script:
- go test -i $EXCLUDE_VENDOR
- go test -run=TestNoRace $EXCLUDE_VENDOR
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else GOGC=10 go test -race -p=1 $EXCLUDE_VENDOR; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]]; then ./scripts/cov.sh TRAVIS; else GOGC=10 go test -v -race -p=1 --failfast $EXCLUDE_VENDOR; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.11 ]] && [ "$TRAVIS_TAG" != "" ]; then ghr --owner nats-io --token $GITHUB_TOKEN --draft --replace $TRAVIS_TAG pkg/; fi
4 changes: 2 additions & 2 deletions scripts/cov.sh
Expand Up @@ -5,8 +5,8 @@ rm -rf ./cov
mkdir cov
go test -v -covermode=atomic -coverprofile=./cov/conf.out ./conf
go test -v -covermode=atomic -coverprofile=./cov/log.out ./logger
go test -v -covermode=atomic -coverprofile=./cov/server.out ./server
go test -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=./server ./test
go test -v -covermode=atomic -coverprofile=./cov/server.out --failfast ./server
go test -v -covermode=atomic -coverprofile=./cov/test.out -coverpkg=./server --failfast ./test
gocovmerge ./cov/*.out > acc.out
rm -rf ./cov

Expand Down
153 changes: 101 additions & 52 deletions server/client.go
Expand Up @@ -60,14 +60,13 @@ const (
msgScratchSize = 1024
msgHeadProto = "RMSG "
msgHeadProtoLen = len(msgHeadProto)
)

// For controlling dynamic buffer sizes.
const (
startBufSize = 512 // For INFO/CONNECT block
minBufSize = 64 // Smallest to shrink to for PING/PONG
maxBufSize = 65536 // 64k
shortsToShrink = 2
// For controlling dynamic buffer sizes.
startBufSize = 512 // For INFO/CONNECT block
minBufSize = 64 // Smallest to shrink to for PING/PONG
maxBufSize = 65536 // 64k
shortsToShrink = 2 // Trigger to shrink dynamic buffers
maxFlushPending = 10 // Max fsps to have in order to wait for writeLoop
)

// Represent client booleans with a bitmask
Expand Down Expand Up @@ -202,11 +201,13 @@ type outbound struct {
sws int32 // Number of short writes, used for dynamic resizing.
pb int32 // Total pending/queued bytes.
pm int32 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
sg *sync.Cond // Flusher conditional for signaling to writeLoop.
wdl time.Duration // Snapshot of write deadline.
mp int32 // Snapshot of max pending.
fsp int32 // Flush signals that are pending from readLoop's pcd.
lft time.Duration // Last flush time.
mp int32 // Snapshot of max pending for client.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
lft time.Duration // Last flush time for Write.
lwb int32 // Last byte size of Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
sgw bool // Indicate flusher is waiting on condition wait.
}

Expand Down Expand Up @@ -634,7 +635,8 @@ func (c *client) writeLoop() {
// buffered outbound structure for efficient writev to the underlying socket.
for {
c.mu.Lock()
if waitOk && (c.out.pb == 0 || c.out.fsp > 0) && len(c.out.nb) == 0 && !c.flags.isSet(clearConnection) {
owtf := c.out.fsp > 0 && c.out.pb < maxBufSize && c.out.fsp < maxFlushPending
if waitOk && (c.out.pb == 0 || owtf) && !c.flags.isSet(clearConnection) {
// Wait on pending data.
c.out.sgw = true
c.out.sg.Wait()
Expand All @@ -657,24 +659,35 @@ func (c *client) writeLoop() {
// for how much time to spend in place flushing for this client. This
// will normally be called in the readLoop of the client who sent the
// message that now is being delivered.
func (c *client) flushClients(budget time.Duration) {
func (c *client) flushClients(budget time.Duration) time.Time {
last := time.Now()
// Check pending clients for flush.
for cp := range c.pcd {
// TODO(dlc) - Wonder if it makes more sense to create a new map?
delete(c.pcd, cp)

// Queue up a flush for those in the set
cp.mu.Lock()
// Update last activity for message delivery
cp.last = last
// Remove ourselves from the pending list.
cp.out.fsp--

// Just ignore if this was closed.
if cp.flags.isSet(clearConnection) {
cp.mu.Unlock()
continue
}

if budget > 0 && cp.flushOutbound() {
budget -= cp.out.lft
} else {
cp.flushSignal()
}

cp.mu.Unlock()
// TODO(dlc) - Wonder if it makes more sense to create a new map?
delete(c.pcd, cp)
}
return last
}

// readLoop is the main socket read functionality.
Expand Down Expand Up @@ -752,15 +765,15 @@ func (c *client) readLoop() {
}

// Flush, or signal to writeLoop to flush to socket.
c.flushClients(budget)
last := c.flushClients(budget)

// Update activity, check read buffer size.
c.mu.Lock()
nc := c.nc

// Activity based on interest changes or data/msgs.
if c.in.msgs > 0 || c.in.subs > 0 {
c.last = time.Now()
c.last = last
}

if n >= cap(b) {
Expand Down Expand Up @@ -837,18 +850,8 @@ func (c *client) flushOutbound() bool {
attempted := c.out.pb
apm := c.out.pm

// What we are doing here is seeing if we are getting behind. This is
// generally not a gradual thing and will spike quickly. Use some basic
// logic to try to understand when this is happening through no fault of
// our own. How we attempt to get back into a more balanced state under
// load will be to hold our lock during IO, forcing others to wait and
// applying back pressure to the publishers sending to us.
releaseLock := c.out.pb < maxBufSize*4

// Do NOT hold lock during actual IO unless we are behind
if releaseLock {
c.mu.Unlock()
}
// Do NOT hold lock during actual IO.
c.mu.Unlock()

// flush here
now := time.Now()
Expand All @@ -861,23 +864,22 @@ func (c *client) flushOutbound() bool {
nc.SetWriteDeadline(time.Time{})
lft := time.Since(now)

// Re-acquire client lock if we let it go during IO
if releaseLock {
c.mu.Lock()
}
// Re-acquire client lock.
c.mu.Lock()

// Update flush time statistics
// Update flush time statistics.
c.out.lft = lft
c.out.lwb = int32(n)

// Subtract from pending bytes and messages.
c.out.pb -= int32(n)
c.out.pb -= c.out.lwb
c.out.pm -= apm // FIXME(dlc) - this will not be totally accurate on partials.

// Check for partial writes
// TODO(dlc) - zero write with no error will cause lost message and the writeloop to spin.
if n != int64(attempted) && n > 0 {
if c.out.lwb != attempted && n > 0 {
c.handlePartialWrite(nb)
} else if n >= int64(c.out.sz) {
} else if c.out.lwb >= c.out.sz {
c.out.sws = 0
}

Expand Down Expand Up @@ -918,7 +920,7 @@ func (c *client) flushOutbound() bool {
}

// Adjust based on what we wrote plus any pending.
pt := int32(n) + c.out.pb
pt := c.out.lwb + c.out.pb

// Adjust sz as needed downward, keeping power of 2.
// We do this at a slower rate.
Expand Down Expand Up @@ -947,20 +949,29 @@ func (c *client) flushOutbound() bool {
}

// Check that if there is still data to send and writeLoop is in wait,
// we need to signal.
// then we need to signal.
if c.out.pb > 0 {
c.flushSignal()
}

// Check if we have a stalled gate and if so and we are recovering release
// any stalled producers. Only kind==CLIENT will stall.
if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) {
close(c.out.stc)
c.out.stc = nil
}

return true
}

// flushSignal will use server to queue the flush IO operation to a pool of flushers.
// Lock must be held.
func (c *client) flushSignal() {
func (c *client) flushSignal() bool {
if c.out.sgw {
c.out.sg.Signal()
return true
}
return false
}

func (c *client) traceMsg(msg []byte) {
Expand Down Expand Up @@ -1326,6 +1337,14 @@ func (c *client) queueOutbound(data []byte) bool {
} else {
c.out.p = append(c.out.p, data...)
}

// Check here if we should create a stall channel if we are falling behind.
// We do this here since if we wait for consumer's writeLoop it could be
// too late with large number of fan in producers.
if c.out.pb > c.out.mp/2 && c.out.stc == nil {
c.out.stc = make(chan struct{})
}

return referenced
}

Expand Down Expand Up @@ -1378,11 +1397,11 @@ func (c *client) sendErr(err string) {
}

func (c *client) sendOK() {
proto := []byte("+OK\r\n")
c.mu.Lock()
c.traceOutOp("OK", nil)
// Can not autoflush this one, needs to be async.
c.sendProto([]byte("+OK\r\n"), false)
// FIXME(dlc) - ??
c.sendProto(proto, false)
c.pcd[c] = needFlush
c.mu.Unlock()
}
Expand Down Expand Up @@ -1921,6 +1940,19 @@ func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
return mh
}

func (c *client) stalledWait(producer *client) {
stall := c.out.stc
c.mu.Unlock()
defer c.mu.Lock()

// TODO(dlc) - Make the stall timeout variable based on health of consumer.
select {
case <-stall:
case <-time.After(100 * time.Millisecond):
producer.Debugf("Timed out of fast producer stall")
}
}

// Used to treat maps as efficient set
var needFlush = struct{}{}

Expand Down Expand Up @@ -2000,8 +2032,15 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
return true
}

// If we are a client and we detect that the consumer we are
// sending to is in a stalled state, go ahead and wait here
// with a limit.
if c.kind == CLIENT && client.out.stc != nil {
client.stalledWait(c)
}

// Check for closed connection
if client.nc == nil {
if client.flags.isSet(clearConnection) {
client.mu.Unlock()
return false
}
Expand All @@ -2016,20 +2055,22 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
// This is specifically looking at situations where we are getting behind and may want
// to intervene before this producer goes back to top of readloop. We are in the producer's
// readloop go routine at this point.
// FIXME(dlc) - We may call this alot, maybe suppress after first call?
if client.out.pm > 1 && client.out.pb > maxBufSize*2 {
client.flushSignal()
}

if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}

// Increment the flush pending signals if we are setting for the first time.
// Add the data size we are responsible for here. This will be processed when we
// return to the top of the readLoop.
if _, ok := c.pcd[client]; !ok {
client.out.fsp++
// Remember for when we return to the top of the loop.
c.pcd[client] = needFlush
}

if c.trace {
client.traceOutOp(string(mh[:len(mh)-LEN_CR_LF]), nil)
}

client.mu.Unlock()

return true
Expand Down Expand Up @@ -2587,9 +2628,17 @@ func (c *client) clearConnection(reason ClosedState) {
c.flags.set(clearConnection)

nc := c.nc
if nc == nil || c.srv == nil {
srv := c.srv
if nc == nil || srv == nil {
return
}

// Unblock anyone who is potentially stalled waiting on us.
if c.out.stc != nil {
close(c.out.stc)
c.out.stc = nil
}

// Flush any pending.
c.flushOutbound()

Expand All @@ -2609,8 +2658,8 @@ func (c *client) clearConnection(reason ClosedState) {
nc.SetWriteDeadline(time.Time{})

// Save off the connection if its a client.
if c.kind == CLIENT && c.srv != nil {
go c.srv.saveClosedClient(c, nc, reason)
if c.kind == CLIENT {
go srv.saveClosedClient(c, nc, reason)
}
}

Expand Down
5 changes: 4 additions & 1 deletion server/route.go
Expand Up @@ -440,7 +440,10 @@ func (c *client) processRouteInfo(info *Info) {
c.Debugf("Registering remote route %q", info.ID)

// Send our subs to the other side.
s.sendSubsToRoute(c)
s.startGoRoutine(func() {
s.sendSubsToRoute(c)
s.grWG.Done()
})

// Send info about the known gateways to this route.
s.sendGatewayConfigsToRoute(c)
Expand Down

0 comments on commit 8d39940

Please sign in to comment.