forked from tobgu/qframe
-
Notifications
You must be signed in to change notification settings - Fork 0
/
grouper.go
184 lines (159 loc) · 4.68 KB
/
grouper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package grouper
import (
"github.com/tobgu/qframe/internal/column"
"github.com/tobgu/qframe/internal/hash"
"github.com/tobgu/qframe/internal/index"
"github.com/tobgu/qframe/internal/math/integer"
"math/bits"
)
/*
This package implements a basic hash table used for GroupBy and Distinct operations.
Hashing is done using murmur 3, collisions are handled using linear probing.
When the table reaches a certain load factor it will be reallocated into a new, larger table.
*/
// An entry in the hash table. For group by operations a slice of all positions each group
// are stored. For distinct operations only the first position is stored to avoid some overhead.
type tableEntry struct {
ix index.Int
hash uint32
firstPos uint32
occupied bool
}
type table struct {
entries []tableEntry
occupiedCount int
comparables []column.Comparable
stats GroupStats
hashBuf *hash.Murm32
loadFactor float64
groupCount uint32
collectIx bool
}
const growthFactor = 2
func (t *table) grow() {
newLen := uint32(growthFactor * len(t.entries))
newEntries := make([]tableEntry, newLen)
bitMask := newLen - 1
for _, e := range t.entries {
for pos := e.hash & bitMask; ; pos = (pos + 1) & bitMask {
if !newEntries[pos].occupied {
newEntries[pos] = e
break
}
t.stats.RelocationCollisions++
}
}
t.stats.RelocationCount++
t.entries = newEntries
t.loadFactor = t.loadFactor / growthFactor
}
func (t *table) hash(i uint32) uint32 {
t.hashBuf.Reset()
for _, c := range t.comparables {
c.HashBytes(i, t.hashBuf)
}
return t.hashBuf.Hash()
}
const maxLoadFactor = 0.5
func (t *table) insertEntry(i uint32) {
if t.loadFactor > maxLoadFactor {
t.grow()
}
hashSum := t.hash(i)
bitMask := uint64(len(t.entries) - 1)
startPos := uint64(hashSum) & bitMask
var dstEntry *tableEntry
for pos := startPos; dstEntry == nil; pos = (pos + 1) & bitMask {
e := &t.entries[pos]
if !e.occupied || e.hash == hashSum && equals(t.comparables, i, e.firstPos) {
dstEntry = e
} else {
t.stats.InsertCollisions++
}
}
// Update entry
if !dstEntry.occupied {
// Eden entry
dstEntry.hash = hashSum
dstEntry.firstPos = i
dstEntry.occupied = true
t.groupCount++
t.loadFactor = float64(t.groupCount) / float64(len(t.entries))
} else {
// Existing entry
if t.collectIx {
// Small hack to reduce number of allocations under some circumstances. Delay
// creation of index slice until there are at least two entries in the group
// since we store the first position in a separate variable on the entry anyway.
if dstEntry.ix == nil {
dstEntry.ix = index.Int{dstEntry.firstPos, i}
} else {
dstEntry.ix = append(dstEntry.ix, i)
}
}
}
}
func newTable(sizeExp int, comparables []column.Comparable, collectIx bool) *table {
return &table{
entries: make([]tableEntry, integer.Pow2(sizeExp)),
comparables: comparables,
collectIx: collectIx,
hashBuf: new(hash.Murm32)}
}
func equals(comparables []column.Comparable, i, j uint32) bool {
for _, c := range comparables {
if c.Compare(i, j) != column.Equal {
return false
}
}
return true
}
type GroupStats struct {
RelocationCount int
RelocationCollisions int
InsertCollisions int
GroupCount int
LoadFactor float64
}
func calculateInitialSizeExp(ixLen int) int {
// Size is expressed as 2^x to keep the size a multiple of two.
// Initial size is picked fairly arbitrarily at the moment, we don't really know the distribution of
// values within the index. Guarantee a minimum initial size of 8 (2³) for sanity.
fitSize := uint64(ixLen) / 4
return integer.Max(bits.Len64(fitSize), 3)
}
func groupIndex(ix index.Int, comparables []column.Comparable, collectIx bool) ([]tableEntry, GroupStats) {
initialSizeExp := calculateInitialSizeExp(len(ix))
table := newTable(initialSizeExp, comparables, collectIx)
for _, i := range ix {
table.insertEntry(i)
}
stats := table.stats
stats.LoadFactor = table.loadFactor
stats.GroupCount = int(table.groupCount)
return table.entries, stats
}
func GroupBy(ix index.Int, comparables []column.Comparable) ([]index.Int, GroupStats) {
entries, stats := groupIndex(ix, comparables, true)
result := make([]index.Int, 0, stats.GroupCount)
for _, e := range entries {
if e.occupied {
if e.ix == nil {
result = append(result, index.Int{e.firstPos})
} else {
result = append(result, e.ix)
}
}
}
return result, stats
}
func Distinct(ix index.Int, comparables []column.Comparable) index.Int {
entries, stats := groupIndex(ix, comparables, false)
result := make(index.Int, 0, stats.GroupCount)
for _, e := range entries {
if e.occupied {
result = append(result, e.firstPos)
}
}
return result
}