Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rvrangel committed Jun 2, 2021
1 parent 9c7f9a7 commit ce81ab6
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions writer.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"runtime"
"sync"

"github.com/pierrec/lz4/internal/xxh32"
)
Expand All @@ -29,6 +30,7 @@ type Writer struct {
data []byte // Data to be compressed + buffer for compressed data.
idx int // Index into data.
hashtable [winSize]int // Hash table used in CompressBlock().
buffers sync.Pool

// For concurrency.
c chan chan zResult // Channel for block compression goroutines and writer goroutine.
Expand Down Expand Up @@ -87,6 +89,9 @@ func (z *Writer) WithConcurrency(n int) *Writer {
if isCompressed := res.size&compressedBlockFlag == 0; isCompressed {
// It is now safe to release the buffer as no longer in use by any goroutine.
putBuffer(cap(res.data), res.data)
} else {
// if the block is uncompressed, return it the buffer to the pool
z.buffers.Put(res.data)
}
if h := z.OnBlockDone; h != nil {
h(n)
Expand Down Expand Up @@ -173,11 +178,12 @@ func (z *Writer) writeHeader() error {

// Write compresses data from the supplied buffer into the underlying io.Writer.
// Write does not return until the data has been written.
func (z *Writer) Write(buffer []byte) (int, error) {
buf := make([]byte, len(buffer))
copy(buf, buffer)

func (z *Writer) Write(buf []byte) (int, error) {
if !z.Header.done {
// prepare the sync.Pool on the first write since we now know the block max size
z.buffers.New = func() interface{} {
return make([]byte, z.Header.BlockMaxSize)
}
if err := z.writeHeader(); err != nil {
return 0, err
}
Expand Down Expand Up @@ -226,18 +232,20 @@ func (z *Writer) Write(buffer []byte) (int, error) {
}

// compressBlock compresses a block.
func (z *Writer) compressBlock(dataBlock []byte) error {
data := make([]byte, len(dataBlock))
copy(data, dataBlock)

func (z *Writer) compressBlock(data []byte) error {
if !z.NoChecksum {
_, _ = z.checksum.Write(data)
}

if z.c != nil {
c := make(chan zResult)
z.c <- c // Send now to guarantee order
go writerCompressBlock(c, z.Header, data)

// get a buffer from the pool and copy the data over
block := z.buffers.Get().([]byte)[:len(data)]
copy(block, data)

go writerCompressBlock(c, z.Header, block, &z.buffers)
return nil
}

Expand Down Expand Up @@ -305,7 +313,9 @@ func (z *Writer) Flush() error {
return nil
}

data := z.data[:z.idx]
data := z.buffers.Get().([]byte)[:len(z.data[:z.idx])]
copy(data, z.data[:z.idx])

z.idx = 0
if z.c == nil {
return z.compressBlock(data)
Expand All @@ -315,7 +325,7 @@ func (z *Writer) Flush() error {
}
c := make(chan zResult)
z.c <- c
writerCompressBlock(c, z.Header, data)
writerCompressBlock(c, z.Header, data, &z.buffers)
return nil
}

Expand Down Expand Up @@ -394,7 +404,7 @@ func (z *Writer) writeUint32(x uint32) error {

// writerCompressBlock compresses data into a pooled buffer and writes its result
// out to the input channel.
func writerCompressBlock(c chan zResult, header Header, data []byte) {
func writerCompressBlock(c chan zResult, header Header, data []byte, pool *sync.Pool) {
zdata := getBuffer(header.BlockMaxSize)
// The compressed block size cannot exceed the input's.
var zn int
Expand All @@ -408,6 +418,10 @@ func writerCompressBlock(c chan zResult, header Header, data []byte) {
if zn > 0 && zn < len(data) {
res.size = uint32(zn)
res.data = zdata[:zn]
// if it is compressed, we return the input buffer sooner to the pool
if pool != nil {
pool.Put(data)
}
} else {
res.size = uint32(len(data)) | compressedBlockFlag
res.data = data
Expand Down

0 comments on commit ce81ab6

Please sign in to comment.