Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Pass optional readers to override and cache metadata objects #249

Merged
merged 5 commits into from
Jun 30, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
return nil, fmt.Errorf("invalid magic header of parquet file: %q", b[:4])
}

if cast, ok := f.reader.(interface{ SetMagicFooterSection(offset, length int64) }); ok {
cast.SetMagicFooterSection(size-8, 8)
}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if _, err := r.ReadAt(b[:8], size-8); err != nil {
return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
}
Expand All @@ -64,6 +67,9 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
footerSize := int64(binary.LittleEndian.Uint32(b[:4]))
footerData := make([]byte, footerSize)

if cast, ok := f.reader.(interface{ SetFooterSection(offset, length int64) }); ok {
cast.SetFooterSection(size-(footerSize+8), footerSize)
}
if _, err := f.reader.ReadAt(footerData, size-(footerSize+8)); err != nil {
return nil, fmt.Errorf("reading footer of parquet file: %w", err)
}
Expand Down Expand Up @@ -117,6 +123,12 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
return nil, err
}
offset, _ = s.Seek(0, io.SeekCurrent)
if cast, ok := r.(interface{ SetBloomFilterSection(offset, length int64) }); ok {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
bloomFilterOffset := c.chunk.MetaData.BloomFilterOffset
bloomFilterLength := (offset - bloomFilterOffset) + int64(h.NumBytes)
cast.SetBloomFilterSection(bloomFilterOffset, bloomFilterLength)
}

c.bloomFilter = newBloomFilter(r, offset, &h)
}
}
Expand Down Expand Up @@ -192,6 +204,9 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
if columnIndexOffset > 0 {
columnIndexData := indexBuffer[:columnIndexLength]

if cast, ok := f.reader.(interface{ SetColumnIndexSection(offset, length int64) }); ok {
cast.SetColumnIndexSection(columnIndexOffset, columnIndexLength)
}
if _, err := f.reader.ReadAt(columnIndexData, columnIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes column index at offset %d: %w", columnIndexLength, columnIndexOffset, err)
}
Expand All @@ -213,6 +228,9 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
if offsetIndexOffset > 0 {
offsetIndexData := indexBuffer[:offsetIndexLength]

if cast, ok := f.reader.(interface{ SetOffsetIndexSection(offset, length int64) }); ok {
cast.SetOffsetIndexSection(offsetIndexOffset, offsetIndexLength)
}
if _, err := f.reader.ReadAt(offsetIndexData, offsetIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes offset index at offset %d: %w", offsetIndexLength, offsetIndexOffset, err)
}
Expand Down