Skip to content

Conversation

@PapaPiya
Copy link
Contributor

@PapaPiya PapaPiya commented Jun 9, 2021

Fixes [issue number]

Changes

  • 1.origin 不正确包括:seqfile reader、extract reader以及行merge时的origin不正确。
  • 2.cache问题应该可以通过读取eof后再次读取解决

Reviewers

Wiki Changes

  • options1...
  • options2...

Checklist

  • Rebased/mergeable
  • Tests pass
  • Wiki updated

@PapaPiya PapaPiya force-pushed the fix_cache branch 4 times, most recently from 1dec318 to eebb621 Compare June 9, 2021 07:12
@PapaPiya PapaPiya changed the title fix origin cache 1.修复文件尾行断行问题;2.修复origin不正确问题 Jun 9, 2021
@PapaPiya PapaPiya force-pushed the fix_cache branch 19 times, most recently from 1474111 to 038aa43 Compare June 10, 2021 18:33
// Read new data: try a limited number of times.
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:])
if n < 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err 不为nil的时候依然需要b.w += n

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err 不为nil的时候依然需要b.w += n

done

}

}
b.err = io.ErrNoProgress
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里把ErrNoProgress去掉,在上面readSlice里面就不会返回err,会一直for循环

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

上面已经有了b.err = err,所以这里可以忽略

SIdx := rc.NewSourceIndex()
for _, v := range SIdx {
// 从 NewSourceIndex 函数中返回的index值就是本次读取的批次中上一个DataSource的数据量,加上b.w就是上个DataSource的整体数据
if len(SIdx) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里没必要加这个判断,SIdx一定会>0。否则就出bug了

@liukai2012
Copy link
Collaborator

liukai2012 commented Jul 19, 2021

我感觉这个问题的根源在于什么是文件末尾,seqfile里面定义文件读取到eof就是文件末尾,但是对于实时读取的场景来说我觉得不严谨,如果读到eof的瞬间还有一部分数据没有写入file,那就会加/n,并且上面所做的一些都是对这个场景的修复。

我觉得文件expire的时候才是文件末尾,但是我们无法提前知道在expire之前eof之后是否还有数据写入,所以没有办法判断是否要加\n

所以我建议分两种情况:

  1. 对单个文件一直监控的比如tailx情况,读到eof后也不加\n,因为没有必要,后面有数据也是这个文件的
  2. 对dirx模式,只要读到eof后就加\n,dirx会不断轮训新的文件进行监控,可以认为文件eof后,新的文件就是该文件的一个补充。

@liukai2012
Copy link
Collaborator

我感觉这个问题的根源在于什么是文件末尾,seqfile里面定义文件读取到eof就是文件末尾,但是对于实时读取的场景来说我觉得不严谨,如果读到eof的瞬间还有一部分数据没有写入file,那就会加/n,并且上面所做的一些都是对这个场景的修复。

我觉得文件expire的时候才是文件末尾,但是我们无法提前知道在expire之前eof之后是否还有数据写入,所以没有办法判断是否要加\n

所以我建议分两种情况:

  1. 对单个文件一直监控的比如tailx情况,读到eof后也不加\n,因为没有必要,后面有数据也是这个文件的
  2. 对dirx模式,只要读到eof后就加\n,dirx会不断轮训新的文件进行监控,可以认为文件eof后,新的文件就是该文件的一个补充。

暂时不按照上面说的逻辑做大的重构了。

@PapaPiya PapaPiya force-pushed the fix_cache branch 4 times, most recently from d0588fc to 2db45ad Compare August 4, 2021 07:08
@liukai2012
Copy link
Collaborator

lgtm

@redHJ redHJ closed this Aug 16, 2021
@redHJ redHJ reopened this Aug 16, 2021
if len(dr.readcache) == 0 && dr.halfLineCache[source] == "" {
if key, exist := utils.GetKeyOfNotEmptyValueInMap(dr.halfLineCache); exist {
source = key
// 大约一小时没读到内容,设置为 inactive
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为什么是1小时

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个为什么是1小时

1小时是原先的逻辑,这里只是代码换个位置

dr.readLock.Unlock()
}

if cache, ok := dr.halfLineCache[source]; ok {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/qiniu/logkit/pull/1181/files#diff-d48b01cd98a25cbcc145cb6eda975cc2d56e6ef77d5c08fe4776e966bde20accL116 我看原逻辑判断大小了,这边需要保持原有逻辑吗?

防止没有换行符导致一直累积数据,加了大小限制

dr.readcache = ""
continue
} else {
delete(dr.halfLineCache, source)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边都不需要使用 readLock 吗

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边都不需要使用 readLock 吗

需要的,已加

n += n1
if n1 > 0 {
eofTimes = 0
if len(sf.newLineBytesSourceIndex) != 0 && sf.newLineBytesSourceIndex[len(sf.newLineBytesSourceIndex)-1].Source == sf.currFile {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(sf.newLineBytesSourceIndex)搞个变量吧,方便阅读

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(sf.newLineBytesSourceIndex)搞个变量吧,方便阅读

done

return
if err == io.EOF {
time.Sleep(time.Millisecond * 10)
eofTimes++
Copy link
Collaborator

@redHJ redHJ Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边的逻辑要实测一下

@redHJ
Copy link
Collaborator

redHJ commented Aug 19, 2021

改动比较多,需要针对 single/tailx/dir/dirx 都自己测一下正常读取和文件尾断行情况,再测一下性能,和之前的比一下

dr.readcache = cache + dr.readcache
dr.readLock.Unlock()
}
if !strings.HasSuffix(dr.readcache, string(dr.br.GetDelimiter())) && dr.numEmptyLines < 3 && len(dr.readcache) < 20*MB {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(dr.readcache) >= 20*MB 需要打个日志吗,方便排查?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len(dr.readcache) >= 20*MB 需要打个日志吗,方便排查?

done

@PapaPiya PapaPiya force-pushed the fix_cache branch 3 times, most recently from 82da3dc to efc7caa Compare August 19, 2021 03:45
@PapaPiya
Copy link
Contributor Author

改动比较多,需要针对 single/tailx/dir/dirx 都自己测一下正常读取和文件尾断行情况,再测一下性能,和之前的比一下

使用情况在客户环境验证过,性能测试需要测试协助

@redHJ
Copy link
Collaborator

redHJ commented Aug 19, 2021

改动比较多,需要针对 single/tailx/dir/dirx 都自己测一下正常读取和文件尾断行情况,再测一下性能,和之前的比一下

使用情况在客户环境验证过,性能测试需要测试协助

客户环境 single/tailx/dir/dirx 这几种情况都验证过吗?comment改了之后还要再实测一下各种场景,防止修复了一个问题,引入更多问题

@redHJ
Copy link
Collaborator

redHJ commented Aug 19, 2021

比较担心测试的case是否全面。其他 lgtm

@redHJ redHJ merged commit 898cb43 into qiniu:master Sep 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants