Skip to content

Commit

Permalink
Readthru (#64)
Browse files Browse the repository at this point in the history
* Test utils and messaging

* New immediate readahead mode provides more immediate results at similar performance

* Fix tests

* Better dropcr tests
  • Loading branch information
zix99 committed May 22, 2022
1 parent 36dd9dd commit fdbd885
Show file tree
Hide file tree
Showing 12 changed files with 376 additions and 55 deletions.
11 changes: 10 additions & 1 deletion cmd/helpers/extractorBuilder.go
Expand Up @@ -30,12 +30,21 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher {
logger.Fatalf("Batch size must be >= 1, is %d", batchSize)
}
if concurrentReaders < 1 {
logger.Fatalf("Must have at least 1 readers")
logger.Fatalf("Must have at least 1 reader")
}
if followPoll && !follow {
logger.Fatalf("Follow (-f) must be enabled for --poll")
}

fileglobs := c.Args()

if len(fileglobs) == 0 || fileglobs[0] == "-" { // Read from stdin
if gunzip {
logger.Fatalln("Cannot decompress (-z) with stdin")
}
if follow {
logger.Println("Cannot follow a stdin stream, not a file")
}
return batchers.OpenReaderToChan("<stdin>", os.Stdin, batchSize)
} else if follow { // Read from source file
if gunzip {
Expand Down
6 changes: 3 additions & 3 deletions pkg/extractor/batchers/batcher.go
Expand Up @@ -135,11 +135,11 @@ func (s *Batcher) StatusString() string {
// and writes the batch-sized results to a channel
func (s *Batcher) syncReaderToBatcher(sourceName string, reader io.Reader, batchSize int) {
readerMetrics := newReaderMetrics(reader)
readahead := readahead.New(readerMetrics, ReadAheadBufferSize)
readahead.OnError = func(e error) {
readahead := readahead.NewImmediate(readerMetrics, ReadAheadBufferSize)
readahead.OnError(func(e error) {
s.incErrors()
logger.Printf("Error reading %s: %v", sourceName, e)
}
})

batch := make([]extractor.BString, 0, batchSize)
var batchStart uint64 = 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/extractor/utils_test.go
Expand Up @@ -15,7 +15,7 @@ func unbatchMatches(c <-chan []Match) []Match {

func convertReaderToBatches(sourceName string, reader io.Reader, batchSize int) <-chan InputBatch {
out := make(chan InputBatch)
ra := readahead.New(reader, 128*1024)
ra := readahead.NewImmediate(reader, 128*1024)

go func() {
batch := make([]BString, 0, batchSize)
Expand Down
63 changes: 24 additions & 39 deletions pkg/readahead/readahead.go → pkg/readahead/buffered.go
Expand Up @@ -5,22 +5,7 @@ import (
"io"
)

/*
Buffered read-ahead similar to Scanner, except it will leave the large-buffers in place
(rather than shifting them) so that a given slice is good for the duration of its life
This allows a slice reference to be passed around without worrying that the underlying data will change
which limits the amount the data needs to be copied around
Initial benchmarks shows a 8% savings over Scanner
*/

type LineScanner interface {
Scan() bool
Bytes() []byte
}

type ReadAhead struct {
type BufferedReadAhead struct {
r io.Reader
maxBufLen int

Expand All @@ -31,37 +16,33 @@ type ReadAhead struct {
token []byte
delim byte

OnError func(error) // OnError is called if there are any downstream errors
onError func(error) // OnError is called if there are any downstream errors
}

// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
return data
}
var _ Scanner = &BufferedReadAhead{}

func maxi(a, b int) int {
if a > b {
return a
}
return b
}
/*
Buffered read-ahead similar to Scanner, except it will leave the large-buffers in place
(rather than shifting them) so that a given slice is good for the duration of its life
func New(reader io.Reader, maxBufLen int) *ReadAhead {
if maxBufLen <= 0 {
panic("Buf length must be > 0")
This allows a slice reference to be passed around without worrying that the underlying data will change
which limits the amount the data needs to be copied around
Initial benchmarks shows a 8% savings over Scanner
*/
func NewBuffered(reader io.Reader, maxBufLen int) *BufferedReadAhead {
if maxBufLen <= 1 {
panic("Buf length must be > 1")
}
return &ReadAhead{
return &BufferedReadAhead{
r: reader,
maxBufLen: maxBufLen,
delim: '\n',
}
}

// Scan for the next token with a new line
func (s *ReadAhead) Scan() bool {
func (s *BufferedReadAhead) Scan() bool {
for {
//var a chars
relIndex := bytes.IndexByte(s.buf[s.offset:], s.delim)
Expand Down Expand Up @@ -97,8 +78,8 @@ func (s *ReadAhead) Scan() bool {
n, err := s.r.Read(s.buf[readOffset:])
readOffset += n
if err != nil {
if err != io.EOF && s.OnError != nil {
s.OnError(err)
if err != io.EOF && s.onError != nil {
s.onError(err)
}
s.eof = true
break
Expand All @@ -117,14 +98,18 @@ func (s *ReadAhead) Scan() bool {
}

// Bytes retrieves the current bytes of the current token (line)
func (s *ReadAhead) Bytes() []byte {
func (s *BufferedReadAhead) Bytes() []byte {
return s.token
}

// ReadLine is shorthand for Scan() Token()
func (s *ReadAhead) ReadLine() []byte {
func (s *BufferedReadAhead) ReadLine() []byte {
if !s.Scan() {
return nil
}
return s.token
}

func (s *BufferedReadAhead) OnError(onError OnScannerError) {
s.onError = onError
}
47 changes: 36 additions & 11 deletions pkg/readahead/readahead_test.go → pkg/readahead/buffered_test.go
@@ -1,6 +1,8 @@
package readahead

import (
"bufio"
"rare/pkg/testutil"
"strings"
"testing"
"testing/iotest"
Expand All @@ -10,67 +12,90 @@ import (

func TestBasicReadingShortBuf(t *testing.T) {
r := strings.NewReader("Hello there you\nthis is line 2\n")
ra := New(r, 3)
ra := NewBuffered(r, 3)
assert.Equal(t, []byte("Hello there you"), ra.ReadLine())
assert.Equal(t, []byte("this is line 2"), ra.ReadLine())
assert.Nil(t, ra.ReadLine())
}

func TestBasicReadingLongBuf(t *testing.T) {
r := strings.NewReader("Hello there you\nthis is line 2\n")
ra := New(r, 1024)
ra := NewBuffered(r, 1024)
assert.Equal(t, []byte("Hello there you"), ra.ReadLine())
assert.Equal(t, []byte("this is line 2"), ra.ReadLine())
assert.Nil(t, ra.ReadLine())
}

func TestBasicReadingMidBuf(t *testing.T) {
r := strings.NewReader("Hello there you\nthis is line 2\n")
ra := New(r, 20) // Just enough to read first line, but not both
ra := NewBuffered(r, 20) // Just enough to read first line, but not both
assert.Equal(t, []byte("Hello there you"), ra.ReadLine())
assert.Equal(t, []byte("this is line 2"), ra.ReadLine())
assert.Nil(t, ra.ReadLine())
}

func TestBasicReadingNoNewTerm(t *testing.T) {
r := strings.NewReader("Hello there you\nthis is line 2")
ra := New(r, 3)
ra := NewBuffered(r, 3)
assert.Equal(t, []byte("Hello there you"), ra.ReadLine())
assert.Equal(t, []byte("this is line 2"), ra.ReadLine())
assert.Nil(t, ra.ReadLine())
}

func TestReadEmptyString(t *testing.T) {
r := strings.NewReader("")
ra := New(r, 3)
ra := NewBuffered(r, 3)
assert.Nil(t, ra.ReadLine())
}

func TestReadSingleCharString(t *testing.T) {
r := strings.NewReader("A")
ra := New(r, 3)
ra := NewBuffered(r, 3)
assert.Equal(t, []byte("A"), ra.ReadLine())
}

func TestDropCR(t *testing.T) {
func TestBufferedDropCR(t *testing.T) {
r := strings.NewReader("test\r\nthing")
ra := New(r, 3)
ra := NewBuffered(r, 3)
assert.Equal(t, []byte("test"), ra.ReadLine())
assert.Equal(t, []byte("thing"), ra.ReadLine())
assert.Nil(t, ra.ReadLine())
}

func TestErrorHandling(t *testing.T) {
errReader := iotest.TimeoutReader(strings.NewReader("Hello there you\nthis is a line\n"))
ra := New(errReader, 20)
ra := NewBuffered(errReader, 20)

var hadError bool
ra.OnError = func(e error) {
ra.OnError(func(e error) {
hadError = true
}
})

assert.Equal(t, []byte("Hello there you"), ra.ReadLine())
assert.Equal(t, []byte("this"), ra.ReadLine()) // up to twentyith char
assert.Nil(t, ra.ReadLine())
assert.True(t, hadError)
}

func BenchmarkBuffered(b *testing.B) {
r := testutil.NewTextGenerator(1024)
ra := NewBuffered(r, 128*128)

for i := 0; i < b.N; i++ {
ra.Scan()
}
}

func BenchmarkScanner(b *testing.B) {
r := testutil.NewTextGenerator(1024)
s := bufio.NewScanner(r)

for i := 0; i < b.N; i++ {
s.Scan()

// Copy into a new memory slot as practically that's needed for how its consumed
r := s.Bytes()
data := make([]byte, len(r))
copy(data, r)
}
}
105 changes: 105 additions & 0 deletions pkg/readahead/immediate.go
@@ -0,0 +1,105 @@
package readahead

import (
"bytes"
"io"
)

type ImmediateReadAhead struct {
r io.Reader

buf []byte
bufSize int
offset int
end int

delim byte

token []byte
eof bool

onError OnScannerError
}

var _ Scanner = &ImmediateReadAhead{}

/*
ReadThru is slightly different than Scanner and read-ahead given that:
- Unlike Scanner and like ReadAhead, ReadThru leaves the buffer in-place
- Like Scanner, and unlike ReadAhead, Scan() returns immediately after finding a delim,
rather than blocking and buffering up to bufSize
*/
func NewImmediate(reader io.Reader, bufSize int) *ImmediateReadAhead {
return &ImmediateReadAhead{
r: reader,
bufSize: bufSize,
buf: make([]byte, bufSize),
delim: '\n',
}
}

func (s *ImmediateReadAhead) Scan() bool {
RESTART:

if s.offset < s.end {
if eol := bytes.IndexByte(s.buf[s.offset:s.end], s.delim); eol >= 0 {
s.token = dropCR(s.buf[s.offset : s.offset+eol])
s.offset += eol + 1
return true
}
if s.eof {
s.token = s.buf[s.offset:s.end]
s.offset = s.end
return true
}
} else if s.eof {
return false
}

// Read loop
for {
// Increase buf if needed (heuristically)
if s.end >= len(s.buf) {
old := s.buf
s.buf = make([]byte, s.end-s.offset+s.bufSize)
copy(s.buf, old[s.offset:s.end])
s.end -= s.offset
s.offset = 0
}

// Read data and check for errors
n, err := s.r.Read(s.buf[s.end:])
s.end += n

if err != nil {
s.eof = true
if err != io.EOF && s.onError != nil {
s.onError(err)
}
goto RESTART
}

// Check only the most recently read bytes for a new line
if eol := bytes.IndexByte(s.buf[s.end-n:s.end], s.delim); eol >= 0 {
end := s.end - n + eol
s.token = dropCR(s.buf[s.offset:end])
s.offset = end + 1
return true
}
}
}

func (s *ImmediateReadAhead) Bytes() []byte {
return s.token
}

func (s *ImmediateReadAhead) ReadLine() []byte {
if s.Scan() {
return s.token
}
return nil
}

func (s *ImmediateReadAhead) OnError(f OnScannerError) {
s.onError = f
}

0 comments on commit fdbd885

Please sign in to comment.