-
Notifications
You must be signed in to change notification settings - Fork 3
/
indexer.go
114 lines (97 loc) · 2.51 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package indexer
import (
"sort"
"unsafe"
"github.com/RoaringBitmap/roaring"
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/vinceanalytics/vince/internal/filters"
"github.com/vinceanalytics/vince/internal/index"
"github.com/vinceanalytics/vince/internal/logger"
)
type ArrowIndexer struct{}
func New() *ArrowIndexer {
return new(ArrowIndexer)
}
var _ index.Index = (*ArrowIndexer)(nil)
func (idx *ArrowIndexer) Index(r arrow.Record) (index.Full, error) {
cIdx := index.NewColIdx()
defer cIdx.Release()
o := make(map[string]*index.FullColumn)
for i := 0; i < int(r.NumCols()); i++ {
a := r.Column(i)
if a.DataType().ID() != arrow.DICTIONARY || a.NullN() == a.Len() {
// skip columns that only nulls. This happens for instance when geo ip is not
// configured or cases of utm* properties
continue
}
cIdx.Index(a.(*array.Dictionary))
name := r.ColumnName(i)
n, err := cIdx.Build(name)
if err != nil {
return nil, err
}
o[name] = n
cIdx.Reset()
}
lo, hi := Timestamps(r)
return NewFullIdx(o, uint64(lo), uint64(hi), uint64(r.NumRows())), nil
}
func Timestamps(r arrow.Record) (lo, hi int64) {
a := r.Column(0).(*array.Int64).Int64Values()
if len(a) > 0 {
// record is sorted by timestamp
lo = a[0]
hi = a[len(a)-1]
}
return
}
type FullIndex struct {
m map[string]*index.FullColumn
keys []string
min, max, size, rows uint64
}
var _ index.Full = (*FullIndex)(nil)
var baseIndexSize = uint64(unsafe.Sizeof(FullIndex{}))
func NewFullIdx(m map[string]*index.FullColumn, min, max, rows uint64) *FullIndex {
keys := make([]string, 0, len(m))
n := baseIndexSize
for k, v := range m {
n += uint64(len(k) * 2)
n += v.Size()
keys = append(keys, k)
}
n += uint64(len(keys) * 2)
sort.Strings(keys)
return &FullIndex{keys: keys, m: m, min: min, max: max, size: n, rows: rows}
}
func (idx *FullIndex) CanIndex() bool {
return true
}
func (idx *FullIndex) Match(b *roaring.Bitmap, m []*filters.CompiledFilter) {
for _, x := range m {
v, ok := idx.m[x.Column]
if !ok {
logger.Fail("Missing index column", "column", x.Column)
}
b.And(v.Match(x))
}
}
func (idx *FullIndex) Size() (n uint64) {
return idx.size
}
func (idx *FullIndex) Min() (n uint64) {
return idx.min
}
func (idx *FullIndex) Max() (n uint64) {
return idx.max
}
func (idx *FullIndex) Columns(f func(column index.Column) error) error {
for _, v := range idx.keys {
err := f(idx.m[v])
if err != nil {
return err
}
}
return nil
}