Skip to content

Commit

Permalink
New fan-in logic
Browse files Browse the repository at this point in the history
Reworked fan in logic. We do not hold locks during IO, either read or write.
On scenarios where we can get behind mostly due to fan-in from fast producers
we detect and create a stall channel. Once we catch up we close the stall channel
to release all blocked producers. Producers have an upper bound on how long
they will be stalled.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Feb 20, 2019
1 parent 04d824c commit 0696d5a
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 74 deletions.
118 changes: 45 additions & 73 deletions server/client.go
Expand Up @@ -67,7 +67,6 @@ const (
startBufSize = 512 // For INFO/CONNECT block
minBufSize = 64 // Smallest to shrink to for PING/PONG
maxBufSize = 65536 // 64k
backedUpSize = maxBufSize * 4
shortsToShrink = 2
)

Expand Down Expand Up @@ -207,8 +206,9 @@ type outbound struct {
wdl time.Duration // Snapshot of write deadline.
mp int32 // Snapshot of max pending for client.
fsp int32 // Flush signals that are pending per producer from readLoop's pcd.
lft time.Duration // Last flush time.
lwb int32 // Last byte size of Write().
lft time.Duration // Last flush time for Write.
lwb int32 // Last byte size of Write.
stc chan struct{} // Stall chan we create to slow down producers on overrun, e.g. fan-in.
sgw bool // Indicate flusher is waiting on condition wait.
}

Expand Down Expand Up @@ -266,7 +266,6 @@ type readCache struct {
msgs int32
bytes int32
subs int32
psd time.Duration // Possible stall duration for fast producers.

rsz int32 // Read buffer size
srs int32 // Short reads, used for dynamic buffer resizing.
Expand Down Expand Up @@ -656,50 +655,6 @@ func (c *client) writeLoop() {
}
}

// Will determine if a client connection is considered behind due to outbound buffering. This will
// allow us to react and try to stabilize bursty fast producers or fan in scenarios.
// Assume lock is held.
func (c *client) outboundBackedUp() bool {
return c.out.pb > backedUpSize && c.out.pm > 1
}

// Given a raw estimated delay we will box with a floor and ceiling.
func (c *client) boxedDelay(psd time.Duration) time.Duration {
// Ceiling at 250ms.
if psd > 250*time.Millisecond {
c.Debugf("Stall delay of %v reset to max of %v", psd, 250*time.Millisecond)
psd = 250 * time.Millisecond
}
return psd
}

// This determines how much of the pending backlog for this client
// we should consider to be responsible for in our fast producer
// stall delay.
// Assume lock is held.
func (c *client) percentBacklog() int32 {
// If behind 50.0% of our max, wait for 100%.
// If behind 12.5% of our max, wait for 33%
// If behind 6.25% of our max, wait for 25%
// Otherwise wait 20%
// We also note how many others are writing.
// This will be at least one, but we dampen by half
fsp := c.out.fsp / 2
if fsp == 0 {
fsp = 1
}
switch {
case c.out.pb > c.out.mp/2:
return c.out.pb / 1 / fsp
case c.out.pb > c.out.mp/8:
return c.out.pb / 3 / fsp
case c.out.pb > c.out.mp/16:
return c.out.pb / 4 / fsp
default:
return c.out.pb / 5 / fsp
}
}

// flushClients will make sure to flush any clients we may have
// sent to during processing. We pass in a budget as a time.Duration
// for how much time to spend in place flushing for this client. This
Expand All @@ -725,18 +680,6 @@ func (c *client) flushClients(budget time.Duration) time.Time {
continue
}

// Check if this client is backed up and if so calculate any delay intention.
// We only delay clients, not routes, gateways etc.
if c.kind == CLIENT && cp.outboundBackedUp() {
td := cp.percentBacklog()
delay := time.Duration(td) * cp.out.lft / time.Duration(cp.out.lwb)
c.in.psd += delay
// Attempt to flush in place if we can. If we do subtract off of our potential stall delay.
if cp.flushOutbound() {
c.in.psd -= cp.out.lft
}
}

if budget > 0 && cp.flushOutbound() {
budget -= cp.out.lft
} else {
Expand Down Expand Up @@ -793,9 +736,6 @@ func (c *client) readLoop() {
c.in.bytes = 0
c.in.subs = 0

// Clear possible stall delay factor.
c.in.psd = 0

// Main call into parser for inbound data. This will generate callouts
// to process messages, etc.
if err := c.parse(b[:n]); err != nil {
Expand Down Expand Up @@ -828,16 +768,6 @@ func (c *client) readLoop() {
// Flush, or signal to writeLoop to flush to socket.
last := c.flushClients(budget)

// If we have detected clients that we are sending to are behind let's
// try to throttle a bit and see if they catch up. This allows servers to
// survive bursts and high fan-in scenarios. We do not hold our lock here
// so outbound is free to continue.
if psd := c.in.psd; psd > 25*time.Microsecond {
psd = c.boxedDelay(psd)
c.Debugf("Delaying %v due to fast producer", psd)
time.Sleep(psd)
}

// Update activity, check read buffer size.
c.mu.Lock()
nc := c.nc
Expand Down Expand Up @@ -1025,6 +955,13 @@ func (c *client) flushOutbound() bool {
c.flushSignal()
}

// Check if we have a stalled gate and if so and we are recovering release
// any stalled producers. Only kind==CLIENT will stall.
if c.out.stc != nil && (c.out.lwb == attempted || c.out.pb < c.out.mp/2) {
close(c.out.stc)
c.out.stc = nil
}

return true
}

Expand Down Expand Up @@ -1401,6 +1338,14 @@ func (c *client) queueOutbound(data []byte) bool {
} else {
c.out.p = append(c.out.p, data...)
}

// Check here if we should create a stall channel if we are falling behind.
// We do this here since if we wait for consumer's writeLoop it could be
// too late with large number of fan in producers.
if c.out.pb > c.out.mp/2 && c.out.stc == nil {
c.out.stc = make(chan struct{})
}

return referenced
}

Expand Down Expand Up @@ -1996,6 +1941,19 @@ func (c *client) msgHeader(mh []byte, sub *subscription, reply []byte) []byte {
return mh
}

func (c *client) stalledWait(producer *client) {
stall := c.out.stc
c.mu.Unlock()
defer c.mu.Lock()

// TODO(dlc) - Make the stall timeout variable based on health of consumer.
select {
case <-stall:
case <-time.After(100 * time.Millisecond):
producer.Debugf("Timed out of fast producer stall")
}
}

// Used to treat maps as efficient set
var needFlush = struct{}{}

Expand Down Expand Up @@ -2075,6 +2033,13 @@ func (c *client) deliverMsg(sub *subscription, mh, msg []byte) bool {
return true
}

// If we are a client and we detect that the consumer we are
// sending to is in a stalled state, go ahead and wait here
// with a limit.
if c.kind == CLIENT && client.out.stc != nil {
client.stalledWait(c)
}

// Check for closed connection
if client.flags.isSet(clearConnection) {
client.mu.Unlock()
Expand Down Expand Up @@ -2668,6 +2633,13 @@ func (c *client) clearConnection(reason ClosedState) {
if nc == nil || srv == nil {
return
}

// Unblock anyone who is potentially stalled waiting on us.
if c.out.stc != nil {
close(c.out.stc)
c.out.stc = nil
}

// Flush any pending.
c.flushOutbound()

Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Expand Up @@ -68,7 +68,7 @@ const (
MAX_PAYLOAD_SIZE = (1024 * 1024)

// MAX_PENDING_SIZE is the maximum outbound pending bytes per client.
MAX_PENDING_SIZE = (128 * 1024 * 1024)
MAX_PENDING_SIZE = (64 * 1024 * 1024)

// DEFAULT_MAX_CONNECTIONS is the default maximum connections allowed.
DEFAULT_MAX_CONNECTIONS = (64 * 1024)
Expand Down

0 comments on commit 0696d5a

Please sign in to comment.