Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow reader #66

Merged
merged 10 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ require (
github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e
github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/hpcloud/tail v1.0.0
github.com/fsnotify/fsnotify v1.4.9
github.com/kr/pretty v0.1.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/tidwall/gjson v1.3.5
Expand All @@ -16,13 +15,10 @@ require (
golang.org/x/term v0.0.0-20210503060354-a79de5458b56
golang.org/x/text v0.3.3
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/fsnotify.v1 v1.4.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
honnef.co/go/tools v0.2.1
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e h1:OjdSMCht0ZVX7IH0nTdf00xEustvbtUGRgMh3gbdmOg=
github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
Expand All @@ -10,8 +9,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down Expand Up @@ -72,10 +69,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
63 changes: 3 additions & 60 deletions pkg/extractor/batchers/tailBatcher.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package batchers

import (
"rare/pkg/extractor"
"rare/pkg/followreader"
"rare/pkg/logger"
"sync"
"time"

"github.com/hpcloud/tail"
)

// TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor
Expand All @@ -16,7 +13,6 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)

go func() {
var wg sync.WaitGroup

for filename := range filenames {
wg.Add(1)
go func(filename string) {
Expand All @@ -26,18 +22,14 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)
}()

out.startFileReading(filename)
fileTail, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: reopen, Poll: poll})
r, err := followreader.New(filename, reopen, poll)
if err != nil {
logger.Print("Unable to open file: ", err)
out.incErrors()
return
}

err = out.tailLineToChan(filename, fileTail.Lines, batchSize)
if err != nil {
logger.Print("Error tailing file: ", err)
out.incErrors()
}
out.syncReaderToBatcher(filename, r, batchSize)
}(filename)
}

Expand All @@ -47,52 +39,3 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool)

return out
}

func (s *Batcher) tailLineToChan(sourceName string, lines <-chan *tail.Line, batchSize int) (err error) {
batch := make([]extractor.BString, 0, batchSize)
var batchStart uint64 = 1
var batchBytes uint64

MAIN_LOOP:
for {
select {
case line := <-lines:
if line == nil {
break MAIN_LOOP
}
if line.Err != nil {
err = line.Err
break MAIN_LOOP
}
batch = append(batch, extractor.BString(line.Text))
batchBytes += uint64(len(line.Text) + 1)
if len(batch) >= batchSize {
s.c <- extractor.InputBatch{
Batch: batch,
Source: sourceName,
BatchStart: batchStart,
}
batchStart += uint64(len(batch))
batch = make([]extractor.BString, 0, batchSize)

s.incReadBytes(batchBytes)
batchBytes = 0
}
case <-time.After(500 * time.Millisecond):
// Since we're tailing, if we haven't received any line in a bit, lets flush what we have
if len(batch) > 0 {
s.c <- extractor.InputBatch{
Batch: batch,
Source: sourceName,
BatchStart: batchStart,
}
batchStart += uint64(len(batch))
batch = make([]extractor.BString, 0, batchSize)

s.incReadBytes(batchBytes)
batchBytes = 0
}
}
}
return
}
19 changes: 0 additions & 19 deletions pkg/extractor/batchers/tailBatcher_test.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,11 @@
package batchers

import (
"rare/pkg/extractor"
"testing"

"github.com/hpcloud/tail"
"github.com/stretchr/testify/assert"
)

func TestTailLineToChan(t *testing.T) {
tailchan := make(chan *tail.Line)
batcher := newBatcher(10)
go batcher.tailLineToChan("test", tailchan, 1)

tailchan <- &tail.Line{
Text: "Hello",
}

val := <-batcher.BatchChan()
assert.Equal(t, "test", val.Source)
assert.Equal(t, extractor.BString("Hello"), val.Batch[0])
assert.Equal(t, uint64(1), val.BatchStart)

close(tailchan)
}

func TestBatchTailFile(t *testing.T) {
filenames := make(chan string, 1)
filenames <- "tailBatcher_test.go" // me
Expand Down
17 changes: 17 additions & 0 deletions pkg/followreader/followreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package followreader

import (
"io"
)

type FollowReader interface {
io.ReadCloser
Drain() error
}

func New(filename string, reopen, poll bool) (FollowReader, error) {
if poll {
return NewPolling(filename, reopen)
}
return NewNotify(filename, reopen)
}
98 changes: 98 additions & 0 deletions pkg/followreader/followreader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package followreader

import (
"io/ioutil"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// reads data and asserts all reads successful
func assertSequentialReads(t *testing.T, tail FollowReader, reads int) {
buf := make([]byte, 100)
for i := 0; i < reads; i++ {
n, err := tail.Read(buf)
assert.NoError(t, err)
assert.NotZero(t, n)
}
}

// Helper to create a file and write random data at random intervals
type testAppendingFile struct {
f *os.File
Line []byte

stop chan<- bool
wg sync.WaitGroup
}

func CreateAppendingFromFile(filename string) *testAppendingFile {
f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
panic(err)
}
return createAppendingFileEx(f)
}

func CreateAppendingTempFile() *testAppendingFile {
f, err := ioutil.TempFile("", "go-test-")
if err != nil {
panic(err)
}
return createAppendingFileEx(f)
}

func createAppendingFileEx(f *os.File) *testAppendingFile {
ret := &testAppendingFile{
f: f,
Line: []byte("test file 123\n"),
wg: sync.WaitGroup{},
}

ret.startWriteRandomData(1 * time.Millisecond)

return ret
}

func (s *testAppendingFile) Name() string {
return s.f.Name()
}

func (s *testAppendingFile) startWriteRandomData(interval time.Duration) chan<- bool {
stop := make(chan bool)
s.stop = stop
s.wg.Add(1)

go func() {
defer s.wg.Done()
for {
select {
case <-stop:
return
case <-time.After(interval):
s.f.Write(s.Line)
}
}
}()

return nil
}

// Stop writing to the file
func (s *testAppendingFile) Stop() {
if s.stop != nil {
s.stop <- true
s.wg.Wait()
s.stop = nil
}
}

// Close and stop writing to the file
func (s *testAppendingFile) Close() {
s.Stop()
s.f.Close()
os.Remove(s.f.Name())
}