Skip to content

Commit

Permalink
remove min/maxes from granule metadata (#205)
Browse files Browse the repository at this point in the history
* remove min/maxes from granule metadata

* remove unused funcs
  • Loading branch information
thorfour committed Sep 22, 2022
1 parent a767773 commit 72eb78a
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 152 deletions.
146 changes: 0 additions & 146 deletions granule.go
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"fmt"
"io"
"sort"
"sync"
"sync/atomic"

"github.com/google/btree"
Expand Down Expand Up @@ -33,13 +31,6 @@ type GranuleMetadata struct {
// This is used for quick insertion into the btree, without requiring an iterator
least atomic.Pointer[dynparquet.DynamicRow]

// min contains the minimum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data
minlock sync.RWMutex
min map[string]*parquet.Value
// max contains the maximum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data
maxlock sync.RWMutex
max map[string]*parquet.Value

// size is the raw commited, and uncommited size of the granule. It is used as a suggestion for potential compaction
size *atomic.Uint64

Expand All @@ -54,8 +45,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi
tableConfig: tableConfig,

metadata: GranuleMetadata{
min: map[string]*parquet.Value{},
max: map[string]*parquet.Value{},
least: atomic.Pointer[dynparquet.DynamicRow]{},
size: &atomic.Uint64{},
pruned: &atomic.Uint64{},
Expand All @@ -71,11 +60,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi
return nil, err
}
g.metadata.least.Store(least)

// Set the minmaxes on the new granule
if err := g.minmaxes(firstPart); err != nil {
return nil, err
}
}

granulesCreated.Inc()
Expand All @@ -102,11 +86,6 @@ func (g *Granule) addPart(p *Part, r *dynparquet.DynamicRow) (uint64, error) {
}
}

// Set the minmaxes for the granule
if err := g.minmaxes(p); err != nil {
return 0, err
}

// If the prepend returned that we're adding to the compacted list; then we need to propogate the Part to the new granules
if node.sentinel == Compacted {
err := addPartToGranule(g.newGranules, p)
Expand Down Expand Up @@ -257,128 +236,3 @@ func (g *Granule) Less(than btree.Item) bool {
func (g *Granule) Least() *dynparquet.DynamicRow {
return (*dynparquet.DynamicRow)(g.metadata.least.Load())
}

// minmaxes finds the mins and maxes of every column in a part.
func (g *Granule) minmaxes(p *Part) error {
f := p.Buf.ParquetFile()

for _, rowGroup := range f.RowGroups() {
for _, columnChunk := range rowGroup.ColumnChunks() {
idx := columnChunk.ColumnIndex()
minvalues := make([]parquet.Value, 0, idx.NumPages())
maxvalues := make([]parquet.Value, 0, idx.NumPages())
for k := 0; k < idx.NumPages(); k++ {
minvalues = append(minvalues, idx.MinValue(k))
maxvalues = append(maxvalues, idx.MaxValue(k))
}

// Check for min
min := findMin(columnChunk.Type(), minvalues)

g.metadata.minlock.RLock()
val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]
g.metadata.minlock.RUnlock()
if val == nil || columnChunk.Type().Compare(*val, *min) == 1 {
if !min.IsNull() {
g.metadata.minlock.Lock() // Check again after acquiring the write lock
if val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *min) == 1 {
g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = min
}
g.metadata.minlock.Unlock()
}
}

// Check for max
max := findMax(columnChunk.Type(), maxvalues)
g.metadata.maxlock.RLock()
val = g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]
g.metadata.maxlock.RUnlock()
if val == nil || columnChunk.Type().Compare(*val, *max) == -1 {
if !max.IsNull() {
g.metadata.maxlock.Lock() // Check again after acquiring the write lock
if val := g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *max) == -1 {
g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = max
}
g.metadata.maxlock.Unlock()
}
}
}
}

return nil
}

func find(minmax int, t parquet.Type, values []parquet.Value) *parquet.Value {
if len(values) == 0 {
return nil
}

val := values[0]
for i := 1; i < len(values); i++ {
if t.Compare(val, values[i]) != minmax {
val = values[i]
}
}

return &val
}

func findMax(t parquet.Type, values []parquet.Value) *parquet.Value {
return find(1, t, values)
}

func findMin(t parquet.Type, values []parquet.Value) *parquet.Value {
return find(-1, t, values)
}

// Schema implements the Particulate interface. It generates a parquet.Schema from the min/max fields of the Granule.
func (g *Granule) Schema() *parquet.Schema {
group := parquet.Group{}
g.metadata.maxlock.RLock()
defer g.metadata.maxlock.RUnlock()
for name, v := range g.metadata.max {
switch v.Kind() {
case parquet.Int32:
group[name] = parquet.Int(32)
case parquet.Int64:
group[name] = parquet.Int(64)
case parquet.Float:
group[name] = parquet.Leaf(parquet.FloatType)
case parquet.Double:
group[name] = parquet.Leaf(parquet.DoubleType)
case parquet.ByteArray:
group[name] = parquet.String()
case parquet.FixedLenByteArray:
group[name] = parquet.Leaf(parquet.ByteArrayType)
default:
group[name] = parquet.Leaf(parquet.DoubleType)
}
}
return parquet.NewSchema("granule", group)
}

// ColumnChunks implements the Particulate interface.
func (g *Granule) ColumnChunks() []parquet.ColumnChunk {
var chunks []parquet.ColumnChunk
g.metadata.maxlock.RLock()
defer g.metadata.maxlock.RUnlock()
g.metadata.minlock.RLock()
defer g.metadata.minlock.RUnlock()

names := []string{}
for name := range g.metadata.max {
names = append(names, name)
}
sort.Strings(names)

for _, name := range names {
chunks = append(chunks, VirtualSparseColumnChunk{
i: VirtualSparseColumnIndex{
Min: *g.metadata.min[name],
Max: *g.metadata.max[name],
},
})
}

return chunks
}
6 changes: 0 additions & 6 deletions table.go
Expand Up @@ -1106,12 +1106,6 @@ func (t *TableBlock) RowGroupIterator(
index.Ascend(func(i btree.Item) bool {
g := i.(*Granule)

// Check if the entire granule can be skipped due to the filter
mayContainUsefulData, err := filter.Eval(g)
if err != nil || !mayContainUsefulData {
return true
}

g.PartBuffersForTx(tx, func(buf *dynparquet.SerializedBuffer) bool {
f := buf.ParquetFile()
for i := range f.RowGroups() {
Expand Down

0 comments on commit 72eb78a

Please sign in to comment.