Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
Use readers created upfront, add MetadataReaders interface
Browse files Browse the repository at this point in the history
Signed-off-by: Annanay Agarwal <annanay.agarwal@grafana.com>
  • Loading branch information
annanay25 committed Jun 22, 2022
1 parent 2e162cb commit b1aabf3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
16 changes: 8 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ const (
DefaultSkipBloomFilters = false
)

type MetadataCacher interface {
GetReader() io.ReaderAt
ShouldCacheBloomFilters() bool
ShouldCacheHeader() bool
ShouldCacheFooter() bool
type MetadataReaders interface {
HeaderReader() io.ReaderAt
FooterReader() io.ReaderAt
PageIndexReader() io.ReaderAt
BloomFilterReader() io.ReaderAt
}

// The FileConfig type carries configuration options for parquet files.
Expand All @@ -40,7 +40,7 @@ type MetadataCacher interface {
type FileConfig struct {
SkipPageIndex bool
SkipBloomFilters bool
MetadataCacher MetadataCacher
MetadataReaders MetadataReaders
}

// DefaultFileConfig returns a new FileConfig value initialized with the
Expand Down Expand Up @@ -326,8 +326,8 @@ func SkipBloomFilters(skip bool) FileOption {
return fileOption(func(config *FileConfig) { config.SkipBloomFilters = skip })
}

func AddMetadataCacher(m MetadataCacher) FileOption {
return fileOption(func(config *FileConfig) { config.MetadataCacher = m })
func AddMetadataCacher(m MetadataReaders) FileOption {
return fileOption(func(config *FileConfig) { config.MetadataReaders = m })
}

// PageBufferSize configures the size of column page buffers on parquet writers.
Expand Down
42 changes: 21 additions & 21 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ const (
// File represents a parquet file. The layout of a Parquet file can be found
// here: https://github.com/apache/parquet-format#file-format
type File struct {
metadata format.FileMetaData
protocol thrift.CompactProtocol
reader io.ReaderAt
size int64
schema *Schema
root *Column
columnIndexes []format.ColumnIndex
offsetIndexes []format.OffsetIndex
rowGroups []RowGroup
MetadataCacher MetadataCacher
metadata format.FileMetaData
protocol thrift.CompactProtocol
reader io.ReaderAt
size int64
schema *Schema
root *Column
columnIndexes []format.ColumnIndex
offsetIndexes []format.OffsetIndex
rowGroups []RowGroup
MetadataReaders MetadataReaders
}

// OpenFile opens a parquet file and reads the content between offset 0 and the given
Expand All @@ -48,12 +48,12 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
return nil, err
}

f.MetadataCacher = c.MetadataCacher
f.MetadataReaders = c.MetadataReaders

// check if we should use cached reader for header
reader := r
if f.MetadataCacher != nil && f.MetadataCacher.ShouldCacheHeader() {
r = f.MetadataCacher.GetReader()
if f.MetadataReaders != nil {
r = f.MetadataReaders.HeaderReader()
}
if _, err := reader.ReadAt(b[:4], 0); err != nil {
return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
Expand All @@ -64,8 +64,8 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {

// check if we should use cached reader for footer
reader = r
if f.MetadataCacher != nil && f.MetadataCacher.ShouldCacheFooter() {
r = f.MetadataCacher.GetReader()
if f.MetadataReaders != nil {
r = f.MetadataReaders.FooterReader()
}
if _, err := reader.ReadAt(b[:8], size-8); err != nil {
return nil, fmt.Errorf("reading magic footer of parquet file: %w", err)
Expand Down Expand Up @@ -116,8 +116,8 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
p := thrift.CompactProtocol{}

reader := f.reader
if f.MetadataCacher != nil && f.MetadataCacher.ShouldCacheBloomFilters() {
reader = f.MetadataCacher.GetReader()
if f.MetadataReaders != nil {
reader = f.MetadataReaders.BloomFilterReader()
}
s := io.NewSectionReader(reader, 0, size)
d := thrift.NewDecoder(p.NewReader(s))
Expand Down Expand Up @@ -211,8 +211,8 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
columnIndexData := indexBuffer[:columnIndexLength]

reader := f.reader
if f.MetadataCacher != nil && f.MetadataCacher.ShouldCachePageIndex() {
reader = f.MetadataCacher.GetReader()
if f.MetadataReaders != nil {
reader = f.MetadataReaders.PageIndexReader()
}
if _, err := reader.ReadAt(columnIndexData, columnIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes column index at offset %d: %w", columnIndexLength, columnIndexOffset, err)
Expand All @@ -236,8 +236,8 @@ func (f *File) ReadPageIndex() ([]format.ColumnIndex, []format.OffsetIndex, erro
offsetIndexData := indexBuffer[:offsetIndexLength]

reader := f.reader
if f.MetadataCacher != nil && f.MetadataCacher.ShouldCachePageIndex() {
reader = f.MetadataCacher.GetReader()
if f.MetadataReaders != nil {
reader = f.MetadataReaders.PageIndexReader()
}
if _, err := reader.ReadAt(offsetIndexData, offsetIndexOffset); err != nil {
return nil, nil, fmt.Errorf("reading %d bytes offset index at offset %d: %w", offsetIndexLength, offsetIndexOffset, err)
Expand Down

0 comments on commit b1aabf3

Please sign in to comment.