From fdbd88557f63a3a8002d06c9870d107fc5ca791b Mon Sep 17 00:00:00 2001 From: Chris LaPointe Date: Sat, 21 May 2022 22:28:36 -0400 Subject: [PATCH] Readthru (#64) * Test utils and messaging * New immediate readahead mode provides more immediate results at similar performance * Fix tests * Better dropcr tests --- cmd/helpers/extractorBuilder.go | 11 +- pkg/extractor/batchers/batcher.go | 6 +- pkg/extractor/utils_test.go | 2 +- pkg/readahead/{readahead.go => buffered.go} | 63 ++++------- .../{readahead_test.go => buffered_test.go} | 47 ++++++-- pkg/readahead/immediate.go | 105 ++++++++++++++++++ pkg/readahead/immediate_test.go | 86 ++++++++++++++ pkg/readahead/interface.go | 9 ++ pkg/readahead/util.go | 16 +++ pkg/readahead/util_test.go | 18 +++ pkg/testutil/texGenerator_test.go | 34 ++++++ pkg/testutil/textGenerator.go | 34 ++++++ 12 files changed, 376 insertions(+), 55 deletions(-) rename pkg/readahead/{readahead.go => buffered.go} (74%) rename pkg/readahead/{readahead_test.go => buffered_test.go} (69%) create mode 100644 pkg/readahead/immediate.go create mode 100644 pkg/readahead/immediate_test.go create mode 100644 pkg/readahead/interface.go create mode 100644 pkg/readahead/util.go create mode 100644 pkg/readahead/util_test.go create mode 100644 pkg/testutil/texGenerator_test.go create mode 100644 pkg/testutil/textGenerator.go diff --git a/cmd/helpers/extractorBuilder.go b/cmd/helpers/extractorBuilder.go index 94c70ee..5d3b8b8 100644 --- a/cmd/helpers/extractorBuilder.go +++ b/cmd/helpers/extractorBuilder.go @@ -30,12 +30,21 @@ func BuildBatcherFromArguments(c *cli.Context) *batchers.Batcher { logger.Fatalf("Batch size must be >= 1, is %d", batchSize) } if concurrentReaders < 1 { - logger.Fatalf("Must have at least 1 readers") + logger.Fatalf("Must have at least 1 reader") + } + if followPoll && !follow { + logger.Fatalf("Follow (-f) must be enabled for --poll") } fileglobs := c.Args() if len(fileglobs) == 0 || fileglobs[0] == "-" { // Read from stdin + if gunzip { + logger.Fatalln("Cannot decompress (-z) with stdin") + } + if follow { + logger.Println("Cannot follow a stdin stream, not a file") + } return batchers.OpenReaderToChan("", os.Stdin, batchSize) } else if follow { // Read from source file if gunzip { diff --git a/pkg/extractor/batchers/batcher.go b/pkg/extractor/batchers/batcher.go index 3a28b8a..d3bd4be 100644 --- a/pkg/extractor/batchers/batcher.go +++ b/pkg/extractor/batchers/batcher.go @@ -135,11 +135,11 @@ func (s *Batcher) StatusString() string { // and writes the batch-sized results to a channel func (s *Batcher) syncReaderToBatcher(sourceName string, reader io.Reader, batchSize int) { readerMetrics := newReaderMetrics(reader) - readahead := readahead.New(readerMetrics, ReadAheadBufferSize) - readahead.OnError = func(e error) { + readahead := readahead.NewImmediate(readerMetrics, ReadAheadBufferSize) + readahead.OnError(func(e error) { s.incErrors() logger.Printf("Error reading %s: %v", sourceName, e) - } + }) batch := make([]extractor.BString, 0, batchSize) var batchStart uint64 = 1 diff --git a/pkg/extractor/utils_test.go b/pkg/extractor/utils_test.go index 455f2dd..f19fda4 100644 --- a/pkg/extractor/utils_test.go +++ b/pkg/extractor/utils_test.go @@ -15,7 +15,7 @@ func unbatchMatches(c <-chan []Match) []Match { func convertReaderToBatches(sourceName string, reader io.Reader, batchSize int) <-chan InputBatch { out := make(chan InputBatch) - ra := readahead.New(reader, 128*1024) + ra := readahead.NewImmediate(reader, 128*1024) go func() { batch := make([]BString, 0, batchSize) diff --git a/pkg/readahead/readahead.go b/pkg/readahead/buffered.go similarity index 74% rename from pkg/readahead/readahead.go rename to pkg/readahead/buffered.go index 9056bb8..528cf07 100644 --- a/pkg/readahead/readahead.go +++ b/pkg/readahead/buffered.go @@ -5,22 +5,7 @@ import ( "io" ) -/* -Buffered read-ahead similar to Scanner, except it will leave the large-buffers in place -(rather than shifting them) so that a given slice is good for the duration of its life - -This allows a slice reference to be passed around without worrying that the underlying data will change -which limits the amount the data needs to be copied around - -Initial benchmarks shows a 8% savings over Scanner -*/ - -type LineScanner interface { - Scan() bool - Bytes() []byte -} - -type ReadAhead struct { +type BufferedReadAhead struct { r io.Reader maxBufLen int @@ -31,29 +16,25 @@ type ReadAhead struct { token []byte delim byte - OnError func(error) // OnError is called if there are any downstream errors + onError func(error) // OnError is called if there are any downstream errors } -// dropCR drops a terminal \r from the data. -func dropCR(data []byte) []byte { - if len(data) > 0 && data[len(data)-1] == '\r' { - return data[0 : len(data)-1] - } - return data -} +var _ Scanner = &BufferedReadAhead{} -func maxi(a, b int) int { - if a > b { - return a - } - return b -} +/* +Buffered read-ahead similar to Scanner, except it will leave the large-buffers in place +(rather than shifting them) so that a given slice is good for the duration of its life -func New(reader io.Reader, maxBufLen int) *ReadAhead { - if maxBufLen <= 0 { - panic("Buf length must be > 0") +This allows a slice reference to be passed around without worrying that the underlying data will change +which limits the amount the data needs to be copied around + +Initial benchmarks shows a 8% savings over Scanner +*/ +func NewBuffered(reader io.Reader, maxBufLen int) *BufferedReadAhead { + if maxBufLen <= 1 { + panic("Buf length must be > 1") } - return &ReadAhead{ + return &BufferedReadAhead{ r: reader, maxBufLen: maxBufLen, delim: '\n', @@ -61,7 +42,7 @@ func New(reader io.Reader, maxBufLen int) *ReadAhead { } // Scan for the next token with a new line -func (s *ReadAhead) Scan() bool { +func (s *BufferedReadAhead) Scan() bool { for { //var a chars relIndex := bytes.IndexByte(s.buf[s.offset:], s.delim) @@ -97,8 +78,8 @@ func (s *ReadAhead) Scan() bool { n, err := s.r.Read(s.buf[readOffset:]) readOffset += n if err != nil { - if err != io.EOF && s.OnError != nil { - s.OnError(err) + if err != io.EOF && s.onError != nil { + s.onError(err) } s.eof = true break @@ -117,14 +98,18 @@ func (s *ReadAhead) Scan() bool { } // Bytes retrieves the current bytes of the current token (line) -func (s *ReadAhead) Bytes() []byte { +func (s *BufferedReadAhead) Bytes() []byte { return s.token } // ReadLine is shorthand for Scan() Token() -func (s *ReadAhead) ReadLine() []byte { +func (s *BufferedReadAhead) ReadLine() []byte { if !s.Scan() { return nil } return s.token } + +func (s *BufferedReadAhead) OnError(onError OnScannerError) { + s.onError = onError +} diff --git a/pkg/readahead/readahead_test.go b/pkg/readahead/buffered_test.go similarity index 69% rename from pkg/readahead/readahead_test.go rename to pkg/readahead/buffered_test.go index eb5577a..8c0c2bb 100644 --- a/pkg/readahead/readahead_test.go +++ b/pkg/readahead/buffered_test.go @@ -1,6 +1,8 @@ package readahead import ( + "bufio" + "rare/pkg/testutil" "strings" "testing" "testing/iotest" @@ -10,7 +12,7 @@ import ( func TestBasicReadingShortBuf(t *testing.T) { r := strings.NewReader("Hello there you\nthis is line 2\n") - ra := New(r, 3) + ra := NewBuffered(r, 3) assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) assert.Nil(t, ra.ReadLine()) @@ -18,7 +20,7 @@ func TestBasicReadingShortBuf(t *testing.T) { func TestBasicReadingLongBuf(t *testing.T) { r := strings.NewReader("Hello there you\nthis is line 2\n") - ra := New(r, 1024) + ra := NewBuffered(r, 1024) assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) assert.Nil(t, ra.ReadLine()) @@ -26,7 +28,7 @@ func TestBasicReadingLongBuf(t *testing.T) { func TestBasicReadingMidBuf(t *testing.T) { r := strings.NewReader("Hello there you\nthis is line 2\n") - ra := New(r, 20) // Just enough to read first line, but not both + ra := NewBuffered(r, 20) // Just enough to read first line, but not both assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) assert.Nil(t, ra.ReadLine()) @@ -34,7 +36,7 @@ func TestBasicReadingMidBuf(t *testing.T) { func TestBasicReadingNoNewTerm(t *testing.T) { r := strings.NewReader("Hello there you\nthis is line 2") - ra := New(r, 3) + ra := NewBuffered(r, 3) assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) assert.Nil(t, ra.ReadLine()) @@ -42,19 +44,19 @@ func TestBasicReadingNoNewTerm(t *testing.T) { func TestReadEmptyString(t *testing.T) { r := strings.NewReader("") - ra := New(r, 3) + ra := NewBuffered(r, 3) assert.Nil(t, ra.ReadLine()) } func TestReadSingleCharString(t *testing.T) { r := strings.NewReader("A") - ra := New(r, 3) + ra := NewBuffered(r, 3) assert.Equal(t, []byte("A"), ra.ReadLine()) } -func TestDropCR(t *testing.T) { +func TestBufferedDropCR(t *testing.T) { r := strings.NewReader("test\r\nthing") - ra := New(r, 3) + ra := NewBuffered(r, 3) assert.Equal(t, []byte("test"), ra.ReadLine()) assert.Equal(t, []byte("thing"), ra.ReadLine()) assert.Nil(t, ra.ReadLine()) @@ -62,15 +64,38 @@ func TestDropCR(t *testing.T) { func TestErrorHandling(t *testing.T) { errReader := iotest.TimeoutReader(strings.NewReader("Hello there you\nthis is a line\n")) - ra := New(errReader, 20) + ra := NewBuffered(errReader, 20) var hadError bool - ra.OnError = func(e error) { + ra.OnError(func(e error) { hadError = true - } + }) assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) assert.Equal(t, []byte("this"), ra.ReadLine()) // up to twentyith char assert.Nil(t, ra.ReadLine()) assert.True(t, hadError) } + +func BenchmarkBuffered(b *testing.B) { + r := testutil.NewTextGenerator(1024) + ra := NewBuffered(r, 128*128) + + for i := 0; i < b.N; i++ { + ra.Scan() + } +} + +func BenchmarkScanner(b *testing.B) { + r := testutil.NewTextGenerator(1024) + s := bufio.NewScanner(r) + + for i := 0; i < b.N; i++ { + s.Scan() + + // Copy into a new memory slot as practically that's needed for how its consumed + r := s.Bytes() + data := make([]byte, len(r)) + copy(data, r) + } +} diff --git a/pkg/readahead/immediate.go b/pkg/readahead/immediate.go new file mode 100644 index 0000000..02b7455 --- /dev/null +++ b/pkg/readahead/immediate.go @@ -0,0 +1,105 @@ +package readahead + +import ( + "bytes" + "io" +) + +type ImmediateReadAhead struct { + r io.Reader + + buf []byte + bufSize int + offset int + end int + + delim byte + + token []byte + eof bool + + onError OnScannerError +} + +var _ Scanner = &ImmediateReadAhead{} + +/* +ReadThru is slightly different than Scanner and read-ahead given that: + - Unlike Scanner and like ReadAhead, ReadThru leaves the buffer in-place + - Like Scanner, and unlike ReadAhead, Scan() returns immediately after finding a delim, + rather than blocking and buffering up to bufSize +*/ +func NewImmediate(reader io.Reader, bufSize int) *ImmediateReadAhead { + return &ImmediateReadAhead{ + r: reader, + bufSize: bufSize, + buf: make([]byte, bufSize), + delim: '\n', + } +} + +func (s *ImmediateReadAhead) Scan() bool { +RESTART: + + if s.offset < s.end { + if eol := bytes.IndexByte(s.buf[s.offset:s.end], s.delim); eol >= 0 { + s.token = dropCR(s.buf[s.offset : s.offset+eol]) + s.offset += eol + 1 + return true + } + if s.eof { + s.token = s.buf[s.offset:s.end] + s.offset = s.end + return true + } + } else if s.eof { + return false + } + + // Read loop + for { + // Increase buf if needed (heuristically) + if s.end >= len(s.buf) { + old := s.buf + s.buf = make([]byte, s.end-s.offset+s.bufSize) + copy(s.buf, old[s.offset:s.end]) + s.end -= s.offset + s.offset = 0 + } + + // Read data and check for errors + n, err := s.r.Read(s.buf[s.end:]) + s.end += n + + if err != nil { + s.eof = true + if err != io.EOF && s.onError != nil { + s.onError(err) + } + goto RESTART + } + + // Check only the most recently read bytes for a new line + if eol := bytes.IndexByte(s.buf[s.end-n:s.end], s.delim); eol >= 0 { + end := s.end - n + eol + s.token = dropCR(s.buf[s.offset:end]) + s.offset = end + 1 + return true + } + } +} + +func (s *ImmediateReadAhead) Bytes() []byte { + return s.token +} + +func (s *ImmediateReadAhead) ReadLine() []byte { + if s.Scan() { + return s.token + } + return nil +} + +func (s *ImmediateReadAhead) OnError(f OnScannerError) { + s.onError = f +} diff --git a/pkg/readahead/immediate_test.go b/pkg/readahead/immediate_test.go new file mode 100644 index 0000000..dd8a5d7 --- /dev/null +++ b/pkg/readahead/immediate_test.go @@ -0,0 +1,86 @@ +package readahead + +import ( + "rare/pkg/testutil" + "strings" + "testing" + "testing/iotest" + + "github.com/stretchr/testify/assert" +) + +func TestImmediateBasicReadingShortBuf(t *testing.T) { + r := strings.NewReader("Hello there you\nthis is line 2\n") + ra := NewImmediate(r, 3) + assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) + assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateBasicReadingLongBuf(t *testing.T) { + r := strings.NewReader("Hello there you\nthis is line 2\n") + ra := NewImmediate(r, 1024) + assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) + assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateBasicReadingMidBuf(t *testing.T) { + r := strings.NewReader("Hello there you\nthis is line 2\n") + ra := NewImmediate(r, 20) // Just enough to read first line, but not both + assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) + assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateBasicReadingNoNewTerm(t *testing.T) { + r := strings.NewReader("Hello there you\nthis is line 2") + ra := NewImmediate(r, 3) + assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) + assert.Equal(t, []byte("this is line 2"), ra.ReadLine()) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateReadEmptyString(t *testing.T) { + r := strings.NewReader("") + ra := NewImmediate(r, 3) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateReadSingleCharString(t *testing.T) { + r := strings.NewReader("A") + ra := NewImmediate(r, 3) + assert.Equal(t, []byte("A"), ra.ReadLine()) +} + +func TestImmediateDropCR(t *testing.T) { + r := strings.NewReader("test\r\nthing") + ra := NewImmediate(r, 3) + assert.Equal(t, []byte("test"), ra.ReadLine()) + assert.Equal(t, []byte("thing"), ra.ReadLine()) + assert.Nil(t, ra.ReadLine()) +} + +func TestImmediateErrorHandling(t *testing.T) { + errReader := iotest.TimeoutReader(strings.NewReader("Hello there you\nthis is a line\n")) + ra := NewImmediate(errReader, 20) + + var hadError bool + ra.OnError(func(e error) { + hadError = true + }) + + assert.Equal(t, []byte("Hello there you"), ra.ReadLine()) + assert.Equal(t, []byte("this"), ra.ReadLine()) // up to twentyith char + assert.Nil(t, ra.ReadLine()) + assert.True(t, hadError) +} + +func BenchmarkImmediate(b *testing.B) { + r := testutil.NewTextGenerator(1024) + ra := NewImmediate(r, 128*128) + + for i := 0; i < b.N; i++ { + ra.Scan() + } +} diff --git a/pkg/readahead/interface.go b/pkg/readahead/interface.go new file mode 100644 index 0000000..0f3739b --- /dev/null +++ b/pkg/readahead/interface.go @@ -0,0 +1,9 @@ +package readahead + +type OnScannerError func(error) + +type Scanner interface { + Scan() bool // Scans for next string. True if exist, false if eof + Bytes() []byte // Returns the result of the Scan() + OnError(f OnScannerError) +} diff --git a/pkg/readahead/util.go b/pkg/readahead/util.go new file mode 100644 index 0000000..73bd94d --- /dev/null +++ b/pkg/readahead/util.go @@ -0,0 +1,16 @@ +package readahead + +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} + +func maxi(a, b int) int { + if a > b { + return a + } + return b +} diff --git a/pkg/readahead/util_test.go b/pkg/readahead/util_test.go new file mode 100644 index 0000000..4719837 --- /dev/null +++ b/pkg/readahead/util_test.go @@ -0,0 +1,18 @@ +package readahead + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDropCR(t *testing.T) { + assert.Equal(t, []byte("test"), dropCR([]byte("test"))) + assert.Equal(t, []byte("test\n"), dropCR([]byte("test\n"))) + assert.Equal(t, []byte("test"), dropCR([]byte("test\r"))) +} + +func TestMaxi(t *testing.T) { + assert.Equal(t, 1, maxi(0, 1)) + assert.Equal(t, 1, maxi(1, 0)) +} diff --git a/pkg/testutil/texGenerator_test.go b/pkg/testutil/texGenerator_test.go new file mode 100644 index 0000000..8af482a --- /dev/null +++ b/pkg/testutil/texGenerator_test.go @@ -0,0 +1,34 @@ +package testutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func zero(buf []byte) { + for i := 0; i < len(buf); i++ { + buf[i] = 0 + } +} + +func TestGeneratesData(t *testing.T) { + buf := make([]byte, 1000) + rg := NewTextGenerator(50) + + zero(buf) + n, err := rg.Read(buf[:100]) + assert.Equal(t, n, 50) + assert.NoError(t, err) + for i := 0; i < n; i++ { + assert.NotZero(t, buf[i]) + } + + zero(buf) + n, err = rg.Read(buf[:10]) + assert.Equal(t, n, 10) + assert.NoError(t, err) + for i := 0; i < n; i++ { + assert.NotZero(t, buf[i]) + } +} diff --git a/pkg/testutil/textGenerator.go b/pkg/testutil/textGenerator.go new file mode 100644 index 0000000..ee80145 --- /dev/null +++ b/pkg/testutil/textGenerator.go @@ -0,0 +1,34 @@ +package testutil + +import ( + "io" +) + +type textGeneratingReader struct { + maxChunk int +} + +var _ io.Reader = &textGeneratingReader{} + +var validText []byte = []byte("abcdefghijklmnopqrstuvwxyz\n") + +// NewTextGenerator creates a io.reader that generates random alphaetical text separated by new-lines +// Will generate infinitely +func NewTextGenerator(maxReadSize int) io.Reader { + return &textGeneratingReader{ + maxChunk: maxReadSize, + } +} + +func (s *textGeneratingReader) Read(buf []byte) (int, error) { + size := len(buf) + if size > s.maxChunk { + size = s.maxChunk + } + + for i := 0; i < size; i += len(validText) { + copy(buf[i:size], validText) + } + + return size, nil +}