-
Notifications
You must be signed in to change notification settings - Fork 2
/
skipfilter.go
193 lines (178 loc) · 4.52 KB
/
skipfilter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package skipfilter
import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"github.com/MauriceGit/skiplist"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/hashicorp/golang-lru"
)
type SkipFilter struct {
i uint64
idx map[interface{}]uint64
set *roaring64.Bitmap
list skiplist.SkipList
cache *lru.Cache
test func(interface{}, interface{}) bool
mutex sync.RWMutex
}
// New creates a new SkipFilter.
// test - should return true if the value passes the provided filter.
// size - controls the size of the LRU cache. Defaults to 100,000 if 0 or less.
// should be tuned to match or exceed the expected filter cardinality.
func New(test func(value interface{}, filter interface{}) bool, size int) *SkipFilter {
if size <= 0 {
size = 1e5
}
cache, _ := lru.New(size)
return &SkipFilter{
idx: make(map[interface{}]uint64),
set: roaring64.New(),
list: skiplist.New(),
cache: cache,
test: test,
}
}
// Add adds a value to the set
func (sf *SkipFilter) Add(value interface{}) {
sf.mutex.Lock()
defer sf.mutex.Unlock()
el := &entry{sf.i, value}
sf.list.Insert(el)
sf.set.Add(sf.i)
sf.idx[value] = sf.i
sf.i++
}
// Remove removes a value from the set
func (sf *SkipFilter) Remove(value interface{}) {
sf.mutex.Lock()
defer sf.mutex.Unlock()
if id, ok := sf.idx[value]; ok {
sf.list.Delete(&entry{id: id})
sf.set.Remove(id)
delete(sf.idx, value)
}
}
// Len returns the number of values in the set
func (sf *SkipFilter) Len() int {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
return sf.list.GetNodeCount()
}
// MatchAny returns a slice of values in the set matching any of the provided filters
func (sf *SkipFilter) MatchAny(filterKeys ...interface{}) []interface{} {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
var sets = make([]*roaring64.Bitmap, len(filterKeys))
var filters = make([]*filter, len(filterKeys))
for i, k := range filterKeys {
filters[i] = sf.getFilter(k)
sets[i] = filters[i].set
}
var set = roaring64.ParOr(runtime.NumCPU(), sets...)
values, notfound := sf.getValues(set)
if len(notfound) > 0 {
// Clean up references to removed values
for _, f := range filters {
f.mutex.Lock()
for _, id := range notfound {
f.set.Remove(id)
}
f.mutex.Unlock()
}
}
return values
}
// Walk executes callback for each value in the set beginning at `start` index.
// Return true in callback to continue iterating, false to stop.
// Returned uint64 is index of `next` element (send as `start` to continue iterating)
func (sf *SkipFilter) Walk(start uint64, callback func(val interface{}) bool) uint64 {
sf.mutex.RLock()
defer sf.mutex.RUnlock()
var i uint64
var id = start
var prev uint64
var first = true
el, ok := sf.list.FindGreaterOrEqual(&entry{id: start})
for ok && el != nil {
if id = el.GetValue().(*entry).id; !first && id <= prev {
// skiplist loops back to first element so we have to detect loop and break manually
id = prev + 1
break
}
i++
if !callback(el.GetValue().(*entry).val) {
id++
break
}
prev = id
el = sf.list.Next(el)
first = false
}
return id
}
func (sf *SkipFilter) getFilter(k interface{}) *filter {
var f *filter
val, ok := sf.cache.Get(k)
if ok {
f = val.(*filter)
} else {
f = &filter{i: 0, set: roaring64.New()}
sf.cache.Add(k, f)
}
var id uint64
var prev uint64
var first = true
if atomic.LoadUint64(&f.i) < sf.i {
f.mutex.Lock()
defer f.mutex.Unlock()
for el, ok := sf.list.FindGreaterOrEqual(&entry{id: f.i}); ok && el != nil; el = sf.list.Next(el) {
if id = el.GetValue().(*entry).id; !first && id <= prev {
// skiplist loops back to first element so we have to detect loop and break manually
break
}
if sf.test(el.GetValue().(*entry).val, k) {
f.set.Add(id)
}
prev = id
first = false
}
f.i = sf.i
}
return f
}
func (sf *SkipFilter) getValues(set *roaring64.Bitmap) ([]interface{}, []uint64) {
idBuf := make([]uint64, 512)
iter := set.ManyIterator()
values := []interface{}{}
notfound := []uint64{}
e := &entry{}
for n := iter.NextMany(idBuf); n > 0; n = iter.NextMany(idBuf) {
for i := 0; i < n; i++ {
e.id = idBuf[i]
el, ok := sf.list.Find(e)
if ok {
values = append(values, el.GetValue().(*entry).val)
} else {
notfound = append(notfound, idBuf[i])
}
}
}
return values, notfound
}
type entry struct {
id uint64
val interface{}
}
func (e *entry) ExtractKey() float64 {
return float64(e.id)
}
func (e *entry) String() string {
return fmt.Sprintf("%16x", e.id)
}
type filter struct {
i uint64
mutex sync.RWMutex
set *roaring64.Bitmap
}