Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
fix #71 Self-added fileds "message":""
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeantet committed Jan 27, 2018
1 parent 149b186 commit a16b0fa
Show file tree
Hide file tree
Showing 43 changed files with 277 additions and 265 deletions.
6 changes: 3 additions & 3 deletions core/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (a *Agent) addOutput(in chan *event, portNumber int) error {
// Start agent
func (a *Agent) start() error {
// Start processor
a.processor.Start(newPacket("start", map[string]interface{}{}))
a.processor.Start(newPacket(map[string]interface{}{"message": "start"}))

// Maximum number of concurent packet consumption ?
var maxConcurentPackets = a.PoolSize
Expand All @@ -187,7 +187,7 @@ func (a *Agent) start() error {
wg.Wait()

Log().Debugf("processor (%d) - stopping (no more packets)", a.ID)
if err := a.processor.Stop(newPacket("", nil)); err != nil {
if err := a.processor.Stop(newPacket(nil)); err != nil {
Log().Errorf("%s %d : %v", a.Type, a.ID, err)
}
close(a.Done)
Expand All @@ -198,7 +198,7 @@ func (a *Agent) start() error {
if a.Schedule != "" {
Log().Debugf("agent %s : schedule=%s", a.Label, a.Schedule)
err := myScheduler.Add(a.Label, a.Schedule, func() {
go a.processor.Tick(newPacket("", nil))
go a.processor.Tick(newPacket(nil))
})
if err != nil {
Log().Errorf("schedule start failed - %s : %v", a.Label, err)
Expand Down
9 changes: 2 additions & 7 deletions core/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,14 @@ func (e *event) SetMessage(s string) {
func (e *event) Clone() processors.IPacket {
nf, _ := e.Fields().Copy()
nf["@timestamp"], _ = e.Fields().ValueForPath("@timestamp")
return newPacket(e.Message(), nf)
return newPacket(nf)
}

func newPacket(message string, fields map[string]interface{}) processors.IPacket {
func newPacket(fields map[string]interface{}) processors.IPacket {
if fields == nil {
fields = mxj.Map{}
}

// Add message to its field if empty
if _, ok := fields["message"]; !ok {
fields["message"] = message
}

if _, k := fields["@timestamp"]; !k {
fields["@timestamp"] = time.Now()
}
Expand Down
64 changes: 32 additions & 32 deletions processors/filter-blacklist/blacklist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func TestReceiveMatch(t *testing.T) {
"terms": []string{"val1", "val2"},
},
)
p.Receive(testutils.NewPacket("test", nil))
p.Receive(testutils.NewPacket("fqsdf", nil))
p.Receive(testutils.NewPacket("valo", nil))
p.Receive(testutils.NewPacket("al1", nil))
p.Receive(testutils.NewPacket("val1", nil))
p.Receive(testutils.NewPacket("val3", nil))
p.Receive(testutils.NewPacket("val2", nil))
p.Receive(testutils.NewPacketOld("test", nil))
p.Receive(testutils.NewPacketOld("fqsdf", nil))
p.Receive(testutils.NewPacketOld("valo", nil))
p.Receive(testutils.NewPacketOld("al1", nil))
p.Receive(testutils.NewPacketOld("val1", nil))
p.Receive(testutils.NewPacketOld("val3", nil))
p.Receive(testutils.NewPacketOld("val2", nil))
if assert.Equal(t, 2, ctx.SentPacketsCount(0), "2 events should pass") {
assert.Equal(t, "val1", ctx.SentPackets(0)[0].Message())
assert.Equal(t, "val2", ctx.SentPackets(0)[1].Message())
Expand All @@ -75,13 +75,13 @@ func TestReceiveDuplicateTermsInConfig(t *testing.T) {
"terms": []string{"val1", "val1", "val1", "val1", "val1"},
},
)
p.Receive(testutils.NewPacket("test", nil))
p.Receive(testutils.NewPacket("fqsdf", nil))
p.Receive(testutils.NewPacket("valo", nil))
p.Receive(testutils.NewPacket("al1", nil))
p.Receive(testutils.NewPacket("val1", nil))
p.Receive(testutils.NewPacket("val3", nil))
p.Receive(testutils.NewPacket("val2", nil))
p.Receive(testutils.NewPacketOld("test", nil))
p.Receive(testutils.NewPacketOld("fqsdf", nil))
p.Receive(testutils.NewPacketOld("valo", nil))
p.Receive(testutils.NewPacketOld("al1", nil))
p.Receive(testutils.NewPacketOld("val1", nil))
p.Receive(testutils.NewPacketOld("val3", nil))
p.Receive(testutils.NewPacketOld("val2", nil))
if assert.Equal(t, 1, ctx.SentPacketsCount(0), "2 events should pass") {
assert.Equal(t, "val1", ctx.SentPackets(0)[0].Message())
}
Expand All @@ -97,13 +97,13 @@ func TestReceiveAllMessagesMatch(t *testing.T) {
"terms": []string{"valo", "test", "val2", "al1", "val3", "val1", "fqsdf"},
},
)
p.Receive(testutils.NewPacket("test", nil))
p.Receive(testutils.NewPacket("fqsdf", nil))
p.Receive(testutils.NewPacket("valo", nil))
p.Receive(testutils.NewPacket("al1", nil))
p.Receive(testutils.NewPacket("val1", nil))
p.Receive(testutils.NewPacket("val3", nil))
p.Receive(testutils.NewPacket("val2", nil))
p.Receive(testutils.NewPacketOld("test", nil))
p.Receive(testutils.NewPacketOld("fqsdf", nil))
p.Receive(testutils.NewPacketOld("valo", nil))
p.Receive(testutils.NewPacketOld("al1", nil))
p.Receive(testutils.NewPacketOld("val1", nil))
p.Receive(testutils.NewPacketOld("val3", nil))
p.Receive(testutils.NewPacketOld("val2", nil))
if assert.Equal(t, 7, ctx.SentPacketsCount(0), "2 events should pass") {
assert.Equal(t, "test", ctx.SentPackets(0)[0].Message())
assert.Equal(t, "fqsdf", ctx.SentPackets(0)[1].Message())
Expand All @@ -125,7 +125,7 @@ func TestReceiveMessageIncludingTermsDoNotMatch(t *testing.T) {
"terms": []string{"test"},
},
)
p.Receive(testutils.NewPacket("testtest", nil))
p.Receive(testutils.NewPacketOld("testtest", nil))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "0 event should pass")
}

Expand All @@ -140,7 +140,7 @@ func TestReceiveFieldIncludingTermsDoNotMatch(t *testing.T) {
},
)

p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "testtest"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "testtest"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "0 event should pass")
}

Expand All @@ -155,9 +155,9 @@ func TestReceiveFieldNamesAreCaseSensitive(t *testing.T) {
},
)

p.Receive(testutils.NewPacket("hello", map[string]interface{}{"myfield": "test"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"myfield": "test"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "myfield != MyField")
p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "test"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "test"}))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !")
}

Expand All @@ -172,11 +172,11 @@ func TestReceiveFieldValuesAreCaseSensitive(t *testing.T) {
},
)

p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "TEST"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "TEST"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "TEST != test")
p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "Test"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "Test"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "TEST != test")
p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "test"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "test"}))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !")
}
func TestReceiveLongValue(t *testing.T) {
Expand All @@ -190,10 +190,10 @@ func TestReceiveLongValue(t *testing.T) {
},
)

p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "TEST"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "TEST"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "No match")
p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "Test"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "Test"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "No match")
p.Receive(testutils.NewPacket("hello", map[string]interface{}{"MyField": "azertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbn"}))
p.Receive(testutils.NewPacketOld("hello", map[string]interface{}{"MyField": "azertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbnazertyuiopqsdfghjklmwxcvbn"}))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "match !")
}
}
Loading

0 comments on commit a16b0fa

Please sign in to comment.