Skip to content

Commit

Permalink
Feat: add timestamp and bodyKey in source
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo committed Jul 25, 2023
1 parent c452784 commit 6288da0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
18 changes: 10 additions & 8 deletions loggie.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ loggie:
sink: ~
queue: ~
pipeline: ~
normalize: ~
sys: ~

discovery:
enabled: false
Expand All @@ -31,15 +31,17 @@ loggie:
defaults:
sink:
type: dev
interceptors:
- type: schema
name: global
order: 700
addMeta:
timestamp:
key: "@timestamp"
sources:
- type: file
timestampKey: "@timestamp"
bodyKey: "message"
fieldsUnderRoot: true
addonMeta: true
addonMetaSchema:
underRoot: true
fields:
filename: "${_meta.filename}"
line: "${_meta.line}"
watcher:
maxOpenFds: 6000
http:
Expand Down
23 changes: 23 additions & 0 deletions pkg/core/source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type Config struct {
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
Codec *codec.Config `yaml:"codec,omitempty"`

TimestampKey string `yaml:"timestampKey,omitempty"`
TimestampLocation string `yaml:"timestampLocation,omitempty"`
TimestampLayout string `yaml:"timestampLayout,omitempty"`
BodyKey string `yaml:"bodyKey,omitempty"`
}

func (c *Config) DeepCopy() *Config {
Expand Down Expand Up @@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config {
FieldsFromEnv: newFieldsFromEnv,
FieldsFromPath: newFieldsFromPath,
Codec: c.Codec.DeepCopy(),

TimestampKey: c.TimestampKey,
TimestampLocation: c.TimestampLocation,
TimestampLayout: c.TimestampLayout,
BodyKey: c.BodyKey,
}

return out
Expand Down Expand Up @@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) {
} else {
c.Codec.Merge(from.Codec)
}

if c.TimestampKey == "" {
c.TimestampKey = from.TimestampKey
}
if c.TimestampLocation == "" {
c.TimestampLocation = from.TimestampLocation
}
if c.TimestampLayout == "" {
c.TimestampLayout = from.TimestampLayout
}
if c.BodyKey == "" {
c.BodyKey = from.BodyKey
}
}

func MergeSourceList(base []*Config, from []*Config) []*Config {
Expand Down
33 changes: 27 additions & 6 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pipeline

import (
"fmt"
timeutil "github.com/loggie-io/loggie/pkg/util/time"
"io/ioutil"
"os"
"strconv"
Expand Down Expand Up @@ -46,10 +47,9 @@ import (
)

const (
FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot"
FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey"

fieldsFromPathMaxBytes = 1024

defaultTsLayout = "2006-01-02T15:04:05.000Z"
)

var (
Expand Down Expand Up @@ -1054,11 +1054,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {

func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
// add meta fields
e.Meta().Set(event.SystemProductTimeKey, time.Now())
now := time.Now()
e.Meta().Set(event.SystemProductTimeKey, now)
e.Meta().Set(event.SystemPipelineKey, p.name)
e.Meta().Set(event.SystemSourceKey, config.Name)
e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot)
e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey)

header := e.Header()
if header == nil {
Expand All @@ -1073,6 +1072,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {

// add header source fields from file
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)

// remap timestamp
if config.TimestampKey != "" {
layout := config.TimestampLayout
if layout == "" {
layout = defaultTsLayout
}

// conf.Location could be "" or "UTC" or "Local"
// default "" indicate "UTC"
ts, err := timeutil.Format(now, config.TimestampLocation, layout)
if err != nil {
log.Warn("time format system product timestamp err: %+v", err)
return
}
header[config.TimestampKey] = ts
}

if config.BodyKey != "" {
header[config.BodyKey] = util.ByteToStringUnsafe(e.Body())
e.Fill(e.Meta(), header, []byte{})
}
}

func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {
Expand Down

0 comments on commit 6288da0

Please sign in to comment.