From 3ecc4e1cf677b4713529d4b8a73050ec47350684 Mon Sep 17 00:00:00 2001 From: jf-tech Date: Fri, 9 Oct 2020 18:10:39 +1300 Subject: [PATCH 1/4] Adding CSV reader. The key diff/improvement is now we add xpath to the csv reader so that we can perform both positive row selection as well as negative row skipping. Much more powerful than the old row skipping thingy. Next PR will bring the FileFormat implementation in, which will complete the csv fileformat. --- errs/errs.go | 4 - errs/errs_test.go | 3 +- handlers/omni/v2/fileformat/csv/decl.go | 25 +++ handlers/omni/v2/fileformat/csv/decl_test.go | 13 ++ handlers/omni/v2/fileformat/csv/reader.go | 156 +++++++++++++++ .../omni/v2/fileformat/csv/reader_test.go | 179 ++++++++++++++++++ handlers/omni/v2/fileformat/fileformat.go | 2 +- .../omni/v2/fileformat/json/format_test.go | 3 +- handlers/omni/v2/fileformat/json/reader.go | 7 +- .../omni/v2/fileformat/json/reader_test.go | 5 +- .../omni/v2/fileformat/xml/format_test.go | 3 +- handlers/omni/v2/fileformat/xml/reader.go | 7 +- .../omni/v2/fileformat/xml/reader_test.go | 5 +- handlers/omni/v2/ingester_test.go | 3 +- .../jsonlog/jsonlogformat/reader.go | 7 +- transform.go | 4 +- transform_test.go | 7 +- 17 files changed, 404 insertions(+), 29 deletions(-) create mode 100644 handlers/omni/v2/fileformat/csv/decl.go create mode 100644 handlers/omni/v2/fileformat/csv/decl_test.go create mode 100644 handlers/omni/v2/fileformat/csv/reader.go create mode 100644 handlers/omni/v2/fileformat/csv/reader_test.go diff --git a/errs/errs.go b/errs/errs.go index 6bdfb30..1533037 100644 --- a/errs/errs.go +++ b/errs/errs.go @@ -2,15 +2,11 @@ package errs import ( "errors" - "io" ) // ErrSchemaNotSupported indicates a schema is not supported by a handler. var ErrSchemaNotSupported = errors.New("schema not supported") -// ErrEOF indicates the end of input stream has reached. -var ErrEOF = io.EOF - // ErrTransformFailed indicates a particular record transform has failed. In general // this isn't fatal, and processing can continue. type ErrTransformFailed string diff --git a/errs/errs_test.go b/errs/errs_test.go index 8f916ed..b659507 100644 --- a/errs/errs_test.go +++ b/errs/errs_test.go @@ -1,6 +1,7 @@ package errs import ( + "io" "testing" "github.com/stretchr/testify/assert" @@ -9,5 +10,5 @@ import ( func TestIsErrTransformFailed(t *testing.T) { assert.True(t, IsErrTransformFailed(ErrTransformFailed("test"))) assert.Equal(t, "test", ErrTransformFailed("test").Error()) - assert.False(t, IsErrTransformFailed(ErrEOF)) + assert.False(t, IsErrTransformFailed(io.EOF)) } diff --git a/handlers/omni/v2/fileformat/csv/decl.go b/handlers/omni/v2/fileformat/csv/decl.go new file mode 100644 index 0000000..1622a5c --- /dev/null +++ b/handlers/omni/v2/fileformat/csv/decl.go @@ -0,0 +1,25 @@ +package omniv2csv + +import ( + "github.com/jf-tech/go-corelib/strs" +) + +type column struct { + Name string `json:"name"` + // If the CSV column 'name' contains characters (such as space, or special letters) that are + // not suitable for *idr.Node construction and xpath query, this gives schema writer an + // alternate way to name/label the column. Optional. + Alias *string `json:"alias"` +} + +func (c column) name() string { + return strs.StrPtrOrElse(c.Alias, c.Name) +} + +type fileDecl struct { + Delimiter string `json:"delimiter"` + ReplaceDoubleQuotes bool `json:"replace_double_quotes"` + HeaderRowIndex *int `json:"header_row_index"` + DataRowIndex int `json:"data_row_index"` + Columns []column `json:"columns"` +} diff --git a/handlers/omni/v2/fileformat/csv/decl_test.go b/handlers/omni/v2/fileformat/csv/decl_test.go new file mode 100644 index 0000000..b6ff812 --- /dev/null +++ b/handlers/omni/v2/fileformat/csv/decl_test.go @@ -0,0 +1,13 @@ +package omniv2csv + +import ( + "testing" + + "github.com/jf-tech/go-corelib/strs" + "github.com/stretchr/testify/assert" +) + +func TestColumnName(t *testing.T) { + assert.Equal(t, "name", column{Name: "name"}.name()) + assert.Equal(t, "alias", column{Name: "name", Alias: strs.StrPtr("alias")}.name()) +} diff --git a/handlers/omni/v2/fileformat/csv/reader.go b/handlers/omni/v2/fileformat/csv/reader.go new file mode 100644 index 0000000..fea3e7d --- /dev/null +++ b/handlers/omni/v2/fileformat/csv/reader.go @@ -0,0 +1,156 @@ +package omniv2csv + +import ( + "errors" + "fmt" + "io" + "strings" + + "github.com/antchfx/xpath" + "github.com/jf-tech/go-corelib/caches" + "github.com/jf-tech/go-corelib/ios" + "github.com/jf-tech/go-corelib/maths" + + "github.com/jf-tech/omniparser/idr" +) + +// ErrInvalidHeader indicates the header of the CSV input is corrupted, mismatched, or simply +// unreadable. This is a fatal, non-continuable error. +type ErrInvalidHeader string + +func (e ErrInvalidHeader) Error() string { return string(e) } + +// IsErrInvalidHeader checks if an err is of ErrInvalidHeader type. +func IsErrInvalidHeader(err error) bool { + switch err.(type) { + case ErrInvalidHeader: + return true + default: + return false + } +} + +type reader struct { + inputName string + decl *fileDecl + xpath *xpath.Expr + r *ios.LineNumReportingCsvReader + headerChecked bool +} + +func (r *reader) Read() (*idr.Node, error) { + if !r.headerChecked { + err := r.checkHeader() + r.headerChecked = true + if err != nil { + return nil, err + } + } +read: + record, err := r.r.Read() + if err == io.EOF { + return nil, io.EOF + } + if err != nil { + return nil, r.FmtErr("failed to fetch record: %s", err.Error()) + } + n := r.recordToNode(record) + if r.xpath != nil && !idr.MatchAny(n, r.xpath) { + goto read + } + return n, nil +} + +func (r *reader) checkHeader() error { + if r.decl.HeaderRowIndex == nil { + _ = r.jumpTo(r.decl.DataRowIndex - 1) + return nil + } + err := r.jumpTo(*r.decl.HeaderRowIndex - 1) + if err == io.EOF { + return ErrInvalidHeader(r.fmtErrStr("unable to read header: %s", err.Error())) + } + header, err := r.r.Read() + if err != nil { + return ErrInvalidHeader(r.fmtErrStr("unable to read header: %s", err.Error())) + } + if len(header) < len(r.decl.Columns) { + return ErrInvalidHeader(r.fmtErrStr( + "actual header column size (%d) is less than the size (%d) declared in file_declaration.columns in schema", + len(header), len(r.decl.Columns))) + } + for index, column := range r.decl.Columns { + if strings.TrimSpace(header[index]) != strings.TrimSpace(column.Name) { + return ErrInvalidHeader(r.fmtErrStr( + "header column[%d] '%s' does not match declared column name '%s' in schema", + index+1, strings.TrimSpace(header[index]), strings.TrimSpace(column.Name))) + } + } + _ = r.jumpTo(r.decl.DataRowIndex - 1) + return nil +} + +func (r *reader) jumpTo(rowIndex int) error { + for r.r.LineNum() < rowIndex { + _, err := r.r.Read() + if err == io.EOF { + return err + } + } + return nil +} + +func (r *reader) recordToNode(record []string) *idr.Node { + root := idr.CreateNode(idr.ElementNode, "") + // - If actual record has more columns than declared in schema, we'll only use up to + // what's declared in the schema; + // - conversely, if the actual record has less columns than declared in schema, we'll + // use all that are in the record. + for i := 0; i < maths.MinInt(len(record), len(r.decl.Columns)); i++ { + col := idr.CreateNode(idr.ElementNode, r.decl.Columns[i].name()) + idr.AddChild(root, col) + data := idr.CreateNode(idr.TextNode, record[i]) + idr.AddChild(col, data) + } + return root +} + +func (r *reader) IsContinuableError(err error) bool { + return !IsErrInvalidHeader(err) && err != io.EOF +} + +func (r *reader) FmtErr(format string, args ...interface{}) error { + return errors.New(r.fmtErrStr(format, args...)) +} + +func (r *reader) fmtErrStr(format string, args ...interface{}) string { + return fmt.Sprintf("input '%s' line %d: %s", r.inputName, r.r.LineNum(), fmt.Sprintf(format, args...)) +} + +// NewReader creates an FormatReader for CSV file format for omniv2 schema handler. +func NewReader(inputName string, r io.Reader, decl *fileDecl, xpathStr string) (*reader, error) { + var expr *xpath.Expr + var err error + xpathStr = strings.TrimSpace(xpathStr) + if xpathStr != "" && xpathStr != "." { + expr, err = caches.GetXPathExpr(xpathStr) + if err != nil { + return nil, fmt.Errorf("invalid xpath '%s', err: %s", xpathStr, err.Error()) + } + } + if decl.ReplaceDoubleQuotes { + r = ios.NewBytesReplacingReader(r, []byte(`"`), []byte(`'`)) + } + csv := ios.NewLineNumReportingCsvReader(r) + delim := []rune(decl.Delimiter) + csv.Comma = delim[0] + csv.FieldsPerRecord = -1 + csv.ReuseRecord = true + return &reader{ + inputName: inputName, + decl: decl, + r: csv, + headerChecked: false, + xpath: expr, + }, nil +} diff --git a/handlers/omni/v2/fileformat/csv/reader_test.go b/handlers/omni/v2/fileformat/csv/reader_test.go new file mode 100644 index 0000000..3a5c18e --- /dev/null +++ b/handlers/omni/v2/fileformat/csv/reader_test.go @@ -0,0 +1,179 @@ +package omniv2csv + +import ( + "errors" + "io" + "strings" + "testing" + + "github.com/jf-tech/go-corelib/jsons" + "github.com/jf-tech/go-corelib/strs" + "github.com/jf-tech/go-corelib/testlib" + "github.com/stretchr/testify/assert" + + "github.com/jf-tech/omniparser/idr" +) + +func TestIsErrInvalidHeader(t *testing.T) { + assert.True(t, IsErrInvalidHeader(ErrInvalidHeader("test"))) + assert.Equal(t, "test", ErrInvalidHeader("test").Error()) + assert.False(t, IsErrInvalidHeader(errors.New("test"))) +} + +func TestNewReader_InvalidXPath(t *testing.T) { + r, err := NewReader("test", nil, nil, "[invalid") + assert.Error(t, err) + assert.Equal(t, `invalid xpath '[invalid', err: expression must evaluate to a node-set`, err.Error()) + assert.Nil(t, r) +} + +func lf(s string) string { + return s + "\n" +} + +func TestReader(t *testing.T) { + for _, test := range []struct { + name string + decl *fileDecl + xpath string + input io.Reader + expected []interface{} + }{ + { + name: "header row; alias used; with xpath; variable data row column size; replace double quote", + decl: &fileDecl{ + Delimiter: "|", + ReplaceDoubleQuotes: true, + HeaderRowIndex: testlib.IntPtr(2), + DataRowIndex: 4, + Columns: []column{ + {Name: "a"}, + {Name: "b with space", Alias: strs.StrPtr("b_with_space")}, + {Name: "c"}, + }, + }, + xpath: ".[a != 'skip']", + input: strings.NewReader(lf("line1") + + lf("a|b with space|c") + + lf("line3") + + lf(`1|"2|3`) + // put an unescaped double quote here to see if our replacement works or not. + lf("") + // auto skipped by csv reader. + lf("skip") + // skipped by the xpath. + lf("one|two")), + expected: []interface{}{ + `{ "a": "1", "b_with_space": "'2", "c": "3" }`, + `{ "a": "one", "b_with_space": "two" }`, + }, + }, + { + name: "cannot jump to header row", + decl: &fileDecl{ + Delimiter: "|", + HeaderRowIndex: testlib.IntPtr(3), + }, + input: strings.NewReader(lf("line1")), + expected: []interface{}{ + ErrInvalidHeader("input 'test-input' line 2: unable to read header: EOF"), + }, + }, + { + name: "cannot read to header row", + decl: &fileDecl{ + Delimiter: "|", + HeaderRowIndex: testlib.IntPtr(2), + }, + input: strings.NewReader(lf("line1")), + expected: []interface{}{ + ErrInvalidHeader("input 'test-input' line 2: unable to read header: EOF"), + }, + }, + { + name: "header columns less than the declared", + decl: &fileDecl{ + Delimiter: "|", + HeaderRowIndex: testlib.IntPtr(2), + Columns: []column{ + {Name: "a"}, + {Name: "b"}, + }, + }, + input: strings.NewReader(lf("line1") + lf("a")), + expected: []interface{}{ + ErrInvalidHeader("input 'test-input' line 2: actual header column size (1) is less than the size (2) declared in file_declaration.columns in schema"), + }, + }, + { + name: "header column not matching the declared", + decl: &fileDecl{ + Delimiter: "|", + HeaderRowIndex: testlib.IntPtr(2), + Columns: []column{ + {Name: "a"}, + {Name: "b"}, + }, + }, + input: strings.NewReader(lf("line1") + lf("a|B")), + expected: []interface{}{ + ErrInvalidHeader("input 'test-input' line 2: header column[2] 'B' does not match declared column name 'b' in schema"), + }, + }, + { + name: "header row but no data row", + decl: &fileDecl{ + Delimiter: ",", + HeaderRowIndex: testlib.IntPtr(2), + DataRowIndex: 4, + Columns: []column{ + {Name: "a"}, + {Name: "b"}, + }, + }, + input: strings.NewReader(lf("line1") + lf("a,b") + lf("line3")), + expected: nil, + }, + { + name: "unable to read data row", + decl: &fileDecl{ + Delimiter: ",", + DataRowIndex: 1, + Columns: []column{{Name: "a"}}, + }, + input: testlib.NewMockReadCloser("read failure", nil), + expected: []interface{}{ + errors.New("input 'test-input' line 1: failed to fetch record: read failure"), + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + r, err := NewReader("test-input", test.input, test.decl, test.xpath) + assert.NoError(t, err) + for { + n, err := r.Read() + if err == io.EOF { + assert.Equal(t, 0, len(test.expected)) + assert.Nil(t, n) + break + } + assert.True(t, len(test.expected) > 0) + if expectedErr, ok := test.expected[0].(error); ok { + assert.Error(t, err) + assert.Equal(t, expectedErr, err) + assert.Nil(t, n) + assert.Equal(t, 1, len(test.expected)) // if there is an error, it will be the last one. + break + } + expectedJSON, ok := test.expected[0].(string) + assert.True(t, ok) + assert.Equal(t, jsons.BPJ(expectedJSON), jsons.BPJ(idr.JSONify2(n))) + test.expected = test.expected[1:] + } + }) + } +} + +func TestIsContinuableError(t *testing.T) { + r := &reader{} + assert.True(t, r.IsContinuableError(errors.New("some error"))) + assert.False(t, r.IsContinuableError(ErrInvalidHeader("invalid header"))) + assert.False(t, r.IsContinuableError(io.EOF)) +} diff --git a/handlers/omni/v2/fileformat/fileformat.go b/handlers/omni/v2/fileformat/fileformat.go index 14f1abf..d63c88f 100644 --- a/handlers/omni/v2/fileformat/fileformat.go +++ b/handlers/omni/v2/fileformat/fileformat.go @@ -26,7 +26,7 @@ type FileFormat interface { // stream content before doing the xpath/node based parsing. type FormatReader interface { // Read returns a *Node and its subtree that will eventually be parsed and transformed into an - // output record. + // output record. If EOF has been reached, io.EOF must be returned. Read() (*idr.Node, error) // IsContinuableError determines whether an FormatReader returned error is continuable or not. // For certain errors (like EOF or corruption) there is no point to keep on trying; while others diff --git a/handlers/omni/v2/fileformat/json/format_test.go b/handlers/omni/v2/fileformat/json/format_test.go index 1fa1b21..1a7ec1d 100644 --- a/handlers/omni/v2/fileformat/json/format_test.go +++ b/handlers/omni/v2/fileformat/json/format_test.go @@ -1,6 +1,7 @@ package omniv2json import ( + "io" "strings" "testing" @@ -91,7 +92,7 @@ func TestCreateFormatReader(t *testing.T) { t.Run("EOF", func(t *testing.T) { n3, err := r.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, n3) }) diff --git a/handlers/omni/v2/fileformat/json/reader.go b/handlers/omni/v2/fileformat/json/reader.go index 9e53ab3..62d1119 100644 --- a/handlers/omni/v2/fileformat/json/reader.go +++ b/handlers/omni/v2/fileformat/json/reader.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "github.com/jf-tech/omniparser/errs" "github.com/jf-tech/omniparser/idr" ) @@ -33,7 +32,7 @@ type reader struct { func (r *reader) Read() (*idr.Node, error) { n, err := r.r.Read() if err == io.EOF { - return nil, errs.ErrEOF + return nil, io.EOF } if err != nil { return nil, ErrNodeReadingFailed(r.fmtErrStr(err.Error())) @@ -42,7 +41,7 @@ func (r *reader) Read() (*idr.Node, error) { } func (r *reader) IsContinuableError(err error) bool { - return !IsErrNodeReadingFailed(err) && err != errs.ErrEOF + return !IsErrNodeReadingFailed(err) && err != io.EOF } func (r *reader) FmtErr(format string, args ...interface{}) error { @@ -53,7 +52,7 @@ func (r *reader) fmtErrStr(format string, args ...interface{}) string { return fmt.Sprintf("input '%s' before/near line %d: %s", r.inputName, r.r.AtLine(), fmt.Sprintf(format, args...)) } -// NewReader creates an InputReader for JSON file format for omniv2 schema handler. +// NewReader creates an FormatReader for JSON file format for omniv2 schema handler. func NewReader(inputName string, src io.Reader, xpath string) (*reader, error) { sp, err := idr.NewJSONStreamReader(src, xpath) if err != nil { diff --git a/handlers/omni/v2/fileformat/json/reader_test.go b/handlers/omni/v2/fileformat/json/reader_test.go index 0606222..b8389ae 100644 --- a/handlers/omni/v2/fileformat/json/reader_test.go +++ b/handlers/omni/v2/fileformat/json/reader_test.go @@ -2,6 +2,7 @@ package omniv2json import ( "errors" + "io" "strings" "testing" @@ -55,7 +56,7 @@ func TestReader_Read_Success(t *testing.T) { n, err = r.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, n) } @@ -83,7 +84,7 @@ func TestReader_FmtErr(t *testing.T) { func TestReader_IsContinuableError(t *testing.T) { r, err := NewReader("test", strings.NewReader(""), "/A/B") assert.NoError(t, err) - assert.False(t, r.IsContinuableError(errs.ErrEOF)) + assert.False(t, r.IsContinuableError(io.EOF)) assert.False(t, r.IsContinuableError(ErrNodeReadingFailed("failure"))) assert.True(t, r.IsContinuableError(errs.ErrTransformFailed("failure"))) assert.True(t, r.IsContinuableError(errors.New("failure"))) diff --git a/handlers/omni/v2/fileformat/xml/format_test.go b/handlers/omni/v2/fileformat/xml/format_test.go index 4ce87d0..46cb49f 100644 --- a/handlers/omni/v2/fileformat/xml/format_test.go +++ b/handlers/omni/v2/fileformat/xml/format_test.go @@ -1,6 +1,7 @@ package omniv2xml import ( + "io" "strings" "testing" @@ -91,7 +92,7 @@ func TestCreateFormatReader(t *testing.T) { t.Run("EOF", func(t *testing.T) { n3, err := r.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, n3) }) diff --git a/handlers/omni/v2/fileformat/xml/reader.go b/handlers/omni/v2/fileformat/xml/reader.go index d1d8cbe..47722fd 100644 --- a/handlers/omni/v2/fileformat/xml/reader.go +++ b/handlers/omni/v2/fileformat/xml/reader.go @@ -5,7 +5,6 @@ import ( "fmt" "io" - "github.com/jf-tech/omniparser/errs" "github.com/jf-tech/omniparser/idr" ) @@ -33,7 +32,7 @@ type reader struct { func (r *reader) Read() (*idr.Node, error) { n, err := r.r.Read() if err == io.EOF { - return nil, errs.ErrEOF + return nil, io.EOF } if err != nil { return nil, ErrNodeReadingFailed(r.fmtErrStr(err.Error())) @@ -42,7 +41,7 @@ func (r *reader) Read() (*idr.Node, error) { } func (r *reader) IsContinuableError(err error) bool { - return !IsErrNodeReadingFailed(err) && err != errs.ErrEOF + return !IsErrNodeReadingFailed(err) && err != io.EOF } func (r *reader) FmtErr(format string, args ...interface{}) error { @@ -53,7 +52,7 @@ func (r *reader) fmtErrStr(format string, args ...interface{}) string { return fmt.Sprintf("input '%s' near line %d: %s", r.inputName, r.r.AtLine(), fmt.Sprintf(format, args...)) } -// NewReader creates an InputReader for XML file format for omniv2 schema handler. +// NewReader creates an FormatReader for XML file format for omniv2 schema handler. func NewReader(inputName string, src io.Reader, xpath string) (*reader, error) { sp, err := idr.NewXMLStreamReader(src, xpath) if err != nil { diff --git a/handlers/omni/v2/fileformat/xml/reader_test.go b/handlers/omni/v2/fileformat/xml/reader_test.go index f92e303..07a17b5 100644 --- a/handlers/omni/v2/fileformat/xml/reader_test.go +++ b/handlers/omni/v2/fileformat/xml/reader_test.go @@ -2,6 +2,7 @@ package omniv2xml import ( "errors" + "io" "strings" "testing" @@ -36,7 +37,7 @@ func TestReader_Read_Success(t *testing.T) { n, err = r.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, n) } @@ -72,7 +73,7 @@ func TestReader_FmtErr(t *testing.T) { func TestReader_IsContinuableError(t *testing.T) { r, err := NewReader("test", strings.NewReader(""), "Root/Node") assert.NoError(t, err) - assert.False(t, r.IsContinuableError(errs.ErrEOF)) + assert.False(t, r.IsContinuableError(io.EOF)) assert.False(t, r.IsContinuableError(ErrNodeReadingFailed("failure"))) assert.True(t, r.IsContinuableError(errs.ErrTransformFailed("failure"))) assert.True(t, r.IsContinuableError(errors.New("failure"))) diff --git a/handlers/omni/v2/ingester_test.go b/handlers/omni/v2/ingester_test.go index 055ded9..e8d8021 100644 --- a/handlers/omni/v2/ingester_test.go +++ b/handlers/omni/v2/ingester_test.go @@ -3,6 +3,7 @@ package omniv2 import ( "errors" "fmt" + "io" "testing" "github.com/stretchr/testify/assert" @@ -21,7 +22,7 @@ type testReader struct { func (r *testReader) Read() (*idr.Node, error) { if len(r.result) == 0 { - return nil, errs.ErrEOF + return nil, io.EOF } result := r.result[0] err := r.err[0] diff --git a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go index 7d0edfa..01f530b 100644 --- a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go +++ b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go @@ -12,7 +12,6 @@ import ( "github.com/jf-tech/go-corelib/ios" "github.com/jf-tech/go-corelib/strs" - "github.com/jf-tech/omniparser/errs" "github.com/jf-tech/omniparser/idr" ) @@ -44,7 +43,7 @@ func (r *reader) Read() (*idr.Node, error) { r.line++ l, err := ios.ReadLine(r.r) if err == io.EOF { - return nil, errs.ErrEOF + return nil, io.EOF } if err != nil { // If we fail to read a log line out (permission issue, disk issue, whatever) @@ -80,7 +79,7 @@ func parseJSON(b []byte) (*idr.Node, error) { } func (r *reader) IsContinuableError(err error) bool { - return !IsErrLogReadingFailed(err) && err != errs.ErrEOF + return !IsErrLogReadingFailed(err) && err != io.EOF } func (r *reader) FmtErr(format string, args ...interface{}) error { @@ -91,7 +90,7 @@ func (r *reader) fmtErrStr(format string, args ...interface{}) string { return fmt.Sprintf("input '%s' line %d: %s", r.inputName, r.line, fmt.Sprintf(format, args...)) } -// NewReader creates an InputReader for this sample jsonlog file format. +// NewReader creates an FormatReader for this sample jsonlog file format. func NewReader(inputName string, src io.Reader, filterXPath string) (*reader, error) { filter, err := caches.GetXPathExpr(filterXPath) if err != nil { diff --git a/transform.go b/transform.go index 8de1b0b..28f73a8 100644 --- a/transform.go +++ b/transform.go @@ -1,6 +1,8 @@ package omniparser import ( + "io" + "github.com/jf-tech/omniparser/errs" "github.com/jf-tech/omniparser/handlers" ) @@ -39,7 +41,7 @@ func (o *transform) Next() bool { o.curRecord = record o.curErr = nil return true - case err == errs.ErrEOF: + case err == io.EOF: o.curErr = err o.curRecord = nil // No more processing needed. diff --git a/transform_test.go b/transform_test.go index a79d1e9..29e2345 100644 --- a/transform_test.go +++ b/transform_test.go @@ -3,6 +3,7 @@ package omniparser import ( "errors" "fmt" + "io" "testing" "github.com/stretchr/testify/assert" @@ -47,7 +48,7 @@ func TestTransform_EndWithEOF(t *testing.T) { {record: []byte("1st good read")}, {err: continuableErr1}, {record: []byte("2nd good read")}, - {err: errs.ErrEOF}, + {err: io.EOF}, }, continuableErrs: map[error]bool{continuableErr1: true}, }, @@ -72,14 +73,14 @@ func TestTransform_EndWithEOF(t *testing.T) { assert.False(t, tfm.Next()) record, err = tfm.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, record) // Verifying when EOF is reached, repeatedly calling Next will still get you EOF. assert.False(t, tfm.Next()) record, err = tfm.Read() assert.Error(t, err) - assert.Equal(t, errs.ErrEOF, err) + assert.Equal(t, io.EOF, err) assert.Nil(t, record) } From 103e1faf0ec06531c24d0d2ab9eb313c3e806e27 Mon Sep 17 00:00:00 2001 From: jf-tech Date: Sat, 10 Oct 2020 06:35:06 +1300 Subject: [PATCH 2/4] pr comments --- handlers/omni/v2/fileformat/csv/reader.go | 24 +++++++++++++------ .../omni/v2/fileformat/csv/reader_test.go | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/handlers/omni/v2/fileformat/csv/reader.go b/handlers/omni/v2/fileformat/csv/reader.go index fea3e7d..dbb0a40 100644 --- a/handlers/omni/v2/fileformat/csv/reader.go +++ b/handlers/omni/v2/fileformat/csv/reader.go @@ -62,15 +62,16 @@ read: } func (r *reader) checkHeader() error { + var err error + var header []string if r.decl.HeaderRowIndex == nil { - _ = r.jumpTo(r.decl.DataRowIndex - 1) - return nil + goto skipToDataRow } - err := r.jumpTo(*r.decl.HeaderRowIndex - 1) - if err == io.EOF { + err = r.jumpTo(*r.decl.HeaderRowIndex - 1) + if err != nil { return ErrInvalidHeader(r.fmtErrStr("unable to read header: %s", err.Error())) } - header, err := r.r.Read() + header, err = r.r.Read() if err != nil { return ErrInvalidHeader(r.fmtErrStr("unable to read header: %s", err.Error())) } @@ -86,15 +87,24 @@ func (r *reader) checkHeader() error { index+1, strings.TrimSpace(header[index]), strings.TrimSpace(column.Name))) } } - _ = r.jumpTo(r.decl.DataRowIndex - 1) +skipToDataRow: + if err = r.jumpTo(r.decl.DataRowIndex - 1); err != nil { + return err + } return nil } +// the only possible error this jumpTo returns is io.EOF. if there is any reading error, we'll ignore +// because we really don't care about what's corrupted in a line. Now it's possible, but very very +// rarely, that the input reader's underlying media fails to read due memory/disk/IO issue. Since we +// can't reliably tease apart those failures from a simple line corruption failure, we'll choose to +// ignore them equally. And those underlying media failures most likely will repeat and cause subsequent +// read to fail, and then the reader will fail out entirely. func (r *reader) jumpTo(rowIndex int) error { for r.r.LineNum() < rowIndex { _, err := r.r.Read() if err == io.EOF { - return err + return io.EOF } } return nil diff --git a/handlers/omni/v2/fileformat/csv/reader_test.go b/handlers/omni/v2/fileformat/csv/reader_test.go index 3a5c18e..3171990 100644 --- a/handlers/omni/v2/fileformat/csv/reader_test.go +++ b/handlers/omni/v2/fileformat/csv/reader_test.go @@ -122,7 +122,7 @@ func TestReader(t *testing.T) { decl: &fileDecl{ Delimiter: ",", HeaderRowIndex: testlib.IntPtr(2), - DataRowIndex: 4, + DataRowIndex: 5, Columns: []column{ {Name: "a"}, {Name: "b"}, From 9cf9102b0fe85e7e05738062beed00c28eefd5e8 Mon Sep 17 00:00:00 2001 From: jf-tech Date: Sat, 10 Oct 2020 10:12:12 +1300 Subject: [PATCH 3/4] Introduce `idr.Node` caching and recycling As a result, adding `Release(*idr.Node)` to `FormatReader` so that `Ingester` will free up `*idr.Node` allocations for recycling Updated all benchmarks. No degradation so far. The reason the json sample bench mark didn't improve much is because there isn't much nodes allocated per read. However typically in the CSV parsing/transform scenarios, files are usually very long and node allocation caching saving would start to show significantly. --- customfuncs/javascript.go | 6 +- customfuncs/javascript_test.go | 12 +-- handlers/omni/v2/fileformat/csv/reader.go | 6 ++ .../omni/v2/fileformat/csv/reader_test.go | 1 + handlers/omni/v2/fileformat/fileformat.go | 3 + handlers/omni/v2/fileformat/json/reader.go | 6 ++ .../omni/v2/fileformat/json/reader_test.go | 3 + handlers/omni/v2/fileformat/xml/reader.go | 6 ++ .../omni/v2/fileformat/xml/reader_test.go | 9 ++ handlers/omni/v2/handler_test.go | 1 + handlers/omni/v2/ingester.go | 5 +- handlers/omni/v2/ingester_test.go | 15 +++- handlers/omni/v2/transform/parse.go | 10 +-- ..._its_parents_first_child_but_not_the_last} | 0 ..._its_parents_last_child_but_not_the_first} | 0 ...s_middle_child_not_the_first_not_the_last} | 0 ...move_a_node_who_is_its_parents_only_child} | 0 ...t_does_nothing_(when_node_caching_is_off)} | 0 idr/jsonreader.go | 17 +++- idr/navigator_test.go | 5 +- idr/node.go | 85 ++++++++++++++++--- idr/node_test.go | 83 +++++++++++++----- idr/xmlreader.go | 17 +++- .../jsonlog/jsonlogformat/reader.go | 14 +-- samples/omniv2/json/json_test.go | 2 +- 25 files changed, 232 insertions(+), 74 deletions(-) rename idr/.snapshots/{TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last => TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last} (100%) rename idr/.snapshots/{TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first => TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first} (100%) rename idr/.snapshots/{TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last => TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last} (100%) rename idr/.snapshots/{TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_only_child => TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_only_child} (100%) rename idr/.snapshots/{TestRemoveNodeAndSubTree-remove_a_root_does_nothing => TestRemoveAndReleaseTree-remove_a_root_does_nothing_(when_node_caching_is_off)} (100%) diff --git a/customfuncs/javascript.go b/customfuncs/javascript.go index ab815c8..e59969b 100644 --- a/customfuncs/javascript.go +++ b/customfuncs/javascript.go @@ -6,7 +6,6 @@ import ( "strconv" "strings" "sync" - "unsafe" "github.com/dop251/goja" "github.com/jf-tech/go-corelib/caches" @@ -113,10 +112,7 @@ func getNodeJSON(n *idr.Node) string { if disableCaching { return idr.JSONify2(n) } - // TODO if in the future we have *idr.Node allocation recycling, then this by-addr caching - // won't work. Ideally, we should have a node ID which refreshes upon recycling. - addr := strconv.FormatUint(uint64(uintptr(unsafe.Pointer(n))), 16) - j, _ := NodeToJSONCache.Get(addr, func(interface{}) (interface{}, error) { + j, _ := NodeToJSONCache.Get(n.ID, func(interface{}) (interface{}, error) { return idr.JSONify2(n), nil }) return j.(string) diff --git a/customfuncs/javascript_test.go b/customfuncs/javascript_test.go index a84b716..64353d1 100644 --- a/customfuncs/javascript_test.go +++ b/customfuncs/javascript_test.go @@ -223,12 +223,12 @@ func TestJavascriptClearVarsAfterRunProgram(t *testing.T) { } // go test -bench=. -benchmem -benchtime=30s -// BenchmarkIfElse-4 368017143 98.1 ns/op 8 B/op 1 allocs/op -// BenchmarkEval-4 26409430 1386 ns/op 418 B/op 8 allocs/op -// BenchmarkJavascriptWithNoCache-4 172803 210958 ns/op 136608 B/op 1698 allocs/op -// BenchmarkJavascriptWithCache-4 23059004 1572 ns/op 129 B/op 8 allocs/op -// BenchmarkConcurrentJavascriptWithNoCache-4 1140 32729941 ns/op 27328924 B/op 339654 allocs/op -// BenchmarkConcurrentJavascriptWithCache-4 70977 504870 ns/op 26568 B/op 1745 allocs/op +// BenchmarkIfElse-4 352522934 97.9 ns/op 8 B/op 1 allocs/op +// BenchmarkEval-4 25887961 1405 ns/op 418 B/op 8 allocs/op +// BenchmarkJavascriptWithNoCache-4 172342 210129 ns/op 136607 B/op 1698 allocs/op +// BenchmarkJavascriptWithCache-4 15996846 2062 ns/op 128 B/op 8 allocs/op +// BenchmarkConcurrentJavascriptWithNoCache-4 1143 33140598 ns/op 27328719 B/op 339653 allocs/op +// BenchmarkConcurrentJavascriptWithCache-4 65140 543091 ns/op 26608 B/op 1746 allocs/op var ( benchTitles = []string{"", "Dr", "Sir"} diff --git a/handlers/omni/v2/fileformat/csv/reader.go b/handlers/omni/v2/fileformat/csv/reader.go index dbb0a40..6c81181 100644 --- a/handlers/omni/v2/fileformat/csv/reader.go +++ b/handlers/omni/v2/fileformat/csv/reader.go @@ -125,6 +125,12 @@ func (r *reader) recordToNode(record []string) *idr.Node { return root } +func (r *reader) Release(n *idr.Node) { + if n != nil { + idr.RemoveAndReleaseTree(n) + } +} + func (r *reader) IsContinuableError(err error) bool { return !IsErrInvalidHeader(err) && err != io.EOF } diff --git a/handlers/omni/v2/fileformat/csv/reader_test.go b/handlers/omni/v2/fileformat/csv/reader_test.go index 3171990..09a41c0 100644 --- a/handlers/omni/v2/fileformat/csv/reader_test.go +++ b/handlers/omni/v2/fileformat/csv/reader_test.go @@ -165,6 +165,7 @@ func TestReader(t *testing.T) { expectedJSON, ok := test.expected[0].(string) assert.True(t, ok) assert.Equal(t, jsons.BPJ(expectedJSON), jsons.BPJ(idr.JSONify2(n))) + r.Release(n) test.expected = test.expected[1:] } }) diff --git a/handlers/omni/v2/fileformat/fileformat.go b/handlers/omni/v2/fileformat/fileformat.go index d63c88f..41aa673 100644 --- a/handlers/omni/v2/fileformat/fileformat.go +++ b/handlers/omni/v2/fileformat/fileformat.go @@ -28,6 +28,9 @@ type FormatReader interface { // Read returns a *Node and its subtree that will eventually be parsed and transformed into an // output record. If EOF has been reached, io.EOF must be returned. Read() (*idr.Node, error) + // Release gives the reader a chance to free resources of the *Node and its subtree that it returned + // to caller in a previous Read() call. + Release(*idr.Node) // IsContinuableError determines whether an FormatReader returned error is continuable or not. // For certain errors (like EOF or corruption) there is no point to keep on trying; while others // can be safely ignored. diff --git a/handlers/omni/v2/fileformat/json/reader.go b/handlers/omni/v2/fileformat/json/reader.go index 62d1119..c88b53f 100644 --- a/handlers/omni/v2/fileformat/json/reader.go +++ b/handlers/omni/v2/fileformat/json/reader.go @@ -40,6 +40,12 @@ func (r *reader) Read() (*idr.Node, error) { return n, nil } +func (r *reader) Release(n *idr.Node) { + if n != nil { + r.r.Release(n) + } +} + func (r *reader) IsContinuableError(err error) bool { return !IsErrNodeReadingFailed(err) && err != io.EOF } diff --git a/handlers/omni/v2/fileformat/json/reader_test.go b/handlers/omni/v2/fileformat/json/reader_test.go index b8389ae..1fe1c80 100644 --- a/handlers/omni/v2/fileformat/json/reader_test.go +++ b/handlers/omni/v2/fileformat/json/reader_test.go @@ -47,12 +47,15 @@ func TestReader_Read_Success(t *testing.T) { name, err := idr.MatchSingle(n, "name") assert.NoError(t, err) assert.Equal(t, "john", name.InnerText()) + // intentionally not calling r.Release(n) to verify that the + // stream node is freed up by a subsequent Read() call. n, err = r.Read() assert.NoError(t, err) name, err = idr.MatchSingle(n, "name") assert.NoError(t, err) assert.Equal(t, "jane", name.InnerText()) + r.Release(n) n, err = r.Read() assert.Error(t, err) diff --git a/handlers/omni/v2/fileformat/xml/reader.go b/handlers/omni/v2/fileformat/xml/reader.go index 47722fd..8991190 100644 --- a/handlers/omni/v2/fileformat/xml/reader.go +++ b/handlers/omni/v2/fileformat/xml/reader.go @@ -40,6 +40,12 @@ func (r *reader) Read() (*idr.Node, error) { return n, nil } +func (r *reader) Release(n *idr.Node) { + if n != nil { + r.r.Release(n) + } +} + func (r *reader) IsContinuableError(err error) bool { return !IsErrNodeReadingFailed(err) && err != io.EOF } diff --git a/handlers/omni/v2/fileformat/xml/reader_test.go b/handlers/omni/v2/fileformat/xml/reader_test.go index 07a17b5..0a4d887 100644 --- a/handlers/omni/v2/fileformat/xml/reader_test.go +++ b/handlers/omni/v2/fileformat/xml/reader_test.go @@ -24,6 +24,7 @@ func TestReader_Read_Success(t *testing.T) { 1 2 + 3 `), "Root/Node[. != '2']") assert.NoError(t, err) @@ -34,6 +35,14 @@ func TestReader_Read_Success(t *testing.T) { assert.Equal(t, "1", n.InnerText()) // xml.Decoder seems to keeps line at the end of whatever inside an element closing tag. assert.Equal(t, 3, r.r.AtLine()) + // intentionally not calling r.Release(n) to verify that the + // stream node is freed up by a subsequent Read() call. + + n, err = r.Read() + assert.NoError(t, err) + assert.Equal(t, "3", n.InnerText()) + assert.Equal(t, 5, r.r.AtLine()) + r.Release(n) n, err = r.Read() assert.Error(t, err) diff --git a/handlers/omni/v2/handler_test.go b/handlers/omni/v2/handler_test.go index a2143fb..dc7e6ec 100644 --- a/handlers/omni/v2/handler_test.go +++ b/handlers/omni/v2/handler_test.go @@ -52,6 +52,7 @@ type testFormatReader struct { } func (r testFormatReader) Read() (*idr.Node, error) { panic("implement me") } +func (r testFormatReader) Release(*idr.Node) { panic("implement me") } func (r testFormatReader) IsContinuableError(error) bool { panic("implement me") } func (r testFormatReader) FmtErr(string, ...interface{}) error { panic("implement me") } diff --git a/handlers/omni/v2/ingester.go b/handlers/omni/v2/ingester.go index 7a3ae7a..603310f 100644 --- a/handlers/omni/v2/ingester.go +++ b/handlers/omni/v2/ingester.go @@ -20,13 +20,14 @@ type ingester struct { } func (g *ingester) Read() ([]byte, error) { - node, err := g.reader.Read() + n, err := g.reader.Read() if err != nil { // Read() supposed to have already done CtxAwareErr error wrapping. So directly return. return nil, err } + defer g.reader.Release(n) result, err := transform.NewParseCtx( - g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(node, g.finalOutputDecl) + g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(n, g.finalOutputDecl) if err != nil { // ParseNode() error not CtxAwareErr wrapped, so wrap it. // Note errs.ErrorTransformFailed is a continuable error. diff --git a/handlers/omni/v2/ingester_test.go b/handlers/omni/v2/ingester_test.go index e8d8021..0740c61 100644 --- a/handlers/omni/v2/ingester_test.go +++ b/handlers/omni/v2/ingester_test.go @@ -14,10 +14,12 @@ import ( ) var errContinuableInTest = errors.New("continuable error") +var ingesterTestNode = idr.CreateNode(idr.ElementNode, "test") type testReader struct { - result []*idr.Node - err []error + result []*idr.Node + err []error + releaseCalled int } func (r *testReader) Read() (*idr.Node, error) { @@ -31,6 +33,8 @@ func (r *testReader) Read() (*idr.Node, error) { return result, err } +func (r *testReader) Release(n *idr.Node) { r.releaseCalled++ } + func (r *testReader) IsContinuableError(err error) bool { return err == errContinuableInTest } func (r *testReader) FmtErr(format string, args ...interface{}) error { @@ -45,6 +49,7 @@ func TestIngester_Read_ReadFailure(t *testing.T) { assert.Error(t, err) assert.Equal(t, "test failure", err.Error()) assert.Nil(t, b) + assert.Equal(t, 0, g.reader.(*testReader).releaseCalled) } func TestIngester_Read_ParseNodeFailure(t *testing.T) { @@ -57,7 +62,7 @@ func TestIngester_Read_ParseNodeFailure(t *testing.T) { assert.NoError(t, err) g := &ingester{ finalOutputDecl: finalOutputDecl, - reader: &testReader{result: []*idr.Node{nil}, err: []error{nil}}, + reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}}, } b, err := g.Read() assert.Error(t, err) @@ -67,6 +72,7 @@ func TestIngester_Read_ParseNodeFailure(t *testing.T) { `ctx: fail to transform. err: fail to convert value 'abc' to type 'int' on 'FINAL_OUTPUT', err: strconv.ParseFloat: parsing "abc": invalid syntax`, err.Error()) assert.Nil(t, b) + assert.Equal(t, 1, g.reader.(*testReader).releaseCalled) } func TestIngester_Read_Success(t *testing.T) { @@ -79,11 +85,12 @@ func TestIngester_Read_Success(t *testing.T) { assert.NoError(t, err) g := &ingester{ finalOutputDecl: finalOutputDecl, - reader: &testReader{result: []*idr.Node{nil}, err: []error{nil}}, + reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}}, } b, err := g.Read() assert.NoError(t, err) assert.Equal(t, "123", string(b)) + assert.Equal(t, 1, g.reader.(*testReader).releaseCalled) } func TestIsContinuableError(t *testing.T) { diff --git a/handlers/omni/v2/transform/parse.go b/handlers/omni/v2/transform/parse.go index 5283184..bdd29c2 100644 --- a/handlers/omni/v2/transform/parse.go +++ b/handlers/omni/v2/transform/parse.go @@ -6,7 +6,6 @@ import ( "reflect" "strconv" "strings" - "unsafe" "github.com/jf-tech/go-corelib/strs" @@ -37,11 +36,6 @@ func NewParseCtx( } } -func nodePtrAddrStr(n *idr.Node) string { - // `uintptr` is faster than `fmt.Sprintf("%p"...)` - return strconv.FormatUint(uint64(uintptr(unsafe.Pointer(n))), 16) -} - func resultTypeConversion(decl *Decl, value string) (interface{}, error) { if decl.resultType() == ResultTypeString { return value, nil @@ -131,9 +125,7 @@ func normalizeAndReturnValue(decl *Decl, value interface{}) (interface{}, error) func (p *parseCtx) ParseNode(n *idr.Node, decl *Decl) (interface{}, error) { var cacheKey string if !p.disableTransformCache { - // TODO if in the future we have *idr.Node allocation recycling, then this by-addr caching won't work. - // Ideally, we should have a node ID which refreshes upon recycling. - cacheKey = nodePtrAddrStr(n) + "/" + decl.hash + cacheKey = strconv.FormatInt(n.ID, 16) + "/" + decl.hash if cacheValue, found := p.transformCache[cacheKey]; found { return cacheValue, nil } diff --git a/idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last b/idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last similarity index 100% rename from idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last rename to idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_first_child_but_not_the_last diff --git a/idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first b/idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first similarity index 100% rename from idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first rename to idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_last_child_but_not_the_first diff --git a/idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last b/idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last similarity index 100% rename from idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last rename to idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_middle_child_not_the_first_not_the_last diff --git a/idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_only_child b/idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_only_child similarity index 100% rename from idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_node_who_is_its_parents_only_child rename to idr/.snapshots/TestRemoveAndReleaseTree-remove_a_node_who_is_its_parents_only_child diff --git a/idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_root_does_nothing b/idr/.snapshots/TestRemoveAndReleaseTree-remove_a_root_does_nothing_(when_node_caching_is_off) similarity index 100% rename from idr/.snapshots/TestRemoveNodeAndSubTree-remove_a_root_does_nothing rename to idr/.snapshots/TestRemoveAndReleaseTree-remove_a_root_does_nothing_(when_node_caching_is_off) diff --git a/idr/jsonreader.go b/idr/jsonreader.go index 08f2ea1..34fe8b2 100644 --- a/idr/jsonreader.go +++ b/idr/jsonreader.go @@ -48,7 +48,7 @@ func (sp *JSONStreamReader) wrapUpCurAndTargetCheck() *Node { // node fully and discovered sp.xpathFilterExpr can't be satisfied, so this // sp.stream isn't a target. To prevent future mismatch for other stream candidate, // we need to remove it from Node tree completely. And reset sp.stream. - RemoveFromTree(sp.stream) + RemoveAndReleaseTree(sp.stream) sp.stream = nil return nil } @@ -192,14 +192,25 @@ func (sp *JSONStreamReader) parse() (*Node, error) { // Read returns a *Node that matches the xpath streaming criteria. func (sp *JSONStreamReader) Read() (*Node, error) { // Because this is a streaming read, we need to release/remove last - // stream node from the node tree to free up memory. + // stream node from the node tree to free up memory. If Release() is + // called after Read() call, then sp.stream is already cleaned up; + // adding this piece of code here just in case Release() isn't called. if sp.stream != nil { - RemoveFromTree(sp.stream) + RemoveAndReleaseTree(sp.stream) sp.stream = nil } return sp.parse() } +// Release releases the *Node (and its subtree) that Read() has previously +// returned. +func (sp *JSONStreamReader) Release(n *Node) { + if n == sp.stream { + sp.stream = nil + } + RemoveAndReleaseTree(n) +} + // AtLine returns the **rough** line number of the current JSON decoder. func (sp *JSONStreamReader) AtLine() int { return sp.r.AtLine() diff --git a/idr/navigator_test.go b/idr/navigator_test.go index 0aca245..e5c03f7 100644 --- a/idr/navigator_test.go +++ b/idr/navigator_test.go @@ -9,6 +9,7 @@ import ( ) func navTestSetup(t *testing.T) (*testTree, *navigator, func(*Node)) { + setupTestNodeCaching(testNodeCachingOff) tt := newTestTree(t, testTreeXML) nav := createNavigator(tt.root) moveTo := func(n *Node) { nav.cur = n } @@ -120,8 +121,8 @@ func TestMoveToChild(t *testing.T) { moveTo(tt.attrC1) assert.False(t, nav.MoveToChild()) // remove elemC3/elemC4, so elemC now only has attrC1 and attrC2 - RemoveFromTree(tt.elemC3) - RemoveFromTree(tt.elemC4) + RemoveAndReleaseTree(tt.elemC3) + RemoveAndReleaseTree(tt.elemC4) moveTo(tt.elemC) assert.False(t, nav.MoveToChild()) moveTo(tt.elemB) diff --git a/idr/node.go b/idr/node.go index 1d66b20..f0757b8 100644 --- a/idr/node.go +++ b/idr/node.go @@ -3,6 +3,8 @@ package idr import ( "fmt" "strings" + "sync" + "sync/atomic" ) // NodeType is the type of a Node in an IDR. @@ -44,6 +46,12 @@ func (nt NodeType) String() string { // for each format. // - Node allocation recycling. type Node struct { + // ID uniquely identifies a Node, whether it's newly created or recycled and reused from + // the node allocation cache. Previously we sometimes used a *Node's pointer address as a + // unique ID which isn't sufficiently unique anymore given the introduction of using + // sync.Pool for node allocation caching. + ID int64 + Parent, FirstChild, LastChild, PrevSibling, NextSibling *Node Type NodeType @@ -52,12 +60,55 @@ type Node struct { FormatSpecific interface{} } +// Give test a chance to turn node caching on/off. Not exported; always caching in production code. +var nodeCaching = true +var nodePool sync.Pool + +func allocNode() *Node { + n := &Node{} + n.reset() + return n +} + +func resetNodePool() { + nodePool = sync.Pool{ + New: func() interface{} { + return allocNode() + }, + } +} + +func init() { + resetNodePool() +} + // CreateNode creates a generic *Node. func CreateNode(ntype NodeType, data string) *Node { - return &Node{ - Type: ntype, - Data: data, + if nodeCaching { + // Node out of pool has already been reset. + n := nodePool.Get().(*Node) + n.Type = ntype + n.Data = data + return n } + n := allocNode() + n.Type = ntype + n.Data = data + return n +} + +var nodeID = int64(0) + +func newNodeID() int64 { + return atomic.AddInt64(&nodeID, 1) +} + +func (n *Node) reset() { + n.ID = newNodeID() + n.Parent, n.FirstChild, n.LastChild, n.PrevSibling, n.NextSibling = nil, nil, nil, nil, nil + n.Type = 0 + n.Data = "" + n.FormatSpecific = nil } // InnerText returns a Node's children's texts concatenated. @@ -95,11 +146,11 @@ func AddChild(parent, n *Node) { parent.LastChild = n } -// RemoveFromTree removes a node and its subtree from an IDR -// tree it is in. If the node is the root of the tree, it's a no-op. -func RemoveFromTree(n *Node) { +// RemoveAndReleaseTree removes a node and its subtree from an IDR tree it is in and +// release the resources (Node allocation) associated with the node and its subtree. +func RemoveAndReleaseTree(n *Node) { if n.Parent == nil { - return + goto recycle } if n.Parent.FirstChild == n { if n.Parent.LastChild == n { @@ -118,7 +169,21 @@ func RemoveFromTree(n *Node) { n.NextSibling.PrevSibling = n.PrevSibling } } - n.Parent = nil - n.PrevSibling = nil - n.NextSibling = nil +recycle: + recycle(n) +} + +func recycle(n *Node) { + if !nodeCaching { + return + } + for c := n.FirstChild; c != nil; { + // Have to save c.NextSibling before recycle(c) call or + // c.NextSibling would be wiped out during the call. + next := c.NextSibling + recycle(c) + c = next + } + n.reset() + nodePool.Put(n) } diff --git a/idr/node_test.go b/idr/node_test.go index 81d7fb4..2db73ec 100644 --- a/idr/node_test.go +++ b/idr/node_test.go @@ -21,37 +21,37 @@ func rootOf(n *Node) *Node { return n } -func checkPointersInTree(t *testing.T, n *Node) { +func checkPointersInTree(tb testing.TB, n *Node) { if n == nil { return } if n.FirstChild != nil { - assert.True(t, n == n.FirstChild.Parent) + assert.True(tb, n == n.FirstChild.Parent) } if n.LastChild != nil { - assert.True(t, n == n.LastChild.Parent) + assert.True(tb, n == n.LastChild.Parent) } - checkPointersInTree(t, n.FirstChild) - // There is no need to call checkPointersInTree(t, n.LastChild) - // because checkPointersInTree(t, n.FirstChild) will traverse all its + checkPointersInTree(tb, n.FirstChild) + // There is no need to call checkPointersInTree(tb, n.LastChild) + // because checkPointersInTree(tb, n.FirstChild) will traverse all its // siblings to the end, and if the last one isn't n.LastChild then it will fail. parent := n.Parent // could be nil if n is the root of a tree. // Verify the PrevSibling chain cur, prev := n, n.PrevSibling for ; prev != nil; cur, prev = prev, prev.PrevSibling { - assert.True(t, prev.Parent == parent) - assert.True(t, prev.NextSibling == cur) + assert.True(tb, prev.Parent == parent) + assert.True(tb, prev.NextSibling == cur) } - assert.True(t, cur.PrevSibling == nil) - assert.True(t, parent == nil || parent.FirstChild == cur) + assert.True(tb, cur.PrevSibling == nil) + assert.True(tb, parent == nil || parent.FirstChild == cur) // Verify the NextSibling chain cur, next := n, n.NextSibling for ; next != nil; cur, next = next, next.NextSibling { - assert.True(t, next.Parent == parent) - assert.True(t, next.PrevSibling == cur) + assert.True(tb, next.Parent == parent) + assert.True(tb, next.PrevSibling == cur) } - assert.True(t, cur.NextSibling == nil) - assert.True(t, parent == nil || parent.LastChild == cur) + assert.True(tb, cur.NextSibling == nil) + assert.True(tb, parent == nil || parent.LastChild == cur) } type testTree struct { @@ -81,7 +81,7 @@ const ( testTreeNotXML = false ) -func newTestTree(t *testing.T, xmlNode bool) *testTree { +func newTestTree(tb testing.TB, xmlNode bool) *testTree { mkNode := func(parent *Node, ntype NodeType, name string) *Node { var node *Node if xmlNode { @@ -115,7 +115,7 @@ func newTestTree(t *testing.T, xmlNode bool) *testTree { elemC3, textC3 := mkPair(elemC, ElementNode, "elemC3", "textC3") elemC4, textC4 := mkPair(elemC, ElementNode, "elemC4", "textC4") - checkPointersInTree(t, root) + checkPointersInTree(tb, root) return &testTree{ root: root, @@ -135,50 +135,87 @@ func newTestTree(t *testing.T, xmlNode bool) *testTree { } } +const ( + testNodeCachingOn = true + testNodeCachingOff = false +) + +func setupTestNodeCaching(caching bool) { + resetNodePool() + nodeCaching = caching +} + func TestDumpTestTree(t *testing.T) { + setupTestNodeCaching(testNodeCachingOff) cupaloy.SnapshotT(t, JSONify1(newTestTree(t, testTreeXML).root)) } func TestInnerText(t *testing.T) { + setupTestNodeCaching(testNodeCachingOff) tt := newTestTree(t, testTreeNotXML) assert.Equal(t, tt.textA1.Data+tt.textA2.Data, tt.elemA.InnerText()) // Note attribute's texts are skipped in InnerText(), by design. assert.Equal(t, tt.textC3.Data+tt.textC4.Data, tt.elemC.InnerText()) } -func TestRemoveNodeAndSubTree(t *testing.T) { +func TestRemoveAndReleaseTree(t *testing.T) { t.Run("remove a node who is its parents only child", func(t *testing.T) { + setupTestNodeCaching(testNodeCachingOn) tt := newTestTree(t, testTreeXML) - RemoveFromTree(tt.elemB1) + RemoveAndReleaseTree(tt.elemB1) checkPointersInTree(t, tt.root) cupaloy.SnapshotT(t, JSONify1(tt.root)) }) t.Run("remove a node who is its parents first child but not the last", func(t *testing.T) { + setupTestNodeCaching(testNodeCachingOn) tt := newTestTree(t, testTreeXML) - RemoveFromTree(tt.elemA) + RemoveAndReleaseTree(tt.elemA) checkPointersInTree(t, tt.root) cupaloy.SnapshotT(t, JSONify1(tt.root)) }) t.Run("remove a node who is its parents middle child not the first not the last", func(t *testing.T) { + setupTestNodeCaching(testNodeCachingOn) tt := newTestTree(t, testTreeXML) - RemoveFromTree(tt.elemB) + RemoveAndReleaseTree(tt.elemB) checkPointersInTree(t, tt.root) cupaloy.SnapshotT(t, JSONify1(tt.root)) }) t.Run("remove a node who is its parents last child but not the first", func(t *testing.T) { + setupTestNodeCaching(testNodeCachingOn) tt := newTestTree(t, testTreeXML) - RemoveFromTree(tt.elemC) + RemoveAndReleaseTree(tt.elemC) checkPointersInTree(t, tt.root) cupaloy.SnapshotT(t, JSONify1(tt.root)) }) - t.Run("remove a root does nothing", func(t *testing.T) { + t.Run("remove a root does nothing (when node caching is off)", func(t *testing.T) { + setupTestNodeCaching(testNodeCachingOff) tt := newTestTree(t, testTreeXML) - RemoveFromTree(tt.root) + RemoveAndReleaseTree(tt.root) checkPointersInTree(t, tt.root) cupaloy.SnapshotT(t, JSONify1(tt.root)) }) } + +// go test -bench=. -benchmem -benchtime=30s +// BenchmarkCreateAndDestroyTree_NoCache-4 20421031 1736 ns/op 1872 B/op 19 allocs/op +// BenchmarkCreateAndDestroyTree_WithCache-4 22744428 1559 ns/op 144 B/op 1 allocs/op + +func BenchmarkCreateAndDestroyTree_NoCache(b *testing.B) { + setupTestNodeCaching(testNodeCachingOff) + for i := 0; i < b.N; i++ { + tt := newTestTree(b, false) + RemoveAndReleaseTree(tt.root) + } +} + +func BenchmarkCreateAndDestroyTree_WithCache(b *testing.B) { + setupTestNodeCaching(testNodeCachingOn) + for i := 0; i < b.N; i++ { + tt := newTestTree(b, false) + RemoveAndReleaseTree(tt.root) + } +} diff --git a/idr/xmlreader.go b/idr/xmlreader.go index a6ac69d..88843ed 100644 --- a/idr/xmlreader.go +++ b/idr/xmlreader.go @@ -49,7 +49,7 @@ func (sp *XMLStreamReader) wrapUpCurAndTargetCheck() *Node { // discovered sp.xpathFilterExpr can't be satisfied, so this sp.stream isn't a // stream target. To prevent future mismatch for other stream candidate, we need to // remove it from Node tree completely. And reset sp.stream. - RemoveFromTree(sp.stream) + RemoveAndReleaseTree(sp.stream) sp.stream = nil return nil } @@ -167,15 +167,26 @@ func (sp *XMLStreamReader) Read() (n *Node, err error) { return nil, sp.err } // Because this is a streaming read, we need to remove the last - // stream node from the node tree to free up memory. + // stream node from the node tree to free up memory. If Release() + // is called after Read() call, then sp.stream is already cleaned up; + // adding this piece of code here just in case Release() isn't called. if sp.stream != nil { - RemoveFromTree(sp.stream) + RemoveAndReleaseTree(sp.stream) sp.stream = nil } n, sp.err = sp.parse() return n, sp.err } +// Release releases the *Node (and its subtree) that Read() has previously +// returned. +func (sp *XMLStreamReader) Release(n *Node) { + if n == sp.stream { + sp.stream = nil + } + RemoveAndReleaseTree(n) +} + // AtLine returns the **rough** line number of the current XML decoder. func (sp *XMLStreamReader) AtLine() int { // Given all the libraries are of fixed versions in go modules, we're fine. diff --git a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go index 01f530b..aabb556 100644 --- a/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go +++ b/samples/omniv2/customfileformats/jsonlog/jsonlogformat/reader.go @@ -54,7 +54,11 @@ func (r *reader) Read() (*idr.Node, error) { if !strs.IsStrNonBlank(l) { continue } - n, err := parseJSON([]byte(l)) + p, err := idr.NewJSONStreamReader(bytes.NewReader([]byte(l)), ".") + if err != nil { + return nil, err + } + n, err := p.Read() if err != nil { // If we read out a log line fine, but unable to parse it, that shouldn't be // a fatal error, thus not wrapping the error in non-continuable error @@ -70,12 +74,10 @@ func (r *reader) Read() (*idr.Node, error) { } } -func parseJSON(b []byte) (*idr.Node, error) { - p, err := idr.NewJSONStreamReader(bytes.NewReader(b), ".") - if err != nil { - return nil, err +func (r *reader) Release(n *idr.Node) { + if n != nil { + idr.RemoveAndReleaseTree(n) } - return p.Read() } func (r *reader) IsContinuableError(err error) bool { diff --git a/samples/omniv2/json/json_test.go b/samples/omniv2/json/json_test.go index daf7835..47ab3f5 100644 --- a/samples/omniv2/json/json_test.go +++ b/samples/omniv2/json/json_test.go @@ -49,7 +49,7 @@ func init() { } // go test -bench=. -benchmem -benchtime=30s -// Benchmark2_Multiple_Objects-4 95299 377800 ns/op 99312 B/op 2351 allocs/op +// Benchmark2_Multiple_Objects-4 95775 381437 ns/op 84870 B/op 2180 allocs/op func Benchmark2_Multiple_Objects(b *testing.B) { for i := 0; i < b.N; i++ { From 601656371a3abd8e8810c56d1ca64fbde284439e Mon Sep 17 00:00:00 2001 From: jf-tech Date: Sat, 10 Oct 2020 10:38:09 +1300 Subject: [PATCH 4/4] fix cov --- idr/jsonreader_test.go | 1 + idr/xmlreader_test.go | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/idr/jsonreader_test.go b/idr/jsonreader_test.go index fddc797..8ae2613 100644 --- a/idr/jsonreader_test.go +++ b/idr/jsonreader_test.go @@ -119,6 +119,7 @@ func TestJSONStreamReader(t *testing.T) { assert.NoError(t, err) assert.True(t, len(test.expected) > 0) assert.Equal(t, test.expected[0], JSONify2(n)) + sp.Release(n) test.expected = test.expected[1:] } assert.Equal(t, 2, sp.AtLine()) diff --git a/idr/xmlreader_test.go b/idr/xmlreader_test.go index 503feff..d420470 100644 --- a/idr/xmlreader_test.go +++ b/idr/xmlreader_test.go @@ -43,6 +43,8 @@ func TestXMLStreamReader_SuccessWithXPathWithFilter(t *testing.T) { t.Run("IDR snapshot after 1st Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + // intentionally not calling sp.Release() to see if subsequent sp.Read() + // call would do the right thing and free the stream node or not. assert.Equal(t, 5, sp.AtLine()) // Second `` read @@ -52,6 +54,7 @@ func TestXMLStreamReader_SuccessWithXPathWithFilter(t *testing.T) { t.Run("IDR snapshot after 2nd Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + sp.Release(n) assert.Equal(t, 7, sp.AtLine()) // Third `` read (Note we will skip 'b3' since the streamElementFilter excludes it) @@ -61,6 +64,7 @@ func TestXMLStreamReader_SuccessWithXPathWithFilter(t *testing.T) { t.Run("IDR snapshot after 3rd Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + sp.Release(n) assert.Equal(t, 11, sp.AtLine()) n, err = sp.Read() @@ -104,6 +108,7 @@ func TestXMLStreamReader_SuccessWithXPathWithoutFilter(t *testing.T) { t.Run("IDR snapshot after 2nd Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + sp.Release(n) assert.Equal(t, 7, sp.AtLine()) // Third `` read @@ -113,6 +118,7 @@ func TestXMLStreamReader_SuccessWithXPathWithoutFilter(t *testing.T) { t.Run("IDR snapshot after 3rd Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + sp.Release(n) assert.Equal(t, 8, sp.AtLine()) // Fourth `` read @@ -122,6 +128,7 @@ func TestXMLStreamReader_SuccessWithXPathWithoutFilter(t *testing.T) { t.Run("IDR snapshot after 4th Read", func(t *testing.T) { cupaloy.SnapshotT(t, JSONify1(rootOf(n))) }) + sp.Release(n) assert.Equal(t, 11, sp.AtLine()) n, err = sp.Read()