Skip to content

Commit

Permalink
feat(zstd):add new interface Read2/Write2/Flush2 for zstd.
Browse files Browse the repository at this point in the history
  • Loading branch information
wqshr12345 committed Nov 6, 2023
1 parent dc4151f commit 451beda
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/klauspost/compress

go 1.19

require github.com/wqshr12345/golib v0.10.6

retract (
// https://github.com/klauspost/compress/pull/503
v1.14.3
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/wqshr12345/golib v0.10.6 h1:GrlezZMK5hGXYaddNBCgEwKihNOXiRPb1/ko2HHuZ+A=
github.com/wqshr12345/golib v0.10.6/go.mod h1:IizEJl7nZ2grhULV4g1Lbuod3X4dnv1Q/U+MwIW6Ms4=
44 changes: 44 additions & 0 deletions zstd/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"encoding/binary"
"io"
"sync"
"time"

"github.com/klauspost/compress/zstd/internal/xxhash"
"github.com/wqshr12345/golib/common"
)

// Decoder provides decoding of zstandard streams.
Expand Down Expand Up @@ -163,6 +165,48 @@ func (d *Decoder) Read(p []byte) (int, error) {
return n, d.current.err
}

func (d *Decoder) Read2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error) {
for {
if len(d.current.b) > 0 {
filled := copy(p, d.current.b)
p = p[filled:]
d.current.b = d.current.b[filled:]
n += filled
}
if len(p) == 0 {
break
}
if len(d.current.b) == 0 {
// We have an error and no more data
if d.current.err != nil {
break
}
// code for compression information.
startTs := time.Now().UnixNano()
flag := d.nextBlock(n == 0)
endTs := time.Now().UnixNano()
cmprInfo.DecompressTime = endTs - startTs
if !flag {
return n, cmprInfo, false, d.current.err
}
}
}
if len(d.current.b) > 0 {
if debugDecoder {
println("returning", n, "still bytes left:", len(d.current.b))
}
// Only return error at end of block
return n, cmprInfo, true, nil
}
if d.current.err != nil {
d.drainOutput()
}
if debugDecoder {
println("returning", n, d.current.err, len(d.decoders))
}
return n, cmprInfo, true, d.current.err
}

// Reset will reset the decoder the supplied stream after the current has finished processing.
// Note that this functionality cannot be used after Close has been called.
// Reset can be called with a nil reader to release references to the previous reader.
Expand Down
79 changes: 79 additions & 0 deletions zstd/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"math"
rdebug "runtime/debug"
"sync"
"time"

"github.com/klauspost/compress/zstd/internal/xxhash"
"github.com/wqshr12345/golib/common"
)

// Encoder provides encoding to Zstandard.
Expand Down Expand Up @@ -181,6 +183,54 @@ func (e *Encoder) Write(p []byte) (n int, err error) {
return n, nil
}

func (e *Encoder) Write2(p []byte) (n int, cmprInfo common.CompressInfo, isNil bool, err error) {
s := &e.state
for len(p) > 0 {
if len(p)+len(s.filling) < e.o.blockSize {
if e.o.crc {
_, _ = s.encoder.CRC().Write(p)
}
s.filling = append(s.filling, p...)
return n + len(p), cmprInfo, true, nil
}
add := p
if len(p)+len(s.filling) > e.o.blockSize {
add = add[:e.o.blockSize-len(s.filling)]
}
if e.o.crc {
_, _ = s.encoder.CRC().Write(add)
}
s.filling = append(s.filling, add...)
p = p[len(add):]
n += len(add)
if len(s.filling) < e.o.blockSize {
return n, cmprInfo, true, nil
}
// code for compression information.
startInput := s.nInput
startWritten := s.nWritten
startTs := time.Now().UnixNano()

err := e.nextBlock(false)

endInput := s.nInput
endWritten := s.nWritten
endTs := time.Now().UnixNano()

cmprInfo.DataLen = int(endInput - startInput)
cmprInfo.CompressTime = endTs - startTs
cmprInfo.CompressRatio = float64(endWritten-startWritten) / float64(endInput-startInput)

if err != nil {
return n, cmprInfo, false, err
}
if debugAsserts && len(s.filling) > 0 {
panic(len(s.filling))
}
}
return n, cmprInfo, false, nil
}

// nextBlock will synchronize and start compressing input in e.state.filling.
// If an error has occurred during encoding it will be returned.
func (e *Encoder) nextBlock(final bool) error {
Expand Down Expand Up @@ -412,6 +462,35 @@ func (e *Encoder) Flush() error {
return s.writeErr
}

func (e *Encoder) Flush2() (cmprInfo common.CompressInfo, err error) {
s := &e.state
if len(s.filling) > 0 {
// code for compression information.
startInput := s.nInput
startWritten := s.nWritten
startTs := time.Now().UnixNano()

err = e.nextBlock(false)

endInput := s.nInput
endWritten := s.nWritten
endTs := time.Now().UnixNano()

cmprInfo.DataLen = int(endInput - startInput)
cmprInfo.CompressTime = endTs - startTs
cmprInfo.CompressRatio = float64(endWritten-startWritten) / float64(endInput-startInput)
if err != nil {
return cmprInfo, err
}
}
s.wg.Wait()
s.wWg.Wait()
if s.err != nil {
return cmprInfo, s.err
}
return cmprInfo, s.writeErr
}

// Close will flush the final output and close the stream.
// The function will block until everything has been written.
// The Encoder can still be re-used after calling this.
Expand Down

0 comments on commit 451beda

Please sign in to comment.