Skip to content

Commit

Permalink
Internally name all patterns for log parsing flexibility
Browse files Browse the repository at this point in the history
closes #1436

This also fixes the bad behavior of waiting until runtime to return log
parsing pattern compile errors when a pattern was simply unfound.

closes #1418

Also protect against user error when the telegraf user does not have
permission to open the provided file. We will now error and exit in this
case, rather than silently waiting to get permission to open it.
  • Loading branch information
sparrc committed Jul 18, 2016
1 parent 281a4d5 commit dabb6f5
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 26 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ should now look like:
- [#1460](https://github.com/influxdata/telegraf/issues/1460): Remove PID from procstat plugin to fix cardinality issues.
- [#1427](https://github.com/influxdata/telegraf/issues/1427): Cassandra input: version 2.x "column family" fix.
- [#1463](https://github.com/influxdata/telegraf/issues/1463): Shared WaitGroup in Exec plugin
- [#1436](https://github.com/influxdata/telegraf/issues/1436): logparser: honor modifiers in "pattern" config.
- [#1418](https://github.com/influxdata/telegraf/issues/1418): logparser: error and exit on file permissions/missing errors.

## v1.0 beta 2 [2016-06-21]

Expand Down
22 changes: 19 additions & 3 deletions plugins/inputs/logparser/grok/grok.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ var (
)

type Parser struct {
Patterns []string
Patterns []string
// namedPatterns is a list of internally-assigned names to the patterns
// specified by the user in Patterns.
// They will look like:
// GROK_INTERNAL_PATTERN_0, GROK_INTERNAL_PATTERN_1, etc.
namedPatterns []string
CustomPatterns string
CustomPatternFiles []string
Measurement string
Expand Down Expand Up @@ -98,13 +103,24 @@ func (p *Parser) Compile() error {
return err
}

p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
// Give Patterns fake names so that they can be treated as named
// "custom patterns"
p.namedPatterns = make([]string, len(p.Patterns))
for i, pattern := range p.Patterns {
name := fmt.Sprintf("GROK_INTERNAL_PATTERN_%d", i)
p.CustomPatterns += "\n" + name + " " + pattern + "\n"
p.namedPatterns[i] = "%{" + name + "}"
}

// Combine user-supplied CustomPatterns with DEFAULT_PATTERNS and parse
// them together as the same type of pattern.
p.CustomPatterns = DEFAULT_PATTERNS + p.CustomPatterns
if len(p.CustomPatterns) != 0 {
scanner := bufio.NewScanner(strings.NewReader(p.CustomPatterns))
p.addCustomPatterns(scanner)
}

// Parse any custom pattern files supplied.
for _, filename := range p.CustomPatternFiles {
file, err := os.Open(filename)
if err != nil {
Expand All @@ -127,7 +143,7 @@ func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
var values map[string]string
// the matching pattern string
var patternName string
for _, pattern := range p.Patterns {
for _, pattern := range p.namedPatterns {
if values, err = p.g.Parse(pattern, line); err != nil {
return nil, err
}
Expand Down
39 changes: 37 additions & 2 deletions plugins/inputs/logparser/grok/grok_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func TestBuiltinCombinedLogFormat(t *testing.T) {

func TestCompileStringAndParse(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
Patterns: []string{"%{TEST_LOG_A}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
Expand All @@ -230,6 +230,41 @@ func TestCompileStringAndParse(t *testing.T) {
assert.Equal(t, map[string]string{"response_code": "200"}, metricA.Tags())
}

func TestCompileErrorsOnInvalidPattern(t *testing.T) {
p := &Parser{
Patterns: []string{"%{TEST_LOG_A}", "%{TEST_LOG_B}"},
CustomPatterns: `
DURATION %{NUMBER}[nuµm]?s
RESPONSE_CODE %{NUMBER:response_code:tag}
RESPONSE_TIME %{DURATION:response_time:duration}
TEST_LOG_A %{NUMBER:myfloat:float} %{RESPONSE_CODE} %{IPORHOST:clientip} %{RESPONSE_TIME}
`,
}
assert.Error(t, p.Compile())

metricA, _ := p.ParseLine(`1.25 200 192.168.1.1 5.432µs`)
require.Nil(t, metricA)
}

func TestParsePatternsWithoutCustom(t *testing.T) {
p := &Parser{
Patterns: []string{"%{POSINT:ts:ts-epochnano} response_time=%{POSINT:response_time:int} mymetric=%{NUMBER:metric:float}"},
}
assert.NoError(t, p.Compile())

metricA, err := p.ParseLine(`1466004605359052000 response_time=20821 mymetric=10890.645`)
require.NotNil(t, metricA)
assert.NoError(t, err)
assert.Equal(t,
map[string]interface{}{
"response_time": int64(20821),
"metric": float64(10890.645),
},
metricA.Fields())
assert.Equal(t, map[string]string{}, metricA.Tags())
assert.Equal(t, time.Unix(0, 1466004605359052000), metricA.Time())
}

func TestParseEpochNano(t *testing.T) {
p := &Parser{
Patterns: []string{"%{MYAPP}"},
Expand Down Expand Up @@ -413,7 +448,7 @@ func TestParseErrors(t *testing.T) {
TEST_LOG_A %{HTTPDATE:ts:ts-httpd} %{WORD:myword:int} %{}
`,
}
assert.NoError(t, p.Compile())
assert.Error(t, p.Compile())
_, err := p.ParseLine(`[04/Jun/2016:12:41:45 +0100] notnumber 200 192.168.1.1 5.432µs 101`)
assert.Error(t, err)

Expand Down
33 changes: 17 additions & 16 deletions plugins/inputs/logparser/logparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/hpcloud/tail"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/globpath"
"github.com/influxdata/telegraf/plugins/inputs"

Expand Down Expand Up @@ -110,11 +111,15 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
}

// compile log parser patterns:
errChan := errchan.New(len(l.parsers))
for _, parser := range l.parsers {
if err := parser.Compile(); err != nil {
return err
errChan.C <- err
}
}
if err := errChan.Error(); err != nil {
return err
}

var seek tail.SeekInfo
if !l.FromBeginning {
Expand All @@ -125,35 +130,33 @@ func (l *LogParserPlugin) Start(acc telegraf.Accumulator) error {
l.wg.Add(1)
go l.parser()

var errS string
// Create a "tailer" for each file
for _, filepath := range l.Files {
g, err := globpath.Compile(filepath)
if err != nil {
log.Printf("ERROR Glob %s failed to compile, %s", filepath, err)
continue
}
for file, _ := range g.Match() {
files := g.Match()
errChan = errchan.New(len(files))
for file, _ := range files {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "
continue
}
errChan.C <- err

// create a goroutine for each "tailer"
l.wg.Add(1)
go l.receiver(tailer)
l.tailers = append(l.tailers, tailer)
}
}

if errS != "" {
return fmt.Errorf(errS)
}
return nil
return errChan.Error()
}

// receiver is launched as a goroutine to continuously watch a tailed logfile
Expand Down Expand Up @@ -201,8 +204,6 @@ func (l *LogParserPlugin) parser() {
if m != nil {
l.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
}
} else {
log.Printf("Malformed log line in [%s], Error: %s\n", line, err)
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions plugins/inputs/logparser/logparser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
}

acc := testutil.Accumulator{}
assert.NoError(t, logparser.Start(&acc))
assert.Error(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
logparser.Stop()
Expand Down Expand Up @@ -80,6 +80,8 @@ func TestGrokParseLogFiles(t *testing.T) {
map[string]string{})
}

// Test that test_a.log line gets parsed even though we don't have the correct
// pattern available for test_b.log
func TestGrokParseLogFilesOneBad(t *testing.T) {
thisdir := getCurrentDir()
p := &grok.Parser{
Expand All @@ -90,11 +92,12 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {

logparser := &LogParserPlugin{
FromBeginning: true,
Files: []string{thisdir + "grok/testdata/*.log"},
Files: []string{thisdir + "grok/testdata/test_a.log"},
GrokParser: p,
}

acc := testutil.Accumulator{}
acc.SetDebug(true)
assert.NoError(t, logparser.Start(&acc))

time.Sleep(time.Millisecond * 500)
Expand Down
7 changes: 4 additions & 3 deletions plugins/inputs/tail/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
for file, _ := range g.Match() {
tailer, err := tail.TailFile(file,
tail.Config{
ReOpen: true,
Follow: true,
Location: &seek,
ReOpen: true,
Follow: true,
Location: &seek,
MustExist: true,
})
if err != nil {
errS += err.Error() + " "
Expand Down

0 comments on commit dabb6f5

Please sign in to comment.