Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mgr/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func Test_RawData_MultiLines(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, []string{"abc\n", "abc\n"}, actual)

readConfig[readerconf.KeyRawDataTimeout] = "3"
readConfig[readerconf.KeyRawDataTimeout] = "40"
os.RemoveAll(fileName)
createRawDataFile(fileName, "abc\n")
actual, err = RawData(readConfig)
Expand Down
13 changes: 7 additions & 6 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ func TestAddDatatags(t *testing.T) {
assert.NoError(t, err)
go rr.Run()

time.Sleep(2 * time.Second)
time.Sleep(60 * time.Second)
data, err := ioutil.ReadFile("./TestAddDatatags/filesend.json")
assert.Nil(t, err)
var res []Data
Expand Down Expand Up @@ -1539,7 +1539,7 @@ func TestRunWithExtra(t *testing.T) {
assert.NoError(t, err)
go rr.Run()

time.Sleep(2 * time.Second)
time.Sleep(60 * time.Second)
data, err := ioutil.ReadFile("./TestRunWithExtra/filesend.json")
assert.Nil(t, err)
var res []Data
Expand Down Expand Up @@ -1671,7 +1671,7 @@ func TestRunWithDataSourceFail(t *testing.T) {
assert.NotNil(t, rr)
go rr.Run()

time.Sleep(2 * time.Second)
time.Sleep(60 * time.Second)
data, err := ioutil.ReadFile("./TestRunWithDataSourceFail/filesend.json")
assert.Nil(t, err)
var res []Data
Expand Down Expand Up @@ -2076,21 +2076,21 @@ func TestTailxCleaner(t *testing.T) {
assert.NotNil(t, rr)
go rr.Run()

time.Sleep(2 * time.Second)
time.Sleep(60 * time.Second)

logPatha1 := filepath.Join(dira, "a.log.1")
assert.NoError(t, os.Rename(logPatha, logPatha1))

assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666))

time.Sleep(5 * time.Second)
time.Sleep(60 * time.Second)

logPatha2 := filepath.Join(dira, "a.log.2")
assert.NoError(t, os.Rename(logPatha, logPatha2))

assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666))

time.Sleep(2 * time.Second)
time.Sleep(60 * time.Second)

assert.NotNil(t, rr.Cleaner())

Expand All @@ -2113,6 +2113,7 @@ DONE:
break
}
}
time.Sleep(50 * time.Second)
assert.Equal(t, 1, ret)
}

Expand Down
44 changes: 32 additions & 12 deletions reader/bufreader/bufreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,17 @@ type LastSync struct {

// BufReader implements buffering for an FileReader object.
type BufReader struct {
stopped int32
buf []byte
delim []byte
mutiLineCache *LineCache
rd reader.FileReader // reader provided by the client
r, w int // buf read and write positions
err error
lastByte int
lastRuneSize int
lastSync LastSync
stopped int32
buf []byte
delim []byte
mutiLineCache *LineCache
waitForWholeLine bool //readWholeLine
rd reader.FileReader // reader provided by the client
r, w int // buf read and write positions
err error
lastByte int
lastRuneSize int
lastSync LastSync

runTime reader.RunTime

Expand All @@ -88,7 +89,7 @@ type BufReader struct {
const minReadBufferSize = 16

//最大连续读到空的尝试次数
const maxConsecutiveEmptyReads = 10
const maxConsecutiveEmptyReads = 40

// NewReaderSize returns a new Reader whose buffer has at least the specified
// size. If the argument FileReader is already a Reader with large enough
Expand Down Expand Up @@ -189,6 +190,11 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) {
return
}

func (b *BufReader) SetWaitFlagForWholeLine() {
b.waitForWholeLine = true
return
}

func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) {
b.runTime, err = reader.ParseRunTimeWithMode(mode, v)
return err
Expand Down Expand Up @@ -280,13 +286,25 @@ func (b *BufReader) fill() {
}

b.w += n

if err == io.EOF && b.waitForWholeLine {
if i == 1 { //when last attempts,return err info;
b.err = err
return
}

time.Sleep(1 * time.Second)
continue
}

if err != nil {
b.err = err
return
}
if n > 0 {
return
}

}
b.err = io.ErrNoProgress
}
Expand Down Expand Up @@ -666,7 +684,9 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re
return
}
maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0)
return NewReaderSize(fr, meta, bufSize, maxLineLen)
r, err := NewReaderSize(fr, meta, bufSize, maxLineLen)
r.SetWaitFlagForWholeLine()
return r, err
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions reader/dirx/dirx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ func readerExpireDeleteTarTest(t *testing.T) {
}
}
t.Log("maxNum ", maxNum, "emptyNum", emptyNum)
time.Sleep(60 * time.Second)
assert.EqualValues(t, len(expectResults), len(actualResults))
for k, v := range expectResults {
actualV, ok := actualResults[k]
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.