diff --git a/Godeps b/Godeps index 2ac95a90455eb..f47a578062707 100644 --- a/Godeps +++ b/Godeps @@ -47,6 +47,7 @@ github.com/shirou/gopsutil 586bb697f3ec9f8ec08ffefe18f521a64534037c github.com/soniah/gosnmp b1b4f885b12c5dcbd021c5cee1c904110de6db7d github.com/streadway/amqp b4f3ceab0337f013208d31348b578d83c0064744 github.com/stretchr/testify 1f4a1643a57e798696635ea4c126e9127adb7d3c +github.com/vjeantet/grok 83bfdfdfd1a8146795b28e547a8e3c8b28a466c2 github.com/wvanbergen/kafka 46f9a1cf3f670edec492029fadded9c2d9e18866 github.com/wvanbergen/kazoo-go 0f768712ae6f76454f987c3356177e138df258f8 github.com/zensqlmonitor/go-mssqldb ffe5510c6fa5e15e6d983210ab501c815b56b363 diff --git a/filter/filter.go b/filter/filter.go new file mode 100644 index 0000000000000..85eed17ac0e7c --- /dev/null +++ b/filter/filter.go @@ -0,0 +1,79 @@ +package filter + +import ( + "strings" + + "github.com/gobwas/glob" +) + +type Filter interface { + Match(string) bool +} + +// CompileFilter takes a list of string filters and returns a Filter interface +// for matching a given string against the filter list. The filter list +// supports glob matching too, ie: +// +// f, _ := CompileFilter([]string{"cpu", "mem", "net*"}) +// f.Match("cpu") // true +// f.Match("network") // true +// f.Match("memory") // false +// +func CompileFilter(filters []string) (Filter, error) { + // return if there is nothing to compile + if len(filters) == 0 { + return nil, nil + } + + // check if we can compile a non-glob filter + noGlob := true + for _, filter := range filters { + if hasMeta(filter) { + noGlob = false + break + } + } + + switch { + case noGlob: + // return non-globbing filter if not needed. + return compileFilterNoGlob(filters), nil + case len(filters) == 1: + return glob.Compile(filters[0]) + default: + return glob.Compile("{" + strings.Join(filters, ",") + "}") + } +} + +// hasMeta reports whether path contains any magic glob characters. +func hasMeta(s string) bool { + return strings.IndexAny(s, "*?[") >= 0 +} + +type filter struct { + m map[string]struct{} +} + +func (f *filter) Match(s string) bool { + _, ok := f.m[s] + return ok +} + +type filtersingle struct { + s string +} + +func (f *filtersingle) Match(s string) bool { + return f.s == s +} + +func compileFilterNoGlob(filters []string) Filter { + if len(filters) == 1 { + return &filtersingle{s: filters[0]} + } + out := filter{m: make(map[string]struct{})} + for _, filter := range filters { + out.m[filter] = struct{}{} + } + return &out +} diff --git a/filter/filter_test.go b/filter/filter_test.go new file mode 100644 index 0000000000000..85072e2ac5354 --- /dev/null +++ b/filter/filter_test.go @@ -0,0 +1,96 @@ +package filter + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCompileFilter(t *testing.T) { + f, err := CompileFilter([]string{}) + assert.NoError(t, err) + assert.Nil(t, f) + + f, err = CompileFilter([]string{"cpu"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.False(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu*"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.True(t, f.Match("cpu0")) + assert.False(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu", "mem"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.True(t, f.Match("mem")) + + f, err = CompileFilter([]string{"cpu", "mem", "net*"}) + assert.NoError(t, err) + assert.True(t, f.Match("cpu")) + assert.False(t, f.Match("cpu0")) + assert.True(t, f.Match("mem")) + assert.True(t, f.Match("network")) +} + +var benchbool bool + +func BenchmarkFilterSingleNoGlobFalse(b *testing.B) { + f, _ := CompileFilter([]string{"cpu"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilterSingleNoGlobTrue(b *testing.B) { + f, _ := CompileFilter([]string{"cpu"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("cpu") + } + benchbool = tmp +} + +func BenchmarkFilter(b *testing.B) { + f, _ := CompileFilter([]string{"cpu", "mem", "net*"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilterNoGlob(b *testing.B) { + f, _ := CompileFilter([]string{"cpu", "mem", "net"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("net") + } + benchbool = tmp +} + +func BenchmarkFilter2(b *testing.B) { + f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq", + "aw", "az", "axxx", "ab", "cpu", "mem", "net*"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("network") + } + benchbool = tmp +} + +func BenchmarkFilter2NoGlob(b *testing.B) { + f, _ := CompileFilter([]string{"aa", "bb", "c", "ad", "ar", "at", "aq", + "aw", "az", "axxx", "ab", "cpu", "mem", "net"}) + var tmp bool + for n := 0; n < b.N; n++ { + tmp = f.Match("net") + } + benchbool = tmp +} diff --git a/internal/internal.go b/internal/internal.go index 27a24f02162e6..4c90d11b9a62a 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -17,8 +17,6 @@ import ( "strings" "time" "unicode" - - "github.com/gobwas/glob" ) const alphanum string = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -209,27 +207,6 @@ func WaitTimeout(c *exec.Cmd, timeout time.Duration) error { } } -// CompileFilter takes a list of glob "filters", ie: -// ["MAIN.*", "CPU.*", "NET"] -// and compiles them into a glob object. This glob object can -// then be used to match keys to the filter. -func CompileFilter(filters []string) (glob.Glob, error) { - var out glob.Glob - - // return if there is nothing to compile - if len(filters) == 0 { - return out, nil - } - - var err error - if len(filters) == 1 { - out, err = glob.Compile(filters[0]) - } else { - out, err = glob.Compile("{" + strings.Join(filters, ",") + "}") - } - return out, err -} - // RandomSleep will sleep for a random amount of time up to max. // If the shutdown channel is closed, it will return before it has finished // sleeping. diff --git a/internal/internal_test.go b/internal/internal_test.go index 31bb5ec612bb4..213e94d3d05d3 100644 --- a/internal/internal_test.go +++ b/internal/internal_test.go @@ -107,37 +107,6 @@ func TestRunError(t *testing.T) { assert.Error(t, err) } -func TestCompileFilter(t *testing.T) { - f, err := CompileFilter([]string{}) - assert.NoError(t, err) - assert.Nil(t, f) - - f, err = CompileFilter([]string{"cpu"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.False(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu*"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.True(t, f.Match("cpu0")) - assert.False(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu", "mem"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.True(t, f.Match("mem")) - - f, err = CompileFilter([]string{"cpu", "mem", "net*"}) - assert.NoError(t, err) - assert.True(t, f.Match("cpu")) - assert.False(t, f.Match("cpu0")) - assert.True(t, f.Match("mem")) - assert.True(t, f.Match("network")) -} - func TestRandomSleep(t *testing.T) { // test that zero max returns immediately s := time.Now() diff --git a/internal/models/filter.go b/internal/models/filter.go index 71d71c23edbcc..ac24ec667c4b5 100644 --- a/internal/models/filter.go +++ b/internal/models/filter.go @@ -3,80 +3,78 @@ package internal_models import ( "fmt" - "github.com/gobwas/glob" - "github.com/influxdata/telegraf" - "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/filter" ) // TagFilter is the name of a tag, and the values on which to filter type TagFilter struct { Name string Filter []string - filter glob.Glob + filter filter.Filter } // Filter containing drop/pass and tagdrop/tagpass rules type Filter struct { NameDrop []string - nameDrop glob.Glob + nameDrop filter.Filter NamePass []string - namePass glob.Glob + namePass filter.Filter FieldDrop []string - fieldDrop glob.Glob + fieldDrop filter.Filter FieldPass []string - fieldPass glob.Glob + fieldPass filter.Filter TagDrop []TagFilter TagPass []TagFilter TagExclude []string - tagExclude glob.Glob + tagExclude filter.Filter TagInclude []string - tagInclude glob.Glob + tagInclude filter.Filter IsActive bool } -// Compile all Filter lists into glob.Glob objects. +// Compile all Filter lists into filter.Filter objects. func (f *Filter) CompileFilter() error { var err error - f.nameDrop, err = internal.CompileFilter(f.NameDrop) + f.nameDrop, err = filter.CompileFilter(f.NameDrop) if err != nil { return fmt.Errorf("Error compiling 'namedrop', %s", err) } - f.namePass, err = internal.CompileFilter(f.NamePass) + f.namePass, err = filter.CompileFilter(f.NamePass) if err != nil { return fmt.Errorf("Error compiling 'namepass', %s", err) } - f.fieldDrop, err = internal.CompileFilter(f.FieldDrop) + f.fieldDrop, err = filter.CompileFilter(f.FieldDrop) if err != nil { return fmt.Errorf("Error compiling 'fielddrop', %s", err) } - f.fieldPass, err = internal.CompileFilter(f.FieldPass) + f.fieldPass, err = filter.CompileFilter(f.FieldPass) if err != nil { return fmt.Errorf("Error compiling 'fieldpass', %s", err) } - f.tagExclude, err = internal.CompileFilter(f.TagExclude) + f.tagExclude, err = filter.CompileFilter(f.TagExclude) if err != nil { return fmt.Errorf("Error compiling 'tagexclude', %s", err) } - f.tagInclude, err = internal.CompileFilter(f.TagInclude) + f.tagInclude, err = filter.CompileFilter(f.TagInclude) if err != nil { return fmt.Errorf("Error compiling 'taginclude', %s", err) } for i, _ := range f.TagDrop { - f.TagDrop[i].filter, err = internal.CompileFilter(f.TagDrop[i].Filter) + f.TagDrop[i].filter, err = filter.CompileFilter(f.TagDrop[i].Filter) if err != nil { return fmt.Errorf("Error compiling 'tagdrop', %s", err) } } for i, _ := range f.TagPass { - f.TagPass[i].filter, err = internal.CompileFilter(f.TagPass[i].Filter) + f.TagPass[i].filter, err = filter.CompileFilter(f.TagPass[i].Filter) if err != nil { return fmt.Errorf("Error compiling 'tagpass', %s", err) } diff --git a/internal/models/filter_test.go b/internal/models/filter_test.go index a374160950a54..454f10c4596f7 100644 --- a/internal/models/filter_test.go +++ b/internal/models/filter_test.go @@ -253,51 +253,6 @@ func TestFilter_TagDrop(t *testing.T) { } } -func TestFilter_CompileFilterError(t *testing.T) { - f := Filter{ - NameDrop: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - NamePass: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - FieldDrop: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - FieldPass: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - TagExclude: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - f = Filter{ - TagInclude: []string{"", ""}, - } - assert.Error(t, f.CompileFilter()) - filters := []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"{foobar}"}, - }} - f = Filter{ - TagDrop: filters, - } - require.Error(t, f.CompileFilter()) - filters = []TagFilter{ - TagFilter{ - Name: "cpu", - Filter: []string{"{foobar}"}, - }} - f = Filter{ - TagPass: filters, - } - require.Error(t, f.CompileFilter()) -} - func TestFilter_ShouldMetricsPass(t *testing.T) { m := testutil.TestMetric(1, "testmetric") f := Filter{ diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 1a386d97c7a0f..1d84724699629 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -29,6 +29,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/jolokia" _ "github.com/influxdata/telegraf/plugins/inputs/kafka_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" + _ "github.com/influxdata/telegraf/plugins/inputs/logparser" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" diff --git a/plugins/inputs/logparser/README.md b/plugins/inputs/logparser/README.md new file mode 100644 index 0000000000000..7dfa04cb6b3b4 --- /dev/null +++ b/plugins/inputs/logparser/README.md @@ -0,0 +1,8 @@ +# logparser Input Plugin + +### Configuration: + +```toml + +``` + diff --git a/plugins/inputs/logparser/grok/grok.go b/plugins/inputs/logparser/grok/grok.go new file mode 100644 index 0000000000000..79fc00a09391a --- /dev/null +++ b/plugins/inputs/logparser/grok/grok.go @@ -0,0 +1,291 @@ +package grok + +import ( + "bufio" + "fmt" + "log" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/vjeantet/grok" + + "github.com/influxdata/telegraf" +) + +var timeFormats = map[string]string{ + "ts-ansic": "Mon Jan _2 15:04:05 2006", + "ts-unix": "Mon Jan _2 15:04:05 MST 2006", + "ts-ruby": "Mon Jan 02 15:04:05 -0700 2006", + "ts-rfc822": "02 Jan 06 15:04 MST", + "ts-rfc822z": "02 Jan 06 15:04 -0700", // RFC822 with numeric zone + "ts-rfc850": "Monday, 02-Jan-06 15:04:05 MST", + "ts-rfc1123": "Mon, 02 Jan 2006 15:04:05 MST", + "ts-rfc1123z": "Mon, 02 Jan 2006 15:04:05 -0700", // RFC1123 with numeric zone + "ts-rfc3339": "2006-01-02T15:04:05Z07:00", + "ts-rfc3339nano": "2006-01-02T15:04:05.999999999Z07:00", + "ts-httpd": "02/Jan/2006:15:04:05 -0700", +} + +const ( + INT = "int" + TAG = "tag" + FLOAT = "float" + STRING = "string" + DURATION = "duration" + DROP = "drop" +) + +var ( + // matches named captures that contain a type. + // ie, + // %{NUMBER:bytes:int} + // %{IPORHOST:clientip:tag} + // %{HTTPDATE:ts1:ts-http} + // %{HTTPDATE:ts2:ts-"02 Jan 06 15:04"} + typedRe = regexp.MustCompile(`%{\w+:(\w+):(ts-".+"|t?s?-?\w+)}`) + // matches a plain pattern name. ie, %{NUMBER} + patternOnlyRe = regexp.MustCompile(`%{(\w+)}`) +) + +type Parser struct { + Patterns []string + CustomPatterns string + CustomPatternFiles []string + + // typeMap is a map of patterns -> capture name -> modifier, + // ie, { + // "%{TESTLOG}": + // { + // "bytes": "int", + // "clientip": "tag" + // } + // } + typeMap map[string]map[string]string + // tsMap is a map of patterns -> capture name -> timestamp layout. + // ie, { + // "%{TESTLOG}": + // { + // "httptime": "02/Jan/2006:15:04:05 -0700" + // } + // } + tsMap map[string]map[string]string + // patterns is a map of all of the parsed patterns from CustomPatterns + // and CustomPatternFiles. + // ie, + // { + // "DURATION": "%{NUMBER}[nuµm]?s" + // "RESPONSE_CODE": "%{NUMBER:rc:tag}" + // } + patterns map[string]string + + g *grok.Grok +} + +func (p *Parser) Compile() error { + p.typeMap = make(map[string]map[string]string) + p.tsMap = make(map[string]map[string]string) + p.patterns = make(map[string]string) + var err error + p.g, err = grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true}) + if err != nil { + return err + } + + if len(p.CustomPatternFiles) != 0 { + for _, filename := range p.CustomPatternFiles { + file, err := os.Open(filename) + if err != nil { + return err + } + + scanner := bufio.NewScanner(bufio.NewReader(file)) + p.addCustomPatterns(scanner) + } + } + + if len(p.CustomPatterns) != 0 { + scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns)) + p.addCustomPatterns(scanner) + } + + return p.compileCustomPatterns() +} + +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { + var err error + var values map[string]string + // the matching pattern string + var patternName string + for _, pattern := range p.Patterns { + if values, err = p.g.Parse(pattern, line); err != nil { + return nil, err + } + if len(values) != 0 { + patternName = pattern + break + } + } + + if len(values) == 0 { + return nil, nil + } + + fields := make(map[string]interface{}) + tags := make(map[string]string) + timestamp := time.Now() + for k, v := range values { + if k == "" || v == "" { + continue + } + + var t string + // if this key has a type, use it + // Otherwise leave the type blank, which will be treated as a string field. + if types, ok := p.typeMap[patternName]; ok { + t = types[k] + } + + switch t { + case INT: + iv, err := strconv.ParseInt(v, 10, 64) + if err != nil { + log.Printf("ERROR parsing %s to int: %s", v, err) + } else { + fields[k] = iv + } + case FLOAT: + fv, err := strconv.ParseFloat(v, 64) + if err != nil { + log.Printf("ERROR parsing %s to float: %s", v, err) + } else { + fields[k] = fv + } + case DURATION: + d, err := time.ParseDuration(v) + if err != nil { + log.Printf("ERROR parsing %s to duration: %s", v, err) + } else { + fields[k] = int64(d) + } + case TAG: + tags[k] = v + case STRING: + fields[k] = v + case DROP: + // goodbye! + default: + // check if this pattern has timestamp types + if ts, ok := p.tsMap[patternName]; ok { + // check if the type is a timestamp type + if layout, ok := ts[k]; ok { + ts, err := time.Parse(layout, v) + if err == nil { + timestamp = ts + } else { + log.Printf("ERROR parsing %s to time layout [%s]: %s", v, layout, err) + } + } else { + // no types found, so just assume it's a string field. + fields[k] = v + } + } else { + // no types found, so just assume it's a string field. + fields[k] = v + } + } + } + + return telegraf.NewMetric("grok", tags, fields, timestamp) +} + +func (p *Parser) addCustomPatterns(scanner *bufio.Scanner) { + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) > 0 && line[0] != '#' { + names := strings.SplitN(line, " ", 2) + p.patterns[names[0]] = names[1] + } + } +} + +func (p *Parser) compileCustomPatterns() error { + var err error + // check if the pattern contains a subpattern that is already defined + // replace it with the subpattern for modifier inheritance. + for name, pattern := range p.patterns { + subNames := patternOnlyRe.FindAllStringSubmatch(pattern, -1) + for _, subName := range subNames { + if subPattern, ok := p.patterns[subName[1]]; ok { + pattern = strings.Replace(pattern, subName[0], subPattern, 1) + } + } + p.patterns[name] = pattern + } + + // check if pattern contains modifiers. Parse them out if it does. + for name, pattern := range p.patterns { + if typedRe.MatchString(pattern) { + // this pattern has modifiers, so parse out the modifiers + pattern, err = p.parseTypedCaptures(name, pattern) + if err != nil { + return err + } + p.patterns[name] = pattern + } + } + + return p.g.AddPatternsFromMap(p.patterns) +} + +// parseTypedCaptures parses the capture types, and then deletes the type from +// the line so that it is a valid "grok" pattern again. +// ie, +// %{NUMBER:bytes:int} => %{NUMBER:bytes} (stores %{NUMBER}->bytes->int) +// %{IPORHOST:clientip:tag} => %{IPORHOST:clientip} (stores %{IPORHOST}->clientip->tag) +func (p *Parser) parseTypedCaptures(name, pattern string) (string, error) { + matches := typedRe.FindAllStringSubmatch(pattern, -1) + if len(matches) == 0 { + return pattern, nil + } + + // grab the name of the capture pattern + patternName := "%{" + name + "}" + // create type map for this pattern + p.typeMap[patternName] = make(map[string]string) + p.tsMap[patternName] = make(map[string]string) + + // boolean to verify that each pattern only has a single ts- data type. + hasTimestamp := false + for _, match := range matches { + if len(match) < 3 { + continue + } + + // regex capture 1 is the name of the capture + // regex capture 2 is the type of the capture + if strings.HasPrefix(match[2], "ts-") { + if hasTimestamp { + return pattern, fmt.Errorf("logparser pattern compile error: "+ + "Each pattern is allowed only one named "+ + "timestamp data type. pattern: %s", pattern) + } + if f, ok := timeFormats[match[2]]; ok { + p.tsMap[patternName][match[1]] = f + } else { + p.tsMap[patternName][match[1]] = strings.TrimSuffix(strings.TrimPrefix(match[2], `ts-"`), `"`) + } + hasTimestamp = true + } else { + p.typeMap[patternName][match[1]] = match[2] + } + + // the modifier is not a valid part of a "grok" pattern, so remove it + // from the pattern. + pattern = strings.Replace(pattern, ":"+match[2]+"}", "}", 1) + } + + return pattern, nil +} diff --git a/plugins/inputs/logparser/grok/grok_test.go b/plugins/inputs/logparser/grok/grok_test.go new file mode 100644 index 0000000000000..9513b5f24404a --- /dev/null +++ b/plugins/inputs/logparser/grok/grok_test.go @@ -0,0 +1 @@ +package grok diff --git a/plugins/inputs/logparser/grok/patterns/influx-patterns b/plugins/inputs/logparser/grok/patterns/influx-patterns new file mode 100644 index 0000000000000..dbef1d85b1b3f --- /dev/null +++ b/plugins/inputs/logparser/grok/patterns/influx-patterns @@ -0,0 +1,55 @@ +# Captures are a slightly modified version of logstash "grok" patterns, with +# the format %{[:][:]} +# By default all named captures are converted into string fields. +# Modifiers can be used to convert captures to other types or tags. +# Timestamp modifiers can be used to convert captures to the timestamp of the +# parsed metric. + +# View logstash grok pattern docs here: +# https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html +# All default logstash patterns are supported, these can be viewed here: +# https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns + +# Available modifiers: +# int +# string +# float +# duration (gets converted to int64 nanoseconds) +# tag (converts the field into a tag) +# drop (drops the field completely) +# Timestamp modifiers: +# ts-ansic ("Mon Jan _2 15:04:05 2006") +# ts-unix ("Mon Jan _2 15:04:05 MST 2006") +# ts-ruby ("Mon Jan 02 15:04:05 -0700 2006") +# ts-rfc822 ("02 Jan 06 15:04 MST") +# ts-rfc822z ("02 Jan 06 15:04 -0700") +# ts-rfc850 ("Monday, 02-Jan-06 15:04:05 MST") +# ts-rfc1123 ("Mon, 02 Jan 2006 15:04:05 MST") +# ts-rfc1123z ("Mon, 02 Jan 2006 15:04:05 -0700") +# ts-rfc3339 ("2006-01-02T15:04:05Z07:00") +# ts-rfc3339nano ("2006-01-02T15:04:05.999999999Z07:00") +# ts-httpd ("02/Jan/2006:15:04:05 -0700") +# ts-"CUSTOM" + +# Test log file pattern, test log looks like this: +# [04/Jun/2016:12:41:45 +0100] 1.25 200 192.168.1.1 5.432ms +# Breakdown of the DURATION pattern below: +# NUMBER is a builtin logstash grok pattern matching float & int numbers. +# [nuµm]? is a regex specifying 0 or 1 of the characters within brackets. +# s is also regex, this match must end in "s". +DURATION %{NUMBER}[nuµm]?s +RESPONSE_CODE %{NUMBER:rc:tag} +TESTLOG \[%{HTTPDATE:timestamp:ts-httpd}\] %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{DURATION:response_time:duration} + +# InfluxDB log file patterns +TIMESTAMP %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME} +CLIENT (?:%{IPORHOST}|%{HOSTPORT}|::1) +INFLUXDB_HTTPD_LOG \[httpd\] %{TIMESTAMP} %{CLIENT:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp:ts-httpd}\] %{WORD:httpmethod:tag} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response:tag} (?:%{NUMBER:bytes:int}|-) (?:%{URI:referrer}|-) %{WORD:agent} %{UUID:uuid:drop} %{DURATION:response_time:duration} + +# Log Patterns +COMMONAPACHELOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} \[%{HTTPDATE:timestamp:ts-httpd}\] "(?:%{WORD:verb:tag} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion:float})?|%{DATA})" %{NUMBER:response:int} (?:%{NUMBER:bytes:int}|-) +# Combined Log Format is a general log format common to apache and nginx +COMBINED_LOG_FORMAT %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} +HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel:tag}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:errormsg} +HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{WORD:module}:%{LOGLEVEL:loglevel:tag}\] \[pid %{POSINT:pid:int}:tid %{NUMBER:tid:int}\]( \(%{POSINT:proxy_errorcode:int}\)%{DATA:proxy_errormessage}:)?( \[client %{IPORHOST:client}:%{POSINT:clientport}\])? %{DATA:errorcode}: %{GREEDYDATA:message} +HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG} diff --git a/plugins/inputs/logparser/logparser.go b/plugins/inputs/logparser/logparser.go new file mode 100644 index 0000000000000..791a3afe04c71 --- /dev/null +++ b/plugins/inputs/logparser/logparser.go @@ -0,0 +1,193 @@ +package logparser + +import ( + "fmt" + "log" + "reflect" + "sync" + + "github.com/hpcloud/tail" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal/globpath" + "github.com/influxdata/telegraf/plugins/inputs" + + // Parsers + "github.com/influxdata/telegraf/plugins/inputs/logparser/grok" +) + +type LogParser interface { + ParseLine(line string) (telegraf.Metric, error) + Compile() error +} + +type LogParserPlugin struct { + Files []string + FromBeginning bool + + tailers []*tail.Tail + wg sync.WaitGroup + acc telegraf.Accumulator + + parser LogParser + + sync.Mutex + + GrokParser *grok.Parser `toml:"grok"` +} + +func NewLogParserPlugin() *LogParserPlugin { + return &LogParserPlugin{ + FromBeginning: false, + } +} + +const sampleConfig = ` + ## files to tail. + ## These accept standard unix glob matching rules, but with the addition of + ## ** as a "super asterisk". ie: + ## "/var/log/**.log" -> recursively find all .log files in /var/log + ## "/var/log/*/*.log" -> find all .log files with a parent dir in /var/log + ## "/var/log/apache.log" -> just tail the apache log file + ## + ## See https://github.com/gobwas/glob for more examples + ## + files = ["/var/log/influxdb/influxdb.log"] + ## Read file from beginning. + from_beginning = false + + ## For parsing logstash-style "grok" patterns: + [inputs.logparser.grok] + patterns = ["%{INFLUXDB_HTTPD_LOG}"] + custom_pattern_files = ["$HOME/influx-patterns"] + custom_patterns = ''' + ''' +` + +func (l *LogParserPlugin) SampleConfig() string { + return sampleConfig +} + +func (l *LogParserPlugin) Description() string { + return "Stream and parse log file(s)." +} + +func (l *LogParserPlugin) Gather(acc telegraf.Accumulator) error { + return nil +} + +func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error { + l.Lock() + defer l.Unlock() + + l.acc = acc + + // Looks for fields which implement LogParser interface + s := reflect.ValueOf(l).Elem() + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + + if !f.CanInterface() { + continue + } + + if lpPlugin, ok := f.Interface().(LogParser); ok { + if reflect.ValueOf(lpPlugin).IsNil() { + continue + } + l.parser = lpPlugin + } + } + + if l.parser == nil { + return fmt.Errorf("ERROR: logparser input plugin: no parser defined.") + } + + // compile log parser patterns: + if err := l.parser.Compile(); err != nil { + return err + } + + var seek tail.SeekInfo + if !l.FromBeginning { + seek.Whence = 2 + seek.Offset = 0 + } + + var errS string + // Create a "tailer" for each file + for _, filepath := range l.Files { + g, err := globpath.Compile(filepath) + if err != nil { + log.Printf("ERROR Glob %s failed to compile, %s", filepath, err) + } + for file, _ := range g.Match() { + tailer, err := tail.TailFile(file, + tail.Config{ + ReOpen: true, + Follow: true, + Location: &seek, + }) + if err != nil { + errS += err.Error() + " " + continue + } + // create a goroutine for each "tailer" + l.wg.Add(1) + go l.receiver(tailer) + l.tailers = append(l.tailers, tailer) + } + } + + if errS != "" { + return fmt.Errorf(errS) + } + return nil +} + +// this is launched as a goroutine to continuously watch a tailed logfile +// for changes, parse any incoming msgs, and add to the accumulator. +func (l *LogParserPlugin) receiver(tailer *tail.Tail) { + defer l.wg.Done() + + var m telegraf.Metric + var err error + var line *tail.Line + for line = range tailer.Lines { + if line.Err != nil { + log.Printf("ERROR tailing file %s, Error: %s\n", + tailer.Filename, err) + continue + } + + m, err = l.parser.ParseLine(line.Text) + if err == nil { + if m != nil { + l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) + } + } else { + log.Printf("Malformed log line in %s: [%s], Error: %s\n", + tailer.Filename, line.Text, err) + } + } +} + +func (l *LogParserPlugin) Stop() { + l.Lock() + defer l.Unlock() + + for _, t := range l.tailers { + err := t.Stop() + if err != nil { + log.Printf("ERROR stopping tail on file %s\n", t.Filename) + } + t.Cleanup() + } + l.wg.Wait() +} + +func init() { + inputs.Add("logparser", func() telegraf.Input { + return NewLogParserPlugin() + }) +} diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go new file mode 100644 index 0000000000000..cc77ae1acbd1c --- /dev/null +++ b/plugins/inputs/logparser/logparser_test.go @@ -0,0 +1 @@ +package logparser diff --git a/plugins/inputs/varnish/varnish.go b/plugins/inputs/varnish/varnish.go index 1a3e4c5580162..2b0e84514d1ff 100644 --- a/plugins/inputs/varnish/varnish.go +++ b/plugins/inputs/varnish/varnish.go @@ -12,9 +12,8 @@ import ( "strings" "time" - "github.com/gobwas/glob" - "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -26,7 +25,7 @@ type Varnish struct { Stats []string Binary string - filter glob.Glob + filter filter.Filter run runner } @@ -78,13 +77,13 @@ func (s *Varnish) Gather(acc telegraf.Accumulator) error { if s.filter == nil { var err error if len(s.Stats) == 0 { - s.filter, err = internal.CompileFilter(defaultStats) + s.filter, err = filter.CompileFilter(defaultStats) } else { // legacy support, change "all" -> "*": if s.Stats[0] == "all" { s.Stats[0] = "*" } - s.filter, err = internal.CompileFilter(s.Stats) + s.filter, err = filter.CompileFilter(s.Stats) } if err != nil { return err