Skip to content

Commit

Permalink
Zstandard Decoder (#98)
Browse files Browse the repository at this point in the history
Add zstd decoder.
  • Loading branch information
klauspost committed Apr 23, 2019
1 parent 30be604 commit a0353f7
Show file tree
Hide file tree
Showing 438 changed files with 3,880 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -9,6 +9,8 @@ os:
go:
- 1.9.x
- 1.10.x
- 1.11.x
- 1.12.x
- master

install:
Expand Down
1 change: 1 addition & 0 deletions README.md
Expand Up @@ -14,6 +14,7 @@ It offers slightly better compression at lower compression settings, and up to 3

# changelog

* Apr 22, 2019: [zstd](https://github.com/klauspost/compress/tree/master/zstd) decompression added.
* Aug 1, 2018: Added [huff0 README](https://github.com/klauspost/compress/tree/master/huff0#huff0-entropy-compression).
* Jul 8, 2018: Added [Performance Update 2018](#performance-update-2018) below.
* Jun 23, 2018: Merged [Go 1.11 inflate optimizations](https://go-review.googlesource.com/c/go/+/102235). Go 1.9 is now required. Backwards compatible version tagged with [v1.3.0](https://github.com/klauspost/compress/releases/tag/v1.3.0).
Expand Down
2 changes: 1 addition & 1 deletion flate/deflate_test.go
Expand Up @@ -162,7 +162,7 @@ func TestVeryLongSparseChunk(t *testing.T) {
t.Errorf("NewWriter: %v", err)
return
}
if _, err = io.Copy(w, &sparseReader{l: 23E8}); err != nil {
if _, err = io.Copy(w, &sparseReader{l: 23e8}); err != nil {
t.Errorf("Compress failed: %v", err)
return
}
Expand Down
5 changes: 4 additions & 1 deletion huff0/decompress.go
Expand Up @@ -278,6 +278,9 @@ bigloop:
tmp[off+1+bufoff*3] = decode(&br[3])
off += 2
if off == bufoff {
if bufoff > dstEvery {
return nil, errors.New("corruption detected: stream overrun")
}
copy(dstOut, tmp[:bufoff])
copy(dstOut[dstEvery:], tmp[bufoff:bufoff*2])
copy(dstOut[dstEvery*2:], tmp[bufoff*2:bufoff*3])
Expand All @@ -292,7 +295,7 @@ bigloop:
}
if off > 0 {
ioff := int(off)
if len(dstOut) < dstEvery*3-ioff {
if len(dstOut) < dstEvery*3+ioff {
return nil, errors.New("corruption detected: stream overrun")
}
copy(dstOut, tmp[:off])
Expand Down
150 changes: 150 additions & 0 deletions zstd/README.md
@@ -0,0 +1,150 @@
# zstd

[Zstandard](https://facebook.github.io/zstd/) is a real-time compression algorithm, providing high compression ratios.
It offers a very wide range of compression / speed trade-off, while being backed by a very fast decoder.

Currently this package provides *decompression* of zstandard compressed content.
Note that custom dictionaries are not supported yet, so if your code relies on that,
you cannot use the package as-is.

This package is pure Go and without use of "unsafe".

The `zstd` package is provided as open source software using a Go standard license.


## Decompressor

STATUS: BETA - there may still be subtle bugs, but a wide variety of content has been tested.


### Usage

Install using `go get -u github.com/klauspost/compress`. The package is located in `github.com/klauspost/compress/zstd`.

Godoc Documentation: https://golang.org/github.com/klauspost/compress/zstd

You will also need the `github.com/cespare/xxhash` package.

The package has been designed for two main usages, big streams of data and smaller in-memory buffers.
There are two main usages of the package for these. Both of them are accessed by creating a `Decoder`.

For streaming use a simple setup could look like this:

```Go
import "github.com/klauspost/compress/zstd"

func Decompress(in io.Reader, out io.Writer) error {
d, err := zstd.NewReader(input)
if err != nil {
return err
}
defer d.Close()

// Copy content...
_, err := io.Copy(out, d)
return err
}
```

It is important to use the "Close" function when you no longer need the Reader to stop running goroutines.
See "Allocation-less operation" below.

For decoding buffers, it could look something like this:

```Go
import "github.com/klauspost/compress/zstd"

// Create a reader that caches decompressors.
// For this operation type we supply a nil Reader.
var decoder, _ = zstd.NewReader(nil)

// Decompress a buffer. We don't supply a destination buffer,
// so it will be allocated by the decoder.
func Decompress(src []byte) ([]byte, error) {
return decoder.DecodeAll(src, nil)
}
```

Both of these cases should provide the functionality needed.
The decoder can be used for *concurrent* decompression of multiple buffers.
It will only allow a certain number of concurrent operations to run.
To tweak that yourself use the `WithDecoderConcurrency(n)` option when creating the decoder.

### Allocation-less operation

The decoder has been designed to operate without allocations after a warmup.

This means that you should *store* the decoder for best performance.
To re-use a stream decoder, use the `Reset(r io.Reader) error` to switch to another stream.
A decoder can safely be re-used even if the previous stream failed.

To release the resources, you must call the `Close()` function on a decoder.
After this it can *no longer be reused*, but all running goroutines will be stopped.
So you *must* use this if you will no longer need the Reader.

For decompressing smaller buffers a single decoder can be used.
When decoding buffers, you can supply a destination slice with length 0 and your expected capacity.
In this case no unneeded allocations should be made.

### Concurrency

The buffer decoder does everything on the same goroutine and does nothing concurrently.
It can however decode several buffers concurrently. Use `WithDecoderConcurrency(n)` to limit that.

The stream decoder operates on

* One goroutine reads input and splits the input to several block decoders.
* A number of decoders will decode blocks.
* A goroutine coordinates these blocks and sends history from one to the next.

So effectively this also means the decoder will "read ahead" and prepare data to always be available for output.

Since "blocks" are quite dependent on the output of the previous block stream decoding will only have limited concurrency.

In practice this means that concurrency is often limited to utilizing about 2 cores effectively.


### Benchmarks

These are some examples of performance compared to [datadog cgo library](https://github.com/DataDog/zstd).

The first two are streaming decodes and the last are smaller inputs.

```
BenchmarkDecoderSilesia-8 20 642550210 ns/op 329.85 MB/s 3101 B/op 8 allocs/op
BenchmarkDecoderSilesiaCgo-8 100 384930000 ns/op 550.61 MB/s 451878 B/op 9713 allocs/op
BenchmarkDecoderEnwik9-2 10 3146000080 ns/op 317.86 MB/s 2649 B/op 9 allocs/op
BenchmarkDecoderEnwik9Cgo-2 20 1905900000 ns/op 524.69 MB/s 1125120 B/op 45785 allocs/op
BenchmarkDecoder_DecodeAll/z000000.zst-8 200 7049994 ns/op 138.26 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000001.zst-8 100000 19560 ns/op 97.49 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000002.zst-8 5000 297599 ns/op 236.99 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000003.zst-8 2000 725502 ns/op 141.17 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000004.zst-8 200000 9314 ns/op 54.54 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000005.zst-8 10000 137500 ns/op 104.72 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000006.zst-8 500 2316009 ns/op 206.06 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000007.zst-8 20000 64499 ns/op 344.90 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000008.zst-8 50000 24900 ns/op 219.56 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAll/z000009.zst-8 1000 2348999 ns/op 154.01 MB/s 40 B/op 2 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000000.zst-8 500 4268005 ns/op 228.38 MB/s 1228849 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000001.zst-8 100000 15250 ns/op 125.05 MB/s 2096 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000002.zst-8 10000 147399 ns/op 478.49 MB/s 73776 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000003.zst-8 5000 320798 ns/op 319.27 MB/s 139312 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000004.zst-8 200000 10004 ns/op 50.77 MB/s 560 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000005.zst-8 20000 73599 ns/op 195.64 MB/s 19120 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000006.zst-8 1000 1119003 ns/op 426.48 MB/s 557104 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000007.zst-8 20000 103450 ns/op 215.04 MB/s 71296 B/op 9 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000008.zst-8 100000 20130 ns/op 271.58 MB/s 6192 B/op 3 allocs/op
BenchmarkDecoder_DecodeAllCgo/z000009.zst-8 2000 1123500 ns/op 322.00 MB/s 368688 B/op 3 allocs/op
```

This reflects the performance around May 2019, but this may be out of date.

# Contributions

Contributions are always welcome.
For new features/fixes, remember to add tests and for performance enhancements include benchmarks.

For sending files for reproducing errors use a service like [goobox](https://goobox.io/#/upload) or similar to share your files.
121 changes: 121 additions & 0 deletions zstd/bitreader.go
@@ -0,0 +1,121 @@
// Copyright 2019+ Klaus Post. All rights reserved.
// License information can be found in the LICENSE file.
// Based on work by Yann Collet, released under BSD License.

package zstd

import (
"errors"
"io"
"math/bits"
)

// bitReader reads a bitstream in reverse.
// The last set bit indicates the start of the stream and is used
// for aligning the input.
type bitReader struct {
in []byte
off uint // next byte to read is at in[off - 1]
value uint64 // Maybe use [16]byte, but shifting is awkward.
bitsRead uint8
}

// init initializes and resets the bit reader.
func (b *bitReader) init(in []byte) error {
if len(in) < 1 {
return errors.New("corrupt stream: too short")
}
b.in = in
b.off = uint(len(in))
// The highest bit of the last byte indicates where to start
v := in[len(in)-1]
if v == 0 {
return errors.New("corrupt stream, did not find end of stream")
}
b.bitsRead = 64
b.value = 0
b.fill()
b.fill()
b.bitsRead += 8 - uint8(highBits(uint32(v)))
return nil
}

// getBits will return n bits. n can be 0.
func (b *bitReader) getBits(n uint8) int {
if n == 0 /*|| b.bitsRead >= 64 */ {
return 0
}
return b.getBitsFast(n)
}

// getBitsFast requires that at least one bit is requested every time.
// There are no checks if the buffer is filled.
func (b *bitReader) getBitsFast(n uint8) int {
const regMask = 64 - 1
v := uint32((b.value << (b.bitsRead & regMask)) >> ((regMask + 1 - n) & regMask))
b.bitsRead += n
return int(v)
}

// fillFast() will make sure at least 32 bits are available.
// There must be at least 4 bytes available.
func (b *bitReader) fillFast() {
if b.bitsRead < 32 {
return
}
// Do single re-slice to avoid bounds checks.
v := b.in[b.off-4 : b.off]
low := (uint32(v[0])) | (uint32(v[1]) << 8) | (uint32(v[2]) << 16) | (uint32(v[3]) << 24)
b.value = (b.value << 32) | uint64(low)
b.bitsRead -= 32
b.off -= 4
}

// fill() will make sure at least 32 bits are available.
func (b *bitReader) fill() {
if b.bitsRead < 32 {
return
}
if b.off > 4 {
v := b.in[b.off-4 : b.off]
low := (uint32(v[0])) | (uint32(v[1]) << 8) | (uint32(v[2]) << 16) | (uint32(v[3]) << 24)
b.value = (b.value << 32) | uint64(low)
b.bitsRead -= 32
b.off -= 4
return
}
for b.off > 0 {
b.value = (b.value << 8) | uint64(b.in[b.off-1])
b.bitsRead -= 8
b.off--
}
}

// finished returns true if all bits have been read from the bit stream.
func (b *bitReader) finished() bool {
return b.off == 0 && b.bitsRead >= 64
}

// overread returns true if more bits have been requested than is on the stream.
func (b *bitReader) overread() bool {
return b.bitsRead > 64
}

// remain returns the number of bits remaining.
func (b *bitReader) remain() uint {
return b.off*8 + 64 - uint(b.bitsRead)
}

// close the bitstream and returns an error if out-of-buffer reads occurred.
func (b *bitReader) close() error {
// Release reference.
b.in = nil
if b.bitsRead > 64 {
return io.ErrUnexpectedEOF
}
return nil
}

func highBits(val uint32) (n uint32) {
return uint32(bits.Len32(val) - 1)
}

0 comments on commit a0353f7

Please sign in to comment.