From a16b0fa250da235680b893741811e39fb7f46cfc Mon Sep 17 00:00:00 2001 From: Valere JEANTET Date: Sat, 27 Jan 2018 23:01:39 +0100 Subject: [PATCH] fix #71 Self-added fileds "message":"" --- core/agent.go | 6 +- core/event.go | 9 +- processors/filter-blacklist/blacklist_test.go | 64 +++++------ processors/filter-change/change_test.go | 106 +++++++++--------- processors/filter-date/date_test.go | 8 +- processors/filter-digest/digest.go | 2 +- processors/filter-digest/digest_test.go | 42 +++---- processors/filter-drop/drop_test.go | 6 +- processors/filter-eval/eval_test.go | 15 +-- processors/filter-exec/exec.go | 2 +- processors/filter-geoip/geoip_test.go | 2 +- processors/filter-grok/grok_test.go | 24 ++-- processors/filter-html/html_test.go | 2 +- processors/filter-json/json_test.go | 6 +- processors/filter-kv/kv_test.go | 32 +++--- processors/filter-mutate/mutate_test.go | 6 +- processors/filter-newterm/newterm_test.go | 34 +++--- processors/filter-split/split.go | 2 +- processors/filter-uuid/uuid_test.go | 4 +- processors/filter-whitelist/whitelist_test.go | 30 ++--- processors/input-beats/beats.go | 2 +- .../input-elasticsearch/elasticinput.go | 6 +- processors/input-event/inputevent.go | 3 +- processors/input-exec/execinput.go | 5 +- processors/input-file/file.go | 7 +- processors/input-httpserver/httpserver.go | 9 +- processors/input-imap/handler.go | 4 +- processors/input-rabbitmq/rabbitmq.go | 10 +- processors/input-stdin/stdin.go | 9 +- processors/input-syslog/sysloginput.go | 2 +- processors/input-tail/tail.go | 11 +- processors/input-twitter/twitter.go | 4 +- processors/input-udp/udpinput.go | 5 +- processors/input-unix/unixinput.go | 10 +- processors/input-websocket/websocket.go | 6 +- processors/ldap/ldap.go | 2 +- processors/output-http/httpoutput_test.go | 12 +- processors/output-statsd/statsd_test.go | 12 +- processors/packet.go | 2 +- processors/pop3/pop3.go | 2 +- processors/testutils/context.go | 4 +- processors/testutils/event.go | 11 +- processors/when/evaluator_test.go | 2 +- 43 files changed, 277 insertions(+), 265 deletions(-) diff --git a/core/agent.go b/core/agent.go index aba32126..da15532c 100644 --- a/core/agent.go +++ b/core/agent.go @@ -165,7 +165,7 @@ func (a *Agent) addOutput(in chan *event, portNumber int) error { // Start agent func (a *Agent) start() error { // Start processor - a.processor.Start(newPacket("start", map[string]interface{}{})) + a.processor.Start(newPacket(map[string]interface{}{"message": "start"})) // Maximum number of concurent packet consumption ? var maxConcurentPackets = a.PoolSize @@ -187,7 +187,7 @@ func (a *Agent) start() error { wg.Wait() Log().Debugf("processor (%d) - stopping (no more packets)", a.ID) - if err := a.processor.Stop(newPacket("", nil)); err != nil { + if err := a.processor.Stop(newPacket(nil)); err != nil { Log().Errorf("%s %d : %v", a.Type, a.ID, err) } close(a.Done) @@ -198,7 +198,7 @@ func (a *Agent) start() error { if a.Schedule != "" { Log().Debugf("agent %s : schedule=%s", a.Label, a.Schedule) err := myScheduler.Add(a.Label, a.Schedule, func() { - go a.processor.Tick(newPacket("", nil)) + go a.processor.Tick(newPacket(nil)) }) if err != nil { Log().Errorf("schedule start failed - %s : %v", a.Label, err) diff --git a/core/event.go b/core/event.go index deef7ae6..8e519c70 100644 --- a/core/event.go +++ b/core/event.go @@ -31,19 +31,14 @@ func (e *event) SetMessage(s string) { func (e *event) Clone() processors.IPacket { nf, _ := e.Fields().Copy() nf["@timestamp"], _ = e.Fields().ValueForPath("@timestamp") - return newPacket(e.Message(), nf) + return newPacket(nf) } -func newPacket(message string, fields map[string]interface{}) processors.IPacket { +func newPacket(fields map[string]interface{}) processors.IPacket { if fields == nil { fields = mxj.Map{} } - // Add message to its field if empty - if _, ok := fields["message"]; !ok { - fields["message"] = message - } - if _, k := fields["@timestamp"]; !k { fields["@timestamp"] = time.Now() } diff --git a/processors/filter-blacklist/blacklist_test.go b/processors/filter-blacklist/blacklist_test.go index f05d3583..96aa5cbc 100644 --- a/processors/filter-blacklist/blacklist_test.go +++ b/processors/filter-blacklist/blacklist_test.go @@ -52,13 +52,13 @@ func TestReceiveMatch(t *testing.T) { "terms": []string{"val1", "val2"}, }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("fqsdf", nil)) - p.Receive(testutils.NewPacket("valo", nil)) - p.Receive(testutils.NewPacket("al1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("fqsdf", nil)) + p.Receive(testutils.NewPacketOld("valo", nil)) + p.Receive(testutils.NewPacketOld("al1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) if assert.Equal(t, 2, ctx.SentPacketsCount(0), "2 events should pass") { assert.Equal(t, "val1", ctx.SentPackets(0)[0].Message()) assert.Equal(t, "val2", ctx.SentPackets(0)[1].Message()) @@ -75,13 +75,13 @@ func TestReceiveDuplicateTermsInConfig(t *testing.T) { "terms": []string{"val1", "val1", "val1", "val1", "val1"}, }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("fqsdf", nil)) - p.Receive(testutils.NewPacket("valo", nil)) - p.Receive(testutils.NewPacket("al1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("fqsdf", nil)) + p.Receive(testutils.NewPacketOld("valo", nil)) + p.Receive(testutils.NewPacketOld("al1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "2 events should pass") { assert.Equal(t, "val1", ctx.SentPackets(0)[0].Message()) } @@ -97,13 +97,13 @@ func TestReceiveAllMessagesMatch(t *testing.T) { "terms": []string{"valo", "test", "val2", "al1", "val3", "val1", "fqsdf"}, }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("fqsdf", nil)) - p.Receive(testutils.NewPacket("valo", nil)) - p.Receive(testutils.NewPacket("al1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("fqsdf", nil)) + p.Receive(testutils.NewPacketOld("valo", nil)) + p.Receive(testutils.NewPacketOld("al1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) if assert.Equal(t, 7, ctx.SentPacketsCount(0), "2 events should pass") { assert.Equal(t, "test", ctx.SentPackets(0)[0].Message()) assert.Equal(t, "fqsdf", ctx.SentPackets(0)[1].Message()) @@ -125,7 +125,7 @@ func TestReceiveMessageIncludingTermsDoNotMatch(t *testing.T) { "terms": []string{"test"}, }, ) - p.Receive(testutils.NewPacket("testtest", nil)) + p.Receive(testutils.NewPacketOld("testtest", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "0 event should pass") } @@ -140,7 +140,7 @@ func TestReceiveFieldIncludingTermsDoNotMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "testtest"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "testtest"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "0 event should pass") } @@ -155,9 +155,9 @@ func TestReceiveFieldNamesAreCaseSensitive(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"myfield": "test"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"myfield": "test"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "myfield != MyField") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "test"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "test"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !") } @@ -172,11 +172,11 @@ func TestReceiveFieldValuesAreCaseSensitive(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "TEST"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "TEST"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "TEST != test") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "Test"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "Test"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "TEST != test") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "test"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "test"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !") } func TestReceiveLongValue(t *testing.T) { @@ -190,10 +190,10 @@ func TestReceiveLongValue(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "TEST"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "TEST"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "No match") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "Test"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "Test"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "No match") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "azertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbn"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "azertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbn"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !") -} \ No newline at end of file +} diff --git a/processors/filter-change/change_test.go b/processors/filter-change/change_test.go index 30d2682a..0310a03e 100644 --- a/processors/filter-change/change_test.go +++ b/processors/filter-change/change_test.go @@ -32,20 +32,20 @@ func TestReceiveMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 1") - p.Receive(testutils.NewPacket("toto", nil)) + p.Receive(testutils.NewPacketOld("toto", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 2") - p.Receive(testutils.NewPacket("toto", nil)) - p.Receive(testutils.NewPacket("toto", nil)) - p.Receive(testutils.NewPacket("toto", nil)) + p.Receive(testutils.NewPacketOld("toto", nil)) + p.Receive(testutils.NewPacketOld("toto", nil)) + p.Receive(testutils.NewPacketOld("toto", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 3") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 4") } @@ -60,19 +60,19 @@ func TestReceiveIgnoreMissingTrue(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 1") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 2") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 3") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 4") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 5") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "B"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "B"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 6") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 7") } @@ -87,19 +87,19 @@ func TestReceiveIgnoreMissingFalse(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 1") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 2") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 3") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 4") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "A"})) assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 5") - p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "B"})) + p.Receive(testutils.NewPacketOld("test", map[string]interface{}{"toto": "B"})) assert.Equal(t, 3, ctx.SentPacketsCount(0), "changed ! 6") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 4, ctx.SentPacketsCount(0), "changed ! 7") } @@ -114,8 +114,8 @@ func TestStopNoTimeFrame(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("test2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("test2", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 1") assert.NoError(t, p.Stop(nil), "no error") } @@ -131,8 +131,8 @@ func TestStopWithTimeFrame(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("test2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("test2", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 1") time.Sleep(time.Second * 1) assert.NoError(t, p.Stop(nil), "no error") @@ -149,52 +149,52 @@ func TestReceiveMatchWithTimeframe(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0") { assert.Equal(t, "test1", ctx.SentPackets(0)[0].Message()) } - p.Receive(testutils.NewPacket("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0") { assert.Equal(t, "test1", ctx.SentPackets(0)[0].Message()) } - p.Receive(testutils.NewPacket("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0") { assert.Equal(t, "test1", ctx.SentPackets(0)[0].Message()) } time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0") { assert.Equal(t, "test1", ctx.SentPackets(0)[0].Message()) } time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test1", nil)) - p.Receive(testutils.NewPacket("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) + p.Receive(testutils.NewPacketOld("test1", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("A", nil)) + p.Receive(testutils.NewPacketOld("A", nil)) assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("B", nil)) + p.Receive(testutils.NewPacketOld("B", nil)) assert.Equal(t, 3, ctx.SentPacketsCount(0), "changed ! 0") time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("B", nil)) + p.Receive(testutils.NewPacketOld("B", nil)) assert.Equal(t, 3, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("B", nil)) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("B", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 4, ctx.SentPacketsCount(0), "changed ! 0") } @@ -210,28 +210,28 @@ func TestReceiveRepetitionsWithinTimeFrameDoNotMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") // 3 repetitions in a row - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") // 1 repetition per second time.Sleep(time.Second * 1) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") time.Sleep(time.Second * 1) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") time.Sleep(time.Second * 1) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") } @@ -246,19 +246,19 @@ func TestReceiveChangesOutOfTimeFrameDoNotMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") // Changes happen out of time frame time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test2", nil)) + p.Receive(testutils.NewPacketOld("test2", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test3", nil)) + p.Receive(testutils.NewPacketOld("test3", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") time.Sleep(time.Second * 2) - p.Receive(testutils.NewPacket("test4", nil)) + p.Receive(testutils.NewPacketOld("test4", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0") } diff --git a/processors/filter-date/date_test.go b/processors/filter-date/date_test.go index c93c3c01..066ba3e1 100644 --- a/processors/filter-date/date_test.go +++ b/processors/filter-date/date_test.go @@ -27,7 +27,7 @@ func TestMatchUnix(t *testing.T) { "timezone": "UTC", } p.Configure(ctx, conf) - em := testutils.NewPacket("", map[string]interface{}{"ts": "1499254601"}) + em := testutils.NewPacketOld("", map[string]interface{}{"ts": "1499254601"}) p.Receive(em) var m msg em.Fields().Struct(&m) @@ -43,7 +43,7 @@ func TestMatchUnixWithMS(t *testing.T) { "timezone": "UTC", } p.Configure(ctx, conf) - em := testutils.NewPacket("", map[string]interface{}{"ts": "1499254601.343"}) + em := testutils.NewPacketOld("", map[string]interface{}{"ts": "1499254601.343"}) p.Receive(em) var m msg em.Fields().Struct(&m) @@ -59,7 +59,7 @@ func TestMatchUnixMS(t *testing.T) { "timezone": "UTC", } p.Configure(ctx, conf) - em := testutils.NewPacket("", map[string]interface{}{"ts": "1499254601343"}) + em := testutils.NewPacketOld("", map[string]interface{}{"ts": "1499254601343"}) p.Receive(em) var m msg em.Fields().Struct(&m) @@ -75,7 +75,7 @@ func TestMatchJODATime(t *testing.T) { "timezone": "Europe/Paris", } p.Configure(ctx, conf) - em := testutils.NewPacket("", map[string]interface{}{"ts": "2017-07-05T11:36:41"}) + em := testutils.NewPacketOld("", map[string]interface{}{"ts": "2017-07-05T11:36:41"}) p.Receive(em) var m msg em.Fields().Struct(&m) diff --git a/processors/filter-digest/digest.go b/processors/filter-digest/digest.go index 75687bcd..fc58adf2 100644 --- a/processors/filter-digest/digest.go +++ b/processors/filter-digest/digest.go @@ -106,7 +106,7 @@ func (p *processor) Tick(e processors.IPacket) error { } } - ne := p.NewPacket("", p.values) + ne := p.NewPacket(p.values) p.opt.ProcessCommonOptions(ne.Fields()) p.Send(ne, PORT_SUCCESS) p.values = map[string]interface{}{} diff --git a/processors/filter-digest/digest_test.go b/processors/filter-digest/digest_test.go index 02f3eab4..5bdb1329 100644 --- a/processors/filter-digest/digest_test.go +++ b/processors/filter-digest/digest_test.go @@ -3,10 +3,11 @@ package digest import ( "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/vjeantet/bitfan/processors/doc" "github.com/vjeantet/bitfan/processors/testutils" - "time" ) func TestNew(t *testing.T) { @@ -23,8 +24,7 @@ func TestMaxConcurent(t *testing.T) { } func TestConfigureEmptyConfigIsInvalid(t *testing.T) { p := New().(*processor) - conf := map[string]interface{}{ - } + conf := map[string]interface{}{} ctx := testutils.NewProcessorContext() err := p.Configure(ctx, conf) assert.EqualError(t, err, "no interval and no Count settings set") @@ -60,13 +60,13 @@ func TestReceiveSimple(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "TEST"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "TEST"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "No match") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"type": "a random value"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"type": "a random value"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "azerty"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "azerty"})) assert.Equal(t, 1, ctx.SentPacketsCount(0), "No match") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"type": "a random value"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"type": "a random value"})) assert.Equal(t, 2, ctx.SentPacketsCount(0), "Two match") } @@ -81,9 +81,9 @@ func TestReceiveMergeTwoEventsWithKeyMap(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"type": "first_value", "key": "value1"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"type": "first_value", "key": "value1"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "Not enough packets") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"type": "second_value", "key": "value2"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"type": "second_value", "key": "value2"})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "Two match") { firstValue, err := ctx.SentPackets(0)[0].Fields().ValueForPath("first_value.key") assert.Nil(t, err, "No error") @@ -105,9 +105,9 @@ func TestReceiveMergeTwoEventsWithoutKeyMap(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"key1": "value1", "key2": "value2"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"key1": "value1", "key2": "value2"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "Not enough packets") - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"key3": "value3", "key4": "value4"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"key3": "value3", "key4": "value4"})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "Two match") { expected := map[string]interface{}{ "key1": "value1", @@ -129,8 +129,8 @@ func TestReceiveNoMatchBeforeCount(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"key1": "value1", "key2": "value2"})) - p.Receive(testutils.NewPacket("hello", map[string]interface{}{"key3": "value3", "key4": "value4"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"key1": "value1", "key2": "value2"})) + p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"key3": "value3", "key4": "value4"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "Two match") } @@ -147,29 +147,29 @@ func TestReceiveTickEverySecond(t *testing.T) { ) // RECEIVE - p.Receive(testutils.NewPacket("hello1", map[string]interface{}{"key1": "value1", "key2": "value2"})) + p.Receive(testutils.NewPacketOld("hello1", map[string]interface{}{"key1": "value1", "key2": "value2"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "No time elapsed and not enough packets") // TICK ! time.Sleep(time.Second) - p.Tick(testutils.NewPacket("", map[string]interface{}{})) + p.Tick(testutils.NewPacketOld("", map[string]interface{}{})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "1 second elapsed but not enough packets") // RECEIVE - p.Receive(testutils.NewPacket("hello2", map[string]interface{}{"key3": "value3", "key4": "value4"})) + p.Receive(testutils.NewPacketOld("hello2", map[string]interface{}{"key3": "value3", "key4": "value4"})) assert.Equal(t, 0, ctx.SentPacketsCount(0), "Enough packets but not enough time elapsed") // TICK ! time.Sleep(time.Second) - p.Tick(testutils.NewPacket("", map[string]interface{}{})) + p.Tick(testutils.NewPacketOld("", map[string]interface{}{})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "Enough packets and enough time sleeping: Go !") { expected := map[string]interface{}{ "message": "hello2", - "key1": "value1", - "key2": "value2", - "key3": "value3", - "key4": "value4", + "key1": "value1", + "key2": "value2", + "key3": "value3", + "key4": "value4", } testutils.AssertValuesForPaths(t, ctx, expected) } diff --git a/processors/filter-drop/drop_test.go b/processors/filter-drop/drop_test.go index f4722c13..6b9a2253 100644 --- a/processors/filter-drop/drop_test.go +++ b/processors/filter-drop/drop_test.go @@ -48,7 +48,7 @@ func TestReceiveDropAll(t *testing.T) { ctx := testutils.NewProcessorContext() p.Configure(ctx, getExampleConfiguration()) for i := 1; i <= 1000; i++ { - em := testutils.NewPacket("", nil) + em := testutils.NewPacketOld("", nil) p.Receive(em) } @@ -62,7 +62,7 @@ func TestReceiveDrop99p(t *testing.T) { p.opt.Percentage = 99 total := 1000 for i := 1; i <= total; i++ { - em := testutils.NewPacket("", nil) + em := testutils.NewPacketOld("", nil) p.Receive(em) } @@ -79,7 +79,7 @@ func TestReceiveDrop80p(t *testing.T) { p.opt.Percentage = 80 total := 10000 for i := 1; i <= total; i++ { - em := testutils.NewPacket("", nil) + em := testutils.NewPacketOld("", nil) p.Receive(em) } diff --git a/processors/filter-eval/eval_test.go b/processors/filter-eval/eval_test.go index c1951991..c1cb53ac 100644 --- a/processors/filter-eval/eval_test.go +++ b/processors/filter-eval/eval_test.go @@ -23,8 +23,7 @@ func TestMaxConcurent(t *testing.T) { func TestConfigureNoExpressionNorTemplate(t *testing.T) { p := New().(*processor) - conf := map[string]interface{}{ - } + conf := map[string]interface{}{} ctx := testutils.NewProcessorContext() err := p.Configure(ctx, conf) assert.EqualError(t, err, "set one expression or go template") @@ -40,7 +39,7 @@ func TestReceiveSimpleMultiplication(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("stats", map[string]interface{}{"usage": float64(1738)})) + p.Receive(testutils.NewPacketOld("stats", map[string]interface{}{"usage": float64(1738)})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") { expected := map[string]interface{}{ "usage": float64(173800), @@ -70,7 +69,7 @@ func TestReceiveSimpleExpressions(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("stats", map[string]interface{}{"usage": oneData[1]})) + p.Receive(testutils.NewPacketOld("stats", map[string]interface{}{"usage": oneData[1]})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") { expected := map[string]interface{}{ "usage": oneData[2], @@ -90,7 +89,7 @@ func TestReceiveLeaveOtherFieldsUnchanged(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("stats", map[string]interface{}{"usage": float64(19), "size": int64(4938), "label": "hello", "percent": float64(18.59)})) + p.Receive(testutils.NewPacketOld("stats", map[string]interface{}{"usage": float64(19), "size": int64(4938), "label": "hello", "percent": float64(18.59)})) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") { expected := map[string]interface{}{ "usage": float64(1900), @@ -102,8 +101,6 @@ func TestReceiveLeaveOtherFieldsUnchanged(t *testing.T) { } } - - func TestTemplate(t *testing.T) { p := New().(*processor) ctx := testutils.NewProcessorContext() @@ -116,7 +113,7 @@ func TestTemplate(t *testing.T) { ) fields := map[string]interface{}{"name": "Jon Doe", "templateName": "EVAL-TEST.tpl", "filter": "EVAL"} - p.Receive(testutils.NewPacket("stats", fields)) + p.Receive(testutils.NewPacketOld("stats", fields)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") { expected := map[string]interface{}{ "mytest": "Hello Jon Doe !\n\nThis template named \"EVAL-TEST.tpl\" (version 1.0) was created for testing the \"EVAL\" filter.\n", @@ -137,7 +134,7 @@ func TestTemplateWithVar(t *testing.T) { ) fields := map[string]interface{}{} - p.Receive(testutils.NewPacket("stats", fields)) + p.Receive(testutils.NewPacketOld("stats", fields)) if assert.Equal(t, 1, ctx.SentPacketsCount(0), "One match") { expected := map[string]interface{}{ "mytest": "Hello Doe Jon !\n\nThis template named \"EVAL-VAR-TEST.tpl\" (version 1.0) was created for testing the \"EVAL\" filter with var template.\n", diff --git a/processors/filter-exec/exec.go b/processors/filter-exec/exec.go index f2c4335a..375981da 100644 --- a/processors/filter-exec/exec.go +++ b/processors/filter-exec/exec.go @@ -90,7 +90,7 @@ func (p *processor) Receive(e processors.IPacket) error { // recover @timestamp dat["@timestamp"], _ = e.Fields().ValueForPath("@timestamp") - e = p.NewPacket("", dat) + e = p.NewPacket(dat) } else { value := strings.TrimSpace(string(d)) err := e.Fields().SetValueForPath(value, p.opt.Target) diff --git a/processors/filter-geoip/geoip_test.go b/processors/filter-geoip/geoip_test.go index c4a1b459..222df893 100644 --- a/processors/filter-geoip/geoip_test.go +++ b/processors/filter-geoip/geoip_test.go @@ -50,7 +50,7 @@ func TestInvalidConfiguration(t *testing.T) { func TestNormalCases(t *testing.T) { Convey("Given an existing event with a valid ip", t, func() { - event := testutils.NewPacket("", map[string]interface{}{}) + event := testutils.NewPacketOld("", map[string]interface{}{}) conf := map[string]interface{}{ "database": setupTmpDatabase("GeoIP2-City-Test.mmdb"), "source": "ip", diff --git a/processors/filter-grok/grok_test.go b/processors/filter-grok/grok_test.go index 79659e18..7bac90e8 100644 --- a/processors/filter-grok/grok_test.go +++ b/processors/filter-grok/grok_test.go @@ -68,7 +68,7 @@ func TestReceive(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) p.Receive(em) assert.Equal(t, "evita", em.Fields().ValueOrEmptyForPathString("logsource"), "field value not proprely groked") @@ -86,7 +86,7 @@ func TestReceiveFailure(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("hello world", nil) + em := testutils.NewPacketOld("hello world", nil) em.Fields().SetValueForPath("VALUE", "field1") // em.On("Pipe", PORT_SUCCESS).Return(nil) @@ -112,7 +112,7 @@ func TestRemoveTagNoTags(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) // em.Fields().SetValueForPath([]string{"myTag", "field1", "myTag2"}, "notags") // em.On("Pipe", PORT_SUCCESS).Return(nil) @@ -134,7 +134,7 @@ func TestRemoveTag(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) em.Fields().SetValueForPath([]string{"myTag", "field1", "myTag2"}, "tags") // em.Fields().SetValueForPath("newvalue", "upfield3") // em.Fields().SetValueForPath("myValue", "rnfieldA") @@ -163,7 +163,7 @@ func TestAddTagToNoTags(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) // em.On("Pipe", PORT_SUCCESS).Return(nil) p.Receive(em) @@ -185,7 +185,7 @@ func TestAddTag(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) em.Fields().SetValueForPath([]string{"myTag", "field1", "myTag2"}, "tags") // em.Fields().SetValueForPath("newvalue", "upfield3") // em.Fields().SetValueForPath("myValue", "rnfieldA") @@ -213,7 +213,7 @@ func TestRemoveField(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) em.Fields().SetValueForPath("valueA", "field1") em.Fields().SetValueForPath("valueB", "field2") // em.Fields().SetValueForPath("newvalue", "upfield3") @@ -240,7 +240,7 @@ func TestAddField(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) em.Fields().SetValueForPath("valueB", "field2") // em.Fields().SetValueForPath("newvalue", "upfield3") // em.Fields().SetValueForPath("myValue", "rnfieldA") @@ -281,7 +281,7 @@ func TestKeep_empty_captures(t *testing.T) { }) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket(`128.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"`, nil) + em := testutils.NewPacketOld(`128.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"`, nil) // em.On("Pipe", PORT_SUCCESS).Return(nil) p.Receive(em) // em.AssertExpectations(t) @@ -298,7 +298,7 @@ func TestKeep_empty_capturesFalse(t *testing.T) { }) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket(`127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"`, nil) + em := testutils.NewPacketOld(`127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"`, nil) // em.On("Pipe", PORT_SUCCESS).Return(nil) p.Receive(em) // em.AssertExpectations(t) @@ -324,7 +324,7 @@ func TestBreak_on_matchFalse(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) // em.On("Pipe", PORT_SUCCESS).Return(nil) p.Receive(em) // em.AssertExpectations(t) @@ -348,7 +348,7 @@ func TestBreak_on_matchTrue(t *testing.T) { p.Configure(ctx, conf) //NewTestEvent(sourceAgentName string, message string, fields map[string]interface{}) Event { - em := testutils.NewPacket("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) + em := testutils.NewPacketOld("Mar 16 00:01:25 evita postfix/smtpd[1713]: connect from camomile.cloud9.net[168.100.1.3]", nil) // em.On("Pipe", PORT_SUCCESS).Return(nil) p.Receive(em) // em.AssertExpectations(t) diff --git a/processors/filter-html/html_test.go b/processors/filter-html/html_test.go index 8760e8cf..fa7b0ab3 100644 --- a/processors/filter-html/html_test.go +++ b/processors/filter-html/html_test.go @@ -63,7 +63,7 @@ func TestReceiveHTML1(t *testing.T) { ctx := testutils.NewProcessorContext() p.Configure(ctx, getExampleConfiguration()) - em := testutils.NewPacket("", nil) + em := testutils.NewPacketOld("", nil) em.Fields().SetValueForPath(HTML1, "source") p.Receive(em) diff --git a/processors/filter-json/json_test.go b/processors/filter-json/json_test.go index 53c5a26d..2d1620b1 100644 --- a/processors/filter-json/json_test.go +++ b/processors/filter-json/json_test.go @@ -45,7 +45,7 @@ func TestInvalidConfiguration(t *testing.T) { func TestJsonFilter(t *testing.T) { Convey("Given an existing event source field contains JSON", t, func() { - event := testutils.NewPacket("", map[string]interface{}{ + event := testutils.NewPacketOld("", map[string]interface{}{ "thejson": `{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }`, }) conf := map[string]interface{}{ @@ -117,7 +117,7 @@ func TestJsonFilter(t *testing.T) { func TestMissingSourceField(t *testing.T) { - event := testutils.NewPacket("", map[string]interface{}{ + event := testutils.NewPacketOld("", map[string]interface{}{ "thejson": `{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }`, }) conf := map[string]interface{}{ @@ -144,7 +144,7 @@ func TestMissingSourceField(t *testing.T) { } func TestInvalidJsonData(t *testing.T) { - event := testutils.NewPacket("", map[string]interface{}{ + event := testutils.NewPacketOld("", map[string]interface{}{ "thejson": `, 3 ], "hash": { "k": "v" } }`, }) conf := map[string]interface{}{ diff --git a/processors/filter-kv/kv_test.go b/processors/filter-kv/kv_test.go index df32f9c9..b48d1f3f 100644 --- a/processors/filter-kv/kv_test.go +++ b/processors/filter-kv/kv_test.go @@ -52,7 +52,7 @@ func TestInvalidSource(t *testing.T) { "source": "foo", } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("blababla", map[string]interface{}{ + event := testutils.NewPacketOld("blababla", map[string]interface{}{ "foo": []string{ "hello=world foo=bar", "hello2=world2 foo2=bar2", @@ -83,7 +83,7 @@ func TestInvalidSource(t *testing.T) { "source": "foo", } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("blababla", map[string]interface{}{ + event := testutils.NewPacketOld("blababla", map[string]interface{}{ "foo": 43, }) p.Receive(event) @@ -104,7 +104,7 @@ func TestAllowDuplicateValues(t *testing.T) { "allow_duplicate_values": false, } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("foo=yeah&foo=yeah&foo=bar", nil) + event := testutils.NewPacketOld("foo=yeah&foo=yeah&foo=bar", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -124,7 +124,7 @@ func TestAllowDuplicateValues(t *testing.T) { "allow_duplicate_values": false, } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("foo=bar&foo=yeah&foo=yeah", nil) + event := testutils.NewPacketOld("foo=bar&foo=yeah&foo=yeah", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -145,7 +145,7 @@ func TestAllowDuplicateValues(t *testing.T) { "allow_duplicate_values": true, } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("foo=yeah&foo=yeah&foo=yeah&foo=bar", nil) + event := testutils.NewPacketOld("foo=yeah&foo=yeah&foo=yeah&foo=bar", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -172,7 +172,7 @@ func TestDefaultKeys(t *testing.T) { Convey("Then ...", func() { p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -193,7 +193,7 @@ func TestDefaultKeys(t *testing.T) { Convey("Then with a specific target", func() { conf["target"] = "kv" p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -221,7 +221,7 @@ func TestTarget(t *testing.T) { p, _ := testutils.NewProcessor(New, conf) Convey("Then ...", func() { - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -251,7 +251,7 @@ func TestTrimKey(t *testing.T) { p, _ := testutils.NewProcessor(New, conf) Convey("Then ...", func() { - event := testutils.NewPacket("key1= value1 with spaces | key2 with spaces =value2", nil) + event := testutils.NewPacketOld("key1= value1 with spaces | key2 with spaces =value2", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -275,7 +275,7 @@ func TestExcludeKeys(t *testing.T) { p, _ := testutils.NewProcessor(New, conf) Convey("Then the specified keys are not valued into the event", func() { - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -306,7 +306,7 @@ func TestIncludeKeys(t *testing.T) { p, _ := testutils.NewProcessor(New, conf) Convey("Then only then specified keys are valued into the event", func() { - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world'", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) em := p.SentPackets(0)[0] @@ -337,7 +337,7 @@ func TestValueSplitUsingAlternateSplitter(t *testing.T) { "value_split": ":", } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hello:=world foo:bar baz=:fizz doublequoted:\"hello world\" singlequoted:'hello world' brackets:(hello world)", nil) + event := testutils.NewPacketOld("hello:=world foo:bar baz=:fizz doublequoted:\"hello world\" singlequoted:'hello world' brackets:(hello world)", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) @@ -363,7 +363,7 @@ func TestSpacesAttachedFields(t *testing.T) { Convey("When spaces are arround key pair value", t, func() { conf := map[string]interface{}{} p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hello = world foo =bar baz= fizz doublequoted = \"hello world\" singlequoted= 'hello world' brackets =(hello world)", nil) + event := testutils.NewPacketOld("hello = world foo =bar baz= fizz doublequoted = \"hello world\" singlequoted= 'hello world' brackets =(hello world)", nil) p.Receive(event) So(p.SentPacketsCount(0), ShouldEqual, 1) @@ -389,7 +389,7 @@ func TestSpacesAttachedFields(t *testing.T) { "value_split": ":", } p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket(`IKE:=Quick\ Mode\ completion IKE\ IDs:=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y`, nil) + event := testutils.NewPacketOld(`IKE:=Quick\ Mode\ completion IKE\ IDs:=subnet:\ x.x.x.x\ (mask=\ 255.255.255.254)\ and\ host:\ y.y.y.y`, nil) Convey("Then the produced event results in new fields/values for each keypair", func() { p.Receive(event) @@ -413,7 +413,7 @@ func TestDefaults(t *testing.T) { conf := map[string]interface{}{} Convey("When processor receive an event with a message containing key=value pairs", func() { p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world' bracketsone=(hello world) bracketstwo=[hello world] bracketsthree=", nil) + event := testutils.NewPacketOld("hello=world foo=bar baz=fizz doublequoted=\"hello world\" singlequoted='hello world' bracketsone=(hello world) bracketstwo=[hello world] bracketsthree=", nil) p.Receive(event) Convey("Then the produced event results in new fields/values for each keypair", func() { @@ -438,7 +438,7 @@ func TestDefaults(t *testing.T) { }) Convey("When processor receive an event with a invalid message", func() { p, _ := testutils.NewProcessor(New, conf) - event := testutils.NewPacket("hree", nil) + event := testutils.NewPacketOld("hree", nil) eventCopy := event.Clone() p.Receive(event) diff --git a/processors/filter-mutate/mutate_test.go b/processors/filter-mutate/mutate_test.go index 1897a361..c7d38cf6 100644 --- a/processors/filter-mutate/mutate_test.go +++ b/processors/filter-mutate/mutate_test.go @@ -87,7 +87,7 @@ func TestReceive(t *testing.T) { p.Configure(ctx, getExampleConfiguration()) - em := testutils.NewPacket("test", nil) + em := testutils.NewPacketOld("test", nil) em.Fields().SetValueForPath("VALUE", "field1") em.Fields().SetValueForPath("loRem", "ucfield2") em.Fields().SetValueForPath("newvalue", "upfield3") @@ -139,7 +139,7 @@ func TestReceiveRemoveAllBut(t *testing.T) { } p.Configure(ctx, conf) - em := testutils.NewPacket("test", nil) + em := testutils.NewPacketOld("test", nil) em.Fields().SetValueForPath("VALUE", "field1") em.Fields().SetValueForPath("loRem", "ucfield2") em.Fields().SetValueForPath("newvalue", "upfield3") @@ -713,7 +713,7 @@ func TestMerge(t *testing.T) { // https://github.com/vjeantet/bitfan/issues/71 func TestNoEmptyTags(t *testing.T) { Convey("When no option about tags is involved", t, func() { - event := testutils.NewPacket("", map[string]interface{}{}) + event := testutils.NewPacketOld("", map[string]interface{}{}) conf := map[string]interface{}{ "target": "name1", } diff --git a/processors/filter-newterm/newterm_test.go b/processors/filter-newterm/newterm_test.go index 05747518..eb0d28bc 100644 --- a/processors/filter-newterm/newterm_test.go +++ b/processors/filter-newterm/newterm_test.go @@ -33,19 +33,19 @@ func TestReceiveMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "new term") - p.Receive(testutils.NewPacket("test", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "no new term") - p.Receive(testutils.NewPacket("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "no new term") - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) assert.Equal(t, 1, ctx.SentPacketsCount(0), "no new term") - p.Receive(testutils.NewPacket("valo", nil)) - p.Receive(testutils.NewPacket("al1", nil)) - p.Receive(testutils.NewPacket("val3", nil)) + p.Receive(testutils.NewPacketOld("valo", nil)) + p.Receive(testutils.NewPacketOld("al1", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) assert.Equal(t, 4, ctx.SentPacketsCount(0), "3 new term") } @@ -61,10 +61,10 @@ func TestReceiveMissingFieldIgnoreTrue(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val2", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val4", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val4", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "no event pass") } @@ -80,9 +80,9 @@ func TestReceiveMissingFieldIgnoreFalse(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) assert.Equal(t, 4, ctx.SentPacketsCount(0), "all events pass") } diff --git a/processors/filter-split/split.go b/processors/filter-split/split.go index ab656627..207b300f 100644 --- a/processors/filter-split/split.go +++ b/processors/filter-split/split.go @@ -59,7 +59,7 @@ func (p *processor) Receive(e processors.IPacket) error { p.opt.ProcessCommonOptions(&cp) // e := processors.NewEvent(e.ToAgentName(), e.Message(), cp) - e2 := p.NewPacket(e.Message(), cp) + e2 := p.NewPacket(cp) p.Send(e2, 0) } diff --git a/processors/filter-uuid/uuid_test.go b/processors/filter-uuid/uuid_test.go index 7327a0f8..874d16ae 100644 --- a/processors/filter-uuid/uuid_test.go +++ b/processors/filter-uuid/uuid_test.go @@ -43,7 +43,7 @@ func TestInvalidConfiguration(t *testing.T) { func TestUuidFilterGeneration(t *testing.T) { Convey("When the target field name does not exist", t, func() { - event := testutils.NewPacket("", map[string]interface{}{}) + event := testutils.NewPacketOld("", map[string]interface{}{}) conf := map[string]interface{}{ "target": "name1", } @@ -60,7 +60,7 @@ func TestUuidFilterGeneration(t *testing.T) { }) Convey("When the target field exists", t, func() { - event := testutils.NewPacket("", map[string]interface{}{ + event := testutils.NewPacketOld("", map[string]interface{}{ "name1": "test", }) diff --git a/processors/filter-whitelist/whitelist_test.go b/processors/filter-whitelist/whitelist_test.go index d7b46bdc..008d0a5a 100644 --- a/processors/filter-whitelist/whitelist_test.go +++ b/processors/filter-whitelist/whitelist_test.go @@ -53,13 +53,13 @@ func TestReceiveMatch(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("test", nil)) - p.Receive(testutils.NewPacket("fqsdf", nil)) - p.Receive(testutils.NewPacket("valo", nil)) - p.Receive(testutils.NewPacket("al1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val2", nil)) + p.Receive(testutils.NewPacketOld("test", nil)) + p.Receive(testutils.NewPacketOld("fqsdf", nil)) + p.Receive(testutils.NewPacketOld("valo", nil)) + p.Receive(testutils.NewPacketOld("al1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) assert.Equal(t, 5, ctx.SentPacketsCount(0), "5 events should pass") } @@ -75,10 +75,10 @@ func TestReceiveMissingFieldIgnoreTrue(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val2", nil)) - p.Receive(testutils.NewPacket("val3", nil)) - p.Receive(testutils.NewPacket("val4", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val2", nil)) + p.Receive(testutils.NewPacketOld("val3", nil)) + p.Receive(testutils.NewPacketOld("val4", nil)) assert.Equal(t, 0, ctx.SentPacketsCount(0), "no event pass") } @@ -94,9 +94,9 @@ func TestReceiveMissingFieldIgnoreFalse(t *testing.T) { }, ) - p.Receive(testutils.NewPacket("val2d", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) - p.Receive(testutils.NewPacket("val1", nil)) + p.Receive(testutils.NewPacketOld("val2d", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) + p.Receive(testutils.NewPacketOld("val1", nil)) assert.Equal(t, 4, ctx.SentPacketsCount(0), "all events pass") } diff --git a/processors/input-beats/beats.go b/processors/input-beats/beats.go index ab6fa37c..2971ba00 100644 --- a/processors/input-beats/beats.go +++ b/processors/input-beats/beats.go @@ -144,7 +144,7 @@ func (p *processor) Start(e processors.IPacket) error { fields["@timestamp"], _ = time.Parse("2006-01-02T15:04:05Z07:00", val.(string)) } - ev := p.NewPacket("", fields) + ev := p.NewPacket(fields) p.opt.ProcessCommonOptions(ev.Fields()) p.Send(ev, 0) } diff --git a/processors/input-elasticsearch/elasticinput.go b/processors/input-elasticsearch/elasticinput.go index 757c21d1..b0f0744b 100644 --- a/processors/input-elasticsearch/elasticinput.go +++ b/processors/input-elasticsearch/elasticinput.go @@ -88,9 +88,11 @@ func (p *processor) Start(e processors.IPacket) error { fields, err := mxj.NewMapJson(*hit.Source) if err != nil { p.Logger.Error(err.Error()) - e = p.NewPacket(string(*hit.Source), nil) + e = p.NewPacket(map[string]interface{}{ + "message": string(*hit.Source), + }) } else { - e = p.NewPacket("", fields) + e = p.NewPacket(fields) } // if err != nil { diff --git a/processors/input-event/inputevent.go b/processors/input-event/inputevent.go index 6880d1b8..99b4505a 100644 --- a/processors/input-event/inputevent.go +++ b/processors/input-event/inputevent.go @@ -55,8 +55,7 @@ func (p *processor) Tick(e processors.IPacket) error { p.wg.Add(1) for i := 1; i <= p.opt.Count; i++ { e := p.NewPacket( - p.opt.Message, - map[string]interface{}{"number": i}, + map[string]interface{}{"message": p.opt.Message, "number": i}, ) p.opt.ProcessCommonOptions(e.Fields()) p.Send(e) diff --git a/processors/input-exec/execinput.go b/processors/input-exec/execinput.go index f8aa9b3d..b592cb6d 100644 --- a/processors/input-exec/execinput.go +++ b/processors/input-exec/execinput.go @@ -100,8 +100,9 @@ func (p *processor) Tick(e processors.IPacket) error { return err } } else { - ne := p.NewPacket(data, map[string]interface{}{ - "host": p.host, + ne := p.NewPacket(map[string]interface{}{ + "message": data, + "host": p.host, }) ne.Fields().SetValueForPath(record, "stdout") ne.Fields().SetValueForPath(p.opt.Command, "command") diff --git a/processors/input-file/file.go b/processors/input-file/file.go index 124ec2c5..873624cb 100644 --- a/processors/input-file/file.go +++ b/processors/input-file/file.go @@ -301,18 +301,19 @@ func (p *processor) readfile(pathfile string) error { var e processors.IPacket switch v := record.(type) { case string: - e = p.NewPacket(v, map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ + "message": v, "host": p.host, "basename": filepath.Base(pathfile), "path": pathfile, }) case map[string]interface{}: - e = p.NewPacket("", v) + e = p.NewPacket(v) e.Fields().SetValueForPath(p.host, "host") e.Fields().SetValueForPath(filepath.Base(pathfile), "basename") e.Fields().SetValueForPath(pathfile, "path") case []interface{}: - e = p.NewPacket("", map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ "host": p.host, "basename": filepath.Base(pathfile), "path": pathfile, diff --git a/processors/input-httpserver/httpserver.go b/processors/input-httpserver/httpserver.go index 08a5b15c..60f97f45 100644 --- a/processors/input-httpserver/httpserver.go +++ b/processors/input-httpserver/httpserver.go @@ -173,18 +173,19 @@ func (p *processor) HttpHandler(w http.ResponseWriter, r *http.Request) { var e processors.IPacket switch v := record.(type) { case nil: - e = p.NewPacket("", map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ "request": req, }) case string: - e = p.NewPacket(v, map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ + "message": v, "request": req, }) case map[string]interface{}: - e = p.NewPacket("", v) + e = p.NewPacket(v) e.Fields().SetValueForPath(req, "request") case []interface{}: - e = p.NewPacket("", map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ "request": req, "data": v, }) diff --git a/processors/input-imap/handler.go b/processors/input-imap/handler.go index a4b7779c..dcc83383 100644 --- a/processors/input-imap/handler.go +++ b/processors/input-imap/handler.go @@ -14,7 +14,9 @@ type toJsonHandler struct { func (hnd *toJsonHandler) Deliver(email string) error { docJSON, _ := json.Marshal(getMsg(email)) - e := hnd.packetFactory(string(docJSON), nil) + e := hnd.packetFactory(map[string]interface{}{ + "message": string(docJSON), + }) hnd.send(e, 0) return nil } diff --git a/processors/input-rabbitmq/rabbitmq.go b/processors/input-rabbitmq/rabbitmq.go index deab5f10..608e107a 100644 --- a/processors/input-rabbitmq/rabbitmq.go +++ b/processors/input-rabbitmq/rabbitmq.go @@ -255,13 +255,17 @@ func (p *processor) parse(message []byte) processors.IPacket { fields, err := mxj.NewMapJson(message) if err != nil { p.Logger.Errorf(err.Error()) - event = p.NewPacket(string(message), nil) + event = p.NewPacket(map[string]interface{}{ + "message": string(message), + }) } else { - event = p.NewPacket("", fields) + event = p.NewPacket(fields) } default: - event = p.NewPacket(string(message), nil) + event = p.NewPacket(map[string]interface{}{ + "message": string(message), + }) } return event diff --git a/processors/input-stdin/stdin.go b/processors/input-stdin/stdin.go index e83e1450..4348f9c0 100644 --- a/processors/input-stdin/stdin.go +++ b/processors/input-stdin/stdin.go @@ -108,14 +108,15 @@ func (p *processor) Start(e processors.IPacket) error { switch v := msg.(type) { case string: - ne = p.NewPacket(v, map[string]interface{}{ - "host": p.host, + ne = p.NewPacket(map[string]interface{}{ + "message": v, + "host": p.host, }) case map[string]interface{}: - ne = p.NewPacket("", v) + ne = p.NewPacket(v) ne.Fields().SetValueForPath(p.host, "host") case []interface{}: - ne = p.NewPacket("", map[string]interface{}{ + ne = p.NewPacket(map[string]interface{}{ "host": p.host, "data": v, }) diff --git a/processors/input-syslog/sysloginput.go b/processors/input-syslog/sysloginput.go index 93ffeaf5..bc6d1a7b 100644 --- a/processors/input-syslog/sysloginput.go +++ b/processors/input-syslog/sysloginput.go @@ -94,7 +94,7 @@ func (p *processor) Start(e processors.IPacket) error { message["@timestamp"] = message["timestamp"].(time.Time) delete(message, "timestamp") - ne := p.NewPacket("", message) + ne := p.NewPacket(message) p.opt.ProcessCommonOptions(ne.Fields()) p.Send(ne) } diff --git a/processors/input-tail/tail.go b/processors/input-tail/tail.go index 2d1e10a6..93dfc526 100644 --- a/processors/input-tail/tail.go +++ b/processors/input-tail/tail.go @@ -335,16 +335,17 @@ func (p *processor) tailFile(path string, q chan bool) error { var e processors.IPacket switch v := record.(type) { case string: - e = p.NewPacket(v, map[string]interface{}{ - "host": p.host, - "path": path, + e = p.NewPacket(map[string]interface{}{ + "message": v, + "host": p.host, + "path": path, }) case map[string]interface{}: - e = p.NewPacket("", v) + e = p.NewPacket(v) e.Fields().SetValueForPath(p.host, "host") e.Fields().SetValueForPath(path, "path") case []interface{}: - e = p.NewPacket("", map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ "host": p.host, "path": path, "data": v, diff --git a/processors/input-twitter/twitter.go b/processors/input-twitter/twitter.go index d155a7c4..5111a166 100644 --- a/processors/input-twitter/twitter.go +++ b/processors/input-twitter/twitter.go @@ -150,8 +150,8 @@ func (p *processor) doStream(stream *anaconda.Stream, packet processors.IPacket, if err == nil { r["@timestamp"] = createdAtTime } - - e := p.NewPacket(t.Text, r) + r["message"] = t.Text + e := p.NewPacket(r) processors.AddFields(opt.Add_field, e.Fields()) if len(opt.Tags) > 0 { processors.AddTags(opt.Tags, e.Fields()) diff --git a/processors/input-udp/udpinput.go b/processors/input-udp/udpinput.go index 5f3ff6fc..71e750f3 100644 --- a/processors/input-udp/udpinput.go +++ b/processors/input-udp/udpinput.go @@ -66,8 +66,9 @@ func (p *processor) Start(e processors.IPacket) error { p.Logger.Errorf("ReadFromUDP: %v input-udp goroutine exiting", err) return } - ne := p.NewPacket(string(buf[:buflen]), map[string]interface{}{ - "host": saddr.IP.String(), + ne := p.NewPacket(map[string]interface{}{ + "message": string(buf[:buflen]), + "host": saddr.IP.String(), }) p.opt.ProcessCommonOptions(ne.Fields()) diff --git a/processors/input-unix/unixinput.go b/processors/input-unix/unixinput.go index c4b0b294..09de2d41 100644 --- a/processors/input-unix/unixinput.go +++ b/processors/input-unix/unixinput.go @@ -146,7 +146,7 @@ func (p *processor) parse(conn net.Conn) { p.Logger.Errorf(err.Error()) } message := strings.TrimSpace(string(buf[:buflen])) - event := p.NewPacket(message, mxj.Map{}) + event := p.NewPacket(mxj.Map{"message": message}) p.opt.ProcessCommonOptions(event.Fields()) p.Send(event) @@ -154,9 +154,9 @@ func (p *processor) parse(conn net.Conn) { json, raw, err := mxj.NewMapJsonReaderRaw(conn) if err != nil { p.Logger.Errorf(err.Error()) - event = p.NewPacket(string(raw), nil) + event = p.NewPacket(mxj.Map{"message": string(raw)}) } else { - event = p.NewPacket("", json) + event = p.NewPacket(json) } p.opt.ProcessCommonOptions(event.Fields()) p.Send(event) @@ -165,9 +165,9 @@ func (p *processor) parse(conn net.Conn) { xml, raw, err := mxj.NewMapXmlReaderRaw(conn) if err != nil { p.Logger.Errorf(err.Error()) - event = p.NewPacket(string(raw), nil) + event = p.NewPacket(mxj.Map{"message": string(raw)}) } else { - event = p.NewPacket("", xml) + event = p.NewPacket(xml) } p.opt.ProcessCommonOptions(event.Fields()) p.Send(event) diff --git a/processors/input-websocket/websocket.go b/processors/input-websocket/websocket.go index 9c26390e..4a6124ce 100644 --- a/processors/input-websocket/websocket.go +++ b/processors/input-websocket/websocket.go @@ -130,11 +130,11 @@ func (p *processor) processMessage(m []byte) { case nil: continue case string: - e = p.NewPacket(v, map[string]interface{}{}) + e = p.NewPacket(map[string]interface{}{"message": v}) case map[string]interface{}: - e = p.NewPacket("", v) + e = p.NewPacket(v) case []interface{}: - e = p.NewPacket("", map[string]interface{}{ + e = p.NewPacket(map[string]interface{}{ "request": v, }) default: diff --git a/processors/ldap/ldap.go b/processors/ldap/ldap.go index f9d54588..0dd2d7f9 100644 --- a/processors/ldap/ldap.go +++ b/processors/ldap/ldap.go @@ -229,7 +229,7 @@ func (p *processor) Receive(e processors.IPacket) error { } if p.opt.EventBy == "row" { - e2 := p.NewPacket("", nil) + e2 := p.NewPacket(nil) e2.Fields().SetValueForPath(p.opt.Host, "host") if len(p.opt.Var) > 0 { e2.Fields().SetValueForPath(p.opt.Var, "var") diff --git a/processors/output-http/httpoutput_test.go b/processors/output-http/httpoutput_test.go index e306ff20..9d02d940 100644 --- a/processors/output-http/httpoutput_test.go +++ b/processors/output-http/httpoutput_test.go @@ -46,9 +46,9 @@ func TestDefault(t *testing.T) { assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") assert.NoError(t, p.Start(nil)) - assert.NoError(t, p.Receive(testutils.NewPacket("msg 1", map[string]interface{}{"abc1": "def1", "1": 123, "@timestamp": "ts"}))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("msg 1", map[string]interface{}{"abc1": "def1", "1": 123, "@timestamp": "ts"}))) assert.Equal(t, map[string]interface{}{"message": "msg 1", "abc1": "def1", "1": 123.0, "@timestamp": "ts"}, <-c) - assert.NoError(t, p.Receive(testutils.NewPacket("message 2", map[string]interface{}{"abc2": "def2", "2": 456, "@timestamp": "ts"}))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message 2", map[string]interface{}{"abc2": "def2", "2": 456, "@timestamp": "ts"}))) assert.Equal(t, map[string]interface{}{"message": "message 2", "abc2": "def2", "2": 456.0, "@timestamp": "ts"}, <-c) assert.NoError(t, p.Stop(nil)) } @@ -76,8 +76,8 @@ func TestLine(t *testing.T) { assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") assert.NoError(t, p.Start(nil)) - assert.NoError(t, p.Receive(testutils.NewPacket("message1", map[string]interface{}{"abc": "def1", "n": 123}))) - assert.NoError(t, p.Receive(testutils.NewPacket("message2", map[string]interface{}{"abc": "def2", "n": 456}))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message1", map[string]interface{}{"abc": "def1", "n": 123}))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message2", map[string]interface{}{"abc": "def2", "n": 456}))) assert.Equal(t, "message1\tdef1\t123\nmessage2\tdef2\t456\n", <-c) assert.NoError(t, p.Stop(nil)) } @@ -113,7 +113,7 @@ func TestRetry(t *testing.T) { } assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") assert.NoError(t, p.Start(nil)) - assert.NoError(t, p.Receive(testutils.NewPacket("message", nil))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message", nil))) assert.Equal(t, "500", <-c) assert.Equal(t, "message\n", <-c) assert.NoError(t, p.Stop(nil)) @@ -130,7 +130,7 @@ func TestStopInRetry(t *testing.T) { } assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") assert.NoError(t, p.Start(nil)) - assert.NoError(t, p.Receive(testutils.NewPacket("doom message", nil))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("doom message", nil))) time.Sleep(time.Second) assert.NoError(t, p.Stop(nil)) } diff --git a/processors/output-statsd/statsd_test.go b/processors/output-statsd/statsd_test.go index c794c72d..5c829465 100644 --- a/processors/output-statsd/statsd_test.go +++ b/processors/output-statsd/statsd_test.go @@ -24,17 +24,17 @@ func TestMetricBuild(t *testing.T) { "sender": "%{message}", } assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") - assert.Equal(t, "200.response.total.200", p.dynamicKey("response.total.%{message}", testutils.NewPacket("200", nil))) - assert.Equal(t, "400.response.total.400.100", p.dynamicKey("response.total.%{message}.%{int}", testutils.NewPacket("400", map[string]interface{}{"int": 100}))) - assert.Equal(t, "message.message.message", p.dynamicKey("%{message}.%{message}", testutils.NewPacket("message", nil))) + assert.Equal(t, "200.response.total.200", p.dynamicKey("response.total.%{message}", testutils.NewPacketOld("200", nil))) + assert.Equal(t, "400.response.total.400.100", p.dynamicKey("response.total.%{message}.%{int}", testutils.NewPacketOld("400", map[string]interface{}{"int": 100}))) + assert.Equal(t, "message.message.message", p.dynamicKey("%{message}.%{message}", testutils.NewPacketOld("message", nil))) - v, err := p.dynamicValue("%{float}", testutils.NewPacket("message", map[string]interface{}{"float": 12.123})) + v, err := p.dynamicValue("%{float}", testutils.NewPacketOld("message", map[string]interface{}{"float": 12.123})) assert.NoError(t, err) assert.Equal(t, 12.123, v) - v, err = p.dynamicValue("%{int}", testutils.NewPacket("message", map[string]interface{}{"int": 123})) + v, err = p.dynamicValue("%{int}", testutils.NewPacketOld("message", map[string]interface{}{"int": 123})) assert.NoError(t, err) assert.Equal(t, 123.0, v) - v, err = p.dynamicValue("%{str}", testutils.NewPacket("message", map[string]interface{}{"str": "4444.99"})) + v, err = p.dynamicValue("%{str}", testutils.NewPacketOld("message", map[string]interface{}{"str": "4444.99"})) assert.NoError(t, err) assert.Equal(t, 4444.99, v) } diff --git a/processors/packet.go b/processors/packet.go index 78d87c91..bdcb7025 100644 --- a/processors/packet.go +++ b/processors/packet.go @@ -12,6 +12,6 @@ type IPacket interface { Clone() IPacket } -type PacketBuilder func(string, map[string]interface{}) IPacket +type PacketBuilder func(map[string]interface{}) IPacket type PacketSender func(IPacket, ...int) bool diff --git a/processors/pop3/pop3.go b/processors/pop3/pop3.go index c0fe95ca..3f14fe62 100644 --- a/processors/pop3/pop3.go +++ b/processors/pop3/pop3.go @@ -281,7 +281,7 @@ func (p *processor) Receive(e processors.IPacket) error { packetFields["parts"] = parts } - ne := p.NewPacket(env.Text, packetFields) + ne := p.NewPacket(packetFields) p.opt.ProcessCommonOptions(e.Fields()) p.Send(ne) diff --git a/processors/testutils/context.go b/processors/testutils/context.go index ee77b770..ebb0403f 100644 --- a/processors/testutils/context.go +++ b/processors/testutils/context.go @@ -44,8 +44,8 @@ func newSender(p *DummyProcessorContext) processors.PacketSender { } func newPacket(p *DummyProcessorContext) processors.PacketBuilder { - return func(message string, fields map[string]interface{}) processors.IPacket { - e := NewPacket(message, fields) + return func(fields map[string]interface{}) processors.IPacket { + e := NewPacket(fields) p.builtPackets = append(p.builtPackets, e) return e } diff --git a/processors/testutils/event.go b/processors/testutils/event.go index c98366aa..a6f3eb1d 100644 --- a/processors/testutils/event.go +++ b/processors/testutils/event.go @@ -31,10 +31,10 @@ func (e *event) SetMessage(s string) { func (e *event) Clone() processors.IPacket { nf, _ := e.Fields().Copy() nf["@timestamp"], _ = e.Fields().ValueForPath("@timestamp") - return NewPacket(e.Message(), nf) + return NewPacket(nf) } -func NewPacket(message string, fields map[string]interface{}) processors.IPacket { +func NewPacketOld(message string, fields map[string]interface{}) processors.IPacket { if fields == nil { fields = mxj.Map{} } @@ -43,6 +43,13 @@ func NewPacket(message string, fields map[string]interface{}) processors.IPacket if _, ok := fields["message"]; !ok { fields["message"] = message } + return NewPacket(fields) +} + +func NewPacket(fields map[string]interface{}) processors.IPacket { + if fields == nil { + fields = mxj.Map{} + } if _, k := fields["@timestamp"]; !k { fields["@timestamp"] = time.Now() diff --git a/processors/when/evaluator_test.go b/processors/when/evaluator_test.go index 955b5672..64f4f3cc 100644 --- a/processors/when/evaluator_test.go +++ b/processors/when/evaluator_test.go @@ -82,7 +82,7 @@ func newTestEvent() processors.IPacket { }, } - return testutils.NewPacket("test", m) + return testutils.NewPacketOld("test", m) } func checkError(t *testing.T, event processors.IPacket, expression string) {