Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
added tests around PayloadXmlDecoder for issue #310
Browse files Browse the repository at this point in the history
  • Loading branch information
crankycoder committed Sep 18, 2013
1 parent 2253546 commit 3fa8ea3
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 0 deletions.
79 changes: 79 additions & 0 deletions pipeline/decoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,85 @@ func DecodersSpec(c gospec.Context) {
c.Expect(name, gs.Equals, "some.counter")
})
})
c.Specify("A PayloadXmlDecoder", func() {
decoder := new(PayloadXmlDecoder)
conf := decoder.ConfigStruct().(*PayloadXmlDecoderConfig)
supply := make(chan *PipelinePack, 1)
pack := NewPipelinePack(supply)

c.Specify("decodes simple messages", func() {
xml_data := `<library>
<!-- Great book. -->
<book id="b0836217462" available="true">
<isbn>0836217462</isbn>
<title lang="en">Being a Dog Is a Full-Time Job</title>
<quote>I'd dog paddle the deepest ocean.</quote>
<author id="CMS">
<?echo "go rocks"?>
<name>Charles M Schulz</name>
<born>1922-11-26</born>
<dead>2000-02-12</dead>
</author>
<character id="PP">
<name>Peppermint Patty</name>
<born>1966-08-22</born>
<qualificati>bold, brash and tomboyish</qualificati>
</character>
<character id="Snoopy">
<name>Snoopy</name>
<born>1950-10-04</born>
<qualificati>extroverted beagle</qualificati>
</character>
</book>
</library>`

conf.XPathMapConfig = map[string]string{"Isbn": "library/*/isbn",
"Name": "/library/book/character[born='1950-10-04']/name",
"Patty": "/library/book//node()[@id='PP']/name",
"Title": "//book[author/@id='CMS']/title",
"Comment": "/library/book/preceding::comment()",
}

conf.MessageFields = MessageTemplate{
"Isbn": "%Isbn%",
"Name": "%Name%",
"Patty": "%Patty%",
"Title": "%Title%",
"Comment": "%Comment%",
}
err := decoder.Init(conf)
c.Assume(err, gs.IsNil)
dRunner := NewMockDecoderRunner(ctrl)
decoder.SetDecoderRunner(dRunner)
pack.Message.SetPayload(xml_data)
err = decoder.Decode(pack)
c.Assume(err, gs.IsNil)

var isbn, name, patty, title, comment interface{}
var ok bool

isbn, ok = pack.Message.GetFieldValue("Isbn")
c.Expect(ok, gs.Equals, true)

name, ok = pack.Message.GetFieldValue("Name")
c.Expect(ok, gs.Equals, true)

patty, ok = pack.Message.GetFieldValue("Patty")
c.Expect(ok, gs.Equals, true)

title, ok = pack.Message.GetFieldValue("Title")
c.Expect(ok, gs.Equals, true)

comment, ok = pack.Message.GetFieldValue("Comment")
c.Expect(ok, gs.Equals, true)

c.Expect(isbn, gs.Equals, "0836217462")
c.Expect(name, gs.Equals, "Snoopy")
c.Expect(patty, gs.Equals, "Peppermint Patty")
c.Expect(title, gs.Equals, "Being a Dog Is a Full-Time Job")
c.Expect(comment, gs.Equals, " Great book. ")
})
})

c.Specify("A PayloadRegexDecoder", func() {
decoder := new(PayloadRegexDecoder)
Expand Down
145 changes: 145 additions & 0 deletions pipeline/payloadxml_decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2012
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Victor Ng (vng@mozilla.com)
#
# ***** END LICENSE BLOCK *****/

package pipeline

import (
"fmt"
"github.com/crankycoder/xmlpath"
"strings"
"time"
)

type PayloadXmlDecoderConfig struct {
// Regular expression that describes log line format and capture group
// values.
XPathMapConfig map[string]string `toml:"xpath_map"`

// Maps severity strings to their int version
SeverityMap map[string]int32 `toml:"severity_map"`

// Keyed to the message field that should be filled in, the value will be
// interpolated so it can use capture parts from the message match.
MessageFields MessageTemplate `toml:"message_fields"`

// User specified timestamp layout string, used for parsing a timestamp
// string into an actual time object. If not specified or it fails to
// match, all the default time layout's will be tried.
TimestampLayout string `toml:"timestamp_layout"`

// Time zone in which the timestamps in the text are presumed to be in.
// Should be a location name corresponding to a file in the IANA Time Zone
// database (e.g. "America/Los_Angeles"), as parsed by Go's
// `time.LoadLocation()` function (see
// http://golang.org/pkg/time/#LoadLocation). Defaults to "UTC". Not
// required if valid time zone info is embedded in every parsed timestamp,
// since those can be parsed as specified in the `timestamp_layout`.
TimestampLocation string `toml:"timestamp_location"`
}

type PayloadXmlDecoder struct {
XPathMap map[string]*xmlpath.Path
SeverityMap map[string]int32
MessageFields MessageTemplate
TimestampLayout string
tzLocation *time.Location
dRunner DecoderRunner
}

func (pxd *PayloadXmlDecoder) ConfigStruct() interface{} {
return &PayloadXmlDecoderConfig{
TimestampLayout: "2012-04-23T18:25:43.511Z",
}
}

func (pxd *PayloadXmlDecoder) Init(config interface{}) (err error) {
conf := config.(*PayloadXmlDecoderConfig)

pxd.XPathMap = make(map[string]*xmlpath.Path)
for capture_name, xpath_expr := range conf.XPathMapConfig {
pxd.XPathMap[capture_name] = xmlpath.MustCompile(xpath_expr)
}

pxd.SeverityMap = make(map[string]int32)
pxd.MessageFields = make(MessageTemplate)
if conf.SeverityMap != nil {
for codeString, codeInt := range conf.SeverityMap {
pxd.SeverityMap[codeString] = codeInt
}
}
if conf.MessageFields != nil {
for field, action := range conf.MessageFields {
pxd.MessageFields[field] = action
}
}
pxd.TimestampLayout = conf.TimestampLayout
if pxd.tzLocation, err = time.LoadLocation(conf.TimestampLocation); err != nil {
err = fmt.Errorf("PayloadXmlDecoder unknown timestamp_location '%s': %s",
conf.TimestampLocation, err)
}
return
}

// Heka will call this to give us access to the runner.
func (pxd *PayloadXmlDecoder) SetDecoderRunner(dr DecoderRunner) {
pxd.dRunner = dr
}

// Matches the given string against the XPath and returns the match result
// and captures
func (pxd *PayloadXmlDecoder) match(s string) (captures map[string]string) {
captures = make(map[string]string)

// TODO: move the reader into the PayloadXmlDecoder struct
reader := strings.NewReader(s)

for capture_group, path := range pxd.XPathMap {
root, err := xmlpath.Parse(reader)
if err != nil {
continue
}

// TODO: this
if value, ok := path.String(root); ok {
captures[capture_group] = value
}
// Reset the reader
reader.Seek(0, 0)
}

return
}

// Runs the message payload against decoder's map of JSONPaths. If
// there's a match, the message will be populated based on the
// decoder's message template, with capture values interpolated into
// the message template values.
func (pxd *PayloadXmlDecoder) Decode(pack *PipelinePack) (err error) {
captures := pxd.match(pack.Message.GetPayload())

pdh := &PayloadDecoderHelper{
Captures: captures,
dRunner: pxd.dRunner,
TimestampLayout: pxd.TimestampLayout,
TzLocation: pxd.tzLocation,
SeverityMap: pxd.SeverityMap,
}

pdh.DecodeTimestamp(pack)
pdh.DecodeSeverity(pack)

// Update the new message fields based on the fields we should
// change and the capture parts
return pxd.MessageFields.PopulateMessage(pack.Message, captures)
}

0 comments on commit 3fa8ea3

Please sign in to comment.