Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 12 additions & 68 deletions reader/dirx/dir_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type dirReader struct {
logPath string
readLock sync.RWMutex
readcache string
halfLineCache map[string]string //针对不同的数据源做一个缓存
numEmptyLines int

msgChan chan<- message
Expand Down Expand Up @@ -104,40 +103,8 @@ func (dr *dirReader) Run() {
time.Sleep(2 * time.Second)
continue
}
source := dr.br.Source()
if _, ok := dr.halfLineCache[source]; !ok {
dr.readLock.Lock()
dr.halfLineCache[source] = ""
dr.readLock.Unlock()
}

if dr.readcache != "" && err == io.EOF {
dr.readLock.Lock()
if len(dr.halfLineCache[source])+len(dr.readcache) > 20*utils.Mb {
log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 2k", dr.runnerName, dr.originalPath, source)
dr.readcache = dr.halfLineCache[source] + dr.readcache
dr.halfLineCache[source] = ""
} else {
dr.halfLineCache[source] += dr.readcache
dr.readcache = ""
}
dr.readLock.Unlock()
}

if err == nil && dr.halfLineCache[source] != "" {
dr.readLock.Lock()
dr.readcache += dr.halfLineCache[source]
dr.halfLineCache[source] = ""
dr.readLock.Unlock()
}

if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" {
if key, exist := utils.GetKeyOfNotEmptyValueInMap(dr.halfLineCache); exist {
source = key
}
}

if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" {
if len(dr.readcache) == 0 {
dr.numEmptyLines++
// 文件 EOF,同时没有任何内容,代表不是第一次 EOF,休息时间设置长一些
if err == io.EOF {
Expand All @@ -155,19 +122,9 @@ func (dr *dirReader) Run() {
// 读取的结果为空,无论如何都 sleep 1s
time.Sleep(time.Second)
continue
} else if len(dr.readcache) == 0 && dr.halfLineCache[source] != "" {
dr.numEmptyLines++
if err == io.EOF && dr.numEmptyLines < 40{
log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", dr.runnerName, dr.originalPath)
time.Sleep(1 * time.Second)
continue
}
dr.readLock.Lock()
dr.readcache = dr.halfLineCache[source]
dr.halfLineCache[source] = ""
dr.readLock.Unlock()
}
}

log.Debugf("Runner[%v] %v >>>>>> read cache[%v] line cache [%v]", dr.runnerName, dr.originalPath, dr.readcache, string(dr.br.FormMutiLine()))
repeat := 0
for {
Expand All @@ -193,7 +150,6 @@ func (dr *dirReader) Run() {
case dr.msgChan <- message{result: dr.readcache, logpath: dr.originalPath, currentFile: dr.br.Source()}:
dr.readLock.Lock()
dr.readcache = ""

dr.readLock.Unlock()
case <-ticker.C:
}
Expand Down Expand Up @@ -239,20 +195,9 @@ func HasDirExpired(dir string, expire time.Duration) bool {
return latestModTime.Add(expire).Before(time.Now())
}

//对于读完的直接认为过期,因为不会追加新数据;最后一次需要等待30s
//对于读完的直接认为过期,因为不会追加新数据
func (dr *dirReader) ReadDone() bool {
return dr.br.ReadDone() && dr.halfLineCacheIsEmpty()
}

func (dr *dirReader) halfLineCacheIsEmpty() bool {
dr.readLock.Lock()
defer dr.readLock.Unlock()
for _, v := range dr.halfLineCache {
if v != "" {
return false
}
}
return true
return dr.br.ReadDone()
}

func (dr *dirReader) HasExpired(expire time.Duration) bool {
Expand Down Expand Up @@ -399,15 +344,14 @@ func (drs *dirReaders) NewReader(opts newReaderOptions, notFirstTime bool, maxLi
}

dr := &dirReader{
status: StatusInit,
inactive: 1,
br: br,
halfLineCache: make(map[string]string),
runnerName: opts.Meta.RunnerName,
originalPath: opts.OriginalPath,
logPath: opts.LogPath,
msgChan: opts.MsgChan,
errChan: opts.ErrChan,
status: StatusInit,
inactive: 1,
br: br,
runnerName: opts.Meta.RunnerName,
originalPath: opts.OriginalPath,
logPath: opts.LogPath,
msgChan: opts.MsgChan,
errChan: opts.ErrChan,
}

drs.lock.Lock()
Expand Down
104 changes: 29 additions & 75 deletions reader/tailx/tailx.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,20 +83,19 @@ type Reader struct {
}

type ActiveReader struct {
cacheLineMux sync.RWMutex
br *bufreader.BufReader
realpath string
originpath string
readcache string
halfLineCache map[string]string //针对不同的数据源做一个缓存
msgchan chan<- Result
errChan chan<- error
resetChan chan<- string
status int32
inactive int32 //当inactive>0 时才会被expire回收
runnerName string
runtime reader.RunTime
inodeStr string
cacheLineMux sync.RWMutex
br *bufreader.BufReader
realpath string
originpath string
readcache string
msgchan chan<- Result
errChan chan<- error
resetChan chan<- string
status int32
inactive int32 //当inactive>0 时才会被expire回收
runnerName string
runtime reader.RunTime
inodeStr string

emptyLineCnt int

Expand Down Expand Up @@ -158,21 +157,20 @@ func NewActiveReader(originPath, realPath, whence, inode string, r *Reader) (ar
inode = strconv.Itoa(int(inodeInt))
}
return &ActiveReader{
cacheLineMux: sync.RWMutex{},
br: bf,
realpath: realPath,
originpath: originPath,
msgchan: r.msgChan,
errChan: r.errChan,
halfLineCache: make(map[string]string),
resetChan: r.resetChan,
inodeStr: inode,
inactive: 1,
emptyLineCnt: 0,
runnerName: r.meta.RunnerName,
status: StatusInit,
statsLock: sync.RWMutex{},
runtime: r.runTime,
cacheLineMux: sync.RWMutex{},
br: bf,
realpath: realPath,
originpath: originPath,
msgchan: r.msgChan,
errChan: r.errChan,
resetChan: r.resetChan,
inodeStr: inode,
inactive: 1,
emptyLineCnt: 0,
runnerName: r.meta.RunnerName,
status: StatusInit,
statsLock: sync.RWMutex{},
runtime: r.runTime,
}, nil

}
Expand Down Expand Up @@ -319,40 +317,7 @@ func (ar *ActiveReader) Run() {
ar.Stop()
return
}

source := ar.br.Source()
if _, ok := ar.halfLineCache[source]; !ok {
ar.cacheLineMux.Lock()
ar.halfLineCache[source] = ""
ar.cacheLineMux.Unlock()
}

if ar.readcache != "" && err == io.EOF {
ar.cacheLineMux.Lock()
if len(ar.halfLineCache[source])+len(ar.readcache) > 20*utils.Mb {
log.Warnf("Runner[%v] log path[%v] reader[%v] single line size has exceed 20mb", ar.runnerName, ar.originpath, source)
ar.readcache = ar.halfLineCache[source] + ar.readcache
ar.halfLineCache[source] = ""
} else {
ar.halfLineCache[source] += ar.readcache
ar.readcache = ""
}
ar.cacheLineMux.Unlock()
}

if err == nil && ar.halfLineCache[source] != "" {
ar.cacheLineMux.Lock()
ar.readcache += ar.halfLineCache[source]
ar.halfLineCache[source] = ""
ar.cacheLineMux.Unlock()
}
if len(ar.readcache) == 0 && ar.halfLineCache[source] == "" {
if key, exist := utils.GetKeyOfNotEmptyValueInMap(ar.halfLineCache); exist {
source = key
}
}

if ar.readcache == "" && ar.halfLineCache[source] == "" {
if ar.readcache == "" {
ar.emptyLineCnt++
//文件EOF,同时没有任何内容,代表不是第一次EOF,休息时间设置长一些
if err == io.EOF {
Expand All @@ -373,17 +338,6 @@ func (ar *ActiveReader) Run() {
//读取的结果为空,无论如何都sleep 1s
time.Sleep(time.Second)
continue
} else if ar.readcache == "" && ar.halfLineCache[source] != "" {
ar.emptyLineCnt++
if err == io.EOF && ar.emptyLineCnt < 40 {
log.Debugf("Runner[%s] %s meet EOF, ActiveReader was inactive now, stop it", ar.runnerName, ar.originpath)
time.Sleep(1 * time.Second)
continue
}
ar.cacheLineMux.Lock()
ar.readcache = ar.halfLineCache[source]
ar.halfLineCache[source] = ""
ar.cacheLineMux.Unlock()
}
}
log.Debugf("Runner[%s] %s >>>>>>readcache <%s> linecache <%s>", ar.runnerName, ar.originpath, strings.TrimSpace(ar.readcache), string(ar.br.FormMutiLine()))
Expand Down Expand Up @@ -663,7 +617,7 @@ func (r *Reader) sendError(err error) {
}()
select {
case r.errChan <- err:
case <-time.After(time.Second):
case <- time.After(time.Second):
}
}

Expand Down