diff --git a/samples/omniv2/customfileformats/jsonlog/.snapshots/TestSample b/samples/omniv2/customfileformats/jsonlog/.snapshots/TestSample new file mode 100644 index 0000000..e5b0b98 --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/.snapshots/TestSample @@ -0,0 +1,21 @@ +[ + { + "message": "something is bad", + "severity": "E", + "source": "api", + "timestamp": "2020-09-08T12:34:57.124Z" + }, + { + "message": "something is really bad", + "severity": "F", + "source": "api", + "timestamp": "2020-09-08T12:34:57.125Z" + }, + { + "message": "it is getting better", + "severity": "W", + "source": "api", + "timestamp": "2020-09-08T12:34:59.130Z" + } +] + diff --git a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/jsonlogformat.go b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/jsonlogformat.go new file mode 100644 index 0000000..1e40370 --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/jsonlogformat.go @@ -0,0 +1,53 @@ +package jsonlogformat + +import ( + "fmt" + "io" + + "github.com/antchfx/xpath" + + "github.com/jf-tech/omniparser/omniparser/errs" + omniv2fileformat "github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2/fileformat" + "github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2/transform" + "github.com/jf-tech/omniparser/strs" +) + +const ( + FileFormatJSONLog = "jsonlog" +) + +type jsonLogFileFormat struct { + schemaName string +} + +func NewJSONLogFileFormat(schemaName string) omniv2fileformat.FileFormat { + return &jsonLogFileFormat{schemaName: schemaName} +} + +func (p *jsonLogFileFormat) ValidateSchema( + format string, _ []byte, finalOutputDecl *transform.Decl) (interface{}, error) { + if format != FileFormatJSONLog { + return nil, errs.ErrSchemaNotSupported + } + if finalOutputDecl == nil { + return nil, p.FmtErr("'FINAL_OUTPUT' decl is nil") + } + if !strs.IsStrPtrNonBlank(finalOutputDecl.XPath) { + return nil, p.FmtErr("'FINAL_OUTPUT' must have 'xpath' specified") + } + _, err := xpath.Compile(*finalOutputDecl.XPath) + if err != nil { + return nil, p.FmtErr("'xpath' on 'FINAL_OUTPUT' (value: '%s') is invalid, err: %s", + *finalOutputDecl.XPath, err.Error()) + } + return *finalOutputDecl.XPath, nil +} + +func (p *jsonLogFileFormat) CreateFormatReader( + name string, r io.Reader, runtime interface{}) (omniv2fileformat.FormatReader, error) { + return NewReader(name, r, runtime.(string)) +} + +func (p *jsonLogFileFormat) FmtErr(format string, args ...interface{}) error { + return fmt.Errorf("schema '%s': %s", p.schemaName, fmt.Sprintf(format, args...)) +} diff --git a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go new file mode 100644 index 0000000..b73fbd0 --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go @@ -0,0 +1,140 @@ +package jsonlogformat + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "io" + "sort" + "strconv" + + node "github.com/antchfx/xmlquery" + "github.com/antchfx/xpath" + "github.com/jf-tech/iohelper" + + "github.com/jf-tech/omniparser/omniparser/errs" + "github.com/jf-tech/omniparser/strs" +) + +// ErrLogReadingFailed indicates the reader fails to read out a complete non-corrupted +// log line. This is a fatal, non-continuable error. +type ErrLogReadingFailed string + +func (e ErrLogReadingFailed) Error() string { return string(e) } + +// IsErrLogReadingFailed checks if an err is of ErrLogReadingFailed type. +func IsErrLogReadingFailed(err error) bool { + switch err.(type) { + case ErrLogReadingFailed: + return true + default: + return false + } +} + +type reader struct { + inputName string + r *bufio.Reader + line int + filter *xpath.Expr +} + +func (r *reader) Read() (*node.Node, error) { + for { + r.line++ + l, err := iohelper.ReadLine(r.r) + if err == io.EOF { + return nil, errs.ErrEOF + } + if err != nil { + // If we fail to read a log line out (permission issue, disk issue, whatever) + // there is really no point to continue anymore, thus wrap the error in this + // non-continuable error ErrLogReadingFailed. + return nil, ErrLogReadingFailed(r.fmtErrStr(err.Error())) + } + if !strs.IsStrNonBlank(l) { + continue + } + n, err := parseJSON([]byte(l)) + if err != nil { + // If we read out a log line fine, but unable to parse it, that shouldn't be + // a fatal error, thus not wrapping the error in non-continuable error + // ErrLogReadingFailed. + return nil, r.FmtErr(err.Error()) + } + // Now we test this log-line-translated node (and its subtree) against the filter, + // if no match, then we'll move onto the next line. + if node.QuerySelector(n, r.filter) == nil { + continue + } + return n, nil + } +} + +func parseJSONValue(x interface{}, parent *node.Node) { + switch v := x.(type) { + case []interface{}: + for _, vv := range v { + n := &node.Node{Type: node.ElementNode} + node.AddChild(parent, n) + parseJSONValue(vv, n) + } + case map[string]interface{}: + var keys []string + for k, _ := range v { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + n := &node.Node{Data: key, Type: node.ElementNode} + node.AddChild(parent, n) + parseJSONValue(v[key], n) + } + case string: + node.AddChild(parent, &node.Node{Data: v, Type: node.TextNode}) + case float64: + // The format fmt with 'f' means (-ddd.dddd, no exponent), + // The special precision -1 uses the smallest number of digits + s := strconv.FormatFloat(v, 'f', -1, 64) + node.AddChild(parent, &node.Node{Data: s, Type: node.TextNode}) + case bool: + s := strconv.FormatBool(v) + node.AddChild(parent, &node.Node{Data: s, Type: node.TextNode}) + } +} + +func parseJSON(b []byte) (*node.Node, error) { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return nil, err + } + doc := &node.Node{Type: node.DocumentNode} + parseJSONValue(v, doc) + return doc, nil +} + +func (r *reader) IsContinuableError(err error) bool { + return !IsErrLogReadingFailed(err) && err != errs.ErrEOF +} + +func (r *reader) FmtErr(format string, args ...interface{}) error { + return errors.New(r.fmtErrStr(format, args...)) +} + +func (r *reader) fmtErrStr(format string, args ...interface{}) string { + return fmt.Sprintf("input '%s' line %d: %s", r.inputName, r.line, fmt.Sprintf(format, args...)) +} + +func NewReader(inputName string, src io.Reader, filterXPath string) (*reader, error) { + filter, err := xpath.Compile(filterXPath) + if err != nil { + return nil, err + } + return &reader{ + inputName: inputName, + r: bufio.NewReader(src), + line: 0, + filter: filter, + }, nil +} diff --git a/samples/omniv2/customfileformats/jsonlog/sample.log b/samples/omniv2/customfileformats/jsonlog/sample.log new file mode 100644 index 0000000..9a6eef5 --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/sample.log @@ -0,0 +1,10 @@ +{"timestamp":"2020-09-08T12:34:56.123Z", "severity":"INFO", "source":"balancer", "message":"balancer is happy"} + +{"timestamp":"2020-09-08T12:34:57.123Z", "severity":"INFO", "source":"api", "message":"api is happy"} +{"timestamp":"2020-09-08T12:34:57.124Z", "severity":"ERROR", "source":"api", "message":"something is bad"} +{"timestamp":"2020-09-08T12:34:57.125Z", "severity":"CRITICAL", "source":"api", "message":"something is really bad"} +{"timestamp":"2020-09-08T12:34:58.321Z", "severity":"WARNING", "source":"balancer", "message":"balancer senses some trouble"} + +{"timestamp":"2020-09-08T12:34:59.130Z", "severity":"WARNING", "source":"api", "message":"it is getting better"} +{"timestamp":"2020-09-08T12:34:59.323Z", "severity":"INFO", "source":"api", "message":"api is happy"} +{"timestamp":"2020-09-08T12:34:59.623Z", "severity":"INFO", "source":"balancer", "message":"balancer is happy"} diff --git a/samples/omniv2/customfileformats/jsonlog/sample_schema.json b/samples/omniv2/customfileformats/jsonlog/sample_schema.json new file mode 100644 index 0000000..3c37506 --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/sample_schema.json @@ -0,0 +1,17 @@ +{ + "parser_settings": { + "version": "omni.2.0", + "file_format_type": "jsonlog" + }, + "transform_declarations": { + "FINAL_OUTPUT": { "xpath": ".[(severity='WARNING' or severity='ERROR' or severity='CRITICAL') and source='api']", "object": { + "timestamp": { "xpath": "timestamp" }, + "source": { "const": "api" }, + "severity": { "custom_func": { + "name": "normalize_severity", + "args": [{ "xpath": "severity" }] + }}, + "message": { "xpath": "message" } + }} + } +} diff --git a/samples/omniv2/customfileformats/jsonlog/sample_test.go b/samples/omniv2/customfileformats/jsonlog/sample_test.go new file mode 100644 index 0000000..1d2757d --- /dev/null +++ b/samples/omniv2/customfileformats/jsonlog/sample_test.go @@ -0,0 +1,80 @@ +package jsonlog + +import ( + "os" + "path/filepath" + "strings" + "testing" + + "github.com/bradleyjkemp/cupaloy" + "github.com/stretchr/testify/assert" + + "github.com/jf-tech/omniparser/jsons" + "github.com/jf-tech/omniparser/omniparser" + "github.com/jf-tech/omniparser/omniparser/customfuncs" + omniv2 "github.com/jf-tech/omniparser/omniparser/schemaplugin/omni/v2" + "github.com/jf-tech/omniparser/omniparser/transformctx" + "github.com/jf-tech/omniparser/samples/omniv2/customfileformats/jsonlog/jsonlogformat" +) + +func normalizeSeverity(_ *transformctx.Ctx, sev string) (string, error) { + switch strings.ToUpper(sev) { + case "DEBUG": + return "D", nil + case "INFO": + return "I", nil + case "WARNING": + return "W", nil + case "ERROR": + return "E", nil + case "CRITICAL": + return "F", nil + default: + return "?", nil + } +} + +func TestSample(t *testing.T) { + schemaFile := "./sample_schema.json" + schemaFileBaseName := filepath.Base(schemaFile) + schemaFileReader, err := os.Open(schemaFile) + assert.NoError(t, err) + defer schemaFileReader.Close() + + inputFile := "./sample.log" + inputFileBaseName := filepath.Base(inputFile) + inputFileReader, err := os.Open(inputFile) + assert.NoError(t, err) + defer inputFileReader.Close() + + parser, err := omniparser.NewParser( + schemaFileBaseName, + schemaFileReader, + // Use this SchemaPluginConfig to effectively replace the + // builtin omniv2 schema plugin (and its builtin fileformats) + // with, well, omniv2 schema plugin :) with our own custom + // fileformat. Also let's demo how to add a new custom func. + omniparser.SchemaPluginConfig{ + ParseSchema: omniv2.ParseSchema, + PluginParams: &omniv2.PluginParams{ + CustomFileFormat: jsonlogformat.NewJSONLogFileFormat(schemaFileBaseName), + }, + CustomFuncs: customfuncs.CustomFuncs{ + "normalize_severity": normalizeSeverity, + }, + }) + assert.NoError(t, err) + op, err := parser.GetTransformOp( + inputFileBaseName, + inputFileReader, + &transformctx.Ctx{}) + assert.NoError(t, err) + + var records []string + for op.Next() { + recordBytes, err := op.Read() + assert.NoError(t, err) + records = append(records, string(recordBytes)) + } + cupaloy.SnapshotT(t, jsons.BPJ("["+strings.Join(records, ",")+"]")) +}