Skip to content

Commit

Permalink
Follow reader (#66)
Browse files Browse the repository at this point in the history
Switch from gotail library to a notify or polling reader increases read performance while following by 10-25x
  • Loading branch information
zix99 committed Jun 3, 2022
1 parent fdbd885 commit 7651b26
Show file tree
Hide file tree
Showing 10 changed files with 638 additions and 89 deletions.
5 changes: 1 addition & 4 deletions go.mod
Expand Up @@ -6,8 +6,7 @@ require (
github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/hpcloud/tail v1.0.0
github.com/fsnotify/fsnotify v1.4.9
github.com/kr/pretty v0.1.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/tidwall/gjson v1.3.5
Expand All @@ -16,8 +15,6 @@ require (
golang.org/x/term v0.0.0-20210503060354-a79de5458b56
golang.org/x/text v0.3.3
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
honnef.co/go/tools v0.2.1
)

Expand Down
6 changes: 0 additions & 6 deletions go.sum
Expand Up @@ -10,8 +10,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -72,10 +70,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
63 changes: 3 additions & 60 deletions pkg/extractor/batchers/tailBatcher.go
@@ -1,12 +1,9 @@
package batchers

import (
"rare/pkg/extractor"
"rare/pkg/followreader"
"rare/pkg/logger"
"sync"
"time"

"github.com/hpcloud/tail"
)

// TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor
Expand All @@ -16,7 +13,6 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)

go func() {
var wg sync.WaitGroup

for filename := range filenames {
wg.Add(1)
go func(filename string) {
Expand All @@ -26,18 +22,14 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)
}()

out.startFileReading(filename)
fileTail, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: reopen, Poll: poll})
r, err := followreader.New(filename, reopen, poll)
if err != nil {
logger.Print("Unable to open file: ", err)
out.incErrors()
return
}

err = out.tailLineToChan(filename, fileTail.Lines, batchSize)
if err != nil {
logger.Print("Error tailing file: ", err)
out.incErrors()
}
out.syncReaderToBatcher(filename, r, batchSize)
}(filename)
}

Expand All @@ -47,52 +39,3 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)

return out
}

func (s *Batcher) tailLineToChan(sourceName string, lines <-chan *tail.Line, batchSize int) (err error) {
batch := make([]extractor.BString, 0, batchSize)
var batchStart uint64 = 1
var batchBytes uint64

MAIN_LOOP:
for {
select {
case line := <-lines:
if line == nil {
break MAIN_LOOP
}
if line.Err != nil {
err = line.Err
break MAIN_LOOP
}
batch = append(batch, extractor.BString(line.Text))
batchBytes += uint64(len(line.Text) + 1)
if len(batch) >= batchSize {
s.c <- extractor.InputBatch{
Batch: batch,
Source: sourceName,
BatchStart: batchStart,
}
batchStart += uint64(len(batch))
batch = make([]extractor.BString, 0, batchSize)

s.incReadBytes(batchBytes)
batchBytes = 0
}
case <-time.After(500 * time.Millisecond):
// Since we're tailing, if we haven't received any line in a bit, lets flush what we have
if len(batch) > 0 {
s.c <- extractor.InputBatch{
Batch: batch,
Source: sourceName,
BatchStart: batchStart,
}
batchStart += uint64(len(batch))
batch = make([]extractor.BString, 0, batchSize)

s.incReadBytes(batchBytes)
batchBytes = 0
}
}
}
return
}
19 changes: 0 additions & 19 deletions pkg/extractor/batchers/tailBatcher_test.go
@@ -1,30 +1,11 @@
package batchers

import (
"rare/pkg/extractor"
"testing"

"github.com/hpcloud/tail"
"github.com/stretchr/testify/assert"
)

func TestTailLineToChan(t *testing.T) {
tailchan := make(chan *tail.Line)
batcher := newBatcher(10)
go batcher.tailLineToChan("test", tailchan, 1)

tailchan <- &tail.Line{
Text: "Hello",
}

val := <-batcher.BatchChan()
assert.Equal(t, "test", val.Source)
assert.Equal(t, extractor.BString("Hello"), val.Batch[0])
assert.Equal(t, uint64(1), val.BatchStart)

close(tailchan)
}

func TestBatchTailFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- "tailBatcher_test.go" // me
Expand Down
17 changes: 17 additions & 0 deletions pkg/followreader/followreader.go
@@ -0,0 +1,17 @@
package followreader

import (
"io"
)

type FollowReader interface {
io.ReadCloser
Drain() error
}

func New(filename string, reopen, poll bool) (FollowReader, error) {
if poll {
return NewPolling(filename, reopen)
}
return NewNotify(filename, reopen)
}
103 changes: 103 additions & 0 deletions pkg/followreader/followreader_test.go
@@ -0,0 +1,103 @@
package followreader

import (
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// reads data and asserts all reads successful
func assertSequentialReads(t *testing.T, tail FollowReader, reads int) {
buf := make([]byte, 100)
for i := 0; i < reads; i++ {
n, err := tail.Read(buf)
assert.NoError(t, err)
assert.NotZero(t, n)
}
}

// Helper to create a file and write random data at random intervals
type testAppendingFile struct {
f *os.File
Line []byte

stop chan<- bool
wg sync.WaitGroup
}

func CreateAppendingFromFile(filename string) *testAppendingFile {
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
panic(err)
}
return createAppendingFileEx(f)
}

func CreateAppendingTempFile() *testAppendingFile {
f, err := ioutil.TempFile("", "go-test-")
if err != nil {
panic(err)
}
return createAppendingFileEx(f)
}

func createAppendingFileEx(f *os.File) *testAppendingFile {
ret := &testAppendingFile{
f: f,
Line: []byte("test file 123\n"),
wg: sync.WaitGroup{},
}

ret.startWriteRandomData(1 * time.Millisecond)

return ret
}

func (s *testAppendingFile) Name() string {
return s.f.Name()
}

func (s *testAppendingFile) startWriteRandomData(interval time.Duration) {
stop := make(chan bool)
s.stop = stop
s.wg.Add(1)

go func() {
defer s.wg.Done()
for {
select {
case <-stop:
return
case <-time.After(interval):
s.f.Write(s.Line)
}
}
}()
}

// Stop writing to the file
func (s *testAppendingFile) Stop() {
if s.stop != nil {
s.stop <- true
s.wg.Wait()
s.stop = nil
}
}

// Close and stop writing to the file
func (s *testAppendingFile) Close() {
s.Stop()
err := s.f.Close()
if err != nil {
panic(err)
}

err = os.Remove(s.f.Name())
if err != nil {
panic(err)
}
}

0 comments on commit 7651b26

Please sign in to comment.