From 7651b26052f79f87cbda5608f645c2432babd8ec Mon Sep 17 00:00:00 2001 From: Chris LaPointe Date: Thu, 2 Jun 2022 20:40:55 -0400 Subject: [PATCH] Follow reader (#66) Switch from gotail library to a notify or polling reader increases read performance while following by 10-25x --- go.mod | 5 +- go.sum | 6 - pkg/extractor/batchers/tailBatcher.go | 63 +-------- pkg/extractor/batchers/tailBatcher_test.go | 19 --- pkg/followreader/followreader.go | 17 +++ pkg/followreader/followreader_test.go | 103 ++++++++++++++ pkg/followreader/notify.go | 152 +++++++++++++++++++++ pkg/followreader/notify_test.go | 141 +++++++++++++++++++ pkg/followreader/poller.go | 112 +++++++++++++++ pkg/followreader/poller_test.go | 109 +++++++++++++++ 10 files changed, 638 insertions(+), 89 deletions(-) create mode 100644 pkg/followreader/followreader.go create mode 100644 pkg/followreader/followreader_test.go create mode 100644 pkg/followreader/notify.go create mode 100644 pkg/followreader/notify_test.go create mode 100644 pkg/followreader/poller.go create mode 100644 pkg/followreader/poller_test.go diff --git a/go.mod b/go.mod index 897ae8c..558b3f7 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,7 @@ require ( github.com/araddon/dateparse v0.0.0-20210207001429-0eec95c9db7e github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect - github.com/hpcloud/tail v1.0.0 + github.com/fsnotify/fsnotify v1.4.9 github.com/kr/pretty v0.1.0 // indirect github.com/stretchr/testify v1.7.0 github.com/tidwall/gjson v1.3.5 @@ -16,8 +15,6 @@ require ( golang.org/x/term v0.0.0-20210503060354-a79de5458b56 golang.org/x/text v0.3.3 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect - gopkg.in/fsnotify.v1 v1.4.7 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect honnef.co/go/tools v0.2.1 ) diff --git a/go.sum b/go.sum index 1256000..0b17a77 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= -github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -72,10 +70,6 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= -gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/extractor/batchers/tailBatcher.go b/pkg/extractor/batchers/tailBatcher.go index 730bfc0..f0dfe32 100644 --- a/pkg/extractor/batchers/tailBatcher.go +++ b/pkg/extractor/batchers/tailBatcher.go @@ -1,12 +1,9 @@ package batchers import ( - "rare/pkg/extractor" + "rare/pkg/followreader" "rare/pkg/logger" "sync" - "time" - - "github.com/hpcloud/tail" ) // TailFilesToChan tails a set of files to an input batcher that can be consumed by extractor @@ -16,7 +13,6 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool) go func() { var wg sync.WaitGroup - for filename := range filenames { wg.Add(1) go func(filename string) { @@ -26,18 +22,14 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool) }() out.startFileReading(filename) - fileTail, err := tail.TailFile(filename, tail.Config{Follow: true, ReOpen: reopen, Poll: poll}) + r, err := followreader.New(filename, reopen, poll) if err != nil { logger.Print("Unable to open file: ", err) out.incErrors() return } - err = out.tailLineToChan(filename, fileTail.Lines, batchSize) - if err != nil { - logger.Print("Error tailing file: ", err) - out.incErrors() - } + out.syncReaderToBatcher(filename, r, batchSize) }(filename) } @@ -47,52 +39,3 @@ func TailFilesToChan(filenames <-chan string, batchSize int, reopen, poll bool) return out } - -func (s *Batcher) tailLineToChan(sourceName string, lines <-chan *tail.Line, batchSize int) (err error) { - batch := make([]extractor.BString, 0, batchSize) - var batchStart uint64 = 1 - var batchBytes uint64 - -MAIN_LOOP: - for { - select { - case line := <-lines: - if line == nil { - break MAIN_LOOP - } - if line.Err != nil { - err = line.Err - break MAIN_LOOP - } - batch = append(batch, extractor.BString(line.Text)) - batchBytes += uint64(len(line.Text) + 1) - if len(batch) >= batchSize { - s.c <- extractor.InputBatch{ - Batch: batch, - Source: sourceName, - BatchStart: batchStart, - } - batchStart += uint64(len(batch)) - batch = make([]extractor.BString, 0, batchSize) - - s.incReadBytes(batchBytes) - batchBytes = 0 - } - case <-time.After(500 * time.Millisecond): - // Since we're tailing, if we haven't received any line in a bit, lets flush what we have - if len(batch) > 0 { - s.c <- extractor.InputBatch{ - Batch: batch, - Source: sourceName, - BatchStart: batchStart, - } - batchStart += uint64(len(batch)) - batch = make([]extractor.BString, 0, batchSize) - - s.incReadBytes(batchBytes) - batchBytes = 0 - } - } - } - return -} diff --git a/pkg/extractor/batchers/tailBatcher_test.go b/pkg/extractor/batchers/tailBatcher_test.go index 77a3b59..5818c96 100644 --- a/pkg/extractor/batchers/tailBatcher_test.go +++ b/pkg/extractor/batchers/tailBatcher_test.go @@ -1,30 +1,11 @@ package batchers import ( - "rare/pkg/extractor" "testing" - "github.com/hpcloud/tail" "github.com/stretchr/testify/assert" ) -func TestTailLineToChan(t *testing.T) { - tailchan := make(chan *tail.Line) - batcher := newBatcher(10) - go batcher.tailLineToChan("test", tailchan, 1) - - tailchan <- &tail.Line{ - Text: "Hello", - } - - val := <-batcher.BatchChan() - assert.Equal(t, "test", val.Source) - assert.Equal(t, extractor.BString("Hello"), val.Batch[0]) - assert.Equal(t, uint64(1), val.BatchStart) - - close(tailchan) -} - func TestBatchTailFile(t *testing.T) { filenames := make(chan string, 1) filenames <- "tailBatcher_test.go" // me diff --git a/pkg/followreader/followreader.go b/pkg/followreader/followreader.go new file mode 100644 index 0000000..7eb1362 --- /dev/null +++ b/pkg/followreader/followreader.go @@ -0,0 +1,17 @@ +package followreader + +import ( + "io" +) + +type FollowReader interface { + io.ReadCloser + Drain() error +} + +func New(filename string, reopen, poll bool) (FollowReader, error) { + if poll { + return NewPolling(filename, reopen) + } + return NewNotify(filename, reopen) +} diff --git a/pkg/followreader/followreader_test.go b/pkg/followreader/followreader_test.go new file mode 100644 index 0000000..0f0cdc1 --- /dev/null +++ b/pkg/followreader/followreader_test.go @@ -0,0 +1,103 @@ +package followreader + +import ( + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// reads data and asserts all reads successful +func assertSequentialReads(t *testing.T, tail FollowReader, reads int) { + buf := make([]byte, 100) + for i := 0; i < reads; i++ { + n, err := tail.Read(buf) + assert.NoError(t, err) + assert.NotZero(t, n) + } +} + +// Helper to create a file and write random data at random intervals +type testAppendingFile struct { + f *os.File + Line []byte + + stop chan<- bool + wg sync.WaitGroup +} + +func CreateAppendingFromFile(filename string) *testAppendingFile { + f, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + panic(err) + } + return createAppendingFileEx(f) +} + +func CreateAppendingTempFile() *testAppendingFile { + f, err := ioutil.TempFile("", "go-test-") + if err != nil { + panic(err) + } + return createAppendingFileEx(f) +} + +func createAppendingFileEx(f *os.File) *testAppendingFile { + ret := &testAppendingFile{ + f: f, + Line: []byte("test file 123\n"), + wg: sync.WaitGroup{}, + } + + ret.startWriteRandomData(1 * time.Millisecond) + + return ret +} + +func (s *testAppendingFile) Name() string { + return s.f.Name() +} + +func (s *testAppendingFile) startWriteRandomData(interval time.Duration) { + stop := make(chan bool) + s.stop = stop + s.wg.Add(1) + + go func() { + defer s.wg.Done() + for { + select { + case <-stop: + return + case <-time.After(interval): + s.f.Write(s.Line) + } + } + }() +} + +// Stop writing to the file +func (s *testAppendingFile) Stop() { + if s.stop != nil { + s.stop <- true + s.wg.Wait() + s.stop = nil + } +} + +// Close and stop writing to the file +func (s *testAppendingFile) Close() { + s.Stop() + err := s.f.Close() + if err != nil { + panic(err) + } + + err = os.Remove(s.f.Name()) + if err != nil { + panic(err) + } +} diff --git a/pkg/followreader/notify.go b/pkg/followreader/notify.go new file mode 100644 index 0000000..e6f4a9e --- /dev/null +++ b/pkg/followreader/notify.go @@ -0,0 +1,152 @@ +package followreader + +import ( + "fmt" + "io" + "os" + "path" + + "github.com/fsnotify/fsnotify" +) + +type NotifyFollowReader struct { + filename string + f *os.File + + ReOpen bool + + closed bool + watcher *fsnotify.Watcher + eventWrite chan struct{} + eventDelete chan struct{} +} + +var _ FollowReader = &NotifyFollowReader{} + +func NewNotify(filename string, reopen bool) (*NotifyFollowReader, error) { + f, err := os.Open(filename) + + if err != nil && !reopen { + return nil, fmt.Errorf("unable to open file and cannot reopen: %w", err) + } + + ret := &NotifyFollowReader{ + filename: filename, + f: f, + ReOpen: reopen, + eventWrite: make(chan struct{}, 1), + eventDelete: make(chan struct{}, 1), + } + + ret.watcher, err = ret.startWatcher() + if err != nil { + if f != nil { + f.Close() + } + return nil, fmt.Errorf("unable to start notify: %w", err) + } + + return ret, nil +} + +func (s *NotifyFollowReader) Close() error { + if !s.closed { + s.closeFile() + s.watcher.Close() + + s.closed = true + } + + return nil +} + +func (s *NotifyFollowReader) Drain() error { + if s.f != nil { + _, err := s.f.Seek(0, os.SEEK_END) + return err + } + return nil +} + +func (s *NotifyFollowReader) Read(buf []byte) (int, error) { + if s.closed { + return 0, io.EOF + } + + for { + if s.f != nil { + n, err := s.f.Read(buf) + + if n > 0 { + return n, nil + } + if err != nil && err != io.EOF { + return n, err + } + } + + // Wait for changes + select { + case <-s.eventWrite: + if s.f == nil && s.ReOpen { // Re-open if able and willing + if f, err := os.Open(s.filename); err == nil { + s.f = f + } + } + case <-s.eventDelete: + if s.ReOpen { + s.closeFile() + } else { + s.Close() + return 0, io.EOF + } + } + } +} + +func (s *NotifyFollowReader) startWatcher() (*fsnotify.Watcher, error) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + + if err := watcher.Add(path.Dir(s.filename)); err != nil { + watcher.Close() + return nil, err + } + + go func() { + defer watcher.Close() + for { + event, ok := <-watcher.Events + switch { + case !ok: + return + case path.Base(s.filename) != path.Base(event.Name): + // nop + case event.Op&fsnotify.Write != 0: + writeSignalNonBlock(s.eventWrite) + case event.Op&fsnotify.Remove != 0: + writeSignalNonBlock(s.eventDelete) + case event.Op&fsnotify.Create != 0: + writeSignalNonBlock(s.eventWrite) + } + } + }() + + return watcher, nil +} + +func (s *NotifyFollowReader) closeFile() { + if s.f != nil { + s.f.Close() + s.f = nil + } +} + +func writeSignalNonBlock(c chan<- struct{}) { + select { + case c <- struct{}{}: + default: + } +} diff --git a/pkg/followreader/notify_test.go b/pkg/followreader/notify_test.go new file mode 100644 index 0000000..7118ac1 --- /dev/null +++ b/pkg/followreader/notify_test.go @@ -0,0 +1,141 @@ +package followreader + +import ( + "fmt" + "io" + "math/rand" + "os" + "path" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSimpleFileNotifyTail(t *testing.T) { + af := CreateAppendingTempFile() + defer af.Close() + + tail, err := NewNotify(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + assert.NoError(t, tail.Close()) +} + +func TestTailNotifyFileAppendingExisting(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewNotify(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Re-open process + af.Stop() + af = CreateAppendingFromFile(af.Name()) + + assertSequentialReads(t, tail, 10) + + af.Close() + assert.NoError(t, tail.Close()) +} + +func TestTailNotifyFileRecreatedReopen(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewNotify(af.Name(), true) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Re-open process + af.Stop() + tail.Drain() + af.Close() // Delete + + af = CreateAppendingFromFile(af.Name()) + + assertSequentialReads(t, tail, 10) + + af.Close() + assert.NoError(t, tail.Close()) +} + +func TestTailNotifyFileDeletedCloses(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewNotify(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Close and should delete + af.Close() + + // Read until we receive an EOF + gotEof := false + buf := make([]byte, 100) + for i := 0; i < 100; i++ { + n, err := tail.Read(buf) + fmt.Printf("Got data: %d\n", n) + if err == io.EOF { + gotEof = true + break + } else if err != nil { + assert.Fail(t, "Non-eof error") + } + } + + assert.True(t, gotEof) + assert.NoError(t, tail.Close()) +} + +func TestWatchingNonExistantFile(t *testing.T) { + tp := path.Join(os.TempDir(), fmt.Sprintf("go-test-%d", rand.Int())) + + tail, err := NewNotify(tp, true) + assert.NoError(t, err) + + af := CreateAppendingFromFile(tp) + + assertSequentialReads(t, tail, 10) + + af.Close() + tail.Close() +} + +func TestWatchingNonExistingFileFails(t *testing.T) { + tp := path.Join(os.TempDir(), fmt.Sprintf("go-test-%d", rand.Int())) + tail, err := NewNotify(tp, false) + + assert.Nil(t, tail) + assert.Error(t, err) +} + +func TestNonBlockingSignal(t *testing.T) { + c := make(chan struct{}, 1) + assert.Len(t, c, 0) + writeSignalNonBlock(c) + writeSignalNonBlock(c) + assert.Len(t, c, 1) + assert.NotNil(t, <-c) +} + +func TestNotifyClosedReaderReturnsEOF(t *testing.T) { + af := CreateAppendingTempFile() + defer af.Close() + + tail, err := NewNotify(af.f.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + tail.Close() + n, err := tail.Read(nil) + assert.Zero(t, n) + assert.ErrorIs(t, err, io.EOF) +} diff --git a/pkg/followreader/poller.go b/pkg/followreader/poller.go new file mode 100644 index 0000000..a2bc44e --- /dev/null +++ b/pkg/followreader/poller.go @@ -0,0 +1,112 @@ +package followreader + +import ( + "fmt" + "io" + "os" + "time" +) + +type PollingFollowReader struct { + filename string + f *os.File + + // State + readBytes int64 + closed bool + + // Options + ReadAttempts int // Number of read-attempts before checking to re-open + PollDelay time.Duration // Delay between read attempts + Reopen bool // If true, will try to open() file again if it looks different after a delay; false will send EOF if file goes away +} + +var _ FollowReader = &PollingFollowReader{} + +func NewPolling(filename string, reopen bool) (*PollingFollowReader, error) { + f, err := os.Open(filename) + + if err != nil && !reopen { + return nil, fmt.Errorf("unable to open file and cannot reopen: %w", err) + } + + ret := &PollingFollowReader{ + filename: filename, + f: f, + PollDelay: 250 * time.Millisecond, + ReadAttempts: 5, + Reopen: reopen, + } + + return ret, nil +} + +// Drain navigates to the end of the stream +func (s *PollingFollowReader) Drain() error { + offset, err := s.f.Seek(0, os.SEEK_END) + if err == nil { + s.readBytes = offset + } + return err +} + +// Close file and underlying resources +func (s *PollingFollowReader) Close() error { + if s.f != nil { + s.f.Close() + s.f = nil + } + + s.closed = true + + return nil +} + +func (s *PollingFollowReader) Read(buf []byte) (int, error) { + if s.closed { + return 0, io.EOF + } + + for { + if s.f != nil { + for i := 0; i < s.ReadAttempts; i++ { + n, err := s.f.Read(buf) + s.readBytes += int64(n) + + if n > 0 { + return n, nil + } + + if err != nil && err != io.EOF { + return n, err + } + + time.Sleep(s.PollDelay) + } + } else { + time.Sleep(s.PollDelay) + } + + // Didn't read any bytes... has the file inode changed? + if s.Reopen { + st, _ := os.Stat(s.filename) + if st != nil && st.Size() != s.readBytes { + s.f, _ = os.Open(s.filename) + if st.Size() >= s.readBytes { + // Likely existing file that is re-opened, start reading where we left off + s.f.Seek(s.readBytes, io.SeekStart) + } else { + // Assume new file, restart reading from beginning + s.readBytes = 0 + } + } + } else { // No re-open, if the file's missing, that's EOF + _, err := os.Stat(s.filename) + if err != nil { + s.Close() + return 0, io.EOF + } + } + + } +} diff --git a/pkg/followreader/poller_test.go b/pkg/followreader/poller_test.go new file mode 100644 index 0000000..177423e --- /dev/null +++ b/pkg/followreader/poller_test.go @@ -0,0 +1,109 @@ +package followreader + +import ( + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSimpleFilePollingTail(t *testing.T) { + af := CreateAppendingTempFile() + defer af.Close() + + tail, err := NewPolling(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + ret := make([]byte, 100) + for i := 0; i < 10; i++ { + n, err := tail.Read(ret) + assert.NoError(t, err) + assert.NotZero(t, n) + } + + assert.NoError(t, tail.Close()) +} + +func TestTailFileAppendingExisting(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewPolling(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Re-open process + af.Stop() + af = CreateAppendingFromFile(af.Name()) + + assertSequentialReads(t, tail, 10) + + af.Close() + assert.NoError(t, tail.Close()) +} + +func TestTailFileRecreatedReopen(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewPolling(af.Name(), true) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Re-open process + af.Stop() + tail.Drain() + af.Close() // Delete + + af = CreateAppendingFromFile(af.Name()) + + assertSequentialReads(t, tail, 10) + + af.Close() + assert.NoError(t, tail.Close()) +} + +func TestTailFileDeletedCloses(t *testing.T) { + af := CreateAppendingTempFile() + + tail, err := NewPolling(af.Name(), false) + assert.NoError(t, err) + assert.NotNil(t, tail) + + assertSequentialReads(t, tail, 10) + + // Close and should delete + af.Close() + + // Read until we receive an EOF + gotEof := false + buf := make([]byte, 100) + for i := 0; i < 100; i++ { + _, err := tail.Read(buf) + if err == io.EOF { + gotEof = true + break + } else if err != nil { + assert.Fail(t, "Non-eof error") + } + } + + assert.True(t, gotEof) + assert.NoError(t, tail.Close()) +} + +func TestPollingCloseReturnsEOF(t *testing.T) { + af := CreateAppendingTempFile() + defer af.Close() + + tail, err := NewPolling(af.Name(), true) + assert.NoError(t, err) + tail.Close() + + n, err := tail.Read(nil) + assert.Zero(t, n) + assert.ErrorIs(t, err, io.EOF) +}