Skip to content

Commit

Permalink
Refactor formatters to use metadata as intermediate (#249)
Browse files Browse the repository at this point in the history
* convert grokToJSON to grok to metadata
* remove collectd to influx formatter.
Resons:
- This formatter is too specific.
- A collectd consumer and an influxdb producer should handle this instead.
- There is other software doing this kind of work better than gollum (e.g. telegraph).

* remove extractToJSON.
Reason: This kind of functionality will be handled by formatters converting metadata to a specific output format like JSON.

* remove jsonToInflux. Same reason as collectdToInflux.
* convert jsonToArray to ToArray to match the new metadata concept.
* remove serialize formatter as spooling does not need it anymore.
* add a formater that can convert metadata to json
* remove filter.json go as it will be replaced by filter.metadata.
* rename 'ApplyTo' to 'Target'.
- includes renaming format.MetadataCopy to format.Copy.
- includes adding format.Move

As we now put more emphasis on metadata, 'target' becomes a much clearer identifier.
Along with this, the names for Set and Get functions have been simplified.
This should make formatter configurations more readable and easier to understand.

* use a better example for format.double
* add an option to keep the last separator
* finish format.ToJSON
* remove format.TextToJSON.

- Nobody understood the syntax anyways.
- format.Grok can handle most of its usecases.
- A high maintenance burden for a too small usecase.

* rename format.Clear to format.Delete
* remove unittest for texttojson
* rename format.Trim to format.Bounds.
There will be a new format.Trim that uses strings.Trim.

* add format.Trim
* rename format.Bounds to format.TrimToBounds
* introduce Source parameter to all SimpleFormatter.
- Also includes targeting a specific metadata key for parser formatter.

* add a metadata access function for parsers
* switch aggregate from Target to Source
* fix metadata being properly cloned on msg.Clone
* add target data getter
* fix base64 encoding tests
* fix copy unittests and simplify code
* make format.Double respect source and target fields
* fix source/target related unittest failures
* fix expected error checking
* fix source/target related unittest errors
* rename applyTo to identifier for setter functions
* retain type when moving metadata
* add unittest for format.Move
* add split formatter
* change splitpick default delimiter to ","
* add documentation for dynamic functions
* add format.Replace
* add a unittest for format.Trim
* add format.flatten
* add format.agent
* add format.GeoIP
* remove process* formatters.
All subformatters have been moved to be separate formaters working on metadata.

* rename toArray to toCSV
* add format.ConvertTime
* allow full string replacement
* reintroduce ApplyTo to set Source and Target to the same value
* change templateJSON to template
* convert SplitToJSON into SplitToFields
* fix trim to bounds behavior and test
* make geoip file loading a non-breaking error
* add json parser formatter
* allow split to fields to target root
* update json benchmark
* fix config for regexp filter integration test
* delete removed formatters/filters
* update dependencies
* exchange strings.ReplaceAll with strings.Replace.
This enables downwards compatibility.
  • Loading branch information
arnecls authored Jun 7, 2019
1 parent eb7c85e commit edc8a43
Show file tree
Hide file tree
Showing 667 changed files with 17,221 additions and 132,837 deletions.
19 changes: 12 additions & 7 deletions config/profile_json.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
KeepRunning: false
ModulatorRoutines: 0
Modulators:
- format.ProcessJSON:
Directives:
- "test:rename:foobar"
- "bar:remove"
- "foo:split:|:foo1:foo2"
- format.ExtractJSON:
Field: thisisquitealongstring
- format.JSON: {}
- format.Move:
Source: "test"
Target: "foobar"
- format.Delete:
Target: "bar"
- format.SplitToFields:
Source: "foo"
Delimiter: "|"
Fields: ["foo1","foo2"]
- format.Copy:
Source: "thisisquitealongstring"

"Benchmark":
Type: "producer.Benchmark"
Expand Down
18 changes: 9 additions & 9 deletions consumer/syslogd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@ package consumer
import (
"testing"

"github.com/trivago/gollum/core"
"github.com/trivago/tgo/tcontainer"
"github.com/trivago/tgo/ttesting"
)

func TestSyslogStructuredDataParser(t *testing.T) {
expect := ttesting.NewExpect(t)

data1 := "[id@12345 key=\"value\" key2=\"quoted \\\"data\\\"\" key3=\"line\nbreak\" key4 = \"value\"]"
metadata := core.Metadata{}
metadata := tcontainer.MarshalMap{}

parseCustomFields(data1, &metadata)
expect.MapEqual(metadata, "key", []byte("value"))
expect.MapEqual(metadata, "key2", []byte("quoted \"data\""))
expect.MapEqual(metadata, "key3", []byte("line\nbreak"))
expect.MapEqual(metadata, "key4", []byte("value"))
expect.MapEqual(metadata, "key", "value")
expect.MapEqual(metadata, "key2", "quoted \"data\"")
expect.MapEqual(metadata, "key3", "line\nbreak")
expect.MapEqual(metadata, "key4", "value")

data2 := "[id@12345 key=\"value\"][id@12345 key2=\"value\"]"
metadata = core.Metadata{}
metadata = tcontainer.MarshalMap{}
parseCustomFields(data2, &metadata)
expect.MapEqual(metadata, "key", []byte("value"))
expect.MapEqual(metadata, "key2", []byte("value"))
expect.MapEqual(metadata, "key", "value")
expect.MapEqual(metadata, "key2", "value")
}
4 changes: 4 additions & 0 deletions core/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (msg *Message) Clone() *Message {
clone.data.payload = make([]byte, len(msg.data.payload))
copy(clone.data.payload, msg.data.payload)

if clone.data.metadata != nil {
clone.data.metadata = msg.data.metadata.Clone()
}

return &clone
}

Expand Down
1 change: 1 addition & 0 deletions core/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func TestMessageCloneOriginalMetadata(t *testing.T) {
expect.Equal("bar", value)

value, err = clone.GetMetadata().String("foo")
expect.NoError(err)
expect.Equal("original_bar", value)

expect.Equal(MessageStreamID(1), clone.GetStreamID())
Expand Down
117 changes: 94 additions & 23 deletions core/messagehelper.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
package core

import "fmt"
import (
"fmt"

// GetAppliedContentFunc is a func() to get message content from payload or meta data
"github.com/trivago/tgo/tcontainer"
)

// GetDataFunc is a func() to get message content from payload or meta data
// for later handling by plugins
type GetAppliedContentFunc func(msg *Message) interface{}
type GetDataFunc func(msg *Message) interface{}

// GetAppliedContentAsStringFunc acts as a wrapper around GetAppliedContentFunc
// GetDataAsStringFunc acts as a wrapper around GetDataFunc
// if only string data can be processed.
type GetAppliedContentAsStringFunc func(msg *Message) string
type GetDataAsStringFunc func(msg *Message) string

// GetAppliedContentAsBytesFunc acts as a wrapper around GetAppliedContentFunc
// GetDataAsBytesFunc acts as a wrapper around GetDataFunc
// if only []byte] data can be processed.
type GetAppliedContentAsBytesFunc func(msg *Message) []byte
type GetDataAsBytesFunc func(msg *Message) []byte

// GetMetadataRootFunc acts as a wrapper around a function that returns the
// metadata value as MarshalMap for a fixed key. The function returns an
// error if the value behind the fixed key is not a MarshalMap.
type GetMetadataRootFunc func(msg *Message) (tcontainer.MarshalMap, error)

// SetAppliedContentFunc is a func() to store message content to payload or meta data
type SetAppliedContentFunc func(msg *Message, content interface{})
// ForceMetadataRootFunc works like GetMetadataRootFunc but makes sure that
// the targeted key is existing and usable.
type ForceMetadataRootFunc func(msg *Message) tcontainer.MarshalMap

// SetDataFunc is a func() to store message content to payload or meta data
type SetDataFunc func(msg *Message, content interface{})

func getPayloadContent(msg *Message) interface{} {
return msg.GetPayload()
Expand All @@ -32,36 +45,94 @@ func getMetadataContent(msg *Message, key string) interface{} {
return []byte{}
}

// NewGetAppliedContentFunc returns a GetAppliedContentFunc function
func NewGetAppliedContentFunc(applyTo string) GetAppliedContentFunc {
if applyTo == "" {
func getMetadataRoot(msg *Message, root string) (tcontainer.MarshalMap, error) {
metadata := msg.TryGetMetadata()
if metadata == nil {
return nil, fmt.Errorf("no metadata set")
}

if len(root) == 0 {
return metadata, nil
}

val, exists := metadata.Value(root)
if !exists {
rootValue := tcontainer.MarshalMap{}
metadata.Set(root, rootValue)
return rootValue, nil
}

return tcontainer.ConvertToMarshalMap(val, nil)
}

func forceMetadataRoot(msg *Message, root string) tcontainer.MarshalMap {
metadata := msg.GetMetadata()
if len(root) == 0 {
return metadata
}

val, exists := metadata.Value(root)
if !exists {
rootValue := tcontainer.MarshalMap{}
metadata.Set(root, rootValue)
return rootValue
}

rootValue, err := tcontainer.ConvertToMarshalMap(val, nil)
if err != nil {
rootValue = tcontainer.MarshalMap{}
metadata.Set(root, rootValue)
}
return rootValue
}

// NewGetterFor returns a GetDataFunc function
func NewGetterFor(identifier string) GetDataFunc {
if identifier == "" {
return getPayloadContent
}

// we need a lambda to hide away the second parameter
return func(msg *Message) interface{} {
return getMetadataContent(msg, applyTo)
return getMetadataContent(msg, identifier)
}
}

// NewGetAppliedContentAsStringFunc returns a function that gets message content
// NewStringGetterFor returns a function that gets message content
// as string.
func NewGetAppliedContentAsStringFunc(applyTo string) GetAppliedContentAsStringFunc {
get := NewGetAppliedContentFunc(applyTo)
func NewStringGetterFor(identifier string) GetDataAsStringFunc {
get := NewGetterFor(identifier)
return func(msg *Message) string {
return ConvertToString(get(msg))
}
}

// NewGetAppliedContentAsBytesFunc returns a function that gets message content
// NewBytesGetterFor returns a function that gets message content
// as bytes.
func NewGetAppliedContentAsBytesFunc(applyTo string) GetAppliedContentAsBytesFunc {
get := NewGetAppliedContentFunc(applyTo)
func NewBytesGetterFor(identifier string) GetDataAsBytesFunc {
get := NewGetterFor(identifier)
return func(msg *Message) []byte {
return ConvertToBytes(get(msg))
}
}

// NewMetadataRootGetterFor returns a function that gets a metadata value
// if it is set and if it is a MarshalMap
func NewMetadataRootGetterFor(identifier string) GetMetadataRootFunc {
return func(msg *Message) (tcontainer.MarshalMap, error) {
return getMetadataRoot(msg, identifier)
}
}

// NewForceMetadataRootGetterFor returns a function that always returns a valid
// metadata root. If the key does not exist, it is created. If the key exists
// but is not a MarshalMap, it will be overridden.
func NewForceMetadataRootGetterFor(identifier string) ForceMetadataRootFunc {
return func(msg *Message) tcontainer.MarshalMap {
return forceMetadataRoot(msg, identifier)
}
}

func setMetadataContent(msg *Message, key string, content interface{}) {
if content == nil {
msg.GetMetadata().Delete(key)
Expand All @@ -78,15 +149,15 @@ func setPayloadContent(msg *Message, content interface{}) {
}
}

// NewSetAppliedContentFunc returns SetAppliedContentFunc function to store message content
func NewSetAppliedContentFunc(applyTo string) SetAppliedContentFunc {
if applyTo == "" {
// NewSetterFor returns SetDataFunc function to store message content
func NewSetterFor(identifier string) SetDataFunc {
if identifier == "" {
return setPayloadContent
}

// we need a lambda to hide away the second parameter
return func(msg *Message, content interface{}) {
setMetadataContent(msg, applyTo, content)
setMetadataContent(msg, identifier, content)
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/messagehelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,24 @@ func getMockRouterMessageHelper(streamName string) mockRouterMessageHelper {
}
}

func TestGetAppliedContentFunction(t *testing.T) {
func TestGetTargetDataFunction(t *testing.T) {
expect := ttesting.NewExpect(t)
resultFunc := NewGetAppliedContentFunc("")
resultFunc := NewGetterFor("")

expect.Equal(reflect.Func, reflect.TypeOf(resultFunc).Kind())
}

func TestGetAppliedContentFromPayload(t *testing.T) {
func TestGetTargetDataFromPayload(t *testing.T) {
expect := ttesting.NewExpect(t)
resultFunc := NewGetAppliedContentFunc("")
resultFunc := NewGetterFor("")
msg := NewMessage(nil, []byte("message payload"), nil, 1)

expect.Equal([]byte("message payload"), resultFunc(msg).([]byte))
}

func TestGetAppliedContentFromMetadata(t *testing.T) {
func TestGetTargetDataFromMetadata(t *testing.T) {
expect := ttesting.NewExpect(t)
resultFunc := NewGetAppliedContentFunc("foo")
resultFunc := NewGetterFor("foo")
msg := NewMessage(nil, []byte("message payload"), nil, 1)
msg.GetMetadata().Set("foo", "foo content")

Expand Down
Loading

0 comments on commit edc8a43

Please sign in to comment.