Skip to content

Commit

Permalink
added Reader.WriteTo and Writer.ReadFrom
Browse files Browse the repository at this point in the history
BlockSize.Put disabled as buffer release is currently buggy
  • Loading branch information
pierrec committed Apr 23, 2020
1 parent bb3370a commit 3d212bc
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 16 deletions.
2 changes: 2 additions & 0 deletions internal/lz4block/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (b BlockSizeIndex) Get() []byte {
}

func (b BlockSizeIndex) Put(buf []byte) {
//TODO disabled as releasing buffers introduces a bug in Reader.WriteTo and Writer.ReadFrom
return
switch b {
case 4:
BlockPool64K.Put(buf)
Expand Down
48 changes: 44 additions & 4 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (r *Reader) Size() int {
return 0
}

func (r *Reader) init() error {
return r.frame.InitR(r.src)
}

func (r *Reader) Read(buf []byte) (n int, err error) {
defer r.state.check(&err)
switch r.state.state {
Expand All @@ -72,10 +76,11 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
return 0, r.state.err
case newState:
// First initialization.
if err = r.frame.InitR(r.src); r.state.next(err) {
if err = r.init(); r.state.next(err) {
return
}
r.data = r.frame.Descriptor.Flags.BlockSizeIndex().Get()
size := r.frame.Descriptor.Flags.BlockSizeIndex()
r.data = size.Get()
default:
return 0, r.state.fail()
}
Expand Down Expand Up @@ -118,8 +123,6 @@ func (r *Reader) Read(buf []byte) (n int, err error) {
return
}
close:
r.handler(bn)
n += bn
if er := r.frame.CloseR(r.src); er != nil {
err = er
}
Expand Down Expand Up @@ -153,3 +156,40 @@ func (r *Reader) Reset(reader io.Reader) {
r.state.state = noState
r.state.next(nil)
}

// WriteTo efficiently uncompresses the data from the Reader underlying source to w.
func (r *Reader) WriteTo(w io.Writer) (n int64, err error) {
switch r.state.state {
case closedState, errorState:
return 0, r.state.err
case newState:
if err = r.init(); r.state.next(err) {
return
}
default:
return 0, r.state.fail()
}
defer r.state.nextd(&err)

var bn int
block := r.frame.Blocks.Block
size := r.frame.Descriptor.Flags.BlockSizeIndex()
data := size.Get()
defer size.Put(r.data)
for {
switch bn, err = block.Uncompress(r.frame, r.src, data); err {
case nil:
case io.EOF:
err = r.frame.CloseR(r.src)
return
default:
return
}
r.handler(bn)
n += int64(bn)
_, err = w.Write(data[:bn])
if err != nil {
return
}
}
}
4 changes: 4 additions & 0 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (s *_State) next(err error) bool {
return false
}

func (s *_State) nextd(errp *error) bool {
return s.next(*errp)
}

// check sets s in error if not already in error and if the error is not nil or io.EOF,
func (s *_State) check(errp *error) {
if s.state == errorState || errp == nil {
Expand Down
69 changes: 57 additions & 12 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,26 @@ func (w *Writer) isNotConcurrent() bool {
return w.num == 1
}

// init sets up the Writer when in newState. It does not change the Writer state.
func (w *Writer) init() error {
w.frame.InitW(w.src, w.num)
size := w.frame.Descriptor.Flags.BlockSizeIndex()
w.data = size.Get()
w.idx = 0
if w.isNotConcurrent() {
w.ht = lz4block.HashTablePool.Get().([]int)
}
return w.frame.Descriptor.Write(w.frame, w.src)
}

func (w *Writer) Write(buf []byte) (n int, err error) {
defer w.state.check(&err)
switch w.state.state {
case writeState:
case closedState, errorState:
return 0, w.state.err
case newState:
w.frame.InitW(w.src, w.num)
size := w.frame.Descriptor.Flags.BlockSizeIndex()
w.data = size.Get()
w.idx = 0
if w.isNotConcurrent() {
w.ht = lz4block.HashTablePool.Get().([]int)
}
if err = w.frame.Descriptor.Write(w.frame, w.src); w.state.next(err) {
if err = w.init(); w.state.next(err) {
return
}
default:
Expand Down Expand Up @@ -153,7 +158,7 @@ func (w *Writer) Close() (err error) {
default:
return nil
}
defer func() { w.state.next(err) }()
defer w.state.nextd(&err)
if w.idx > 0 {
// Flush pending data, disable w.data freeing as it is done later on.
if err = w.write(w.data[:w.idx], false); err != nil {
Expand All @@ -167,9 +172,11 @@ func (w *Writer) Close() (err error) {
w.ht = nil
}
// It is now safe to free the buffer.
size := w.frame.Descriptor.Flags.BlockSizeIndex()
size.Put(w.data)
w.data = nil
if w.data != nil {
size := w.frame.Descriptor.Flags.BlockSizeIndex()
size.Put(w.data)
w.data = nil
}
return
}

Expand All @@ -189,3 +196,41 @@ func (w *Writer) Reset(writer io.Writer) {
w.state.next(nil)
w.src = writer
}

// ReadFrom efficiently reads from r and compressed into the Writer destination.
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
switch w.state.state {
case closedState, errorState:
return 0, w.state.err
case newState:
if err = w.init(); w.state.next(err) {
return
}
default:
return 0, w.state.fail()
}
defer w.state.check(&err)

size := w.frame.Descriptor.Flags.BlockSizeIndex()
var done bool
var rn int
for !done {
data := size.Get()
rn, err = io.ReadFull(r, data)
switch err {
case nil:
case io.EOF:
done = true
default:
return
}
n += int64(rn)
err = w.write(data[:rn], true)
if err != nil {
return
}
w.handler(rn)
}
err = w.Close()
return
}

0 comments on commit 3d212bc

Please sign in to comment.