Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2c+s2d: Add readahead #192

Merged
merged 3 commits into from Jan 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions s2/cmd/internal/readahead/LICENSE
@@ -0,0 +1,22 @@
The MIT License (MIT)

Copyright (c) 2015 Klaus Post

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

57 changes: 57 additions & 0 deletions s2/cmd/internal/readahead/README.md
@@ -0,0 +1,57 @@
# readahead
Asynchronous read-ahead for Go readers

This package will allow you to add readhead to any reader. This means a separate goroutine will perform reads from your upstream reader, so you can request from this reader without delay.

This is helpful for splitting an input stream into concurrent processing, and also helps smooth out **bursts** of input or output.

This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. A panic will be caught and returned as an error.

The readahead object also fulfills the [`io.WriterTo`](https://golang.org/pkg/io/#WriterTo) interface, which is likely to speed up `io.Copy` and other code that use the interface.

See an introduction: [An Async Read-ahead Package for Go](https://blog.klauspost.com/an-async-read-ahead-package-for-go/)

[![GoDoc][1]][2] [![Build Status][3]][4]

[1]: https://godoc.org/github.com/klauspost/readahead?status.svg
[2]: https://godoc.org/github.com/klauspost/readahead
[3]: https://travis-ci.org/klauspost/readahead.svg
[4]: https://travis-ci.org/klauspost/readahead

# usage

To get the package use `go get -u github.com/klauspost/readahead`.

Here is a simple example that does file copy. Error handling has been omitted for brevity.
```Go
input, _ := os.Open("input.txt")
output, _ := os.Create("output.txt")
defer input.Close()
defer output.Close()

// Create a read-ahead Reader with default settings
ra := readahead.NewReader(input)
defer ra.Close()

// Copy the content to our output
_, _ = io.Copy(output, ra)
```

# settings

You can finetune the read-ahead for your specific use case, and adjust the number of buffers and the size of each buffer.

The default the size of each buffer is 1MB, and there are 4 buffers. Do not make your buffers too small since there is a small overhead for passing buffers between goroutines. Other than that you are free to experiment with buffer sizes.

# contributions

On this project contributions in terms of new features is limited to:

* Features that are widely usable and
* Features that have extensive tests

This package is meant to be simple and stable, so therefore these strict requirements.

# license

This package is released under the MIT license. See the supplied LICENSE file for more info.
275 changes: 275 additions & 0 deletions s2/cmd/internal/readahead/reader.go
@@ -0,0 +1,275 @@
// Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file.

// The readahead package will do asynchronous read-ahead from an input io.Reader
// and make the data available as an io.Reader.
//
// This should be fully transparent, except that once an error
// has been returned from the Reader, it will not recover.
//
// The readahead object also fulfills the io.WriterTo interface, which
// is likely to speed up copies.
//
// Package home: https://github.com/klauspost/readahead
//
package readahead

import (
"errors"
"fmt"
"io"
)

type seekable struct {
*reader
}

type ReadSeekCloser interface {
io.ReadCloser
io.Seeker
}

type reader struct {
in io.Reader // Input reader
closer io.Closer // Optional closer
ready chan *buffer // Buffers ready to be handed to the reader
reuse chan *buffer // Buffers to reuse for input reading
exit chan struct{} // Closes when finished
buffers int // Number of buffers
size int // Size of each buffer
err error // If an error has occurred it is here
cur *buffer // Current buffer being served
exited chan struct{} // Channel is closed been the async reader shuts down
}

// NewReaderSize returns a reader with a custom number of buffers and size.
// buffers is the number of queued buffers and size is the size of each
// buffer in bytes.
func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) {
if size <= 0 {
return nil, fmt.Errorf("buffer size too small")
}
if buffers <= 0 {
return nil, fmt.Errorf("number of buffers too small")
}
if rd == nil {
return nil, fmt.Errorf("nil input reader supplied")
}
a := &reader{}
if _, ok := rd.(io.Seeker); ok {
res = &seekable{a}
} else {
res = a
}
a.init(rd, buffers, size)
return
}

// initialize the reader
func (a *reader) init(rd io.Reader, buffers, size int) {
a.in = rd
a.ready = make(chan *buffer, buffers)
a.reuse = make(chan *buffer, buffers)
a.exit = make(chan struct{}, 0)
a.exited = make(chan struct{}, 0)
a.buffers = buffers
a.size = size
a.cur = nil
a.err = nil

// Create buffers
for i := 0; i < buffers; i++ {
a.reuse <- newBuffer(size)
}

// Start async reader
go func() {
// Ensure that when we exit this is signalled.
defer close(a.exited)
defer close(a.ready)
for {
select {
case b := <-a.reuse:
err := b.read(a.in)
a.ready <- b
if err != nil {
return
}
case <-a.exit:
return
}
}
}()
}

// fill will check if the current buffer is empty and fill it if it is.
// If an error was returned at the end of the current buffer it is returned.
func (a *reader) fill() (err error) {
if a.cur.isEmpty() {
if a.cur != nil {
a.reuse <- a.cur
a.cur = nil
}
b, ok := <-a.ready
if !ok {
if a.err == nil {
a.err = errors.New("readahead: read after Close")
}
return a.err
}
a.cur = b
}
return nil
}

// Read will return the next available data.
func (a *reader) Read(p []byte) (n int, err error) {
if a.err != nil {
return 0, a.err
}
// Swap buffer and maybe return error
err = a.fill()
if err != nil {
return 0, err
}

// Copy what we can
n = copy(p, a.cur.buffer())
a.cur.inc(n)

// If at end of buffer, return any error, if present
if a.cur.isEmpty() {
a.err = a.cur.err
return n, a.err
}
return n, nil
}

func (a *seekable) Seek(offset int64, whence int) (res int64, err error) {
//Not checking the result as seekable receiver guarantees it to be assertable
seeker, _ := a.in.(io.Seeker)
//Make sure the async routine is closed
select {
case <-a.exited:
case a.exit <- struct{}{}:
<-a.exited
}
if whence == io.SeekCurrent {
//If need to seek based on current position, take into consideration the bytes we read but the consumer
//doesn't know about
err = nil
for a.cur != nil {
if err = a.fill(); err == nil && a.cur != nil {
offset -= int64(len(a.cur.buffer()))
a.cur.offset = len(a.cur.buf)
}
}
}
//Seek the actual Seeker
if res, err = seeker.Seek(offset, whence); err == nil {
//If the seek was successful, reinitalize ourselves (with the new position).
a.init(a.in, a.buffers, a.size)
}
return
}

// WriteTo writes data to w until there's no more data to write or when an error occurs.
// The return value n is the number of bytes written.
// Any error encountered during the write is also returned.
func (a *reader) WriteTo(w io.Writer) (n int64, err error) {
if a.err != nil {
return 0, a.err
}
n = 0
for {
err = a.fill()
if err != nil {
return n, err
}
n2, err := w.Write(a.cur.buffer())
a.cur.inc(n2)
n += int64(n2)
if err != nil {
return n, err
}
if a.cur.err != nil {
// io.Writer should return nil if we are at EOF.
if a.cur.err == io.EOF {
a.err = a.cur.err
return n, nil
}
a.err = a.cur.err
return n, a.cur.err
}
}
}

// Close will ensure that the underlying async reader is shut down.
// It will also close the input supplied on newAsyncReader.
func (a *reader) Close() (err error) {
select {
case <-a.exited:
case a.exit <- struct{}{}:
<-a.exited
}
if a.closer != nil {
// Only call once
c := a.closer
a.closer = nil
return c.Close()
}
a.err = errors.New("readahead: read after Close")
return nil
}

// Internal buffer representing a single read.
// If an error is present, it must be returned
// once all buffer content has been served.
type buffer struct {
buf []byte
err error
offset int
size int
}

func newBuffer(size int) *buffer {
return &buffer{buf: make([]byte, size), err: nil, size: size}
}

// isEmpty returns true is offset is at end of
// buffer, or if the buffer is nil
func (b *buffer) isEmpty() bool {
if b == nil {
return true
}
if len(b.buf)-b.offset <= 0 {
return true
}
return false
}

// read into start of the buffer from the supplied reader,
// resets the offset and updates the size of the buffer.
// Any error encountered during the read is returned.
func (b *buffer) read(rd io.Reader) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic reading: %v", r)
b.err = err
}
}()
var n int
n, b.err = rd.Read(b.buf[0:b.size])
b.buf = b.buf[0:n]
b.offset = 0
return b.err
}

// Return the buffer at current offset
func (b *buffer) buffer() []byte {
return b.buf[b.offset:]
}

// inc will increment the read offset
func (b *buffer) inc(n int) {
b.offset += n
}
8 changes: 7 additions & 1 deletion s2/cmd/s2c/main.go
Expand Up @@ -17,6 +17,7 @@ import (
"unicode"

"github.com/klauspost/compress/s2"
"github.com/klauspost/compress/s2/cmd/internal/readahead"
)

var (
Expand Down Expand Up @@ -88,14 +89,19 @@ Options:`)
func() {
var closeOnce sync.Once
dstFilename := fmt.Sprintf("%s%s", filename, ".s2")
if *bench > 0 {
dstFilename = "(discarded)"
}
if !*quiet {
fmt.Println("Compressing", filename, "->", dstFilename)
}
// Input file.
file, err := os.Open(filename)
exitErr(err)
defer closeOnce.Do(func() { file.Close() })
src := bufio.NewReaderSize(file, int(sz)*2)
src, err := readahead.NewReaderSize(file, *cpu, int(sz))
exitErr(err)
defer src.Close()
finfo, err := file.Stat()
exitErr(err)
var out io.Writer
Expand Down