diff --git a/mgr/dataflow_test.go b/mgr/dataflow_test.go index aabc873d9..fe4cc207f 100644 --- a/mgr/dataflow_test.go +++ b/mgr/dataflow_test.go @@ -679,7 +679,7 @@ func Test_RawData_MultiLines(t *testing.T) { assert.Nil(t, err) assert.Equal(t, []string{"abc\n", "abc\n"}, actual) - readConfig[readerconf.KeyRawDataTimeout] = "3" + readConfig[readerconf.KeyRawDataTimeout] = "40" os.RemoveAll(fileName) createRawDataFile(fileName, "abc\n") actual, err = RawData(readConfig) diff --git a/mgr/runner_test.go b/mgr/runner_test.go index ae80251b8..ae405419c 100644 --- a/mgr/runner_test.go +++ b/mgr/runner_test.go @@ -1469,7 +1469,7 @@ func TestAddDatatags(t *testing.T) { assert.NoError(t, err) go rr.Run() - time.Sleep(2 * time.Second) + time.Sleep(60 * time.Second) data, err := ioutil.ReadFile("./TestAddDatatags/filesend.json") assert.Nil(t, err) var res []Data @@ -1539,7 +1539,7 @@ func TestRunWithExtra(t *testing.T) { assert.NoError(t, err) go rr.Run() - time.Sleep(2 * time.Second) + time.Sleep(60 * time.Second) data, err := ioutil.ReadFile("./TestRunWithExtra/filesend.json") assert.Nil(t, err) var res []Data @@ -1671,7 +1671,7 @@ func TestRunWithDataSourceFail(t *testing.T) { assert.NotNil(t, rr) go rr.Run() - time.Sleep(2 * time.Second) + time.Sleep(60 * time.Second) data, err := ioutil.ReadFile("./TestRunWithDataSourceFail/filesend.json") assert.Nil(t, err) var res []Data @@ -2076,21 +2076,21 @@ func TestTailxCleaner(t *testing.T) { assert.NotNil(t, rr) go rr.Run() - time.Sleep(2 * time.Second) + time.Sleep(60 * time.Second) logPatha1 := filepath.Join(dira, "a.log.1") assert.NoError(t, os.Rename(logPatha, logPatha1)) assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666)) - time.Sleep(5 * time.Second) + time.Sleep(60 * time.Second) logPatha2 := filepath.Join(dira, "a.log.2") assert.NoError(t, os.Rename(logPatha, logPatha2)) assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666)) - time.Sleep(2 * time.Second) + time.Sleep(60 * time.Second) assert.NotNil(t, rr.Cleaner()) @@ -2113,6 +2113,7 @@ DONE: break } } + time.Sleep(50 * time.Second) assert.Equal(t, 1, ret) } diff --git a/reader/bufreader/bufreader.go b/reader/bufreader/bufreader.go index d02f2813a..ab803ab58 100644 --- a/reader/bufreader/bufreader.go +++ b/reader/bufreader/bufreader.go @@ -54,16 +54,17 @@ type LastSync struct { // BufReader implements buffering for an FileReader object. type BufReader struct { - stopped int32 - buf []byte - delim []byte - mutiLineCache *LineCache - rd reader.FileReader // reader provided by the client - r, w int // buf read and write positions - err error - lastByte int - lastRuneSize int - lastSync LastSync + stopped int32 + buf []byte + delim []byte + mutiLineCache *LineCache + waitForWholeLine bool //readWholeLine + rd reader.FileReader // reader provided by the client + r, w int // buf read and write positions + err error + lastByte int + lastRuneSize int + lastSync LastSync runTime reader.RunTime @@ -88,7 +89,7 @@ type BufReader struct { const minReadBufferSize = 16 //最大连续读到空的尝试次数 -const maxConsecutiveEmptyReads = 10 +const maxConsecutiveEmptyReads = 40 // NewReaderSize returns a new Reader whose buffer has at least the specified // size. If the argument FileReader is already a Reader with large enough @@ -189,6 +190,11 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) { return } +func (b *BufReader) SetWaitFlagForWholeLine() { + b.waitForWholeLine = true + return +} + func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) { b.runTime, err = reader.ParseRunTimeWithMode(mode, v) return err @@ -280,6 +286,17 @@ func (b *BufReader) fill() { } b.w += n + + if err == io.EOF && b.waitForWholeLine { + if i == 1 { //when last attempts,return err info; + b.err = err + return + } + + time.Sleep(1 * time.Second) + continue + } + if err != nil { b.err = err return @@ -287,6 +304,7 @@ func (b *BufReader) fill() { if n > 0 { return } + } b.err = io.ErrNoProgress } @@ -666,7 +684,9 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re return } maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0) - return NewReaderSize(fr, meta, bufSize, maxLineLen) + r, err := NewReaderSize(fr, meta, bufSize, maxLineLen) + r.SetWaitFlagForWholeLine() + return r, err } func init() { diff --git a/reader/dirx/dirx_test.go b/reader/dirx/dirx_test.go index d16b1512e..bd4e587f3 100644 --- a/reader/dirx/dirx_test.go +++ b/reader/dirx/dirx_test.go @@ -1079,6 +1079,7 @@ func readerExpireDeleteTarTest(t *testing.T) { } } t.Log("maxNum ", maxNum, "emptyNum", emptyNum) + time.Sleep(60 * time.Second) assert.EqualValues(t, len(expectResults), len(actualResults)) for k, v := range expectResults { actualV, ok := actualResults[k]