Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions customfuncs/javascript.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"strings"
"sync"
"unsafe"

"github.com/dop251/goja"
"github.com/jf-tech/go-corelib/caches"
Expand Down Expand Up @@ -113,10 +112,7 @@ func getNodeJSON(n *idr.Node) string {
if disableCaching {
return idr.JSONify2(n)
}
// TODO if in the future we have *idr.Node allocation recycling, then this by-addr caching
// won't work. Ideally, we should have a node ID which refreshes upon recycling.
addr := strconv.FormatUint(uint64(uintptr(unsafe.Pointer(n))), 16)
j, _ := NodeToJSONCache.Get(addr, func(interface{}) (interface{}, error) {
j, _ := NodeToJSONCache.Get(n.ID, func(interface{}) (interface{}, error) {
return idr.JSONify2(n), nil
})
return j.(string)
Expand Down
12 changes: 6 additions & 6 deletions customfuncs/javascript_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ func TestJavascriptClearVarsAfterRunProgram(t *testing.T) {
}

// go test -bench=. -benchmem -benchtime=30s
// BenchmarkIfElse-4 368017143 98.1 ns/op 8 B/op 1 allocs/op
// BenchmarkEval-4 26409430 1386 ns/op 418 B/op 8 allocs/op
// BenchmarkJavascriptWithNoCache-4 172803 210958 ns/op 136608 B/op 1698 allocs/op
// BenchmarkJavascriptWithCache-4 23059004 1572 ns/op 129 B/op 8 allocs/op
// BenchmarkConcurrentJavascriptWithNoCache-4 1140 32729941 ns/op 27328924 B/op 339654 allocs/op
// BenchmarkConcurrentJavascriptWithCache-4 70977 504870 ns/op 26568 B/op 1745 allocs/op
// BenchmarkIfElse-4 352522934 97.9 ns/op 8 B/op 1 allocs/op
// BenchmarkEval-4 25887961 1405 ns/op 418 B/op 8 allocs/op
// BenchmarkJavascriptWithNoCache-4 172342 210129 ns/op 136607 B/op 1698 allocs/op
// BenchmarkJavascriptWithCache-4 15996846 2062 ns/op 128 B/op 8 allocs/op
// BenchmarkConcurrentJavascriptWithNoCache-4 1143 33140598 ns/op 27328719 B/op 339653 allocs/op
// BenchmarkConcurrentJavascriptWithCache-4 65140 543091 ns/op 26608 B/op 1746 allocs/op

var (
benchTitles = []string{"", "Dr", "Sir"}
Expand Down
6 changes: 6 additions & 0 deletions handlers/omni/v2/fileformat/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ func (r *reader) recordToNode(record []string) *idr.Node {
return root
}

func (r *reader) Release(n *idr.Node) {
if n != nil {
idr.RemoveAndReleaseTree(n)
}
}

func (r *reader) IsContinuableError(err error) bool {
return !IsErrInvalidHeader(err) && err != io.EOF
}
Expand Down
1 change: 1 addition & 0 deletions handlers/omni/v2/fileformat/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestReader(t *testing.T) {
expectedJSON, ok := test.expected[0].(string)
assert.True(t, ok)
assert.Equal(t, jsons.BPJ(expectedJSON), jsons.BPJ(idr.JSONify2(n)))
r.Release(n)
test.expected = test.expected[1:]
}
})
Expand Down
3 changes: 3 additions & 0 deletions handlers/omni/v2/fileformat/fileformat.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type FormatReader interface {
// Read returns a *Node and its subtree that will eventually be parsed and transformed into an
// output record. If EOF has been reached, io.EOF must be returned.
Read() (*idr.Node, error)
// Release gives the reader a chance to free resources of the *Node and its subtree that it returned
// to caller in a previous Read() call.
Release(*idr.Node)
// IsContinuableError determines whether an FormatReader returned error is continuable or not.
// For certain errors (like EOF or corruption) there is no point to keep on trying; while others
// can be safely ignored.
Expand Down
6 changes: 6 additions & 0 deletions handlers/omni/v2/fileformat/json/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func (r *reader) Read() (*idr.Node, error) {
return n, nil
}

func (r *reader) Release(n *idr.Node) {
if n != nil {
r.r.Release(n)
}
}

func (r *reader) IsContinuableError(err error) bool {
return !IsErrNodeReadingFailed(err) && err != io.EOF
}
Expand Down
3 changes: 3 additions & 0 deletions handlers/omni/v2/fileformat/json/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@ func TestReader_Read_Success(t *testing.T) {
name, err := idr.MatchSingle(n, "name")
assert.NoError(t, err)
assert.Equal(t, "john", name.InnerText())
// intentionally not calling r.Release(n) to verify that the
// stream node is freed up by a subsequent Read() call.

n, err = r.Read()
assert.NoError(t, err)
name, err = idr.MatchSingle(n, "name")
assert.NoError(t, err)
assert.Equal(t, "jane", name.InnerText())
r.Release(n)

n, err = r.Read()
assert.Error(t, err)
Expand Down
6 changes: 6 additions & 0 deletions handlers/omni/v2/fileformat/xml/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func (r *reader) Read() (*idr.Node, error) {
return n, nil
}

func (r *reader) Release(n *idr.Node) {
if n != nil {
r.r.Release(n)
}
}

func (r *reader) IsContinuableError(err error) bool {
return !IsErrNodeReadingFailed(err) && err != io.EOF
}
Expand Down
9 changes: 9 additions & 0 deletions handlers/omni/v2/fileformat/xml/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestReader_Read_Success(t *testing.T) {
<Root>
<Node>1</Node>
<Node>2</Node>
<Node>3</Node>
</Root>`),
"Root/Node[. != '2']")
assert.NoError(t, err)
Expand All @@ -34,6 +35,14 @@ func TestReader_Read_Success(t *testing.T) {
assert.Equal(t, "1", n.InnerText())
// xml.Decoder seems to keeps line at the end of whatever inside an element closing tag.
assert.Equal(t, 3, r.r.AtLine())
// intentionally not calling r.Release(n) to verify that the
// stream node is freed up by a subsequent Read() call.

n, err = r.Read()
assert.NoError(t, err)
assert.Equal(t, "3", n.InnerText())
assert.Equal(t, 5, r.r.AtLine())
r.Release(n)

n, err = r.Read()
assert.Error(t, err)
Expand Down
1 change: 1 addition & 0 deletions handlers/omni/v2/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type testFormatReader struct {
}

func (r testFormatReader) Read() (*idr.Node, error) { panic("implement me") }
func (r testFormatReader) Release(*idr.Node) { panic("implement me") }
func (r testFormatReader) IsContinuableError(error) bool { panic("implement me") }
func (r testFormatReader) FmtErr(string, ...interface{}) error { panic("implement me") }

Expand Down
5 changes: 3 additions & 2 deletions handlers/omni/v2/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ type ingester struct {
}

func (g *ingester) Read() ([]byte, error) {
node, err := g.reader.Read()
n, err := g.reader.Read()
if err != nil {
// Read() supposed to have already done CtxAwareErr error wrapping. So directly return.
return nil, err
}
defer g.reader.Release(n)
result, err := transform.NewParseCtx(
g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(node, g.finalOutputDecl)
g.ctx, g.customFuncs, g.customParseFuncs).ParseNode(n, g.finalOutputDecl)
if err != nil {
// ParseNode() error not CtxAwareErr wrapped, so wrap it.
// Note errs.ErrorTransformFailed is a continuable error.
Expand Down
15 changes: 11 additions & 4 deletions handlers/omni/v2/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
)

var errContinuableInTest = errors.New("continuable error")
var ingesterTestNode = idr.CreateNode(idr.ElementNode, "test")

type testReader struct {
result []*idr.Node
err []error
result []*idr.Node
err []error
releaseCalled int
}

func (r *testReader) Read() (*idr.Node, error) {
Expand All @@ -31,6 +33,8 @@ func (r *testReader) Read() (*idr.Node, error) {
return result, err
}

func (r *testReader) Release(n *idr.Node) { r.releaseCalled++ }

func (r *testReader) IsContinuableError(err error) bool { return err == errContinuableInTest }

func (r *testReader) FmtErr(format string, args ...interface{}) error {
Expand All @@ -45,6 +49,7 @@ func TestIngester_Read_ReadFailure(t *testing.T) {
assert.Error(t, err)
assert.Equal(t, "test failure", err.Error())
assert.Nil(t, b)
assert.Equal(t, 0, g.reader.(*testReader).releaseCalled)
}

func TestIngester_Read_ParseNodeFailure(t *testing.T) {
Expand All @@ -57,7 +62,7 @@ func TestIngester_Read_ParseNodeFailure(t *testing.T) {
assert.NoError(t, err)
g := &ingester{
finalOutputDecl: finalOutputDecl,
reader: &testReader{result: []*idr.Node{nil}, err: []error{nil}},
reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}},
}
b, err := g.Read()
assert.Error(t, err)
Expand All @@ -67,6 +72,7 @@ func TestIngester_Read_ParseNodeFailure(t *testing.T) {
`ctx: fail to transform. err: fail to convert value 'abc' to type 'int' on 'FINAL_OUTPUT', err: strconv.ParseFloat: parsing "abc": invalid syntax`,
err.Error())
assert.Nil(t, b)
assert.Equal(t, 1, g.reader.(*testReader).releaseCalled)
}

func TestIngester_Read_Success(t *testing.T) {
Expand All @@ -79,11 +85,12 @@ func TestIngester_Read_Success(t *testing.T) {
assert.NoError(t, err)
g := &ingester{
finalOutputDecl: finalOutputDecl,
reader: &testReader{result: []*idr.Node{nil}, err: []error{nil}},
reader: &testReader{result: []*idr.Node{ingesterTestNode}, err: []error{nil}},
}
b, err := g.Read()
assert.NoError(t, err)
assert.Equal(t, "123", string(b))
assert.Equal(t, 1, g.reader.(*testReader).releaseCalled)
}

func TestIsContinuableError(t *testing.T) {
Expand Down
10 changes: 1 addition & 9 deletions handlers/omni/v2/transform/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"reflect"
"strconv"
"strings"
"unsafe"

"github.com/jf-tech/go-corelib/strs"

Expand Down Expand Up @@ -37,11 +36,6 @@ func NewParseCtx(
}
}

func nodePtrAddrStr(n *idr.Node) string {
// `uintptr` is faster than `fmt.Sprintf("%p"...)`
return strconv.FormatUint(uint64(uintptr(unsafe.Pointer(n))), 16)
}

func resultTypeConversion(decl *Decl, value string) (interface{}, error) {
if decl.resultType() == ResultTypeString {
return value, nil
Expand Down Expand Up @@ -131,9 +125,7 @@ func normalizeAndReturnValue(decl *Decl, value interface{}) (interface{}, error)
func (p *parseCtx) ParseNode(n *idr.Node, decl *Decl) (interface{}, error) {
var cacheKey string
if !p.disableTransformCache {
// TODO if in the future we have *idr.Node allocation recycling, then this by-addr caching won't work.
// Ideally, we should have a node ID which refreshes upon recycling.
cacheKey = nodePtrAddrStr(n) + "/" + decl.hash
cacheKey = strconv.FormatInt(n.ID, 16) + "/" + decl.hash
if cacheValue, found := p.transformCache[cacheKey]; found {
return cacheValue, nil
}
Expand Down
17 changes: 14 additions & 3 deletions idr/jsonreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (sp *JSONStreamReader) wrapUpCurAndTargetCheck() *Node {
// node fully and discovered sp.xpathFilterExpr can't be satisfied, so this
// sp.stream isn't a target. To prevent future mismatch for other stream candidate,
// we need to remove it from Node tree completely. And reset sp.stream.
RemoveFromTree(sp.stream)
RemoveAndReleaseTree(sp.stream)
sp.stream = nil
return nil
}
Expand Down Expand Up @@ -192,14 +192,25 @@ func (sp *JSONStreamReader) parse() (*Node, error) {
// Read returns a *Node that matches the xpath streaming criteria.
func (sp *JSONStreamReader) Read() (*Node, error) {
// Because this is a streaming read, we need to release/remove last
// stream node from the node tree to free up memory.
// stream node from the node tree to free up memory. If Release() is
// called after Read() call, then sp.stream is already cleaned up;
// adding this piece of code here just in case Release() isn't called.
if sp.stream != nil {
RemoveFromTree(sp.stream)
RemoveAndReleaseTree(sp.stream)
sp.stream = nil
}
return sp.parse()
}

// Release releases the *Node (and its subtree) that Read() has previously
// returned.
func (sp *JSONStreamReader) Release(n *Node) {
if n == sp.stream {
sp.stream = nil
}
RemoveAndReleaseTree(n)
}

// AtLine returns the **rough** line number of the current JSON decoder.
func (sp *JSONStreamReader) AtLine() int {
return sp.r.AtLine()
Expand Down
1 change: 1 addition & 0 deletions idr/jsonreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func TestJSONStreamReader(t *testing.T) {
assert.NoError(t, err)
assert.True(t, len(test.expected) > 0)
assert.Equal(t, test.expected[0], JSONify2(n))
sp.Release(n)
test.expected = test.expected[1:]
}
assert.Equal(t, 2, sp.AtLine())
Expand Down
5 changes: 3 additions & 2 deletions idr/navigator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

func navTestSetup(t *testing.T) (*testTree, *navigator, func(*Node)) {
setupTestNodeCaching(testNodeCachingOff)
tt := newTestTree(t, testTreeXML)
nav := createNavigator(tt.root)
moveTo := func(n *Node) { nav.cur = n }
Expand Down Expand Up @@ -120,8 +121,8 @@ func TestMoveToChild(t *testing.T) {
moveTo(tt.attrC1)
assert.False(t, nav.MoveToChild())
// remove elemC3/elemC4, so elemC now only has attrC1 and attrC2
RemoveFromTree(tt.elemC3)
RemoveFromTree(tt.elemC4)
RemoveAndReleaseTree(tt.elemC3)
RemoveAndReleaseTree(tt.elemC4)
moveTo(tt.elemC)
assert.False(t, nav.MoveToChild())
moveTo(tt.elemB)
Expand Down
Loading