Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions mgr/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ func Test_QiniulogRun(t *testing.T) {
time.Sleep(time.Second)
timer := time.NewTimer(20 * time.Second).C
for {
if s.SendCount() >= 4 {
if s.SendCount() >= 5 {
break
}
select {
Expand Down Expand Up @@ -739,6 +739,21 @@ func Test_QiniulogRun(t *testing.T) {
assert.Equal(t, expreqid[idx], dt["reqid"], "equal reqid test")
}
}
if !strings.HasSuffix(dts[0]["testtag"].(string), "log1") {
t.Errorf("datasource should be log1, but now is %s", dts[0]["testtag"].(string))
}
if !strings.HasSuffix(dts[1]["testtag"].(string), "log1") {
t.Errorf("datasource should be log1, but now is %s", dts[1]["testtag"].(string))
}
if !strings.HasSuffix(dts[2]["testtag"].(string), "log2") {
t.Errorf("datasource should be log2, but now is %s", dts[2]["testtag"].(string))
}
if !strings.HasSuffix(dts[3]["testtag"].(string), "log2") {
t.Errorf("datasource should be log2, but now is %s", dts[3]["testtag"].(string))
}
if !strings.HasSuffix(dts[4]["testtag"].(string), "log3") {
t.Errorf("datasource should be log3, but now is %s", dts[4]["testtag"].(string))
}
ls, err := runner.LagStats()
assert.NoError(t, err)
assert.Equal(t, &LagInfo{0, "bytes", 0, 0}, ls)
Expand Down Expand Up @@ -2076,21 +2091,21 @@ func TestTailxCleaner(t *testing.T) {
assert.NotNil(t, rr)
go rr.Run()

time.Sleep(60 * time.Second)
time.Sleep(5 * time.Second)

logPatha1 := filepath.Join(dira, "a.log.1")
assert.NoError(t, os.Rename(logPatha, logPatha1))

assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666))

time.Sleep(60 * time.Second)
time.Sleep(5 * time.Second)

logPatha2 := filepath.Join(dira, "a.log.2")
assert.NoError(t, os.Rename(logPatha, logPatha2))

assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666))

time.Sleep(60 * time.Second)
time.Sleep(5 * time.Second)

assert.NotNil(t, rr.Cleaner())

Expand All @@ -2113,7 +2128,7 @@ DONE:
break
}
}
time.Sleep(50 * time.Second)
time.Sleep(5 * time.Second)
assert.Equal(t, 1, ret)
}

Expand Down
103 changes: 54 additions & 49 deletions reader/bufreader/bufreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ type LastSync struct {

// BufReader implements buffering for an FileReader object.
type BufReader struct {
stopped int32
buf []byte
delim []byte
mutiLineCache *LineCache
waitForWholeLine bool //readWholeLine
rd reader.FileReader // reader provided by the client
r, w int // buf read and write positions
err error
lastByte int
lastRuneSize int
lastSync LastSync
stopped int32
buf []byte
delim []byte
mutiLineCache *LineCache
rd reader.FileReader // reader provided by the client
r, w int // buf read and write positions
err error
lastByte int
lastRuneSize int
lastSync LastSync

runTime reader.RunTime

mux sync.Mutex
decoder mahonia.Decoder

Meta *reader.Meta // 存放offset的元信息
multiLineRegexp *regexp.Regexp
Meta *reader.Meta // 存放offset的元信息
multiLineRegexp *regexp.Regexp
lastLineSource string // 存放多行上一次source信息
lastSecondLineSource string // 存放多行上上次source信息

stats StatsInfo
statsLock sync.RWMutex
Expand All @@ -89,7 +90,7 @@ type BufReader struct {
const minReadBufferSize = 16

//最大连续读到空的尝试次数
const maxConsecutiveEmptyReads = 40
const maxConsecutiveEmptyReads = 10

// NewReaderSize returns a new Reader whose buffer has at least the specified
// size. If the argument FileReader is already a Reader with large enough
Expand Down Expand Up @@ -190,11 +191,6 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) {
return
}

func (b *BufReader) SetWaitFlagForWholeLine() {
b.waitForWholeLine = true
return
}

func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) {
b.runTime, err = reader.ParseRunTimeWithMode(mode, v)
return err
Expand Down Expand Up @@ -249,7 +245,6 @@ func (b *BufReader) fill() {
b.updateLastRdSource()
b.r = 0
}

if b.w >= len(b.buf) {
panic(fmt.Sprintf("Runner[%v] bufio: tried to fill full buffer", b.Meta.RunnerName))
}
Expand All @@ -264,10 +259,14 @@ func (b *BufReader) fill() {
if n < 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err 不为nil的时候依然需要b.w += n

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err 不为nil的时候依然需要b.w += n

done

panic(errNegativeRead)
}
if b.latestSource != b.rd.Source() {
//这个情况表示文件的数据源出现了变化,在buf中已经出现了至少2个数据源的数据,要定位是哪个位置的数据出现的分隔

if n > 0 {
if rc, ok := b.rd.(reader.NewSourceRecorder); ok {
SIdx := rc.NewSourceIndex()
if len(b.lastRdSource) > 0 && SIdx[0].Source == b.lastRdSource[len(b.lastRdSource)-1].Source {
b.lastRdSource[len(b.lastRdSource)-1].Index = SIdx[0].Index + b.w
SIdx = SIdx[1:]
}
for _, v := range SIdx {
// 从 NewSourceIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据
b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{
Expand All @@ -276,37 +275,24 @@ func (b *BufReader) fill() {
})
}
} else {
//如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件
b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{
Source: b.latestSource,
Index: b.w,
})
if b.latestSource != b.rd.Source() {
//如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件
b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{
Source: b.latestSource,
Index: b.w,
})
}
}
b.latestSource = b.rd.Source()
}

b.w += n

if err == io.EOF && b.waitForWholeLine {
if i == 1 { //when last attempts,return err info;
b.err = err
return
}

time.Sleep(1 * time.Second)
continue
b.w += n
}

if err != nil {
b.err = err
return
}
if n > 0 {
b.err = err
if err != nil && err != io.EOF {
return
}

}
b.err = io.ErrNoProgress
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里把ErrNoProgress去掉,在上面readSlice里面就不会返回err,会一直for循环

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面已经有了b.err = err,所以这里可以忽略

}

func (b *BufReader) readErr() error {
Expand Down Expand Up @@ -455,6 +441,7 @@ func (b *BufReader) ReadPattern() (string, error) {
//读取到line的情况
if len(line) > 0 {
if b.mutiLineCache.Size() <= 0 {
b.lastSecondLineSource = b.lineSource()
b.mutiLineCache.Set([]string{line})
continue
}
Expand All @@ -464,6 +451,10 @@ func (b *BufReader) ReadPattern() (string, error) {
line = string(b.mutiLineCache.Combine())
b.mutiLineCache.Set(make([]string, 0, 16))
b.mutiLineCache.Append(tmp)
if b.lastLineSource != "" {
b.lastSecondLineSource = b.lastLineSource
}
b.lastLineSource = b.lineSource()
return line, err
}
b.mutiLineCache.Append(line)
Expand All @@ -472,6 +463,9 @@ func (b *BufReader) ReadPattern() (string, error) {
if err != nil {
line = string(b.mutiLineCache.Combine())
b.mutiLineCache.Set(make([]string, 0, 16))
if b.lastLineSource != "" {
b.lastSecondLineSource = b.lastLineSource
}
return line, err
}
maxTimes++
Expand Down Expand Up @@ -550,6 +544,13 @@ func (b *BufReader) Name() string {
}

func (b *BufReader) Source() string {
if b.multiLineRegexp != nil && b.lastSecondLineSource != "" {
return b.lastSecondLineSource
}
return b.lineSource()
}

func (b *BufReader) lineSource() string {
//如果我当前读取的位置在上个数据源index之前,则返回上个数据源
for _, v := range b.lastRdSource {
if (b.r < v.Index) || (v.Index > 0 && b.r == v.Index) {
Expand Down Expand Up @@ -586,9 +587,11 @@ func (b *BufReader) Lag() (rl *LagInfo, err error) {
}

func (b *BufReader) ReadDone() bool {
b.mux.Lock()
defer b.mux.Unlock()
lr, ok := b.rd.(reader.OnceReader)
if ok {
return lr.ReadDone()
return lr.ReadDone() && b.r == 0 && b.w == 0
}
return false
}
Expand Down Expand Up @@ -684,9 +687,7 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re
return
}
maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0)
r, err := NewReaderSize(fr, meta, bufSize, maxLineLen)
r.SetWaitFlagForWholeLine()
return r, err
return NewReaderSize(fr, meta, bufSize, maxLineLen)
}

func init() {
Expand All @@ -704,3 +705,7 @@ func getDelimByEncodingWay(encodingWay string) []byte {
return []byte("\n")
}
}

func (b *BufReader) GetDelimiter() []byte {
return b.delim
}
Loading