Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/vjeantet/bitfan
Browse files Browse the repository at this point in the history
  • Loading branch information
vjeantet committed Jan 12, 2018
2 parents 2e8c2c6 + c50b878 commit df9ee13
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 27 deletions.
59 changes: 43 additions & 16 deletions processors/filter-geoip/geoip.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
//go:generate bitfanDoc
// The GeoIP filter adds information about the geographical location of IP addresses,
// based on data from the Maxmind GeoLite2 databases
//
// This processor use a GeoLite2 City database. From Maxmind’s description — "GeoLite2 databases are free IP geolocation databases comparable to, but less accurate than, MaxMind’s GeoIP2 databases". Please see GeoIP Lite2 license for more details.
// Databae is not bundled in the processor, you can download directly from Maxmind’s website and use the
// database option to specify their location. The GeoLite2 databases can be downloaded from https://dev.maxmind.com/geoip/geoip2/geolite2.
package geoip

import (
Expand Down Expand Up @@ -39,6 +45,7 @@ type options struct {
// Path or URL to the MaxMind GeoIP2 database.
// Default value is "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz"
// Note that URL can point to gzipped database (*.mmdb.gz) but local path must point to an unzipped file.
// @Default "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz"
Database string `mapstructure:"database"`

// Type of GeoIP database. Default value is "city"
Expand Down Expand Up @@ -76,6 +83,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i
defaults := options{
Fields: []string{
"city_name",
"metro_code",
"country_code",
"country_name",
"continent_code",
Expand All @@ -95,6 +103,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i
},
Language: "en",
LruCacheSize: 1000,
Source: "",
Target: "",
Database: "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz",
DatabaseType: "city",
Expand All @@ -105,21 +114,25 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i
p.cache = lrucache.New(p.opt.LruCacheSize)

err = p.ConfigureAndValidate(ctx, conf, p.opt)
if err != nil {
return err
}

err = p.refresh()
ticker := time.NewTicker(p.opt.RefreshInterval * time.Minute)
go func() {
defer ticker.Stop()
for range ticker.C {
err := p.refresh()
if err != nil {
p.Logger.Error(err)
return err
}

func (p *processor) Start(e processors.IPacket) error {

err := p.refresh()
if p.opt.RefreshInterval > 0 {
ticker := time.NewTicker(p.opt.RefreshInterval * time.Minute)
go func() {
defer ticker.Stop()
for range ticker.C {
err := p.refresh()
if err != nil {
p.Logger.Error(err)
}
}
}
}()
}()
}

return err
}
Expand Down Expand Up @@ -198,6 +211,22 @@ func (p *processor) Receive(e processors.IPacket) error {
data["isp"] = isp.ISP
}
}
case *geoip2.Country:
country := cache.(*geoip2.Country)
for _, field := range p.opt.Fields {
switch field {
case "country_code":
data["country_code"] = country.Country.IsoCode
case "country_name":
data["country_name"] = country.Country.Names[p.opt.Language]
case "continent_code":
data["continent_code"] = country.Continent.Code
case "continent_name":
data["continent_name"] = country.Continent.Names[p.opt.Language]
}
}
default:
p.Logger.Warn("Unsupported database type `%v`", cache)
}

if p.opt.Target != "" {
Expand Down Expand Up @@ -290,6 +319,7 @@ func (p *processor) refresh() error {
}

p.database, err = geoip2.FromBytes(db)

p.Logger.Infof("Geoip filter: %s database successfully downloaded.\n", p.opt.DatabaseType)
return err
}
Expand All @@ -298,6 +328,3 @@ func (p *processor) refresh() error {
p.database, err = geoip2.Open(p.opt.Database)
return err
}

func (p *processor) Tick(e processors.IPacket) error { return nil }
func (p *processor) Stop(e processors.IPacket) error { return nil }
179 changes: 168 additions & 11 deletions processors/filter-geoip/geoip_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,178 @@
package geoip

import (
"fmt"
"io"
"os"
"path"
"runtime"
"testing"

"github.com/stretchr/testify/assert"
"github.com/vjeantet/bitfan/processors/doc"
. "github.com/smartystreets/goconvey/convey"
"github.com/vjeantet/bitfan/processors/testutils"
)

func TestNew(t *testing.T) {
p := New()
_, ok := p.(*processor)
assert.Equal(t, ok, true, "New() should return a processor")
func TestCommonProcessorDetails(t *testing.T) {
Convey("This processor... ", t, func() {
p, _ := testutils.NewProcessor(New)

Convey("is a valid bitfan processor", func() {
_, ok := p.Processor.(*processor)
So(ok, ShouldBeTrue)
})

Convey("does not have limit on concurent event processing", func() {
So(p.MaxConcurent(), ShouldEqual, 0)
})

Convey("is self documented", func() {
if p.Doc().Doc == "" {
Println("Missing documentation for this processor")
} else {
So(true, ShouldBeTrue)
}
})
})
}

func TestInvalidConfiguration(t *testing.T) {
conf := map[string]interface{}{}

Convey("When source is missing", t, func() {
conf["database"] = "foo/bar"
_, err := testutils.NewProcessor(New, conf)
Convey("Then an error occurs", func() {
So(err, ShouldBeError)
})
})
}

func TestNormalCases(t *testing.T) {

Convey("Given an existing event with a valid ip", t, func() {
event := testutils.NewPacket("", map[string]interface{}{})
conf := map[string]interface{}{
"database": setupTmpDatabase("GeoIP2-City-Test.mmdb"),
"source": "ip",
}
event.Fields().SetValueForPath(`89.160.20.113`, "ip")

Convey("When using a city database", func() {
Convey("Then 1 event is produced with city and country name : ", func() {
p, _ := testutils.StartNewProcessor(New, conf)
defer p.Stop(nil)
p.Receive(event)

So(p.SentPacketsCount(0), ShouldEqual, 1)
pe := p.SentPackets(0)[0]
So(pe.Fields().ValueOrEmptyForPathString("city_name"), ShouldEqual, "Linköping")
So(pe.Fields().ValueOrEmptyForPathString("country_name"), ShouldEqual, "Sweden")
})

Convey("When target field is set ", func() {
conf["target"] = "geo"

Convey("Then geo data are valued in this target field ", func() {
p, _ := testutils.StartNewProcessor(New, conf)
defer p.Stop(nil)
p.Receive(event)

So(p.SentPacketsCount(0), ShouldEqual, 1)
pe := p.SentPackets(0)[0]
So(pe.Fields().Exists("city_name"), ShouldBeFalse)
So(pe.Fields().ValueOrEmptyForPathString("geo.city_name"), ShouldEqual, "Linköping")
So(pe.Fields().ValueOrEmptyForPathString("geo.country_name"), ShouldEqual, "Sweden")
})
})

})

Convey("When using a ISP database", func() {
conf := map[string]interface{}{
"database": setupTmpDatabase("GeoIP2-ISP-Test.mmdb"),
"source": "ip",
"database_type": "isp",
}

event.Fields().SetValueForPath(`84.128.23.4`, "ip")

Convey("Then 1 event is produced with ISP name and organization : ", func() {
p, _ := testutils.StartNewProcessor(New, conf)
defer p.Stop(nil)
p.Receive(event)

So(p.SentPacketsCount(0), ShouldEqual, 1)
pe := p.SentPackets(0)[0]

So(pe.Fields().ValueOrEmptyForPathString("isp"), ShouldEqual, "Deutsche Telekom AG")
So(pe.Fields().ValueOrEmptyForPathString("organization"), ShouldEqual, "Deutsche Telekom AG")
})
})

Convey("When using a Country database", func() {
conf := map[string]interface{}{
"database": setupTmpDatabase("GeoIP2-Country-Test.mmdb"),
"source": "ip",
"database_type": "country",
}

event.Fields().SetValueForPath(`111.235.160.9`, "ip")

Convey("Then 1 event is produced with country name", func() {
p, _ := testutils.StartNewProcessor(New, conf)
defer p.Stop(nil)
p.Receive(event)

So(p.SentPacketsCount(0), ShouldEqual, 1)

pe := p.SentPackets(0)[0]
So(pe.Fields().ValueOrEmptyForPathString("country_code"), ShouldEqual, "CN")
So(pe.Fields().ValueOrEmptyForPathString("country_name"), ShouldEqual, "People's Republic of China")
So(pe.Fields().ValueOrEmptyForPathString("continent_code"), ShouldEqual, "AS")
So(pe.Fields().ValueOrEmptyForPathString("continent_name"), ShouldEqual, "Asia")
})
})

})
}
func TestDoc(t *testing.T) {
assert.IsType(t, &doc.Processor{}, New().(*processor).Doc())

func TestFallbackCases(t *testing.T) {
// When the ip field value is empty
// When the ip field does not exist
// when ip field is valid
// When no geoIP

}
func TestMaxConcurent(t *testing.T) {
max := New().(*processor).MaxConcurent()
assert.Equal(t, 0, max, "this processor does support concurency")

func setupTmpDatabase(geoFileName string) string {
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("No caller information")
}

srcFilePath := path.Dir(filename) + string(os.PathSeparator) + "testdata" + string(os.PathSeparator) + geoFileName
destFilePath := os.TempDir() + string(os.PathSeparator) + geoFileName

srcFile, err := os.Open(srcFilePath)
if err != nil {
fmt.Println(err)
}
defer srcFile.Close()

destFile, err := os.Create(destFilePath) // creates if file doesn't exist
if err != nil {
fmt.Println(err)
}
defer destFile.Close()

_, err = io.Copy(destFile, srcFile) // check first var for number of bytes copied
if err != nil {
fmt.Println(err)
}
err = destFile.Sync()
if err != nil {
fmt.Println(err)
}

return destFilePath
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
13 changes: 13 additions & 0 deletions processors/testutils/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ type Processor struct {
ctx *DummyProcessorContext
}

func StartNewProcessor(f func() processors.Processor, conf ...map[string]interface{}) (Processor, error) {
p, err := NewProcessor(f, conf...)
if err != nil {
return p, err
}
err = p.Start(nil)
if err != nil {
return p, err
}

return p, nil

}
func NewProcessor(f func() processors.Processor, conf ...map[string]interface{}) (Processor, error) {
var err error
p := newMockedProcessor(f)
Expand Down

0 comments on commit df9ee13

Please sign in to comment.