From 278430dc1c251a91d74df87ff7bdd7cfe3e5bf14 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Fri, 19 Feb 2021 18:32:53 +1100 Subject: [PATCH 1/4] gunzip: implement NewReader and NewReaderN using Reset This matches NewWriter, and reduces the possibility of forgetting to update one of the initialisation functions when making changes. Signed-off-by: Aleksa Sarai --- gunzip.go | 36 +++--------------------------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/gunzip.go b/gunzip.go index d1ae730..9fc1314 100644 --- a/gunzip.go +++ b/gunzip.go @@ -110,20 +110,7 @@ type read struct { // The implementation buffers input and may read more data than necessary from r. // It is the caller's responsibility to call Close on the Reader when done. func NewReader(r io.Reader) (*Reader, error) { - z := new(Reader) - z.blocks = defaultBlocks - z.blockSize = defaultBlockSize - z.r = makeReader(r) - z.digest = crc32.NewIEEE() - z.multistream = true - z.blockPool = make(chan []byte, z.blocks) - for i := 0; i < z.blocks; i++ { - z.blockPool <- make([]byte, z.blockSize) - } - if err := z.readHeader(true); err != nil { - return nil, err - } - return z, nil + return NewReaderN(r, defaultBlockSize, defaultBlocks) } // NewReaderN creates a new Reader reading the given reader. @@ -140,25 +127,8 @@ func NewReaderN(r io.Reader, blockSize, blocks int) (*Reader, error) { z := new(Reader) z.blocks = blocks z.blockSize = blockSize - z.r = makeReader(r) - z.digest = crc32.NewIEEE() - z.multistream = true - - // Account for too small values - if z.blocks <= 0 { - z.blocks = defaultBlocks - } - if z.blockSize <= 512 { - z.blockSize = defaultBlockSize - } - z.blockPool = make(chan []byte, z.blocks) - for i := 0; i < z.blocks; i++ { - z.blockPool <- make([]byte, z.blockSize) - } - if err := z.readHeader(true); err != nil { - return nil, err - } - return z, nil + err := z.Reset(r) + return z, err } // Reset discards the Reader z's state and makes it equivalent to the From d6725aabad192e05f8a231df2350d27ff202ceb0 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Fri, 19 Feb 2021 18:56:15 +1100 Subject: [PATCH 2/4] gunzip: switch to sync.Pool for blockPool This stops us from causing goroutine deadlocks when we re-add a buffer after the stream has been read and the read-ahead goroutine is dead. Note that with this change, it is possible for more blocks to be allocated than the user requested. Signed-off-by: Aleksa Sarai --- gunzip.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/gunzip.go b/gunzip.go index 9fc1314..733f922 100644 --- a/gunzip.go +++ b/gunzip.go @@ -98,7 +98,7 @@ type Reader struct { activeRA bool // Indication if readahead is active mu sync.Mutex // Lock for above - blockPool chan []byte + blockPool *sync.Pool } type read struct { @@ -151,9 +151,16 @@ func (z *Reader) Reset(r io.Reader) error { } if z.blockPool == nil { - z.blockPool = make(chan []byte, z.blocks) + // Save this in a closure, so as to avoid races. + blockSize := z.blockSize + z.blockPool = &sync.Pool{ + New: func() interface{} { + return make([]byte, blockSize) + }, + } + // Pre-fill the pool. for i := 0; i < z.blocks; i++ { - z.blockPool <- make([]byte, z.blockSize) + z.blockPool.Put(z.blockPool.New()) } } @@ -304,11 +311,11 @@ func (z *Reader) killReadAhead() error { for blk := range z.readAhead { if blk.b != nil { - z.blockPool <- blk.b + z.blockPool.Put(blk.b) } } if cap(z.current) > 0 { - z.blockPool <- z.current + z.blockPool.Put(z.current) z.current = nil } if !ok { @@ -360,9 +367,10 @@ func (z *Reader) doReadAhead() { for { var buf []byte select { - case buf = <-z.blockPool: case <-closeReader: return + default: + buf = z.blockPool.Get().([]byte) } buf = buf[0:z.blockSize] // Try to fill the buffer @@ -398,7 +406,7 @@ func (z *Reader) doReadAhead() { case z.readAhead <- read{b: buf, err: err}: case <-closeReader: // Sent on close, we don't care about the next results - z.blockPool <- buf + z.blockPool.Put(buf) return } if err != nil { @@ -440,7 +448,7 @@ func (z *Reader) Read(p []byte) (n int, err error) { if len(p) >= len(avail) { // If len(p) >= len(current), return all content of current n = copy(p, avail) - z.blockPool <- z.current + z.blockPool.Put(z.current) z.current = nil if z.lastBlock { err = io.EOF @@ -514,7 +522,7 @@ func (z *Reader) WriteTo(w io.Writer) (n int64, err error) { return total, err } // Put block back - z.blockPool <- read.b + z.blockPool.Put(read.b) if z.lastBlock { break } From cb18aaa4accad314857503f49375ba381a3387c9 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Fri, 19 Feb 2021 19:29:24 +1100 Subject: [PATCH 3/4] gunzip: handle io.EOF errors correctly in WriteTo WriteTo should not return io.EOF because it is assumed to have io.Copy semantics (namely, on success you return no error -- even if there were no bytes copied). Several parts of WriteTo would return io.EOF -- all of which need to be switched to special-case io.EOF. In addition, Read would save io.EOF in z.err in some specific corner cases -- these appear to be oversights and thus are fixed to not store io.EOF in z.err. Signed-off-by: Aleksa Sarai --- gunzip.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/gunzip.go b/gunzip.go index 733f922..8337ccc 100644 --- a/gunzip.go +++ b/gunzip.go @@ -481,7 +481,9 @@ func (z *Reader) Read(p []byte) (n int, err error) { // Is there another? if err = z.readHeader(false); err != nil { - z.err = err + if err != io.EOF { + z.err = err + } return } @@ -492,8 +494,11 @@ func (z *Reader) Read(p []byte) (n int, err error) { func (z *Reader) WriteTo(w io.Writer) (n int64, err error) { total := int64(0) for { - if z.err != nil { - return total, z.err + if err = z.err; err != nil { + if err == io.EOF { + err = nil + } + return total, err } // We write both to output and digest. for { @@ -530,8 +535,12 @@ func (z *Reader) WriteTo(w io.Writer) (n int64, err error) { // Finished file; check checksum + size. if _, err := io.ReadFull(z.r, z.buf[0:8]); err != nil { - z.err = err - return total, err + if total == 0 && err == io.EOF { + err = nil + } else { + z.err = err + return total, err + } } crc32, isize := get4(z.buf[0:4]), get4(z.buf[4:8]) sum := z.digest.Sum32() From 60383327defb3317ca49e0dbc0381696065907b3 Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Fri, 19 Feb 2021 19:31:40 +1100 Subject: [PATCH 4/4] gunzip: add EOF-handling tests Before this patchset, these tests would either lock up or fail with spurrious EOF errors. Signed-off-by: Aleksa Sarai --- gunzip_test.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/gunzip_test.go b/gunzip_test.go index 135a6a5..d73932d 100644 --- a/gunzip_test.go +++ b/gunzip_test.go @@ -570,6 +570,65 @@ func TestWriteTo(t *testing.T) { } } +func TestCopyAfterReadAll(t *testing.T) { + for _, tt := range gunzipTests { + if tt.err != nil { + // Only use valid gzip tests. + continue + } + + dec, err := NewReader(bytes.NewBuffer(tt.gzip)) + if err != nil { + t.Fatal(err) + } + + // Read the stream twice with io.Copy. + if b, err := ioutil.ReadAll(ioutil.NopCloser(dec)); err != nil { + t.Fatalf("ioutil.ReadAll end of stream: unexpected error %v", err) + } else if !bytes.Equal(b, []byte(tt.raw)) { + t.Fatal("ioutil.ReadAll output didn't match input") + } + if n, err := io.Copy(ioutil.Discard, dec); err != nil { + t.Fatalf("io.Copy end of stream: unexpected error %v", err) + } else if n != 0 { + t.Fatalf("io.Copy at end of stream should read no bytes, got %v", n) + } + } +} + +func TestMultipleCopy(t *testing.T) { + for _, tt := range gunzipTests { + if tt.err != nil { + // Only use valid gzip tests. + continue + } + + dec, err := NewReader(bytes.NewBuffer(tt.gzip)) + if err != nil { + t.Fatal(err) + } + + // Read to the end of the stream (using WriteTo). + if n, err := io.Copy(ioutil.Discard, dec); err != nil { + t.Fatalf("io.Copy full stream: unexpected error %v", err) + } else if n != int64(len(tt.raw)) { + t.Fatal("did not decompress everything") + } + + // Now try to read again using Read and WriteTo. + if b, err := ioutil.ReadAll(ioutil.NopCloser(dec)); err != nil { + t.Fatalf("ioutil.ReadAll end of stream: unexpected error %v", err) + } else if len(b) != 0 { + t.Fatalf("ioutil.ReadAll at end of stream should read no bytes, got %v", b) + } + if n, err := io.Copy(ioutil.Discard, dec); err != nil { + t.Fatalf("io.Copy end of stream: unexpected error %v", err) + } else if n != 0 { + t.Fatalf("io.Copy at end of stream should read no bytes, got %v", n) + } + } +} + func BenchmarkGunzipCopy(b *testing.B) { dat, _ := ioutil.ReadFile("testdata/test.json") dat = append(dat, dat...)