Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove min/maxes from granule metadata #205

Merged
merged 2 commits into from Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
146 changes: 0 additions & 146 deletions granule.go
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"fmt"
"io"
"sort"
"sync"
"sync/atomic"

"github.com/google/btree"
Expand Down Expand Up @@ -33,13 +31,6 @@ type GranuleMetadata struct {
// This is used for quick insertion into the btree, without requiring an iterator
least atomic.Pointer[dynparquet.DynamicRow]

// min contains the minimum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data
minlock sync.RWMutex
min map[string]*parquet.Value
// max contains the maximum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data
maxlock sync.RWMutex
max map[string]*parquet.Value

// size is the raw commited, and uncommited size of the granule. It is used as a suggestion for potential compaction
size *atomic.Uint64

Expand All @@ -54,8 +45,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi
tableConfig: tableConfig,

metadata: GranuleMetadata{
min: map[string]*parquet.Value{},
max: map[string]*parquet.Value{},
least: atomic.Pointer[dynparquet.DynamicRow]{},
size: &atomic.Uint64{},
pruned: &atomic.Uint64{},
Expand All @@ -71,11 +60,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi
return nil, err
}
g.metadata.least.Store(least)

// Set the minmaxes on the new granule
if err := g.minmaxes(firstPart); err != nil {
return nil, err
}
}

granulesCreated.Inc()
Expand All @@ -102,11 +86,6 @@ func (g *Granule) addPart(p *Part, r *dynparquet.DynamicRow) (uint64, error) {
}
}

// Set the minmaxes for the granule
if err := g.minmaxes(p); err != nil {
return 0, err
}

// If the prepend returned that we're adding to the compacted list; then we need to propogate the Part to the new granules
if node.sentinel == Compacted {
err := addPartToGranule(g.newGranules, p)
Expand Down Expand Up @@ -257,128 +236,3 @@ func (g *Granule) Less(than btree.Item) bool {
func (g *Granule) Least() *dynparquet.DynamicRow {
return (*dynparquet.DynamicRow)(g.metadata.least.Load())
}

// minmaxes finds the mins and maxes of every column in a part.
func (g *Granule) minmaxes(p *Part) error {
f := p.Buf.ParquetFile()

for _, rowGroup := range f.RowGroups() {
for _, columnChunk := range rowGroup.ColumnChunks() {
idx := columnChunk.ColumnIndex()
minvalues := make([]parquet.Value, 0, idx.NumPages())
maxvalues := make([]parquet.Value, 0, idx.NumPages())
for k := 0; k < idx.NumPages(); k++ {
minvalues = append(minvalues, idx.MinValue(k))
maxvalues = append(maxvalues, idx.MaxValue(k))
}

// Check for min
min := findMin(columnChunk.Type(), minvalues)

g.metadata.minlock.RLock()
val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]
g.metadata.minlock.RUnlock()
if val == nil || columnChunk.Type().Compare(*val, *min) == 1 {
if !min.IsNull() {
g.metadata.minlock.Lock() // Check again after acquiring the write lock
if val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *min) == 1 {
g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = min
}
g.metadata.minlock.Unlock()
}
}

// Check for max
max := findMax(columnChunk.Type(), maxvalues)
g.metadata.maxlock.RLock()
val = g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]
g.metadata.maxlock.RUnlock()
if val == nil || columnChunk.Type().Compare(*val, *max) == -1 {
if !max.IsNull() {
g.metadata.maxlock.Lock() // Check again after acquiring the write lock
if val := g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *max) == -1 {
g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = max
}
g.metadata.maxlock.Unlock()
}
}
}
}

return nil
}

func find(minmax int, t parquet.Type, values []parquet.Value) *parquet.Value {
if len(values) == 0 {
return nil
}

val := values[0]
for i := 1; i < len(values); i++ {
if t.Compare(val, values[i]) != minmax {
val = values[i]
}
}

return &val
}

func findMax(t parquet.Type, values []parquet.Value) *parquet.Value {
return find(1, t, values)
}

func findMin(t parquet.Type, values []parquet.Value) *parquet.Value {
return find(-1, t, values)
}

// Schema implements the Particulate interface. It generates a parquet.Schema from the min/max fields of the Granule.
func (g *Granule) Schema() *parquet.Schema {
group := parquet.Group{}
g.metadata.maxlock.RLock()
defer g.metadata.maxlock.RUnlock()
for name, v := range g.metadata.max {
switch v.Kind() {
case parquet.Int32:
group[name] = parquet.Int(32)
case parquet.Int64:
group[name] = parquet.Int(64)
case parquet.Float:
group[name] = parquet.Leaf(parquet.FloatType)
case parquet.Double:
group[name] = parquet.Leaf(parquet.DoubleType)
case parquet.ByteArray:
group[name] = parquet.String()
case parquet.FixedLenByteArray:
group[name] = parquet.Leaf(parquet.ByteArrayType)
default:
group[name] = parquet.Leaf(parquet.DoubleType)
}
}
return parquet.NewSchema("granule", group)
}

// ColumnChunks implements the Particulate interface.
func (g *Granule) ColumnChunks() []parquet.ColumnChunk {
var chunks []parquet.ColumnChunk
g.metadata.maxlock.RLock()
defer g.metadata.maxlock.RUnlock()
g.metadata.minlock.RLock()
defer g.metadata.minlock.RUnlock()

names := []string{}
for name := range g.metadata.max {
names = append(names, name)
}
sort.Strings(names)

for _, name := range names {
chunks = append(chunks, VirtualSparseColumnChunk{
i: VirtualSparseColumnIndex{
Min: *g.metadata.min[name],
Max: *g.metadata.max[name],
},
})
}

return chunks
}
6 changes: 0 additions & 6 deletions table.go
Expand Up @@ -1106,12 +1106,6 @@ func (t *TableBlock) RowGroupIterator(
index.Ascend(func(i btree.Item) bool {
g := i.(*Granule)

// Check if the entire granule can be skipped due to the filter
mayContainUsefulData, err := filter.Eval(g)
if err != nil || !mayContainUsefulData {
return true
}

g.PartBuffersForTx(tx, func(buf *dynparquet.SerializedBuffer) bool {
f := buf.ParquetFile()
for i := range f.RowGroups() {
Expand Down