From 4836e98452cb7be3f99aaca80b573459a2cb1152 Mon Sep 17 00:00:00 2001 From: papapiya <402561078@qq.com> Date: Mon, 31 May 2021 10:49:00 +0800 Subject: [PATCH] remove line cache --- reader/dirx/dir_reader.go | 80 +++++------------------------ reader/tailx/tailx.go | 104 +++++++++++--------------------------- 2 files changed, 41 insertions(+), 143 deletions(-) diff --git a/reader/dirx/dir_reader.go b/reader/dirx/dir_reader.go index dc75b4e09..fa1d58b3e 100644 --- a/reader/dirx/dir_reader.go +++ b/reader/dirx/dir_reader.go @@ -37,7 +37,6 @@ type dirReader struct { logPath string readLock sync.RWMutex readcache string - halfLineCache map[string]string //针对不同的数据源做一个缓存 numEmptyLines int msgChan chan<- message @@ -104,40 +103,8 @@ func (dr *dirReader) Run() { time.Sleep(2 * time.Second) continue } - source := dr.br.Source() - if _, ok := dr.halfLineCache[source]; !ok { - dr.readLock.Lock() - dr.halfLineCache[source] = "" - dr.readLock.Unlock() - } - - if dr.readcache != "" && err == io.EOF { - dr.readLock.Lock() - if len(dr.halfLineCache[source])+len(dr.readcache) > 20*utils.Mb { - log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 2k", dr.runnerName, dr.originalPath, source) - dr.readcache = dr.halfLineCache[source] + dr.readcache - dr.halfLineCache[source] = "" - } else { - dr.halfLineCache[source] += dr.readcache - dr.readcache = "" - } - dr.readLock.Unlock() - } - if err == nil && dr.halfLineCache[source] != "" { - dr.readLock.Lock() - dr.readcache += dr.halfLineCache[source] - dr.halfLineCache[source] = "" - dr.readLock.Unlock() - } - - if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" { - if key, exist := utils.GetKeyOfNotEmptyValueInMap(dr.halfLineCache); exist { - source = key - } - } - - if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" { + if len(dr.readcache) == 0 { dr.numEmptyLines++ // 文件 EOF,同时没有任何内容,代表不是第一次 EOF,休息时间设置长一些 if err == io.EOF { @@ -155,19 +122,9 @@ func (dr *dirReader) Run() { // 读取的结果为空,无论如何都 sleep 1s time.Sleep(time.Second) continue - } else if len(dr.readcache) == 0 && dr.halfLineCache[source] != "" { - dr.numEmptyLines++ - if err == io.EOF && dr.numEmptyLines < 40{ - log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", dr.runnerName, dr.originalPath) - time.Sleep(1 * time.Second) - continue - } - dr.readLock.Lock() - dr.readcache = dr.halfLineCache[source] - dr.halfLineCache[source] = "" - dr.readLock.Unlock() } } + log.Debugf("Runner[%v] %v >>>>>> read cache[%v] line cache [%v]", dr.runnerName, dr.originalPath, dr.readcache, string(dr.br.FormMutiLine())) repeat := 0 for { @@ -193,7 +150,6 @@ func (dr *dirReader) Run() { case dr.msgChan <- message{result: dr.readcache, logpath: dr.originalPath, currentFile: dr.br.Source()}: dr.readLock.Lock() dr.readcache = "" - dr.readLock.Unlock() case <-ticker.C: } @@ -239,20 +195,9 @@ func HasDirExpired(dir string, expire time.Duration) bool { return latestModTime.Add(expire).Before(time.Now()) } -//对于读完的直接认为过期,因为不会追加新数据;最后一次需要等待30s +//对于读完的直接认为过期,因为不会追加新数据 func (dr *dirReader) ReadDone() bool { - return dr.br.ReadDone() && dr.halfLineCacheIsEmpty() -} - -func (dr *dirReader) halfLineCacheIsEmpty() bool { - dr.readLock.Lock() - defer dr.readLock.Unlock() - for _, v := range dr.halfLineCache { - if v != "" { - return false - } - } - return true + return dr.br.ReadDone() } func (dr *dirReader) HasExpired(expire time.Duration) bool { @@ -399,15 +344,14 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi } dr := &dirReader{ - status: StatusInit, - inactive: 1, - br: br, - halfLineCache: make(map[string]string), - runnerName: opts.Meta.RunnerName, - originalPath: opts.OriginalPath, - logPath: opts.LogPath, - msgChan: opts.MsgChan, - errChan: opts.ErrChan, + status: StatusInit, + inactive: 1, + br: br, + runnerName: opts.Meta.RunnerName, + originalPath: opts.OriginalPath, + logPath: opts.LogPath, + msgChan: opts.MsgChan, + errChan: opts.ErrChan, } drs.lock.Lock() diff --git a/reader/tailx/tailx.go b/reader/tailx/tailx.go index 36516f7c1..d51f0b722 100644 --- a/reader/tailx/tailx.go +++ b/reader/tailx/tailx.go @@ -83,20 +83,19 @@ type Reader struct { } type ActiveReader struct { - cacheLineMux sync.RWMutex - br *bufreader.BufReader - realpath string - originpath string - readcache string - halfLineCache map[string]string //针对不同的数据源做一个缓存 - msgchan chan<- Result - errChan chan<- error - resetChan chan<- string - status int32 - inactive int32 //当inactive>0 时才会被expire回收 - runnerName string - runtime reader.RunTime - inodeStr string + cacheLineMux sync.RWMutex + br *bufreader.BufReader + realpath string + originpath string + readcache string + msgchan chan<- Result + errChan chan<- error + resetChan chan<- string + status int32 + inactive int32 //当inactive>0 时才会被expire回收 + runnerName string + runtime reader.RunTime + inodeStr string emptyLineCnt int @@ -158,21 +157,20 @@ func NewActiveReader(originPath, realPath, whence, inode string, r *Reader) (ar inode = strconv.Itoa(int(inodeInt)) } return &ActiveReader{ - cacheLineMux: sync.RWMutex{}, - br: bf, - realpath: realPath, - originpath: originPath, - msgchan: r.msgChan, - errChan: r.errChan, - halfLineCache: make(map[string]string), - resetChan: r.resetChan, - inodeStr: inode, - inactive: 1, - emptyLineCnt: 0, - runnerName: r.meta.RunnerName, - status: StatusInit, - statsLock: sync.RWMutex{}, - runtime: r.runTime, + cacheLineMux: sync.RWMutex{}, + br: bf, + realpath: realPath, + originpath: originPath, + msgchan: r.msgChan, + errChan: r.errChan, + resetChan: r.resetChan, + inodeStr: inode, + inactive: 1, + emptyLineCnt: 0, + runnerName: r.meta.RunnerName, + status: StatusInit, + statsLock: sync.RWMutex{}, + runtime: r.runTime, }, nil } @@ -319,40 +317,7 @@ func (ar *ActiveReader) Run() { ar.Stop() return } - - source := ar.br.Source() - if _, ok := ar.halfLineCache[source]; !ok { - ar.cacheLineMux.Lock() - ar.halfLineCache[source] = "" - ar.cacheLineMux.Unlock() - } - - if ar.readcache != "" && err == io.EOF { - ar.cacheLineMux.Lock() - if len(ar.halfLineCache[source])+len(ar.readcache) > 20*utils.Mb { - log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 20mb", ar.runnerName, ar.originpath, source) - ar.readcache = ar.halfLineCache[source] + ar.readcache - ar.halfLineCache[source] = "" - } else { - ar.halfLineCache[source] += ar.readcache - ar.readcache = "" - } - ar.cacheLineMux.Unlock() - } - - if err == nil && ar.halfLineCache[source] != "" { - ar.cacheLineMux.Lock() - ar.readcache += ar.halfLineCache[source] - ar.halfLineCache[source] = "" - ar.cacheLineMux.Unlock() - } - if len(ar.readcache) == 0 && ar.halfLineCache[source] == "" { - if key, exist := utils.GetKeyOfNotEmptyValueInMap(ar.halfLineCache); exist { - source = key - } - } - - if ar.readcache == "" && ar.halfLineCache[source] == "" { + if ar.readcache == "" { ar.emptyLineCnt++ //文件EOF,同时没有任何内容,代表不是第一次EOF,休息时间设置长一些 if err == io.EOF { @@ -373,17 +338,6 @@ func (ar *ActiveReader) Run() { //读取的结果为空,无论如何都sleep 1s time.Sleep(time.Second) continue - } else if ar.readcache == "" && ar.halfLineCache[source] != "" { - ar.emptyLineCnt++ - if err == io.EOF && ar.emptyLineCnt < 40 { - log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", ar.runnerName, ar.originpath) - time.Sleep(1 * time.Second) - continue - } - ar.cacheLineMux.Lock() - ar.readcache = ar.halfLineCache[source] - ar.halfLineCache[source] = "" - ar.cacheLineMux.Unlock() } } log.Debugf("Runner[%s] %s >>>>>>readcache <%s> linecache <%s>", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache), string(ar.br.FormMutiLine())) @@ -663,7 +617,7 @@ func (r *Reader) sendError(err error) { }() select { case r.errChan <- err: - case <-time.After(time.Second): + case <- time.After(time.Second): } }