Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Add support for caching parquet metadata #180

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,59 @@ Reading arrays of typed values is often preferable when performing aggregations
on the values as this model offers a more compact representation of the values
in memory, and pairs well with the use of optimizations like SIMD vectorization.

#### A. Caching parquet file metadata
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

In certain cases, we may require caching parquet metadata for quick access.
This can be achieved by using custom options on the File reader/writer to
retrieve handlers for accessing data from a secondary storage.

```
// Option to apply filepath to column chunk configuration
type ApplyColumnChunkFilePath struct {
filepath string
}

func (a *ApplyColumnChunkFilePath) ConfigureWriter(wc *pq.WriterConfig) {
wc.ColumnChunkFilePath = a.filepath
}

var _ WriterOption = (*ApplyColumnChunkFilePath)(nil)

w := NewWriter(file, ..., &ApplyColumnChunkFilePath{filepath: "foobar"})

```

```
// Custom reader can be used to fetch data from a secondary backend
type CustomIOReader struct {
...
}

func (c *CustomIOReader) ReadAt(p []byte, off int64) (int, error) {
// custom reader implementation
}

var _ io.ReaderAt = (*CustomIOReader)(nil)

// Option to provide a way to get io reader given filepath
type ApplyCustomIOReader struct {
reader CustomIOReader
}

func (a *ApplyCustomIOReader) ConfigureFile(fc *FileConfig) {
// callback function to fetch handlers to read data from secondary backend
fc.GetIOReaderFromPath = func(filepath string) io.ReaderAt {
...
return &a.reader
}
}

var _ FileOption = (*ApplyCustomIOReader)(nil)

f, _ = OpenFile(file, ..., &ApplyCustomIOReader{reader: CustomIOReader{ioRWFromBackendRW: *remoteFile}})

```

### Optimizing Writes

Applications that deal with columnar storage are sometimes designed to work with
Expand Down
20 changes: 18 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package parquet

import (
"fmt"
"io"
"strings"

"github.com/segmentio/parquet-go/compress"
"github.com/segmentio/parquet-go/format"
)

const (
Expand All @@ -19,6 +21,10 @@ const (
DefaultSkipBloomFilters = false
)

type ColumnChunkReader interface {
FromMetadata(format.ColumnMetaData) io.ReaderAt
}

// The FileConfig type carries configuration options for parquet files.
//
// FileConfig implements the FileOption interface so it can be used directly
Expand All @@ -30,8 +36,9 @@ const (
// })
//
type FileConfig struct {
SkipPageIndex bool
SkipBloomFilters bool
SkipPageIndex bool
SkipBloomFilters bool
ColumnChunkReader ColumnChunkReader
}

// DefaultFileConfig returns a new FileConfig value initialized with the
Expand Down Expand Up @@ -317,6 +324,15 @@ func SkipBloomFilters(skip bool) FileOption {
return fileOption(func(config *FileConfig) { config.SkipBloomFilters = skip })
}

// ConfigureColumnChunkReader is a file configuration option which allows using
// secondary io readers for column chunk data. This can be useful as a caching
// mechanism for the metadata or the column chunk data.
//
// Defaults to nil.
func ConfigureColumnChunkReader(r ColumnChunkReader) FileOption {
return fileOption(func(config *FileConfig) { config.ColumnChunkReader = r })
}

// PageBufferSize configures the size of column page buffers on parquet writers.
//
// Note that the page buffer size refers to the in-memory buffers where pages
Expand Down
31 changes: 21 additions & 10 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ const (
// File represents a parquet file. The layout of a Parquet file can be found
// here: https://github.com/apache/parquet-format#file-format
type File struct {
metadata format.FileMetaData
protocol thrift.CompactProtocol
reader io.ReaderAt
size int64
schema *Schema
root *Column
columnIndexes []format.ColumnIndex
offsetIndexes []format.OffsetIndex
rowGroups []RowGroup
metadata format.FileMetaData
protocol thrift.CompactProtocol
reader io.ReaderAt
size int64
schema *Schema
root *Column
columnIndexes []format.ColumnIndex
offsetIndexes []format.OffsetIndex
rowGroups []RowGroup
columnChunkReader ColumnChunkReader
}

// OpenFile opens a parquet file and reads the content between offset 0 and the given
Expand All @@ -47,6 +48,9 @@ func OpenFile(r io.ReaderAt, size int64, options ...FileOption) (*File, error) {
return nil, err
}

// used to get secondary readers for column chunks
f.columnChunkReader = c.ColumnChunkReader

if _, err := r.ReadAt(b[:4], 0); err != nil {
return nil, fmt.Errorf("reading magic header of parquet file: %w", err)
}
Expand Down Expand Up @@ -452,7 +456,14 @@ func (f *filePages) init(c *fileColumnChunk) {
f.dictOffset = f.baseOffset
}

f.section = *io.NewSectionReader(c.file, f.baseOffset, c.chunk.MetaData.TotalCompressedSize)
// fetch secondary readers for column chunk data
var reader io.ReaderAt
reader = c.file
if f.chunk.file.columnChunkReader != nil {
reader = f.chunk.file.columnChunkReader.FromMetadata(f.chunk.chunk.MetaData)
}

f.section = *io.NewSectionReader(reader, f.baseOffset, c.chunk.MetaData.TotalCompressedSize)
f.rbuf = acquireReadBuffer(&f.section)
f.decoder.Reset(f.protocol.NewReader(f.rbuf))
}
Expand Down