diff --git a/server/client.go b/server/client.go index 1a43a14984..57e62d0cfe 100644 --- a/server/client.go +++ b/server/client.go @@ -67,7 +67,6 @@ const ( startBufSize = 512 // For INFO/CONNECT block minBufSize = 64 // Smallest to shrink to for PING/PONG maxBufSize = 65536 // 64k - backedUpSize = maxBufSize * 4 shortsToShrink = 2 ) @@ -207,8 +206,9 @@ type outbound struct { wdl time.Duration // Snapshot of write deadline. mp int32 // Snapshot of max pending for client. fsp int32 // Flush signals that are pending per producer from readLoop's pcd. - lft time.Duration // Last flush time. - lwb int32 // Last byte size of Write(). + lft time.Duration // Last flush time for Write. + lwb int32 // Last byte size of Write. + stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in. sgw bool // Indicate flusher is waiting on condition wait. } @@ -266,7 +266,6 @@ type readCache struct { msgs int32 bytes int32 subs int32 - psd time.Duration // Possible stall duration for fast producers. rsz int32 // Read buffer size srs int32 // Short reads, used for dynamic buffer resizing. @@ -656,50 +655,6 @@ func (c *client) writeLoop() { } } -// Will determine if a client connection is considered behind due to outbound buffering. This will -// allow us to react and try to stabilize bursty fast producers or fan in scenarios. -// Assume lock is held. -func (c *client) outboundBackedUp() bool { - return c.out.pb > backedUpSize && c.out.pm > 1 -} - -// Given a raw estimated delay we will box with a floor and ceiling. -func (c *client) boxedDelay(psd time.Duration) time.Duration { - // Ceiling at 250ms. - if psd > 250*time.Millisecond { - c.Debugf("Stall delay of %v reset to max of %v", psd, 250*time.Millisecond) - psd = 250 * time.Millisecond - } - return psd -} - -// This determines how much of the pending backlog for this client -// we should consider to be responsible for in our fast producer -// stall delay. -// Assume lock is held. -func (c *client) percentBacklog() int32 { - // If behind 50.0% of our max, wait for 100%. - // If behind 12.5% of our max, wait for 33% - // If behind 6.25% of our max, wait for 25% - // Otherwise wait 20% - // We also note how many others are writing. - // This will be at least one, but we dampen by half - fsp := c.out.fsp / 2 - if fsp == 0 { - fsp = 1 - } - switch { - case c.out.pb > c.out.mp/2: - return c.out.pb / 1 / fsp - case c.out.pb > c.out.mp/8: - return c.out.pb / 3 / fsp - case c.out.pb > c.out.mp/16: - return c.out.pb / 4 / fsp - default: - return c.out.pb / 5 / fsp - } -} - // flushClients will make sure to flush any clients we may have // sent to during processing. We pass in a budget as a time.Duration // for how much time to spend in place flushing for this client. This @@ -725,18 +680,6 @@ func (c *client) flushClients(budget time.Duration) time.Time { continue } - // Check if this client is backed up and if so calculate any delay intention. - // We only delay clients, not routes, gateways etc. - if c.kind == CLIENT && cp.outboundBackedUp() { - td := cp.percentBacklog() - delay := time.Duration(td) * cp.out.lft / time.Duration(cp.out.lwb) - c.in.psd += delay - // Attempt to flush in place if we can. If we do subtract off of our potential stall delay. - if cp.flushOutbound() { - c.in.psd -= cp.out.lft - } - } - if budget > 0 && cp.flushOutbound() { budget -= cp.out.lft } else { @@ -793,9 +736,6 @@ func (c *client) readLoop() { c.in.bytes = 0 c.in.subs = 0 - // Clear possible stall delay factor. - c.in.psd = 0 - // Main call into parser for inbound data. This will generate callouts // to process messages, etc. if err := c.parse(b[:n]); err != nil { @@ -828,16 +768,6 @@ func (c *client) readLoop() { // Flush, or signal to writeLoop to flush to socket. last := c.flushClients(budget) - // If we have detected clients that we are sending to are behind let's - // try to throttle a bit and see if they catch up. This allows servers to - // survive bursts and high fan-in scenarios. We do not hold our lock here - // so outbound is free to continue. - if psd := c.in.psd; psd > 25*time.Microsecond { - psd = c.boxedDelay(psd) - c.Debugf("Delaying %v due to fast producer", psd) - time.Sleep(psd) - } - // Update activity, check read buffer size. c.mu.Lock() nc := c.nc @@ -1025,6 +955,13 @@ func (c *client) flushOutbound() bool { c.flushSignal() } + // Check if we have a stalled gate and if so and we are recovering release + // any stalled producers. Only kind==CLIENT will stall. + if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) { + close(c.out.stc) + c.out.stc = nil + } + return true } @@ -1401,6 +1338,14 @@ func (c *client) queueOutbound(data []byte) bool { } else { c.out.p = append(c.out.p, data...) } + + // Check here if we should create a stall channel if we are falling behind. + // We do this here since if we wait for consumer's writeLoop it could be + // too late with large number of fan in producers. + if c.out.pb > c.out.mp/2 && c.out.stc == nil { + c.out.stc = make(chan struct{}) + } + return referenced } @@ -1996,6 +1941,19 @@ func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte { return mh } +func (c *client) stalledWait(producer *client) { + stall := c.out.stc + c.mu.Unlock() + defer c.mu.Lock() + + // TODO(dlc) - Make the stall timeout variable based on health of consumer. + select { + case <-stall: + case <-time.After(100 * time.Millisecond): + producer.Debugf("Timed out of fast producer stall") + } +} + // Used to treat maps as efficient set var needFlush = struct{}{} @@ -2075,6 +2033,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool { return true } + // If we are a client and we detect that the consumer we are + // sending to is in a stalled state, go ahead and wait here + // with a limit. + if c.kind == CLIENT && client.out.stc != nil { + client.stalledWait(c) + } + // Check for closed connection if client.flags.isSet(clearConnection) { client.mu.Unlock() @@ -2668,6 +2633,13 @@ func (c *client) clearConnection(reason ClosedState) { if nc == nil || srv == nil { return } + + // Unblock anyone who is potentially stalled waiting on us. + if c.out.stc != nil { + close(c.out.stc) + c.out.stc = nil + } + // Flush any pending. c.flushOutbound() diff --git a/server/const.go b/server/const.go index 03000956df..16b8bab9bc 100644 --- a/server/const.go +++ b/server/const.go @@ -68,7 +68,7 @@ const ( MAX_PAYLOAD_SIZE = (1024 * 1024) // MAX_PENDING_SIZE is the maximum outbound pending bytes per client. - MAX_PENDING_SIZE = (128 * 1024 * 1024) + MAX_PENDING_SIZE = (64 * 1024 * 1024) // DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed. DEFAULT_MAX_CONNECTIONS = (64 * 1024)