Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Pass optional readers to override and cache metadata objects #249

Merged
merged 5 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bloom/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ func CheckSplitBlock(r io.ReaderAt, n int64, x uint64) (bool, error) {
block := acquireBlock()
defer releaseBlock(block)
offset := BlockSize * fasthash1x64(x, int32(n/BlockSize))
_, err := r.ReadAt(block.Bytes(), int64(offset))

buf := block.Bytes()
if cast, ok := r.(interface{ SetBloomFilterSection(offset, length int64) }); ok {
cast.SetBloomFilterSection(int64(offset), int64(len(buf)))
}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
_, err := r.ReadAt(buf, int64(offset))
return block.Check(uint32(x)), err
}

Expand Down
15 changes: 15 additions & 0 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,19 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
return nil, err
}

if cast, ok := f.reader.(interface{ SetMagicHeaderSection(offset, length int64) }); ok {
cast.SetMagicHeaderSection(0, 4)
}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if _, err := r.ReadAt(b[:4], 0); err != nil {
return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
}
if string(b[:4]) != "PAR1" {
return nil, fmt.Errorf("invalid magic header of parquet file: %q", b[:4])
}

if cast, ok := f.reader.(interface{ SetMagicFooterSection(offset, length int64) }); ok {
cast.SetMagicFooterSection(size-8, 8)
}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
if _, err := r.ReadAt(b[:8], size-8); err != nil {
return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
}
Expand All @@ -64,6 +70,9 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
footerSize := int64(binary.LittleEndian.Uint32(b[:4]))
footerData := make([]byte, footerSize)

if cast, ok := f.reader.(interface{ SetFooterSection(offset, length int64) }); ok {
cast.SetFooterSection(size-(footerSize+8), footerSize)
}
if _, err := f.reader.ReadAt(footerData, size-(footerSize+8)); err != nil {
return nil, fmt.Errorf("reading footer of parquet file: %w", err)
}
Expand Down Expand Up @@ -192,6 +201,9 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
if columnIndexOffset > 0 {
columnIndexData := indexBuffer[:columnIndexLength]

if cast, ok := f.reader.(interface{ SetColumnIndexSection(offset, length int64) }); ok {
cast.SetColumnIndexSection(columnIndexOffset, columnIndexLength)
}
if _, err := f.reader.ReadAt(columnIndexData, columnIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes column index at offset %d: %w", columnIndexLength, columnIndexOffset, err)
}
Expand All @@ -213,6 +225,9 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
if offsetIndexOffset > 0 {
offsetIndexData := indexBuffer[:offsetIndexLength]

if cast, ok := f.reader.(interface{ SetOffsetIndexSection(offset, length int64) }); ok {
cast.SetOffsetIndexSection(offsetIndexOffset, offsetIndexLength)
}
if _, err := f.reader.ReadAt(offsetIndexData, offsetIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes offset index at offset %d: %w", offsetIndexLength, offsetIndexOffset, err)
}
Expand Down