Skip to content

Commit

Permalink
Feat: support copytruncate in file source (#571)
Browse files Browse the repository at this point in the history
* Feat: support copytruncate in file source
  • Loading branch information
ethfoo committed Jul 13, 2023
1 parent 71ce680 commit a577e59
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
89 changes: 54 additions & 35 deletions pkg/source/file/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type jobEvent struct {
opt Operation
job *Job
newFilename string
newFileSize int64
}

type Watcher struct {
Expand Down Expand Up @@ -212,11 +213,10 @@ func (w *Watcher) reportMetric(job *Job) {
PipelineName: job.task.pipelineName,
SourceName: job.task.sourceName,
},
FileName: job.filename,
Offset: job.endOffset,
LineNumber: job.currentLineNumber,
Lines: job.currentLines,
// FileSize: fileSize,
FileName: job.filename,
Offset: job.endOffset,
LineNumber: job.currentLineNumber,
Lines: job.currentLines,
SourceFields: job.task.sourceFields,
}
job.currentLines = 0
Expand Down Expand Up @@ -249,23 +249,47 @@ func (w *Watcher) eventBus(e jobEvent) {
}
// only care about zombie job write event
watchJobId := job.WatchUid()
if existJob, ok := w.zombieJobs[watchJobId]; ok {
err, fdOpen := existJob.Active()
if fdOpen {
w.currentOpenFds++
}
if err != nil {
log.Error("active job fileName(%s) fail: %s", filename, err)
if existJob.Release() {
w.currentOpenFds--
existJob, ok := w.zombieJobs[watchJobId]
if !ok {
return
}

// check whether the file size is less than the offset in the job
filesize := e.newFileSize
currentOffset := job.endOffset
if filesize < currentOffset {
// maybe the file is truncated
log.Info("filesize: %d, currentOffset: %d", filesize, currentOffset)
existRegistry := w.findExistJobRegistry(job)
existAckOffset := existRegistry.Offset
if existAckOffset > filesize+int64(len(job.GetEncodeLineEnd())) {
log.Warn("the job(jobUid:%s) fileName(%s) existRegistry(%+v) ackOffset is larger than file size(%d), the file was truncate", job.Uid(), filename, existRegistry, filesize)
// file was truncated, need to reinitialize the job
job.Delete()
if w.isZombieJob(job) {
w.finalizeJob(job)
}
return
}
existJob.Read()
// zombie job change to active, so without os notify
w.removeOsNotify(existJob.filename)
delete(w.zombieJobs, watchJobId)
}

err, fdOpen := existJob.Active()
if fdOpen {
w.currentOpenFds++
}
if err != nil {
log.Error("active job fileName(%s) fail: %s", filename, err)
if existJob.Release() {
w.currentOpenFds--
}
return
}
existJob.Read()
// zombie job change to active, so without os notify
w.removeOsNotify(existJob.filename)
log.Debug("job fileName(%s) change to active", filename)
delete(w.zombieJobs, watchJobId)

case CREATE:
if w.currentOpenFds >= w.config.MaxOpenFds {
log.Error("maxCollectFiles reached. fileName(%s) will be ignore", filename)
Expand Down Expand Up @@ -557,12 +581,7 @@ func (w *Watcher) scanActiveJob() {
}
}

// check zombie job:
// 0. final status
// 1. remove
// 2. fd hold timeout,release fd
// 3. write
// 4. truncated file
// check zombie job
func (w *Watcher) scanZombieJob() {
for _, job := range w.zombieJobs {
if job.IsDelete() {
Expand All @@ -571,8 +590,6 @@ func (w *Watcher) scanZombieJob() {
}
filename := job.filename
stat, err := os.Stat(filename)
//var stat os.FileInfo
//var err error
var checkRemove = func() bool {
if err != nil {
if os.IsNotExist(err) {
Expand Down Expand Up @@ -621,8 +638,9 @@ func (w *Watcher) scanZombieJob() {
job.nextOffset = 0
job.currentLineNumber = 0
w.eventBus(jobEvent{
opt: WRITE,
job: job,
opt: WRITE,
job: job,
newFileSize: size,
})
continue
}
Expand All @@ -648,8 +666,9 @@ func (w *Watcher) scanZombieJob() {
size := stat.Size()
if size > job.nextOffset && !job.task.config.IsIgnoreOlder(stat) {
w.eventBus(jobEvent{
opt: WRITE,
job: job,
opt: WRITE,
job: job,
newFileSize: size,
})
continue
}
Expand Down Expand Up @@ -713,7 +732,6 @@ func (w *Watcher) run() {
case job := <-w.zombieJobChan:
w.decideZombieJob(job)
case e := <-osEvents:
// log.Info("os event: %v", e)
w.osNotify(e)
case <-scanFileTicker.C:
w.scan()
Expand Down Expand Up @@ -802,7 +820,6 @@ func (w *Watcher) decideZombieJob(job *Job) {
}

func (w *Watcher) osNotify(e fsnotify.Event) {
log.Debug("received os notify: %+v", e)
if e.Op == fsnotify.Chmod {
// File writing will also be received. Ignore it. Only check whether you have read permission when the file job is activated (job. Active())
return
Expand All @@ -819,6 +836,7 @@ func (w *Watcher) osNotify(e fsnotify.Event) {
if e.Op == fsnotify.Rename {
return
}
log.Debug("received os notify: %+v", e)

fileName := e.Name
if ignoreSystemFile(fileName) {
Expand Down Expand Up @@ -856,8 +874,9 @@ func (w *Watcher) osNotify(e fsnotify.Event) {
for _, existJob := range w.allJobs {
if existJob.Uid() == jobUid {
w.eventBus(jobEvent{
opt: WRITE,
job: existJob,
opt: WRITE,
job: existJob,
newFileSize: stat.Size(),
})
}
}
Expand Down Expand Up @@ -1080,7 +1099,7 @@ func (w *Watcher) handleRemoveJobs(jobs ...*Job) {
JobUid: jt.Uid(),
Filename: jt.filename,
}
log.Info("try to delete registry(%+v) because CleanWhenRemoved. deleteTime: %s", r, jt.deleteTime.Load().(time.Time).Format(persistence.TimeFormatPattern))
log.Info("try to delete registry(%+v). deleteTime: %s", r, jt.deleteTime.Load().(time.Time).Format(persistence.TimeFormatPattern))
w.dbHandler.HandleOpt(persistence.DbOpt{
R: r,
OptType: persistence.DeleteByJobUidOpt,
Expand Down
3 changes: 1 addition & 2 deletions pkg/util/persistence/driver/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,10 @@ func (e *Engine) DeleteBy(jobUid string, sourceName string, pipelineName string)
if err != nil {
resErr = errors.WithMessagef(err, "%s stmt exec fail", e.String())
}
affected, err := result.RowsAffected()
_, err = result.RowsAffected()
if err != nil {
resErr = errors.WithMessagef(err, "%s get result fail", e.String())
}
log.Info("delete registry(jobUid:%s, sourceName:%s, pipelineName:%s). affected: %d", jobUid, sourceName, pipelineName, affected)
})
return resErr
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/persistence/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,15 @@ func (d *DbHandler) updateFileName(registries []reg.Registry) {
func (d *DbHandler) upsertOffsetByJobWatchId(r reg.Registry) {
r.CollectTime = time2text(time.Now())
r.Version = api.VERSION
rs := []reg.Registry{r}

or := d.FindBy(r.JobUid, r.SourceName, r.PipelineName)
if or.JobUid != "" {
// update
r.Id = or.Id
d.updateRegistry(rs)
d.updateRegistry([]reg.Registry{r})
} else {
// insert
d.insertRegistry(rs)
d.insertRegistry([]reg.Registry{r})
}
}

Expand All @@ -276,7 +275,7 @@ func (d *DbHandler) delete(r reg.Registry) {
log.Error("%s fail to delete registry %s : %s", d.String(), r.Key(), err)
return
}
log.Info("delete registry %s because db.cleanInactiveTimeout(%dh) reached. file: %s", r.Key(), d.config.CleanInactiveTimeout/time.Hour, r.Filename)
log.Info("delete registry %s. file: %s", r.Key(), r.Filename)
}

func (d *DbHandler) deleteRemoved(rs []reg.Registry) {
Expand All @@ -300,6 +299,7 @@ func (d *DbHandler) cleanData() {
t := text2time(collectTime)
if time.Since(t) >= d.config.CleanInactiveTimeout {
// delete
log.Info("clean inactive registry: %s because CleanInactiveTimeout(%dh) reached ", r.Key(), d.config.CleanInactiveTimeout/time.Hour)
d.delete(r)
}
}
Expand Down

0 comments on commit a577e59

Please sign in to comment.