Skip to content

Commit

Permalink
Add support for array parsing in azure-eventhub input (elastic#18585)
Browse files Browse the repository at this point in the history
* add array as parser

* changelog

* update test

(cherry picked from commit c742662)
  • Loading branch information
narph committed May 20, 2020
1 parent 98972eb commit 3ab919e
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Improve ECS categorization field mappings in osquery module. {issue}16176[16176] {pull}17881[17881]
- Add support for v10, v11 and v12 logs on Postgres {issue}13810[13810] {pull}17732[17732]
- Add dashboard for Google Cloud Audit and AWS CloudTrail. {pull}17379[17379]
- Add support for array parsing in azure-eventhub input. {pull}18585[18585]
- Added `observer.vendor`, `observer.product`, and `observer.type` to PANW module events. {pull}18223[18223]
- The `logstash` module can now automatically detect the log file format (JSON or plaintext) and process it accordingly. {issue}9964[9964] {pull}18095[18095]
- Improve ECS categorization field mappings in coredns module. {issue}16159[16159] {pull}18424[18424]
- Improve ECS categorization field mappings in envoyproxy module. {issue}16161[16161] {pull}18395[18395]
Expand Down
32 changes: 25 additions & 7 deletions x-pack/filebeat/input/azureeventhub/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,32 @@ func (a *azureInput) processEvents(event *eventhub.Event, partitionID string) bo

// parseMultipleMessages will try to split the message into multiple ones based on the group field provided by the configuration
func (a *azureInput) parseMultipleMessages(bMessage []byte) []string {
var obj map[string][]interface{}
err := json.Unmarshal(bMessage, &obj)
if err != nil {
a.log.Errorw(fmt.Sprintf("deserializing multiple messages using the group object `records`"), "error", err)
}
var mapObject map[string][]interface{}
var messages []string
if len(obj[expandEventListFromField]) > 0 {
for _, ms := range obj[expandEventListFromField] {
// check if the message is a "records" object containing a list of events
err := json.Unmarshal(bMessage, &mapObject)
if err == nil {
if len(mapObject[expandEventListFromField]) > 0 {
for _, ms := range mapObject[expandEventListFromField] {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
} else {
a.log.Errorw(fmt.Sprintf("serializing message %s", ms), "error", err)
}
}
}
} else {
a.log.Debugf("deserializing multiple messages to a `records` object returning error: %s", err)
// in some cases the message is an array
var arrayObject []interface{}
err = json.Unmarshal(bMessage, &arrayObject)
if err != nil {
// return entire message
a.log.Debugf("deserializing multiple messages to an array returning error: %s", err)
return []string{string(bMessage)}
}
for _, ms := range arrayObject {
js, err := json.Marshal(ms)
if err == nil {
messages = append(messages, string(js))
Expand Down
31 changes: 27 additions & 4 deletions x-pack/filebeat/input/azureeventhub/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/logp"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -70,17 +72,38 @@ func TestProcessEvents(t *testing.T) {
}

func TestParseMultipleMessages(t *testing.T) {
// records object
msg := "{\"records\":[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"
input := azureInput{}
messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
msgs := []string{
fmt.Sprintf("{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"),
fmt.Sprintf("{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}")}
input := azureInput{log: logp.NewLogger(fmt.Sprintf("%s test for input", inputName))}
messages := input.parseMultipleMessages([]byte(msg))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}

// array of events
msg1 := "[{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 2nd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
"{\"test\":\"this is 3rd message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]"
messages = input.parseMultipleMessages([]byte(msg1))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 3)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}

// one event only
msg2 := "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"
messages = input.parseMultipleMessages([]byte(msg2))
assert.NotNil(t, messages)
assert.Equal(t, len(messages), 1)
for _, ms := range messages {
assert.Contains(t, msgs, ms)
}
Expand Down

0 comments on commit 3ab919e

Please sign in to comment.