Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions samples/omniv2/customfileformats/jsonlog/.snapshots/TestSample
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[
{
"message": "something is bad",
"severity": "E",
"source": "api",
"timestamp": "2020-09-08T12:34:57.124Z"
},
{
"message": "something is really bad",
"severity": "F",
"source": "api",
"timestamp": "2020-09-08T12:34:57.125Z"
},
{
"message": "it is getting better",
"severity": "W",
"source": "api",
"timestamp": "2020-09-08T12:34:59.130Z"
}
]

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package jsonlogformat

import (
"fmt"
"io"

"github.com/antchfx/xpath"

"github.com/jf-tech/omniparser/omniparser/errs"
omniv2fileformat "github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2/fileformat"
"github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2/transform"
"github.com/jf-tech/omniparser/strs"
)

const (
FileFormatJSONLog = "jsonlog"
)

type jsonLogFileFormat struct {
schemaName string
}

func NewJSONLogFileFormat(schemaName string) omniv2fileformat.FileFormat {
return &jsonLogFileFormat{schemaName: schemaName}
}

func (p *jsonLogFileFormat) ValidateSchema(
format string, _ []byte, finalOutputDecl *transform.Decl) (interface{}, error) {
if format != FileFormatJSONLog {
return nil, errs.ErrSchemaNotSupported
}
if finalOutputDecl == nil {
return nil, p.FmtErr("'FINAL_OUTPUT' decl is nil")
}
if !strs.IsStrPtrNonBlank(finalOutputDecl.XPath) {
return nil, p.FmtErr("'FINAL_OUTPUT' must have 'xpath' specified")
}
_, err := xpath.Compile(*finalOutputDecl.XPath)
if err != nil {
return nil, p.FmtErr("'xpath' on 'FINAL_OUTPUT' (value: '%s') is invalid, err: %s",
*finalOutputDecl.XPath, err.Error())
}
return *finalOutputDecl.XPath, nil
}

func (p *jsonLogFileFormat) CreateFormatReader(
name string, r io.Reader, runtime interface{}) (omniv2fileformat.FormatReader, error) {
return NewReader(name, r, runtime.(string))
}

func (p *jsonLogFileFormat) FmtErr(format string, args ...interface{}) error {
return fmt.Errorf("schema '%s': %s", p.schemaName, fmt.Sprintf(format, args...))
}
140 changes: 140 additions & 0 deletions samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package jsonlogformat

import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"sort"
"strconv"

node "github.com/antchfx/xmlquery"
"github.com/antchfx/xpath"
"github.com/jf-tech/iohelper"

"github.com/jf-tech/omniparser/omniparser/errs"
"github.com/jf-tech/omniparser/strs"
)

// ErrLogReadingFailed indicates the reader fails to read out a complete non-corrupted
// log line. This is a fatal, non-continuable error.
type ErrLogReadingFailed string

func (e ErrLogReadingFailed) Error() string { return string(e) }

// IsErrLogReadingFailed checks if an err is of ErrLogReadingFailed type.
func IsErrLogReadingFailed(err error) bool {
switch err.(type) {
case ErrLogReadingFailed:
return true
default:
return false
}
}

type reader struct {
inputName string
r *bufio.Reader
line int
filter *xpath.Expr
}

func (r *reader) Read() (*node.Node, error) {
for {
r.line++
l, err := iohelper.ReadLine(r.r)
if err == io.EOF {
return nil, errs.ErrEOF
}
if err != nil {
// If we fail to read a log line out (permission issue, disk issue, whatever)
// there is really no point to continue anymore, thus wrap the error in this
// non-continuable error ErrLogReadingFailed.
return nil, ErrLogReadingFailed(r.fmtErrStr(err.Error()))
}
if !strs.IsStrNonBlank(l) {
continue
}
n, err := parseJSON([]byte(l))
if err != nil {
// If we read out a log line fine, but unable to parse it, that shouldn't be
// a fatal error, thus not wrapping the error in non-continuable error
// ErrLogReadingFailed.
return nil, r.FmtErr(err.Error())
}
// Now we test this log-line-translated node (and its subtree) against the filter,
// if no match, then we'll move onto the next line.
if node.QuerySelector(n, r.filter) == nil {
continue
}
return n, nil
}
}

func parseJSONValue(x interface{}, parent *node.Node) {
switch v := x.(type) {
case []interface{}:
for _, vv := range v {
n := &node.Node{Type: node.ElementNode}
node.AddChild(parent, n)
parseJSONValue(vv, n)
}
case map[string]interface{}:
var keys []string
for k, _ := range v {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
n := &node.Node{Data: key, Type: node.ElementNode}
node.AddChild(parent, n)
parseJSONValue(v[key], n)
}
case string:
node.AddChild(parent, &node.Node{Data: v, Type: node.TextNode})
case float64:
// The format fmt with 'f' means (-ddd.dddd, no exponent),
// The special precision -1 uses the smallest number of digits
s := strconv.FormatFloat(v, 'f', -1, 64)
node.AddChild(parent, &node.Node{Data: s, Type: node.TextNode})
case bool:
s := strconv.FormatBool(v)
node.AddChild(parent, &node.Node{Data: s, Type: node.TextNode})
}
}

func parseJSON(b []byte) (*node.Node, error) {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return nil, err
}
doc := &node.Node{Type: node.DocumentNode}
parseJSONValue(v, doc)
return doc, nil
}

func (r *reader) IsContinuableError(err error) bool {
return !IsErrLogReadingFailed(err) && err != errs.ErrEOF
}

func (r *reader) FmtErr(format string, args ...interface{}) error {
return errors.New(r.fmtErrStr(format, args...))
}

func (r *reader) fmtErrStr(format string, args ...interface{}) string {
return fmt.Sprintf("input '%s' line %d: %s", r.inputName, r.line, fmt.Sprintf(format, args...))
}

func NewReader(inputName string, src io.Reader, filterXPath string) (*reader, error) {
filter, err := xpath.Compile(filterXPath)
if err != nil {
return nil, err
}
return &reader{
inputName: inputName,
r: bufio.NewReader(src),
line: 0,
filter: filter,
}, nil
}
10 changes: 10 additions & 0 deletions samples/omniv2/customfileformats/jsonlog/sample.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"timestamp":"2020-09-08T12:34:56.123Z", "severity":"INFO", "source":"balancer", "message":"balancer is happy"}

{"timestamp":"2020-09-08T12:34:57.123Z", "severity":"INFO", "source":"api", "message":"api is happy"}
{"timestamp":"2020-09-08T12:34:57.124Z", "severity":"ERROR", "source":"api", "message":"something is bad"}
{"timestamp":"2020-09-08T12:34:57.125Z", "severity":"CRITICAL", "source":"api", "message":"something is really bad"}
{"timestamp":"2020-09-08T12:34:58.321Z", "severity":"WARNING", "source":"balancer", "message":"balancer senses some trouble"}

{"timestamp":"2020-09-08T12:34:59.130Z", "severity":"WARNING", "source":"api", "message":"it is getting better"}
{"timestamp":"2020-09-08T12:34:59.323Z", "severity":"INFO", "source":"api", "message":"api is happy"}
{"timestamp":"2020-09-08T12:34:59.623Z", "severity":"INFO", "source":"balancer", "message":"balancer is happy"}
17 changes: 17 additions & 0 deletions samples/omniv2/customfileformats/jsonlog/sample_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"parser_settings": {
"version": "omni.2.0",
"file_format_type": "jsonlog"
},
"transform_declarations": {
"FINAL_OUTPUT": { "xpath": ".[(severity='WARNING' or severity='ERROR' or severity='CRITICAL') and source='api']", "object": {
"timestamp": { "xpath": "timestamp" },
"source": { "const": "api" },
"severity": { "custom_func": {
"name": "normalize_severity",
"args": [{ "xpath": "severity" }]
}},
"message": { "xpath": "message" }
}}
}
}
80 changes: 80 additions & 0 deletions samples/omniv2/customfileformats/jsonlog/sample_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package jsonlog

import (
"os"
"path/filepath"
"strings"
"testing"

"github.com/bradleyjkemp/cupaloy"
"github.com/stretchr/testify/assert"

"github.com/jf-tech/omniparser/jsons"
"github.com/jf-tech/omniparser/omniparser"
"github.com/jf-tech/omniparser/omniparser/customfuncs"
omniv2 "github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2"
"github.com/jf-tech/omniparser/omniparser/transformctx"
"github.com/jf-tech/omniparser/samples/omniv2/customfileformats/jsonlog/jsonlogformat"
)

func normalizeSeverity(_ *transformctx.Ctx, sev string) (string, error) {
switch strings.ToUpper(sev) {
case "DEBUG":
return "D", nil
case "INFO":
return "I", nil
case "WARNING":
return "W", nil
case "ERROR":
return "E", nil
case "CRITICAL":
return "F", nil
default:
return "?", nil
}
}

func TestSample(t *testing.T) {
schemaFile := "./sample_schema.json"
schemaFileBaseName := filepath.Base(schemaFile)
schemaFileReader, err := os.Open(schemaFile)
assert.NoError(t, err)
defer schemaFileReader.Close()

inputFile := "./sample.log"
inputFileBaseName := filepath.Base(inputFile)
inputFileReader, err := os.Open(inputFile)
assert.NoError(t, err)
defer inputFileReader.Close()

parser, err := omniparser.NewParser(
schemaFileBaseName,
schemaFileReader,
// Use this SchemaPluginConfig to effectively replace the
// builtin omniv2 schema plugin (and its builtin fileformats)
// with, well, omniv2 schema plugin :) with our own custom
// fileformat. Also let's demo how to add a new custom func.
omniparser.SchemaPluginConfig{
ParseSchema: omniv2.ParseSchema,
PluginParams: &omniv2.PluginParams{
CustomFileFormat: jsonlogformat.NewJSONLogFileFormat(schemaFileBaseName),
},
CustomFuncs: customfuncs.CustomFuncs{
"normalize_severity": normalizeSeverity,
},
})
assert.NoError(t, err)
op, err := parser.GetTransformOp(
inputFileBaseName,
inputFileReader,
&transformctx.Ctx{})
assert.NoError(t, err)

var records []string
for op.Next() {
recordBytes, err := op.Read()
assert.NoError(t, err)
records = append(records, string(recordBytes))
}
cupaloy.SnapshotT(t, jsons.BPJ("["+strings.Join(records, ",")+"]"))
}