diff --git a/granule.go b/granule.go index b1a4d9dbe..bf26e40dc 100644 --- a/granule.go +++ b/granule.go @@ -4,8 +4,6 @@ import ( "bytes" "fmt" "io" - "sort" - "sync" "sync/atomic" "github.com/google/btree" @@ -33,13 +31,6 @@ type GranuleMetadata struct { // This is used for quick insertion into the btree, without requiring an iterator least atomic.Pointer[dynparquet.DynamicRow] - // min contains the minimum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data - minlock sync.RWMutex - min map[string]*parquet.Value - // max contains the maximum value found for each column in the granule. It is used during iteration to validate if the granule contains interesting data - maxlock sync.RWMutex - max map[string]*parquet.Value - // size is the raw commited, and uncommited size of the granule. It is used as a suggestion for potential compaction size *atomic.Uint64 @@ -54,8 +45,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi tableConfig: tableConfig, metadata: GranuleMetadata{ - min: map[string]*parquet.Value{}, - max: map[string]*parquet.Value{}, least: atomic.Pointer[dynparquet.DynamicRow]{}, size: &atomic.Uint64{}, pruned: &atomic.Uint64{}, @@ -71,11 +60,6 @@ func NewGranule(granulesCreated prometheus.Counter, tableConfig *TableConfig, fi return nil, err } g.metadata.least.Store(least) - - // Set the minmaxes on the new granule - if err := g.minmaxes(firstPart); err != nil { - return nil, err - } } granulesCreated.Inc() @@ -102,11 +86,6 @@ func (g *Granule) addPart(p *Part, r *dynparquet.DynamicRow) (uint64, error) { } } - // Set the minmaxes for the granule - if err := g.minmaxes(p); err != nil { - return 0, err - } - // If the prepend returned that we're adding to the compacted list; then we need to propogate the Part to the new granules if node.sentinel == Compacted { err := addPartToGranule(g.newGranules, p) @@ -257,128 +236,3 @@ func (g *Granule) Less(than btree.Item) bool { func (g *Granule) Least() *dynparquet.DynamicRow { return (*dynparquet.DynamicRow)(g.metadata.least.Load()) } - -// minmaxes finds the mins and maxes of every column in a part. -func (g *Granule) minmaxes(p *Part) error { - f := p.Buf.ParquetFile() - - for _, rowGroup := range f.RowGroups() { - for _, columnChunk := range rowGroup.ColumnChunks() { - idx := columnChunk.ColumnIndex() - minvalues := make([]parquet.Value, 0, idx.NumPages()) - maxvalues := make([]parquet.Value, 0, idx.NumPages()) - for k := 0; k < idx.NumPages(); k++ { - minvalues = append(minvalues, idx.MinValue(k)) - maxvalues = append(maxvalues, idx.MaxValue(k)) - } - - // Check for min - min := findMin(columnChunk.Type(), minvalues) - - g.metadata.minlock.RLock() - val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] - g.metadata.minlock.RUnlock() - if val == nil || columnChunk.Type().Compare(*val, *min) == 1 { - if !min.IsNull() { - g.metadata.minlock.Lock() // Check again after acquiring the write lock - if val := g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *min) == 1 { - g.metadata.min[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = min - } - g.metadata.minlock.Unlock() - } - } - - // Check for max - max := findMax(columnChunk.Type(), maxvalues) - g.metadata.maxlock.RLock() - val = g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] - g.metadata.maxlock.RUnlock() - if val == nil || columnChunk.Type().Compare(*val, *max) == -1 { - if !max.IsNull() { - g.metadata.maxlock.Lock() // Check again after acquiring the write lock - if val := g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()]; val == nil || columnChunk.Type().Compare(*val, *max) == -1 { - g.metadata.max[rowGroup.Schema().Fields()[columnChunk.Column()].Name()] = max - } - g.metadata.maxlock.Unlock() - } - } - } - } - - return nil -} - -func find(minmax int, t parquet.Type, values []parquet.Value) *parquet.Value { - if len(values) == 0 { - return nil - } - - val := values[0] - for i := 1; i < len(values); i++ { - if t.Compare(val, values[i]) != minmax { - val = values[i] - } - } - - return &val -} - -func findMax(t parquet.Type, values []parquet.Value) *parquet.Value { - return find(1, t, values) -} - -func findMin(t parquet.Type, values []parquet.Value) *parquet.Value { - return find(-1, t, values) -} - -// Schema implements the Particulate interface. It generates a parquet.Schema from the min/max fields of the Granule. -func (g *Granule) Schema() *parquet.Schema { - group := parquet.Group{} - g.metadata.maxlock.RLock() - defer g.metadata.maxlock.RUnlock() - for name, v := range g.metadata.max { - switch v.Kind() { - case parquet.Int32: - group[name] = parquet.Int(32) - case parquet.Int64: - group[name] = parquet.Int(64) - case parquet.Float: - group[name] = parquet.Leaf(parquet.FloatType) - case parquet.Double: - group[name] = parquet.Leaf(parquet.DoubleType) - case parquet.ByteArray: - group[name] = parquet.String() - case parquet.FixedLenByteArray: - group[name] = parquet.Leaf(parquet.ByteArrayType) - default: - group[name] = parquet.Leaf(parquet.DoubleType) - } - } - return parquet.NewSchema("granule", group) -} - -// ColumnChunks implements the Particulate interface. -func (g *Granule) ColumnChunks() []parquet.ColumnChunk { - var chunks []parquet.ColumnChunk - g.metadata.maxlock.RLock() - defer g.metadata.maxlock.RUnlock() - g.metadata.minlock.RLock() - defer g.metadata.minlock.RUnlock() - - names := []string{} - for name := range g.metadata.max { - names = append(names, name) - } - sort.Strings(names) - - for _, name := range names { - chunks = append(chunks, VirtualSparseColumnChunk{ - i: VirtualSparseColumnIndex{ - Min: *g.metadata.min[name], - Max: *g.metadata.max[name], - }, - }) - } - - return chunks -} diff --git a/table.go b/table.go index d4e920caa..af149b333 100644 --- a/table.go +++ b/table.go @@ -1106,12 +1106,6 @@ func (t *TableBlock) RowGroupIterator( index.Ascend(func(i btree.Item) bool { g := i.(*Granule) - // Check if the entire granule can be skipped due to the filter - mayContainUsefulData, err := filter.Eval(g) - if err != nil || !mayContainUsefulData { - return true - } - g.PartBuffersForTx(tx, func(buf *dynparquet.SerializedBuffer) bool { f := buf.ParquetFile() for i := range f.RowGroups() {