diff --git a/loggie.yml b/loggie.yml index 64ba5814e..48835afb3 100644 --- a/loggie.yml +++ b/loggie.yml @@ -14,7 +14,7 @@ loggie: sink: ~ queue: ~ pipeline: ~ - normalize: ~ + sys: ~ discovery: enabled: false @@ -31,15 +31,17 @@ loggie: defaults: sink: type: dev - interceptors: - - type: schema - name: global - order: 700 - addMeta: - timestamp: - key: "@timestamp" sources: - type: file + timestampKey: "@timestamp" + bodyKey: "message" + fieldsUnderRoot: true + addonMeta: true + addonMetaSchema: + underRoot: true + fields: + filename: "${_meta.filename}" + line: "${_meta.line}" watcher: maxOpenFds: 6000 http: diff --git a/pkg/core/source/config.go b/pkg/core/source/config.go index 64bb44432..70c2cc3dd 100644 --- a/pkg/core/source/config.go +++ b/pkg/core/source/config.go @@ -38,6 +38,11 @@ type Config struct { FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"` FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"` Codec *codec.Config `yaml:"codec,omitempty"` + + TimestampKey string `yaml:"timestampKey,omitempty"` + TimestampLocation string `yaml:"timestampLocation,omitempty"` + TimestampLayout string `yaml:"timestampLayout,omitempty"` + BodyKey string `yaml:"bodyKey,omitempty"` } func (c *Config) DeepCopy() *Config { @@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config { FieldsFromEnv: newFieldsFromEnv, FieldsFromPath: newFieldsFromPath, Codec: c.Codec.DeepCopy(), + + TimestampKey: c.TimestampKey, + TimestampLocation: c.TimestampLocation, + TimestampLayout: c.TimestampLayout, + BodyKey: c.BodyKey, } return out @@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) { } else { c.Codec.Merge(from.Codec) } + + if c.TimestampKey == "" { + c.TimestampKey = from.TimestampKey + } + if c.TimestampLocation == "" { + c.TimestampLocation = from.TimestampLocation + } + if c.TimestampLayout == "" { + c.TimestampLayout = from.TimestampLayout + } + if c.BodyKey == "" { + c.BodyKey = from.BodyKey + } } func MergeSourceList(base []*Config, from []*Config) []*Config { diff --git a/pkg/pipeline/pipeline.go b/pkg/pipeline/pipeline.go index 850657a48..269cf168b 100644 --- a/pkg/pipeline/pipeline.go +++ b/pkg/pipeline/pipeline.go @@ -18,6 +18,7 @@ package pipeline import ( "fmt" + timeutil "github.com/loggie-io/loggie/pkg/util/time" "io/ioutil" "os" "strconv" @@ -46,10 +47,9 @@ import ( ) const ( - FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot" - FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey" - fieldsFromPathMaxBytes = 1024 + + defaultTsLayout = "2006-01-02T15:04:05.000Z" ) var ( @@ -1054,11 +1054,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) { func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) { // add meta fields - e.Meta().Set(event.SystemProductTimeKey, time.Now()) + now := time.Now() + e.Meta().Set(event.SystemProductTimeKey, now) e.Meta().Set(event.SystemPipelineKey, p.name) e.Meta().Set(event.SystemSourceKey, config.Name) - e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot) - e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey) header := e.Header() if header == nil { @@ -1073,6 +1072,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) { // add header source fields from file AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey) + + // remap timestamp + if config.TimestampKey != "" { + layout := config.TimestampLayout + if layout == "" { + layout = defaultTsLayout + } + + // conf.Location could be "" or "UTC" or "Local" + // default "" indicate "UTC" + ts, err := timeutil.Format(now, config.TimestampLocation, layout) + if err != nil { + log.Warn("time format system product timestamp err: %+v", err) + return + } + header[config.TimestampKey] = ts + } + + if config.BodyKey != "" { + header[config.BodyKey] = util.ByteToStringUnsafe(e.Body()) + e.Fill(e.Meta(), header, []byte{}) + } } func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {