diff --git a/omniparser/schemaplugin/omni/v2/fileformat/xml/reader.go b/omniparser/schemaplugin/omni/v2/fileformat/xml/reader.go
new file mode 100644
index 0000000..3619c53
--- /dev/null
+++ b/omniparser/schemaplugin/omni/v2/fileformat/xml/reader.go
@@ -0,0 +1,119 @@
+package omniv2xml
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "reflect"
+ "strings"
+
+ node "github.com/antchfx/xmlquery"
+
+ "github.com/jf-tech/omniparser/omniparser/errs"
+)
+
+// ErrNodeReadingFailed indicates the reader fails to read out a complete non-corrupted
+// XML element node. This is a fatal, non-continuable error.
+type ErrNodeReadingFailed string
+
+func (e ErrNodeReadingFailed) Error() string { return string(e) }
+
+// IsErrNodeReadingFailed checks if an err is of ErrNodeReadingFailed type.
+func IsErrNodeReadingFailed(err error) bool {
+ switch err.(type) {
+ case ErrNodeReadingFailed:
+ return true
+ default:
+ return false
+ }
+}
+
+type reader struct {
+ inputName string
+ reader *node.StreamParser
+}
+
+func (r *reader) Read() (*node.Node, error) {
+ n, err := r.reader.Read()
+ if err == io.EOF {
+ return nil, errs.ErrEOF
+ }
+ if err != nil {
+ return nil, ErrNodeReadingFailed(r.fmtErrStr(err.Error()))
+ }
+ return n, nil
+}
+
+func (r *reader) IsContinuableError(err error) bool {
+ return !IsErrNodeReadingFailed(err) && err != errs.ErrEOF
+}
+
+func (r *reader) FmtErr(format string, args ...interface{}) error {
+ return errors.New(r.fmtErrStr(format, args...))
+}
+
+const (
+ xmlParserLineFieldFQDN = "reader.p.decoder.line"
+)
+
+func (r *reader) fmtErrStr(format string, args ...interface{}) string {
+ return fmt.Sprintf("input '%s' near line %d: %s", r.inputName, r.lineNumber(), fmt.Sprintf(format, args...))
+}
+
+func (r *reader) lineNumber() int {
+ // We assumed the field structure leading to "line" from library
+ // github.com/antchfx/xmlquery. If we decide to upgrade to a newer version
+ // of the library and somehow the structure/path is changed, our test case
+ // will fail, ensuring us to make corresponding changes here.
+ return int(reflect.ValueOf(r.reader).Elem().
+ FieldByName("p").Elem().
+ FieldByName("decoder").Elem().
+ FieldByName("line").Int())
+}
+
+func removeLastFilterInXPath(xpath string) string {
+ runes := []rune(xpath)
+ if len(runes) == 0 {
+ return xpath
+ }
+ if runes[len(runes)-1] != ']' {
+ return xpath
+ }
+ bracket := 1
+ for pos := len(runes) - 2; pos >= 0; pos-- {
+ switch runes[pos] {
+ case '"', '\'':
+ quote := runes[pos]
+ for pos--; pos >= 0 && runes[pos] != quote; pos-- {
+ }
+ if pos < 0 {
+ goto fail
+ }
+ case '[':
+ bracket--
+ if bracket == 0 {
+ return string(runes[0:pos])
+ }
+ case ']':
+ bracket++
+ }
+ }
+fail:
+ return xpath
+}
+
+func NewReader(inputName string, src io.Reader, xpath string) (*reader, error) {
+ xpath = strings.TrimSpace(xpath)
+ xpathWithoutLastFilter := removeLastFilterInXPath(xpath)
+ var sp *node.StreamParser
+ var err error
+ if xpathWithoutLastFilter == xpath {
+ sp, err = node.CreateStreamParser(src, xpath)
+ } else {
+ sp, err = node.CreateStreamParser(src, xpathWithoutLastFilter, xpath)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &reader{inputName: inputName, reader: sp}, nil
+}
diff --git a/omniparser/schemaplugin/omni/v2/fileformat/xml/reader_test.go b/omniparser/schemaplugin/omni/v2/fileformat/xml/reader_test.go
new file mode 100644
index 0000000..387b491
--- /dev/null
+++ b/omniparser/schemaplugin/omni/v2/fileformat/xml/reader_test.go
@@ -0,0 +1,146 @@
+package omniv2xml
+
+import (
+ "errors"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/jf-tech/omniparser/omniparser/errs"
+)
+
+func TestIsErrNodeReadingFailed(t *testing.T) {
+ assert.True(t, IsErrNodeReadingFailed(ErrNodeReadingFailed("test")))
+ assert.Equal(t, "test", ErrNodeReadingFailed("test").Error())
+ assert.False(t, IsErrNodeReadingFailed(errors.New("test")))
+}
+
+func TestReader_Read_Success(t *testing.T) {
+ r, err := NewReader(
+ "test-input",
+ strings.NewReader(`
+
+ 1
+ 2
+ `),
+ "Root/Node[. != '2']")
+ assert.NoError(t, err)
+ assert.Equal(t, 1, r.lineNumber())
+
+ n, err := r.Read()
+ assert.NoError(t, err)
+ assert.Equal(t, "1", n.InnerText())
+ // xml.Decoder seems to keeps line at the end of whatever inside an element closing tag.
+ assert.Equal(t, 3, r.lineNumber())
+
+ n, err = r.Read()
+ assert.Error(t, err)
+ assert.Equal(t, errs.ErrEOF, err)
+ assert.Nil(t, n)
+}
+
+func TestReader_Read_InvalidXML(t *testing.T) {
+ r, err := NewReader(
+ "test-input",
+ strings.NewReader(`
+
+ 1
+ 2
+ `),
+ "Root/Node[. != '2']")
+ assert.NoError(t, err)
+ assert.Equal(t, 1, r.lineNumber())
+
+ n, err := r.Read()
+ assert.Error(t, err)
+ assert.True(t, IsErrNodeReadingFailed(err))
+ assert.Equal(t,
+ `input 'test-input' near line 5: XML syntax error on line 5: element closed by `,
+ err.Error())
+ assert.Nil(t, n)
+}
+
+func TestReader_FmtErr(t *testing.T) {
+ r, err := NewReader("test-input", strings.NewReader(""), "Root/Node")
+ assert.NoError(t, err)
+ err = r.FmtErr("golang is %s", "fun")
+ assert.Error(t, err)
+ assert.Equal(t, `input 'test-input' near line 1: golang is fun`, err.Error())
+}
+
+func TestReader_IsContinuableError(t *testing.T) {
+ r, err := NewReader("test", strings.NewReader(""), "Root/Node")
+ assert.NoError(t, err)
+ assert.False(t, r.IsContinuableError(errs.ErrEOF))
+ assert.False(t, r.IsContinuableError(ErrNodeReadingFailed("failure")))
+ assert.True(t, r.IsContinuableError(errs.ErrTransformFailed("failure")))
+ assert.True(t, r.IsContinuableError(errors.New("failure")))
+}
+
+func TestRemoveLastFilterInXPath(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ xpath string
+ expect string
+ }{
+ {
+ name: "empty",
+ xpath: "",
+ expect: "",
+ },
+ {
+ name: "blank",
+ xpath: " ",
+ expect: " ",
+ },
+ {
+ name: " /A/B/C ",
+ xpath: " /A/B/C ",
+ expect: " /A/B/C ",
+ },
+ {
+ name: "unbalanced brackets",
+ xpath: "/A/B/C[...]]",
+ expect: "/A/B/C[...]]",
+ },
+ {
+ name: "another unbalanced brackets",
+ xpath: "/A/B/C']",
+ expect: "/A/B/C']",
+ },
+ {
+ name: "balanced brackets",
+ xpath: "/A/B/C[...]",
+ expect: "/A/B/C",
+ },
+ {
+ name: "brackets in single quotes",
+ xpath: "/A/B/C[.='[']",
+ expect: "/A/B/C",
+ },
+ {
+ name: "brackets in double quotes",
+ xpath: `/A/B/C[.="abc]"]`,
+ expect: "/A/B/C",
+ },
+ {
+ name: "brackets not at the end",
+ xpath: `/A/B/C[.="abc]"]/D`,
+ expect: `/A/B/C[.="abc]"]/D`,
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ assert.Equal(t, test.expect, removeLastFilterInXPath(test.xpath))
+ })
+ }
+}
+
+func TestNewReader_InvalidXPath(t *testing.T) {
+ r, err := NewReader("test-input", strings.NewReader(""), "[not-valid")
+ assert.Error(t, err)
+ assert.Equal(t,
+ `invalid streamElementXPath '[not-valid', err: expression must evaluate to a node-set`,
+ err.Error())
+ assert.Nil(t, r)
+}