From cc6f0ef3695ee335b901779c357bfba69d8a6c89 Mon Sep 17 00:00:00 2001 From: papapiya <402561078@qq.com> Date: Tue, 8 Jun 2021 20:36:43 +0800 Subject: [PATCH] fix origin cache --- mgr/runner_test.go | 25 ++++++-- reader/bufreader/bufreader.go | 103 +++++++++++++++++--------------- reader/dirx/dir_reader.go | 88 ++++++++++----------------- reader/dirx/dirx_test.go | 10 ++-- reader/extract/reader.go | 19 +++--- reader/seqfile/seqfile.go | 32 +++++++--- reader/singlefile/singlefile.go | 55 ++++++++++------- reader/tailx/tailx.go | 86 +++++++++----------------- reader/tailx/tailx_test.go | 2 +- 9 files changed, 208 insertions(+), 212 deletions(-) diff --git a/mgr/runner_test.go b/mgr/runner_test.go index ae405419c..0227a356f 100644 --- a/mgr/runner_test.go +++ b/mgr/runner_test.go @@ -711,7 +711,7 @@ func Test_QiniulogRun(t *testing.T) { time.Sleep(time.Second) timer := time.NewTimer(20 * time.Second).C for { - if s.SendCount() >= 4 { + if s.SendCount() >= 5 { break } select { @@ -739,6 +739,21 @@ func Test_QiniulogRun(t *testing.T) { assert.Equal(t, expreqid[idx], dt["reqid"], "equal reqid test") } } + if !strings.HasSuffix(dts[0]["testtag"].(string), "log1") { + t.Errorf("datasource should be log1, but now is %s", dts[0]["testtag"].(string)) + } + if !strings.HasSuffix(dts[1]["testtag"].(string), "log1") { + t.Errorf("datasource should be log1, but now is %s", dts[1]["testtag"].(string)) + } + if !strings.HasSuffix(dts[2]["testtag"].(string), "log2") { + t.Errorf("datasource should be log2, but now is %s", dts[2]["testtag"].(string)) + } + if !strings.HasSuffix(dts[3]["testtag"].(string), "log2") { + t.Errorf("datasource should be log2, but now is %s", dts[3]["testtag"].(string)) + } + if !strings.HasSuffix(dts[4]["testtag"].(string), "log3") { + t.Errorf("datasource should be log3, but now is %s", dts[4]["testtag"].(string)) + } ls, err := runner.LagStats() assert.NoError(t, err) assert.Equal(t, &LagInfo{0, "bytes", 0, 0}, ls) @@ -2076,21 +2091,21 @@ func TestTailxCleaner(t *testing.T) { assert.NotNil(t, rr) go rr.Run() - time.Sleep(60 * time.Second) + time.Sleep(5 * time.Second) logPatha1 := filepath.Join(dira, "a.log.1") assert.NoError(t, os.Rename(logPatha, logPatha1)) assert.NoError(t, ioutil.WriteFile(logPatha, []byte("bbbb\n"), 0666)) - time.Sleep(60 * time.Second) + time.Sleep(5 * time.Second) logPatha2 := filepath.Join(dira, "a.log.2") assert.NoError(t, os.Rename(logPatha, logPatha2)) assert.NoError(t, ioutil.WriteFile(logPatha, []byte("cccc\n"), 0666)) - time.Sleep(60 * time.Second) + time.Sleep(5 * time.Second) assert.NotNil(t, rr.Cleaner()) @@ -2113,7 +2128,7 @@ DONE: break } } - time.Sleep(50 * time.Second) + time.Sleep(5 * time.Second) assert.Equal(t, 1, ret) } diff --git a/reader/bufreader/bufreader.go b/reader/bufreader/bufreader.go index ab803ab58..2076d8b9b 100644 --- a/reader/bufreader/bufreader.go +++ b/reader/bufreader/bufreader.go @@ -54,25 +54,26 @@ type LastSync struct { // BufReader implements buffering for an FileReader object. type BufReader struct { - stopped int32 - buf []byte - delim []byte - mutiLineCache *LineCache - waitForWholeLine bool //readWholeLine - rd reader.FileReader // reader provided by the client - r, w int // buf read and write positions - err error - lastByte int - lastRuneSize int - lastSync LastSync + stopped int32 + buf []byte + delim []byte + mutiLineCache *LineCache + rd reader.FileReader // reader provided by the client + r, w int // buf read and write positions + err error + lastByte int + lastRuneSize int + lastSync LastSync runTime reader.RunTime mux sync.Mutex decoder mahonia.Decoder - Meta *reader.Meta // 存放offset的元信息 - multiLineRegexp *regexp.Regexp + Meta *reader.Meta // 存放offset的元信息 + multiLineRegexp *regexp.Regexp + lastLineSource string // 存放多行上一次source信息 + lastSecondLineSource string // 存放多行上上次source信息 stats StatsInfo statsLock sync.RWMutex @@ -89,7 +90,7 @@ type BufReader struct { const minReadBufferSize = 16 //最大连续读到空的尝试次数 -const maxConsecutiveEmptyReads = 40 +const maxConsecutiveEmptyReads = 10 // NewReaderSize returns a new Reader whose buffer has at least the specified // size. If the argument FileReader is already a Reader with large enough @@ -190,11 +191,6 @@ func (b *BufReader) SetMode(mode string, v interface{}) (err error) { return } -func (b *BufReader) SetWaitFlagForWholeLine() { - b.waitForWholeLine = true - return -} - func (b *BufReader) SetRunTime(mode string, v interface{}) (err error) { b.runTime, err = reader.ParseRunTimeWithMode(mode, v) return err @@ -249,7 +245,6 @@ func (b *BufReader) fill() { b.updateLastRdSource() b.r = 0 } - if b.w >= len(b.buf) { panic(fmt.Sprintf("Runner[%v] bufio: tried to fill full buffer", b.Meta.RunnerName)) } @@ -264,10 +259,14 @@ func (b *BufReader) fill() { if n < 0 { panic(errNegativeRead) } - if b.latestSource != b.rd.Source() { - //这个情况表示文件的数据源出现了变化,在buf中已经出现了至少2个数据源的数据,要定位是哪个位置的数据出现的分隔 + + if n > 0 { if rc, ok := b.rd.(reader.NewSourceRecorder); ok { SIdx := rc.NewSourceIndex() + if len(b.lastRdSource) > 0 && SIdx[0].Source == b.lastRdSource[len(b.lastRdSource)-1].Source { + b.lastRdSource[len(b.lastRdSource)-1].Index = SIdx[0].Index + b.w + SIdx = SIdx[1:] + } for _, v := range SIdx { // 从 NewSourceIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据 b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{ @@ -276,37 +275,24 @@ func (b *BufReader) fill() { }) } } else { - //如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件 - b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{ - Source: b.latestSource, - Index: b.w, - }) + if b.latestSource != b.rd.Source() { + //如果没实现这个接口,那么就认为到上次读到的为止都是前一次source的文件 + b.lastRdSource = append(b.lastRdSource, reader.SourceIndex{ + Source: b.latestSource, + Index: b.w, + }) + } } b.latestSource = b.rd.Source() - } - - b.w += n - if err == io.EOF && b.waitForWholeLine { - if i == 1 { //when last attempts,return err info; - b.err = err - return - } - - time.Sleep(1 * time.Second) - continue + b.w += n } - if err != nil { - b.err = err - return - } - if n > 0 { + b.err = err + if err != nil && err != io.EOF { return } - } - b.err = io.ErrNoProgress } func (b *BufReader) readErr() error { @@ -455,6 +441,7 @@ func (b *BufReader) ReadPattern() (string, error) { //读取到line的情况 if len(line) > 0 { if b.mutiLineCache.Size() <= 0 { + b.lastSecondLineSource = b.lineSource() b.mutiLineCache.Set([]string{line}) continue } @@ -464,6 +451,10 @@ func (b *BufReader) ReadPattern() (string, error) { line = string(b.mutiLineCache.Combine()) b.mutiLineCache.Set(make([]string, 0, 16)) b.mutiLineCache.Append(tmp) + if b.lastLineSource != "" { + b.lastSecondLineSource = b.lastLineSource + } + b.lastLineSource = b.lineSource() return line, err } b.mutiLineCache.Append(line) @@ -472,6 +463,9 @@ func (b *BufReader) ReadPattern() (string, error) { if err != nil { line = string(b.mutiLineCache.Combine()) b.mutiLineCache.Set(make([]string, 0, 16)) + if b.lastLineSource != "" { + b.lastSecondLineSource = b.lastLineSource + } return line, err } maxTimes++ @@ -550,6 +544,13 @@ func (b *BufReader) Name() string { } func (b *BufReader) Source() string { + if b.multiLineRegexp != nil && b.lastSecondLineSource != "" { + return b.lastSecondLineSource + } + return b.lineSource() +} + +func (b *BufReader) lineSource() string { //如果我当前读取的位置在上个数据源index之前,则返回上个数据源 for _, v := range b.lastRdSource { if (b.r < v.Index) || (v.Index > 0 && b.r == v.Index) { @@ -586,9 +587,11 @@ func (b *BufReader) Lag() (rl *LagInfo, err error) { } func (b *BufReader) ReadDone() bool { + b.mux.Lock() + defer b.mux.Unlock() lr, ok := b.rd.(reader.OnceReader) if ok { - return lr.ReadDone() + return lr.ReadDone() && b.r == 0 && b.w == 0 } return false } @@ -684,9 +687,7 @@ func NewSingleFileReader(meta *reader.Meta, conf conf.MapConf) (reader reader.Re return } maxLineLen, _ := conf.GetInt64Or(KeyRunnerMaxLineLen, 0) - r, err := NewReaderSize(fr, meta, bufSize, maxLineLen) - r.SetWaitFlagForWholeLine() - return r, err + return NewReaderSize(fr, meta, bufSize, maxLineLen) } func init() { @@ -704,3 +705,7 @@ func getDelimByEncodingWay(encodingWay string) []byte { return []byte("\n") } } + +func (b *BufReader) GetDelimiter() []byte { + return b.delim +} diff --git a/reader/dirx/dir_reader.go b/reader/dirx/dir_reader.go index dc75b4e09..9003bb9e4 100644 --- a/reader/dirx/dir_reader.go +++ b/reader/dirx/dir_reader.go @@ -92,7 +92,7 @@ func (dr *dirReader) Run() { log.Warnf("Runner[%v] log path[%v] reader has stopped", dr.runnerName, dr.originalPath) return } - + source := "" if len(dr.readcache) == 0 { dr.readLock.Lock() dr.readcache, err = dr.br.ReadLine() @@ -104,41 +104,27 @@ func (dr *dirReader) Run() { time.Sleep(2 * time.Second) continue } - source := dr.br.Source() - if _, ok := dr.halfLineCache[source]; !ok { - dr.readLock.Lock() - dr.halfLineCache[source] = "" - dr.readLock.Unlock() - } - - if dr.readcache != "" && err == io.EOF { + source = dr.br.Source() + // 连续2次没数据且当前没读取到数据,返回halfLineCache里的数据 + if dr.numEmptyLines > 2 && len(dr.readcache) == 0 && len(dr.halfLineCache) != 0 { dr.readLock.Lock() - if len(dr.halfLineCache[source])+len(dr.readcache) > 20*utils.Mb { - log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 2k", dr.runnerName, dr.originalPath, source) - dr.readcache = dr.halfLineCache[source] + dr.readcache - dr.halfLineCache[source] = "" - } else { - dr.halfLineCache[source] += dr.readcache - dr.readcache = "" + for delaySource, line := range dr.halfLineCache { + dr.readcache = line + source = delaySource + break } + delete(dr.halfLineCache, source) dr.readLock.Unlock() } - if err == nil && dr.halfLineCache[source] != "" { - dr.readLock.Lock() - dr.readcache += dr.halfLineCache[source] - dr.halfLineCache[source] = "" - dr.readLock.Unlock() - } + if len(dr.readcache) == 0 { + dr.numEmptyLines++ - if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" { - if key, exist := utils.GetKeyOfNotEmptyValueInMap(dr.halfLineCache); exist { - source = key + // 大约一小时没读到内容,设置为 inactive + if dr.numEmptyLines > 60*60 { + atomic.StoreInt32(&dr.inactive, 1) } - } - if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" { - dr.numEmptyLines++ // 文件 EOF,同时没有任何内容,代表不是第一次 EOF,休息时间设置长一些 if err == io.EOF { atomic.StoreInt32(&dr.inactive, 1) @@ -147,25 +133,27 @@ func (dr *dirReader) Run() { continue } - // 大约一小时没读到内容,设置为 inactive - if dr.numEmptyLines > 60*60 { - atomic.StoreInt32(&dr.inactive, 1) - } - // 读取的结果为空,无论如何都 sleep 1s time.Sleep(time.Second) continue - } else if len(dr.readcache) == 0 && dr.halfLineCache[source] != "" { - dr.numEmptyLines++ - if err == io.EOF && dr.numEmptyLines < 40{ - log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", dr.runnerName, dr.originalPath) - time.Sleep(1 * time.Second) - continue - } + } + + if cache, ok := dr.halfLineCache[source]; ok { + dr.readLock.Lock() + dr.readcache = cache + dr.readcache + dr.readLock.Unlock() + } + if !strings.HasSuffix(dr.readcache, string(dr.br.GetDelimiter())) && dr.numEmptyLines < 3 && len(dr.readcache) < 20*MB { + dr.halfLineCache[source] = dr.readcache dr.readLock.Lock() - dr.readcache = dr.halfLineCache[source] - dr.halfLineCache[source] = "" + dr.readcache = "" dr.readLock.Unlock() + continue + } else { + delete(dr.halfLineCache, source) + } + if len(dr.readcache) > 20*MB { + log.Warnf("Runner[%v] %v >>>>>> read cache has exceed 20 MB, will return current line", dr.runnerName, dr.originalPath) } } log.Debugf("Runner[%v] %v >>>>>> read cache[%v] line cache [%v]", dr.runnerName, dr.originalPath, dr.readcache, string(dr.br.FormMutiLine())) @@ -190,10 +178,9 @@ func (dr *dirReader) Run() { } select { - case dr.msgChan <- message{result: dr.readcache, logpath: dr.originalPath, currentFile: dr.br.Source()}: + case dr.msgChan <- message{result: dr.readcache, logpath: dr.originalPath, currentFile: source}: dr.readLock.Lock() dr.readcache = "" - dr.readLock.Unlock() case <-ticker.C: } @@ -241,18 +228,7 @@ func HasDirExpired(dir string, expire time.Duration) bool { //对于读完的直接认为过期,因为不会追加新数据;最后一次需要等待30s func (dr *dirReader) ReadDone() bool { - return dr.br.ReadDone() && dr.halfLineCacheIsEmpty() -} - -func (dr *dirReader) halfLineCacheIsEmpty() bool { - dr.readLock.Lock() - defer dr.readLock.Unlock() - for _, v := range dr.halfLineCache { - if v != "" { - return false - } - } - return true + return dr.br.ReadDone() && len(dr.halfLineCache) == 0 } func (dr *dirReader) HasExpired(expire time.Duration) bool { diff --git a/reader/dirx/dirx_test.go b/reader/dirx/dirx_test.go index bd4e587f3..6fe5a031a 100644 --- a/reader/dirx/dirx_test.go +++ b/reader/dirx/dirx_test.go @@ -727,7 +727,7 @@ func multiReaderNewestOffsetTest(t *testing.T) { } func multiReaderSameInodeTest(t *testing.T) { - + } func TestMultiReaderSameInodeTest(t *testing.T) { @@ -793,7 +793,7 @@ func TestMultiReaderSameInodeTest(t *testing.T) { } else { emptyNum++ } - if maxNum >= 60 || emptyNum > 60 { + if maxNum >= 10 || emptyNum > 60 { break } } @@ -820,7 +820,7 @@ func TestMultiReaderSameInodeTest(t *testing.T) { if err == io.EOF { break } - if maxNum >= 60 || emptyNum > 60 { + if maxNum >= 13 || emptyNum > 60 { break } } @@ -843,7 +843,7 @@ func TestMultiReaderSameInodeTest(t *testing.T) { if err == io.EOF { break } - if maxNum >= 60 || emptyNum > 60 { + if maxNum >= 17 || emptyNum > 60 { break } } @@ -1079,7 +1079,7 @@ func readerExpireDeleteTarTest(t *testing.T) { } } t.Log("maxNum ", maxNum, "emptyNum", emptyNum) - time.Sleep(60 * time.Second) + time.Sleep(1 * time.Second) assert.EqualValues(t, len(expectResults), len(actualResults)) for k, v := range expectResults { actualV, ok := actualResults[k] diff --git a/reader/extract/reader.go b/reader/extract/reader.go index 2f6889665..9e7efd0bf 100644 --- a/reader/extract/reader.go +++ b/reader/extract/reader.go @@ -224,13 +224,11 @@ func (t *Tar) Read(p []byte) (n int, err error) { }() t.sourceIndexes = []reader.SourceIndex{} for { - var lastSource string if t.header == nil || reader.IgnoreHidden(t.header.Name, t.opt.IgnoreHidden) || reader.IgnoreFileSuffixes(t.header.Name, t.opt.IgnoreFileSuffixes) || !reader.ValidFileRegex(filepath.Base(t.header.Name), t.opt.ValidFilesRegex) { if t.header != nil { - lastSource = t.header.Name log.Infof("ignore %s in path %s", t.header.Name, t.path) } err = t.next() @@ -239,12 +237,16 @@ func (t *Tar) Read(p []byte) (n int, err error) { } continue } - t.sourceIndexes = append(t.sourceIndexes, reader.SourceIndex{Source: lastSource}) log.Infof("start to read %s in path %s", t.header.Name, t.path) break } + t.sourceIndexes = []reader.SourceIndex{{Index: 0, Source: t.header.Name}} for { n, err = t.rd.Read(p) + if n > 0 { + t.sourceIndexes[0].Index = n + t.sourceIndexes[0].Source = t.header.Name + } if err == io.EOF { //如果已经EOF,但是上次还读到了,先返回上次的结果 if n > 0 { @@ -383,7 +385,7 @@ func (t *ZIP) next() (err error) { if t.f != nil { t.f.Close() } - for ;t.idx >= 0 && t.idx < len(t.rd.File); t.idx++ { + for ; t.idx >= 0 && t.idx < len(t.rd.File); t.idx++ { t.zipf = t.rd.File[t.idx] if t.zipf.FileInfo().IsDir() { continue @@ -406,14 +408,13 @@ func (t *ZIP) next() (err error) { } func (t *ZIP) Read(p []byte) (n int, err error) { + t.sourceIndexes = []reader.SourceIndex{} for { - var lastSource string if t.f == nil || reader.IgnoreHidden(t.zipf.Name, t.opt.IgnoreHidden) || reader.IgnoreFileSuffixes(t.zipf.Name, t.opt.IgnoreFileSuffixes) || !reader.ValidFileRegex(filepath.Base(t.zipf.Name), t.opt.ValidFilesRegex) { if t.f != nil { - lastSource = t.zipf.Name log.Infof("ignore %s in path %s", t.zipf.Name, t.path) } err = t.next() @@ -422,12 +423,16 @@ func (t *ZIP) Read(p []byte) (n int, err error) { } continue } - t.sourceIndexes = append(t.sourceIndexes, reader.SourceIndex{Source: lastSource}) log.Infof("start to read %s in path %s", t.zipf.Name, t.path) break } + t.sourceIndexes = []reader.SourceIndex{{Index: 0, Source: t.zipf.Name}} for { n, err = t.f.Read(p) + if n > 0 { + t.sourceIndexes[0].Index += n + t.sourceIndexes[0].Source = t.zipf.Name + } if err == io.EOF { //如果已经EOF,但是上次还读到了,先返回上次的结果 if n > 0 { diff --git a/reader/seqfile/seqfile.go b/reader/seqfile/seqfile.go index ef99c2380..b3ac8b79c 100644 --- a/reader/seqfile/seqfile.go +++ b/reader/seqfile/seqfile.go @@ -43,7 +43,6 @@ type SeqFile struct { newLineNotAdded bool //文件最后的部分正好填满buffer,导致\n符号加不上,此时要用这个变量 newLineBytesSourceIndex []reader.SourceIndex //新文件被读取时的bytes位置 - justOpenedNewFile bool //新文件刚刚打开 validFilePattern string // 合法的文件名正则表达式 stopped int32 // 停止标志位 @@ -313,10 +312,15 @@ func (sf *SeqFile) Read(p []byte) (n int, err error) { sf.mux.Lock() defer sf.mux.Unlock() n = 0 + eofTimes := 0 for n < len(p) { if sf.newLineNotAdded { p[n] = '\n' n++ + sourceIndexLen := len(sf.newLineBytesSourceIndex) + if sourceIndexLen != 0 { + sf.newLineBytesSourceIndex[sourceIndexLen-1].Index = n + } sf.newLineNotAdded = false } var n1 int @@ -343,20 +347,31 @@ func (sf *SeqFile) Read(p []byte) (n int, err error) { } continue } - if n1 > 0 && sf.justOpenedNewFile { - sf.justOpenedNewFile = false - sf.newLineBytesSourceIndex = append(sf.newLineBytesSourceIndex, reader.SourceIndex{ - Source: sf.lastFile, - Index: n, - }) - } sf.offset += int64(n1) n += n1 + if n1 > 0 { + eofTimes = 0 + sourceIndexLen := len(sf.newLineBytesSourceIndex) + if sourceIndexLen != 0 && sf.newLineBytesSourceIndex[sourceIndexLen-1].Source == sf.currFile { + sf.newLineBytesSourceIndex[sourceIndexLen-1].Index = n + } else { + sf.newLineBytesSourceIndex = append(sf.newLineBytesSourceIndex, reader.SourceIndex{ + Source: sf.currFile, + Index: n, + }) + } + } if err != nil { if err != io.EOF { sf.handleUnexpectErr(err) return n, err } + // wait eof twice 0.1s + if eofTimes < 2 { + time.Sleep(time.Millisecond * 10) + eofTimes++ + continue + } fi, err1 := sf.nextFile() if os.IsNotExist(err1) { if nextFileRetry >= 1 { @@ -391,7 +406,6 @@ func (sf *SeqFile) Read(p []byte) (n int, err error) { if err2 != nil { return n, err2 } - sf.justOpenedNewFile = true //已经获得了下一个文件,没有EOF err = nil } else { diff --git a/reader/singlefile/singlefile.go b/reader/singlefile/singlefile.go index 45a7d5d90..1df999b43 100644 --- a/reader/singlefile/singlefile.go +++ b/reader/singlefile/singlefile.go @@ -346,34 +346,43 @@ func (sf *SingleFile) Read(p []byte) (n int, err error) { } sf.mux.Lock() defer sf.mux.Unlock() - n, err = sf.ratereader.Read(p) - if err != nil && strings.Contains(err.Error(), "stale NFS file handle") { - nerr := sf.reopenForESTALE() - if nerr != nil { - if !IsSelfRunner(sf.meta.RunnerName) { - log.Errorf("Runner[%v] %v meet eror %v reopen error %v", sf.meta.RunnerName, sf.originpath, err, nerr) - } else { - log.Debugf("Runner[%v] %v meet eror %v reopen error %v", sf.meta.RunnerName, sf.originpath, err, nerr) + eofTimes := 0 + n1 := 0 + n = 0 + for n < len(p) && eofTimes <= 2 { + n1, err = sf.ratereader.Read(p[n:]) + if err != nil && strings.Contains(err.Error(), "stale NFS file handle") { + nerr := sf.reopenForESTALE() + if nerr != nil { + if !IsSelfRunner(sf.meta.RunnerName) { + log.Errorf("Runner[%v] %v meet eror %v reopen error %v", sf.meta.RunnerName, sf.originpath, err, nerr) + } else { + log.Debugf("Runner[%v] %v meet eror %v reopen error %v", sf.meta.RunnerName, sf.originpath, err, nerr) + } } + return n, err } - return - } - sf.offset += int64(n) - if err == io.EOF { - //读到了,如果n大于0,先把EOF抹去,返回 - if n > 0 { - err = nil - return + if n1 > 0 { + eofTimes = 0 + n += n1 + sf.offset += int64(n1) } - err = sf.Reopen() - if err != nil { - return + if err == io.EOF { + if n1 > 0 { + err = nil + } + time.Sleep(time.Millisecond * 10) + eofTimes++ + err1 := sf.Reopen() + if err1 != nil { + return n, err1 + } + continue + } else if err != nil { + return n, err } - n, err = sf.ratereader.Read(p) - sf.offset += int64(n) - return } - return + return n, err } func (sf *SingleFile) SyncMeta() error { diff --git a/reader/tailx/tailx.go b/reader/tailx/tailx.go index 36516f7c1..44f136ebf 100644 --- a/reader/tailx/tailx.go +++ b/reader/tailx/tailx.go @@ -88,7 +88,7 @@ type ActiveReader struct { realpath string originpath string readcache string - halfLineCache map[string]string //针对不同的数据源做一个缓存 + halfLineCache string // 断行缓存 msgchan chan<- Result errChan chan<- error resetChan chan<- string @@ -158,21 +158,20 @@ func NewActiveReader(originPath, realPath, whence, inode string, r *Reader) (ar inode = strconv.Itoa(int(inodeInt)) } return &ActiveReader{ - cacheLineMux: sync.RWMutex{}, - br: bf, - realpath: realPath, - originpath: originPath, - msgchan: r.msgChan, - errChan: r.errChan, - halfLineCache: make(map[string]string), - resetChan: r.resetChan, - inodeStr: inode, - inactive: 1, - emptyLineCnt: 0, - runnerName: r.meta.RunnerName, - status: StatusInit, - statsLock: sync.RWMutex{}, - runtime: r.runTime, + cacheLineMux: sync.RWMutex{}, + br: bf, + realpath: realPath, + originpath: originPath, + msgchan: r.msgChan, + errChan: r.errChan, + resetChan: r.resetChan, + inodeStr: inode, + inactive: 1, + emptyLineCnt: 0, + runnerName: r.meta.RunnerName, + status: StatusInit, + statsLock: sync.RWMutex{}, + runtime: r.runTime, }, nil } @@ -319,43 +318,18 @@ func (ar *ActiveReader) Run() { ar.Stop() return } - - source := ar.br.Source() - if _, ok := ar.halfLineCache[source]; !ok { - ar.cacheLineMux.Lock() - ar.halfLineCache[source] = "" - ar.cacheLineMux.Unlock() - } - - if ar.readcache != "" && err == io.EOF { - ar.cacheLineMux.Lock() - if len(ar.halfLineCache[source])+len(ar.readcache) > 20*utils.Mb { - log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 20mb", ar.runnerName, ar.originpath, source) - ar.readcache = ar.halfLineCache[source] + ar.readcache - ar.halfLineCache[source] = "" - } else { - ar.halfLineCache[source] += ar.readcache - ar.readcache = "" - } - ar.cacheLineMux.Unlock() - } - - if err == nil && ar.halfLineCache[source] != "" { + // 超过1次空行返回cache内容 + if len(ar.halfLineCache) != 0 && (ar.readcache != "" || ar.emptyLineCnt > 1) { ar.cacheLineMux.Lock() - ar.readcache += ar.halfLineCache[source] - ar.halfLineCache[source] = "" + ar.readcache = ar.halfLineCache + ar.readcache + ar.halfLineCache = "" ar.cacheLineMux.Unlock() } - if len(ar.readcache) == 0 && ar.halfLineCache[source] == "" { - if key, exist := utils.GetKeyOfNotEmptyValueInMap(ar.halfLineCache); exist { - source = key - } - } - if ar.readcache == "" && ar.halfLineCache[source] == "" { + if ar.readcache == "" { ar.emptyLineCnt++ //文件EOF,同时没有任何内容,代表不是第一次EOF,休息时间设置长一些 - if err == io.EOF { + if err == io.EOF && ar.emptyLineCnt > 3 { atomic.StoreInt32(&ar.inactive, 1) log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", ar.runnerName, ar.originpath) ar.ResetFileMeta() @@ -363,7 +337,7 @@ func (ar *ActiveReader) Run() { return } // 3s 没读到内容,设置为inactive - if ar.emptyLineCnt > 3 { + if ar.emptyLineCnt > 5 { atomic.StoreInt32(&ar.inactive, 1) log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", ar.runnerName, ar.originpath) ar.ResetFileMeta() @@ -373,17 +347,15 @@ func (ar *ActiveReader) Run() { //读取的结果为空,无论如何都sleep 1s time.Sleep(time.Second) continue - } else if ar.readcache == "" && ar.halfLineCache[source] != "" { - ar.emptyLineCnt++ - if err == io.EOF && ar.emptyLineCnt < 40 { - log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", ar.runnerName, ar.originpath) - time.Sleep(1 * time.Second) - continue - } + } + if !strings.HasSuffix(ar.readcache, string(ar.br.GetDelimiter())) && ar.emptyLineCnt <= 1 { ar.cacheLineMux.Lock() - ar.readcache = ar.halfLineCache[source] - ar.halfLineCache[source] = "" + ar.halfLineCache = ar.readcache + ar.readcache = "" ar.cacheLineMux.Unlock() + continue + } else { + ar.halfLineCache = "" } } log.Debugf("Runner[%s] %s >>>>>>readcache <%s> linecache <%s>", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache), string(ar.br.FormMutiLine())) diff --git a/reader/tailx/tailx_test.go b/reader/tailx/tailx_test.go index 54782494e..570db3a91 100644 --- a/reader/tailx/tailx_test.go +++ b/reader/tailx/tailx_test.go @@ -1531,7 +1531,7 @@ func TestReaderErrMiddle(t *testing.T) { } } if err == nil || !strings.Contains(err.Error(), os.ErrPermission.Error()) { - t.Errorf("no matched error %v, expect %v", err, os.ErrPermission) + t.Errorf("no matched error %v, expect %v", err, os.ErrPermission.Error()) } err = mr.Close() assert.Nil(t, err)