Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
"change" processor : add test, fix code
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeantet committed Feb 12, 2017
1 parent ff21104 commit 0d363e4
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 52 deletions.
85 changes: 47 additions & 38 deletions processors/filter-change/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type options struct {

// If true, events without a compare_key field will not count as changed.
// @Default true
IgnoreNull bool `mapstructure:"ignore_null"`
IgnoreMissing bool `mapstructure:"ignore_missing"`

// The maximum time in seconds between changes. After this time period, Bitfan will forget the old value of the compare_field field.
// @Default 0 (no timeframe)
Expand All @@ -70,58 +70,67 @@ type options struct {

func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) (err error) {
defaults := options{
IgnoreNull: true,
Timeframe: 0,
IgnoreMissing: true,
Timeframe: 0,
}
p.opt = &defaults
p.first = true
return p.ConfigureAndValidate(ctx, conf, p.opt)
}

func (p *processor) Receive(e processors.IPacket) error {
if p.opt.IgnoreNull == true && e.Fields().Exists(p.opt.CompareField) == false {
p.Logger.Debugf("event does not have a field [%s]", p.opt.CompareField)
}

p.mu.Lock()
eValue, err := e.Fields().ValueForPathString(p.opt.CompareField)
if err != nil { // path not found
if p.opt.IgnoreMissing == true {
return nil
}
p.Logger.Debugf("missing field [%s]", p.opt.CompareField)
} else {
p.mu.Lock()

if p.lastValue == e.Fields().ValueOrEmptyForPathString(p.opt.CompareField) {
p.mu.Unlock()
return nil
}
if p.lastValue == eValue {
p.mu.Unlock()
return nil
}

p.Logger.Debugf("[%s] value change from '%s' to '%s'", p.opt.CompareField, p.lastValue, e.Fields().ValueOrEmptyForPathString(p.opt.CompareField))
p.lastValue = e.Fields().ValueOrEmptyForPathString(p.opt.CompareField)
p.Logger.Debugf("[%s] value change from '%s' to '%s'", p.opt.CompareField, p.lastValue, eValue)
p.lastValue = eValue

if p.first == true {
p.Logger.Debugf("ignore first change on field [%s]", p.opt.CompareField)
p.first = false
p.mu.Unlock()
return nil
}
if p.first == true {
p.Logger.Debugf("ignore first change on field [%s]", p.opt.CompareField)
p.first = false
p.mu.Unlock()
return nil
}

// Change occured !
if p.opt.Timeframe > 0 {
if p.hop == nil { // Initiate timer
p.Logger.Debugf("Timer inited")
p.hop = time.AfterFunc(time.Second*3, func() { // when timeframe expires, reset old value
p.Logger.Debugf("expired !")
p.lastValue = ""
p.hop = nil
})
} else { // Change occured before timeout -> reset timeframe
p.Logger.Debugf("change before timeout")
if !p.hop.Stop() {
<-p.hop.C
p.Logger.Debugf("expired ! B")
p.lastValue = ""
// Change occured !

if p.opt.Timeframe > 0 {
if p.hop == nil { // Initiate timer
p.Logger.Debugf("Timer inited")
p.hop = time.AfterFunc(time.Second*time.Duration(p.opt.Timeframe), func() { // when timeframe expires, reset old value
p.Logger.Debugf("expired !")
p.mu.Lock()
p.lastValue = ""
p.hop = nil
p.mu.Unlock()
})
} else { // Change occured before timeout -> reset timeframe
p.Logger.Debugf("change before timeout")
if !p.hop.Stop() {
<-p.hop.C
p.Logger.Debugf("expired ! B")
p.lastValue = ""
}
p.Logger.Debugf("reset")
p.hop.Reset(time.Second * time.Duration(p.opt.Timeframe))
}
p.Logger.Debugf("reset")
p.hop.Reset(time.Second * 3)
}

p.mu.Unlock()

}

p.mu.Unlock()
processors.ProcessCommonFields2(e.Fields(),
p.opt.AddField,
p.opt.AddTag,
Expand Down
148 changes: 147 additions & 1 deletion processors/filter-change/change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package change

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/vjeantet/bitfan/processors/doc"
Expand All @@ -16,8 +17,12 @@ func TestNew(t *testing.T) {
func TestDoc(t *testing.T) {
assert.IsType(t, &doc.Processor{}, New().(*processor).Doc())
}
func TestMaxConcurent(t *testing.T) {
max := New().(*processor).MaxConcurent()
assert.Equal(t, 1, max, "this processor does not support concurency")
}

func TestReceiveDropAll(t *testing.T) {
func TestReceiveMatch(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
Expand All @@ -42,5 +47,146 @@ func TestReceiveDropAll(t *testing.T) {
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 3")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 4")
}

func TestReceiveIgnoreMissingTrue(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
ctx,
map[string]interface{}{
"Compare_Field": "toto",
"Ignore_missing": true,
},
)

p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 1")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 2")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 3")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 4")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 5")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "B"}))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 6")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 7")
}

func TestReceiveIgnoreMissingFalse(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
ctx,
map[string]interface{}{
"Compare_Field": "toto",
"Ignore_missing": false,
},
)

p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 1")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 2")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 3")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 4")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "A"}))
assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 5")
p.Receive(testutils.NewPacket("test", map[string]interface{}{"toto": "B"}))
assert.Equal(t, 3, ctx.SentPacketsCount(0), "changed ! 6")
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 4, ctx.SentPacketsCount(0), "changed ! 7")
}

func TestStopNoTimeFrame(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
ctx,
map[string]interface{}{
"Compare_Field": "message",
"Timeframe": 0,
},
)

p.Receive(testutils.NewPacket("test", nil))
p.Receive(testutils.NewPacket("test2", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 1")
assert.NoError(t, p.Stop(nil), "no error")
}

func TestStopWithTimeFrame(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
ctx,
map[string]interface{}{
"Compare_Field": "message",
"Timeframe": 3,
},
)

p.Receive(testutils.NewPacket("test", nil))
p.Receive(testutils.NewPacket("test2", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 1")
time.Sleep(time.Second * 1)
assert.NoError(t, p.Stop(nil), "no error")
}

func TestReceiveMatchWithTimeframe(t *testing.T) {
p := New().(*processor)
ctx := testutils.NewProcessorContext()
p.Configure(
ctx,
map[string]interface{}{
"Compare_Field": "message",
"timeframe": 1,
},
)

p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 0, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("test1", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("test1", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("test1", nil))
assert.Equal(t, 1, ctx.SentPacketsCount(0), "changed ! 0")

time.Sleep(time.Second * 2)
p.Receive(testutils.NewPacket("test1", nil))
assert.Equal(t, 2, ctx.SentPacketsCount(0), "changed ! 0")

time.Sleep(time.Second * 2)
p.Receive(testutils.NewPacket("test1", nil))
p.Receive(testutils.NewPacket("test1", nil))
assert.Equal(t, 3, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("A", nil))
assert.Equal(t, 4, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("B", nil))
assert.Equal(t, 5, ctx.SentPacketsCount(0), "changed ! 0")

time.Sleep(time.Second * 2)
p.Receive(testutils.NewPacket("B", nil))
assert.Equal(t, 6, ctx.SentPacketsCount(0), "changed ! 0")

p.Receive(testutils.NewPacket("B", nil))
p.Receive(testutils.NewPacket("test", nil))
time.Sleep(time.Second * 2)
p.Receive(testutils.NewPacket("test", nil))
assert.Equal(t, 8, ctx.SentPacketsCount(0), "changed ! 0")

}
4 changes: 2 additions & 2 deletions processors/filter-change/docdoc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions processors/filter-change/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ This rule will monitor a certain field and match if that field changes. The fiel
## Synopsys


| SETTING | TYPE | REQUIRED | DEFAULT VALUE |
|---------------|--------|----------|------------------|
| add_field | hash | false | {} |
| add_tag | array | false | [] |
| remove_field | array | false | [] |
| remove_tag | array | false | [] |
| compare_field | string | true | "" |
| ignore_null | bool | false | true |
| timeframe | int | false | 0 (no timeframe) |
| SETTING | TYPE | REQUIRED | DEFAULT VALUE |
|----------------|--------|----------|------------------|
| add_field | hash | false | {} |
| add_tag | array | false | [] |
| remove_field | array | false | [] |
| remove_tag | array | false | [] |
| compare_field | string | true | "" |
| ignore_missing | bool | false | true |
| timeframe | int | false | 0 (no timeframe) |


## Details
Expand Down Expand Up @@ -58,7 +58,7 @@ If the event has field "somefield" == "hello" this filter, on success, would rem
The name of the field to use to compare to the blacklist.
If the field is null, those events will be ignored.

### ignore_null
### ignore_missing
* Value type is bool
* Default value is `true`

Expand All @@ -81,7 +81,7 @@ change{
remove_field => []
remove_tag => []
compare_field => "message"
ignore_null => true
ignore_missing => true
timeframe => 10
}
```

0 comments on commit 0d363e4

Please sign in to comment.