Skip to content

Commit

Permalink
Feat: add addonMetaSchema in file source
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo committed Jul 25, 2023
1 parent 96516da commit 610ae9c
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
7 changes: 7 additions & 0 deletions pkg/source/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CollectConfig struct {
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
AddonMeta bool `yaml:"addonMeta,omitempty"`
AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"`
excludeFilePatterns []*regexp.Regexp
Charset string `yaml:"charset,omitempty" default:"utf-8"`

Expand All @@ -54,6 +55,12 @@ type CollectConfig struct {
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
}

type AddonMetaSchema struct {
Fields map[string]string `yaml:"fields,omitempty"`
FieldsUnderRoot bool `yaml:"underRoot,omitempty"`
FieldsUnderKey string `yaml:"key,omitempty" default:"state"`
}

type LineDelimiterValue struct {
Charset string `yaml:"charset,omitempty" default:"utf-8"`
LineType string `yaml:"type,omitempty" default:"auto"`
Expand Down
113 changes: 102 additions & 11 deletions pkg/source/file/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ type Source struct {
multilineProcessor *MultiProcessor
mTask *MultiTask
codec codec.Codec

addonMetaField *AddonMetaFields
}

type AddonMetaFields struct {
Pipeline string `yaml:"pipeline,omitempty"`
Source string `yaml:"source,omitempty"`
Filename string `yaml:"filename,omitempty"`
Timestamp string `yaml:"timestamp,omitempty"`
Offset string `yaml:"offset,omitempty"`
Bytes string `yaml:"bytes,omitempty"`
Line string `yaml:"line,omitempty"`
Hostname string `yaml:"hostname,omitempty"`
}

func (s *Source) Config() interface{} {
Expand Down Expand Up @@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error {
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
}

if s.config.CollectConfig.AddonMeta {
s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields)
}

// init reader chan size
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds

Expand Down Expand Up @@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) {
s.productFunc = productFunc
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
if s.config.CollectConfig.AddonMeta {
s.productFunc = addonMetaProductFunc(s.productFunc)
s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema)
}
if s.config.ReaderConfig.MultiConfig.Active {
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
Expand Down Expand Up @@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap
}
}

func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc {
func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc {
return func(event api.Event) api.Result {
s, _ := event.Meta().Get(SystemStateKey)
state := s.(*persistence.State)
addonMeta := make(map[string]interface{})
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName

event.Header()["state"] = addonMeta

// if fields is nil, use default config
if fields == nil {
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName
} else {

if fields.Pipeline != "" {
addonMeta[fields.Pipeline] = state.PipelineName
}
if fields.Source != "" {
addonMeta[fields.Source] = state.SourceName
}
if fields.Filename != "" {
addonMeta[fields.Filename] = state.Filename
}
if fields.Timestamp != "" {
addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout)
}
if fields.Offset != "" {
addonMeta[fields.Offset] = state.Offset
}
if fields.Bytes != "" {
addonMeta[fields.Bytes] = state.ContentBytes
}
if fields.Line != "" {
addonMeta[fields.Line] = state.LineNumber
}
if fields.Hostname != "" {
addonMeta[fields.Hostname] = global.NodeName
}
}

if schema.FieldsUnderRoot {
for k, v := range addonMeta {
event.Header()[k] = v
}
} else {
event.Header()[schema.FieldsUnderKey] = addonMeta
}

productFunc(event)
return result.Success()
}
}

func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields {
if len(fields) == 0 {
return nil
}

amf := &AddonMetaFields{}
for k, v := range fields {
switch v {
case "${_meta.pipeline}":
amf.Pipeline = k

case "${_meta.source}":
amf.Source = k

case "${_meta.filename}":
amf.Filename = k

case "${_meta.timestamp}":
amf.Timestamp = k

case "${_meta.offset}":
amf.Offset = k

case "${_meta.bytes}":
amf.Bytes = k

case "${_meta.line}":
amf.Line = k

case "${_meta.hostname}":
amf.Hostname = k
}
}

return amf
}

0 comments on commit 610ae9c

Please sign in to comment.