diff --git a/server/client.go b/server/client.go index 1c4725282f..e3364c8a80 100644 --- a/server/client.go +++ b/server/client.go @@ -346,23 +346,20 @@ func nbPoolGet(sz int) []byte { } } -func nbPoolPut(in []byte) { - ca := cap(in) - for in = in[:ca]; ca >= nbPoolSizeSmall; ca = cap(in) { - switch { - case ca >= nbPoolSizeLarge: - b := (*[nbPoolSizeLarge]byte)(in[0:nbPoolSizeLarge:nbPoolSizeLarge]) - nbPoolLarge.Put(b) - in = in[nbPoolSizeLarge:] - case ca >= nbPoolSizeMedium: - b := (*[nbPoolSizeMedium]byte)(in[0:nbPoolSizeMedium:nbPoolSizeMedium]) - nbPoolMedium.Put(b) - in = in[nbPoolSizeMedium:] - case ca >= nbPoolSizeSmall: - b := (*[nbPoolSizeSmall]byte)(in[0:nbPoolSizeSmall:nbPoolSizeSmall]) - nbPoolSmall.Put(b) - in = in[nbPoolSizeSmall:] - } +func nbPoolPut(b []byte) { + switch cap(b) { + case nbPoolSizeSmall: + b := (*[nbPoolSizeSmall]byte)(b[0:nbPoolSizeSmall]) + nbPoolSmall.Put(b) + case nbPoolSizeMedium: + b := (*[nbPoolSizeMedium]byte)(b[0:nbPoolSizeMedium]) + nbPoolMedium.Put(b) + case nbPoolSizeLarge: + b := (*[nbPoolSizeLarge]byte)(b[0:nbPoolSizeLarge]) + nbPoolLarge.Put(b) + default: + // Ignore frames that are the wrong size, this might happen + // with WebSocket/MQTT messages as they are framed } } diff --git a/server/websocket.go b/server/websocket.go index 8163568d05..014a1d72fc 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -1345,6 +1345,11 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) { } csz = len(h) + ol } + // Make sure that the compressor no longer holds a reference to + // the bytes.Buffer, so that the underlying memory gets cleaned + // up after flushOutbound/flushAndClose. For this to be safe, we + // always cp.Reset(...) before reusing the compressor again. + cp.Reset(nil) // Add to pb the compressed data size (including headers), but // remove the original uncompressed data size that was added // during the queueing.