Skip to content

Commit

Permalink
Restore outbound queue coalescing (#4093)
Browse files Browse the repository at this point in the history
This PR effectively reverts part of #4084 which removed the coalescing
from the outbound queues as I initially thought it was the source of a
race condition.

Further investigation has proven that not only was that untrue (the race
actually came from the WebSocket code, all coalescing operations happen
under the client lock) but removing the coalescing also worsens
performance.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Apr 25, 2023
2 parents 70b635e + 2206f9e commit 08d3418
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
29 changes: 20 additions & 9 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,14 @@ var nbPoolLarge = &sync.Pool{
}

func nbPoolGet(sz int) []byte {
var new []byte
switch {
case sz <= nbPoolSizeSmall:
ptr := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)
new = ptr[:0]
return nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:0]
case sz <= nbPoolSizeMedium:
ptr := nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)
new = ptr[:0]
return nbPoolMedium.Get().(*[nbPoolSizeMedium]byte)[:0]
default:
ptr := nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)
new = ptr[:0]
return nbPoolLarge.Get().(*[nbPoolSizeLarge]byte)[:0]
}
return new
}

func nbPoolPut(b []byte) {
Expand Down Expand Up @@ -1447,7 +1442,8 @@ func (c *client) flushOutbound() bool {
// "nb" will be reset back to its starting position so it can be modified
// safely by queueOutbound calls.
c.out.wnb = append(c.out.wnb, collapsed...)
orig := append(net.Buffers{}, c.out.wnb...)
var _orig [1024][]byte
orig := append(_orig[:0], c.out.wnb...)
c.out.nb = c.out.nb[:0]

// Since WriteTo is lopping things off the beginning, we need to remember
Expand Down Expand Up @@ -2041,6 +2037,21 @@ func (c *client) queueOutbound(data []byte) {
// without affecting the original "data" slice.
toBuffer := data

// All of the queued []byte have a fixed capacity, so if there's a []byte
// at the tail of the buffer list that isn't full yet, we should top that
// up first. This helps to ensure we aren't pulling more []bytes from the
// pool than we need to.
if len(c.out.nb) > 0 {
last := &c.out.nb[len(c.out.nb)-1]
if free := cap(*last) - len(*last); free > 0 {
if l := len(toBuffer); l < free {
free = l
}
*last = append(*last, toBuffer[:free]...)
toBuffer = toBuffer[free:]
}
}

// Now we can push the rest of the data into new []bytes from the pool
// in fixed size chunks. This ensures we don't go over the capacity of any
// of the buffers and end up reallocating.
Expand Down
60 changes: 60 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1483,6 +1483,66 @@ func TestWildcardCharsInLiteralSubjectWorks(t *testing.T) {
}
}

// This test ensures that coalescing into the fixed-size output
// queues works as expected. When bytes are queued up, they should
// not overflow a buffer until the capacity is exceeded, at which
// point a new buffer should be added.
func TestClientOutboundQueueCoalesce(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()

clients := s.GlobalAccount().getClients()
if len(clients) != 1 {
t.Fatal("Expecting a client to exist")
}
client := clients[0]
client.mu.Lock()
defer client.mu.Unlock()

// First up, queue something small into the queue.
client.queueOutbound([]byte{1, 2, 3, 4, 5})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 5 {
t.Fatalf("Expecting only 5 bytes in the first queued buffer, found %d instead", l)
}

// Then queue up a few more bytes, but not enough
// to overflow into the next buffer.
client.queueOutbound([]byte{6, 7, 8, 9, 10})

if len(client.out.nb) != 1 {
t.Fatal("Expecting a single queued buffer")
}
if l := len(client.out.nb[0]); l != 10 {
t.Fatalf("Expecting 10 bytes in the first queued buffer, found %d instead", l)
}

// Finally, queue up something that is guaranteed
// to overflow.
b := nbPoolSmall.Get().(*[nbPoolSizeSmall]byte)[:]
b = b[:cap(b)]
client.queueOutbound(b)
if len(client.out.nb) != 2 {
t.Fatal("Expecting buffer to have overflowed")
}
if l := len(client.out.nb[0]); l != cap(b) {
t.Fatalf("Expecting %d bytes in the first queued buffer, found %d instead", cap(b), l)
}
if l := len(client.out.nb[1]); l != 10 {
t.Fatalf("Expecting 10 bytes in the second queued buffer, found %d instead", l)
}
}

// This test ensures that outbound queues don't cause a run on
// memory when sending something to lots of clients.
func TestClientOutboundQueueMemory(t *testing.T) {
Expand Down

0 comments on commit 08d3418

Please sign in to comment.