From ce81ab6b4f4accd3d5918c01151db78fa5f450a4 Mon Sep 17 00:00:00 2001 From: Renan Rangel Date: Wed, 2 Jun 2021 04:36:24 -0700 Subject: [PATCH] PR feedback --- writer.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/writer.go b/writer.go index 6723ac60..972d4677 100644 --- a/writer.go +++ b/writer.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "runtime" + "sync" "github.com/pierrec/lz4/internal/xxh32" ) @@ -29,6 +30,7 @@ type Writer struct { data []byte // Data to be compressed + buffer for compressed data. idx int // Index into data. hashtable [winSize]int // Hash table used in CompressBlock(). + buffers sync.Pool // For concurrency. c chan chan zResult // Channel for block compression goroutines and writer goroutine. @@ -87,6 +89,9 @@ func (z *Writer) WithConcurrency(n int) *Writer { if isCompressed := res.size&compressedBlockFlag == 0; isCompressed { // It is now safe to release the buffer as no longer in use by any goroutine. putBuffer(cap(res.data), res.data) + } else { + // if the block is uncompressed, return it the buffer to the pool + z.buffers.Put(res.data) } if h := z.OnBlockDone; h != nil { h(n) @@ -173,11 +178,12 @@ func (z *Writer) writeHeader() error { // Write compresses data from the supplied buffer into the underlying io.Writer. // Write does not return until the data has been written. -func (z *Writer) Write(buffer []byte) (int, error) { - buf := make([]byte, len(buffer)) - copy(buf, buffer) - +func (z *Writer) Write(buf []byte) (int, error) { if !z.Header.done { + // prepare the sync.Pool on the first write since we now know the block max size + z.buffers.New = func() interface{} { + return make([]byte, z.Header.BlockMaxSize) + } if err := z.writeHeader(); err != nil { return 0, err } @@ -226,10 +232,7 @@ func (z *Writer) Write(buffer []byte) (int, error) { } // compressBlock compresses a block. -func (z *Writer) compressBlock(dataBlock []byte) error { - data := make([]byte, len(dataBlock)) - copy(data, dataBlock) - +func (z *Writer) compressBlock(data []byte) error { if !z.NoChecksum { _, _ = z.checksum.Write(data) } @@ -237,7 +240,12 @@ func (z *Writer) compressBlock(dataBlock []byte) error { if z.c != nil { c := make(chan zResult) z.c <- c // Send now to guarantee order - go writerCompressBlock(c, z.Header, data) + + // get a buffer from the pool and copy the data over + block := z.buffers.Get().([]byte)[:len(data)] + copy(block, data) + + go writerCompressBlock(c, z.Header, block, &z.buffers) return nil } @@ -305,7 +313,9 @@ func (z *Writer) Flush() error { return nil } - data := z.data[:z.idx] + data := z.buffers.Get().([]byte)[:len(z.data[:z.idx])] + copy(data, z.data[:z.idx]) + z.idx = 0 if z.c == nil { return z.compressBlock(data) @@ -315,7 +325,7 @@ func (z *Writer) Flush() error { } c := make(chan zResult) z.c <- c - writerCompressBlock(c, z.Header, data) + writerCompressBlock(c, z.Header, data, &z.buffers) return nil } @@ -394,7 +404,7 @@ func (z *Writer) writeUint32(x uint32) error { // writerCompressBlock compresses data into a pooled buffer and writes its result // out to the input channel. -func writerCompressBlock(c chan zResult, header Header, data []byte) { +func writerCompressBlock(c chan zResult, header Header, data []byte, pool *sync.Pool) { zdata := getBuffer(header.BlockMaxSize) // The compressed block size cannot exceed the input's. var zn int @@ -408,6 +418,10 @@ func writerCompressBlock(c chan zResult, header Header, data []byte) { if zn > 0 && zn < len(data) { res.size = uint32(zn) res.data = zdata[:zn] + // if it is compressed, we return the input buffer sooner to the pool + if pool != nil { + pool.Put(data) + } } else { res.size = uint32(len(data)) | compressedBlockFlag res.data = data