diff --git a/s2/cmd/internal/readahead/LICENSE b/s2/cmd/internal/readahead/LICENSE new file mode 100644 index 0000000000..eaeb61a87e --- /dev/null +++ b/s2/cmd/internal/readahead/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 Klaus Post + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/s2/cmd/internal/readahead/README.md b/s2/cmd/internal/readahead/README.md new file mode 100644 index 0000000000..e6d0e92362 --- /dev/null +++ b/s2/cmd/internal/readahead/README.md @@ -0,0 +1,57 @@ +# readahead +Asynchronous read-ahead for Go readers + +This package will allow you to add readhead to any reader. This means a separate goroutine will perform reads from your upstream reader, so you can request from this reader without delay. + +This is helpful for splitting an input stream into concurrent processing, and also helps smooth out **bursts** of input or output. + +This should be fully transparent, except that once an error has been returned from the Reader, it will not recover. A panic will be caught and returned as an error. + +The readahead object also fulfills the [`io.WriterTo`](https://golang.org/pkg/io/#WriterTo) interface, which is likely to speed up `io.Copy` and other code that use the interface. + +See an introduction: [An Async Read-ahead Package for Go](https://blog.klauspost.com/an-async-read-ahead-package-for-go/) + +[![GoDoc][1]][2] [![Build Status][3]][4] + +[1]: https://godoc.org/github.com/klauspost/readahead?status.svg +[2]: https://godoc.org/github.com/klauspost/readahead +[3]: https://travis-ci.org/klauspost/readahead.svg +[4]: https://travis-ci.org/klauspost/readahead + +# usage + +To get the package use `go get -u github.com/klauspost/readahead`. + +Here is a simple example that does file copy. Error handling has been omitted for brevity. +```Go +input, _ := os.Open("input.txt") +output, _ := os.Create("output.txt") +defer input.Close() +defer output.Close() + +// Create a read-ahead Reader with default settings +ra := readahead.NewReader(input) +defer ra.Close() + +// Copy the content to our output +_, _ = io.Copy(output, ra) +``` + +# settings + +You can finetune the read-ahead for your specific use case, and adjust the number of buffers and the size of each buffer. + +The default the size of each buffer is 1MB, and there are 4 buffers. Do not make your buffers too small since there is a small overhead for passing buffers between goroutines. Other than that you are free to experiment with buffer sizes. + +# contributions + +On this project contributions in terms of new features is limited to: + +* Features that are widely usable and +* Features that have extensive tests + +This package is meant to be simple and stable, so therefore these strict requirements. + +# license + +This package is released under the MIT license. See the supplied LICENSE file for more info. diff --git a/s2/cmd/internal/readahead/reader.go b/s2/cmd/internal/readahead/reader.go new file mode 100644 index 0000000000..168932cd4f --- /dev/null +++ b/s2/cmd/internal/readahead/reader.go @@ -0,0 +1,275 @@ +// Copyright (c) 2015 Klaus Post, released under MIT License. See LICENSE file. + +// The readahead package will do asynchronous read-ahead from an input io.Reader +// and make the data available as an io.Reader. +// +// This should be fully transparent, except that once an error +// has been returned from the Reader, it will not recover. +// +// The readahead object also fulfills the io.WriterTo interface, which +// is likely to speed up copies. +// +// Package home: https://github.com/klauspost/readahead +// +package readahead + +import ( + "errors" + "fmt" + "io" +) + +type seekable struct { + *reader +} + +type ReadSeekCloser interface { + io.ReadCloser + io.Seeker +} + +type reader struct { + in io.Reader // Input reader + closer io.Closer // Optional closer + ready chan *buffer // Buffers ready to be handed to the reader + reuse chan *buffer // Buffers to reuse for input reading + exit chan struct{} // Closes when finished + buffers int // Number of buffers + size int // Size of each buffer + err error // If an error has occurred it is here + cur *buffer // Current buffer being served + exited chan struct{} // Channel is closed been the async reader shuts down +} + +// NewReaderSize returns a reader with a custom number of buffers and size. +// buffers is the number of queued buffers and size is the size of each +// buffer in bytes. +func NewReaderSize(rd io.Reader, buffers, size int) (res io.ReadCloser, err error) { + if size <= 0 { + return nil, fmt.Errorf("buffer size too small") + } + if buffers <= 0 { + return nil, fmt.Errorf("number of buffers too small") + } + if rd == nil { + return nil, fmt.Errorf("nil input reader supplied") + } + a := &reader{} + if _, ok := rd.(io.Seeker); ok { + res = &seekable{a} + } else { + res = a + } + a.init(rd, buffers, size) + return +} + +// initialize the reader +func (a *reader) init(rd io.Reader, buffers, size int) { + a.in = rd + a.ready = make(chan *buffer, buffers) + a.reuse = make(chan *buffer, buffers) + a.exit = make(chan struct{}, 0) + a.exited = make(chan struct{}, 0) + a.buffers = buffers + a.size = size + a.cur = nil + a.err = nil + + // Create buffers + for i := 0; i < buffers; i++ { + a.reuse <- newBuffer(size) + } + + // Start async reader + go func() { + // Ensure that when we exit this is signalled. + defer close(a.exited) + defer close(a.ready) + for { + select { + case b := <-a.reuse: + err := b.read(a.in) + a.ready <- b + if err != nil { + return + } + case <-a.exit: + return + } + } + }() +} + +// fill will check if the current buffer is empty and fill it if it is. +// If an error was returned at the end of the current buffer it is returned. +func (a *reader) fill() (err error) { + if a.cur.isEmpty() { + if a.cur != nil { + a.reuse <- a.cur + a.cur = nil + } + b, ok := <-a.ready + if !ok { + if a.err == nil { + a.err = errors.New("readahead: read after Close") + } + return a.err + } + a.cur = b + } + return nil +} + +// Read will return the next available data. +func (a *reader) Read(p []byte) (n int, err error) { + if a.err != nil { + return 0, a.err + } + // Swap buffer and maybe return error + err = a.fill() + if err != nil { + return 0, err + } + + // Copy what we can + n = copy(p, a.cur.buffer()) + a.cur.inc(n) + + // If at end of buffer, return any error, if present + if a.cur.isEmpty() { + a.err = a.cur.err + return n, a.err + } + return n, nil +} + +func (a *seekable) Seek(offset int64, whence int) (res int64, err error) { + //Not checking the result as seekable receiver guarantees it to be assertable + seeker, _ := a.in.(io.Seeker) + //Make sure the async routine is closed + select { + case <-a.exited: + case a.exit <- struct{}{}: + <-a.exited + } + if whence == io.SeekCurrent { + //If need to seek based on current position, take into consideration the bytes we read but the consumer + //doesn't know about + err = nil + for a.cur != nil { + if err = a.fill(); err == nil && a.cur != nil { + offset -= int64(len(a.cur.buffer())) + a.cur.offset = len(a.cur.buf) + } + } + } + //Seek the actual Seeker + if res, err = seeker.Seek(offset, whence); err == nil { + //If the seek was successful, reinitalize ourselves (with the new position). + a.init(a.in, a.buffers, a.size) + } + return +} + +// WriteTo writes data to w until there's no more data to write or when an error occurs. +// The return value n is the number of bytes written. +// Any error encountered during the write is also returned. +func (a *reader) WriteTo(w io.Writer) (n int64, err error) { + if a.err != nil { + return 0, a.err + } + n = 0 + for { + err = a.fill() + if err != nil { + return n, err + } + n2, err := w.Write(a.cur.buffer()) + a.cur.inc(n2) + n += int64(n2) + if err != nil { + return n, err + } + if a.cur.err != nil { + // io.Writer should return nil if we are at EOF. + if a.cur.err == io.EOF { + a.err = a.cur.err + return n, nil + } + a.err = a.cur.err + return n, a.cur.err + } + } +} + +// Close will ensure that the underlying async reader is shut down. +// It will also close the input supplied on newAsyncReader. +func (a *reader) Close() (err error) { + select { + case <-a.exited: + case a.exit <- struct{}{}: + <-a.exited + } + if a.closer != nil { + // Only call once + c := a.closer + a.closer = nil + return c.Close() + } + a.err = errors.New("readahead: read after Close") + return nil +} + +// Internal buffer representing a single read. +// If an error is present, it must be returned +// once all buffer content has been served. +type buffer struct { + buf []byte + err error + offset int + size int +} + +func newBuffer(size int) *buffer { + return &buffer{buf: make([]byte, size), err: nil, size: size} +} + +// isEmpty returns true is offset is at end of +// buffer, or if the buffer is nil +func (b *buffer) isEmpty() bool { + if b == nil { + return true + } + if len(b.buf)-b.offset <= 0 { + return true + } + return false +} + +// read into start of the buffer from the supplied reader, +// resets the offset and updates the size of the buffer. +// Any error encountered during the read is returned. +func (b *buffer) read(rd io.Reader) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic reading: %v", r) + b.err = err + } + }() + var n int + n, b.err = rd.Read(b.buf[0:b.size]) + b.buf = b.buf[0:n] + b.offset = 0 + return b.err +} + +// Return the buffer at current offset +func (b *buffer) buffer() []byte { + return b.buf[b.offset:] +} + +// inc will increment the read offset +func (b *buffer) inc(n int) { + b.offset += n +} diff --git a/s2/cmd/s2c/main.go b/s2/cmd/s2c/main.go index 1481e34f0f..339e54f9b0 100644 --- a/s2/cmd/s2c/main.go +++ b/s2/cmd/s2c/main.go @@ -17,6 +17,7 @@ import ( "unicode" "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/s2/cmd/internal/readahead" ) var ( @@ -88,6 +89,9 @@ Options:`) func() { var closeOnce sync.Once dstFilename := fmt.Sprintf("%s%s", filename, ".s2") + if *bench > 0 { + dstFilename = "(discarded)" + } if !*quiet { fmt.Println("Compressing", filename, "->", dstFilename) } @@ -95,7 +99,9 @@ Options:`) file, err := os.Open(filename) exitErr(err) defer closeOnce.Do(func() { file.Close() }) - src := bufio.NewReaderSize(file, int(sz)*2) + src, err := readahead.NewReaderSize(file, *cpu, int(sz)) + exitErr(err) + defer src.Close() finfo, err := file.Stat() exitErr(err) var out io.Writer diff --git a/s2/cmd/s2d/main.go b/s2/cmd/s2d/main.go index c2bf8761f2..70004c671a 100644 --- a/s2/cmd/s2d/main.go +++ b/s2/cmd/s2d/main.go @@ -14,6 +14,7 @@ import ( "time" "github.com/klauspost/compress/s2" + "github.com/klauspost/compress/s2/cmd/internal/readahead" ) var ( @@ -78,6 +79,9 @@ Options:`) fmt.Println("Skipping", filename) continue } + if *bench > 0 { + dstFilename = "(discarded)" + } func() { var closeOnce sync.Once @@ -89,7 +93,9 @@ Options:`) exitErr(err) defer closeOnce.Do(func() { file.Close() }) rc := rCounter{in: file} - src := bufio.NewReaderSize(&rc, 4<<20) + src, err := readahead.NewReaderSize(&rc, 2, 4<<20) + exitErr(err) + defer src.Close() finfo, err := file.Stat() exitErr(err) mode := finfo.Mode() // use the same mode for the output file