Skip to content

Commit

Permalink
Fix SeekToRow on mergedRowGroupRows (xitongsys#462)
Browse files Browse the repository at this point in the history
When a Read is performed after SeekToRow on mergedRowGroups, the rowIndex is
checked against the seek index and advanced until the rowIndex == seek index.
Previously, the rowIndex was not advanced in the normal read path, resulting in
mistakenly dropping unread rows when advancing the rowIndex.
  • Loading branch information
asubiotto committed Jan 19, 2023
1 parent 952b161 commit c3c521a
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
11 changes: 8 additions & 3 deletions merge.go
Expand Up @@ -106,6 +106,12 @@ type mergedRowGroupRows struct {
schema *Schema
}

func (r *mergedRowGroupRows) readInternal(rows []Row) (int, error) {
n, err := r.merge.ReadRows(rows)
r.rowIndex += int64(n)
return n, err
}

func (r *mergedRowGroupRows) Close() (lastErr error) {
r.merge.close()
r.rowIndex = 0
Expand All @@ -126,14 +132,13 @@ func (r *mergedRowGroupRows) ReadRows(rows []Row) (int, error) {
if n > len(rows) {
n = len(rows)
}
n, err := r.merge.ReadRows(rows[:n])
n, err := r.readInternal(rows[:n])
if err != nil {
return 0, err
}
r.rowIndex += int64(n)
}

return r.merge.ReadRows(rows)
return r.readInternal(rows)
}

func (r *mergedRowGroupRows) SeekToRow(rowIndex int64) error {
Expand Down
61 changes: 61 additions & 0 deletions merge_test.go
Expand Up @@ -423,6 +423,67 @@ func TestMergeRowGroupsCursorsAreClosed(t *testing.T) {
}
}

func TestMergeRowGroupsSeekToRow(t *testing.T) {
type model struct {
A int
}

schema := parquet.SchemaOf(model{})
options := []parquet.RowGroupOption{
parquet.SortingRowGroupConfig(
parquet.SortingColumns(
parquet.Ascending(schema.Columns()[0]...),
),
),
}

rowGroups := make([]parquet.RowGroup, numRowGroups)

counter := 0
for i := range rowGroups {
rows := make([]interface{}, 0, rowsPerGroup)
for j := 0; j < rowsPerGroup; j++ {
rows = append(rows, model{A: counter})
counter++
}
rowGroups[i] = sortedRowGroup(options, rows...)
}

m, err := parquet.MergeRowGroups(rowGroups, options...)
if err != nil {
t.Fatal(err)
}

func() {
mergedRows := m.Rows()
defer mergedRows.Close()

rbuf := make([]parquet.Row, 1)
cursor := int64(0)
for {
if err := mergedRows.SeekToRow(cursor); err != nil {
t.Fatal(err)
}

if _, err := mergedRows.ReadRows(rbuf); err != nil {
if errors.Is(err, io.EOF) {
break
}
t.Fatal(err)
}
v := model{}
if err := schema.Reconstruct(&v, rbuf[0]); err != nil {
t.Fatal(err)
}
if v.A != int(cursor) {
t.Fatalf("expected value %d, got %d", cursor, v.A)
}

cursor++
}
}()
}

func BenchmarkMergeRowGroups(b *testing.B) {
for _, test := range readerTests {
b.Run(test.scenario, func(b *testing.B) {
Expand Down

0 comments on commit c3c521a

Please sign in to comment.