-
Notifications
You must be signed in to change notification settings - Fork 351
/
combined_iterator.go
124 lines (114 loc) · 2.43 KB
/
combined_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package graveler
import (
"bytes"
"github.com/treeverse/lakefs/pkg/logging"
)
// CombinedIterator iterates over two listing iterators,
// in case of duplication (in values or in errors) returns value in iterA
type CombinedIterator struct {
iterA ValueIterator
iterB ValueIterator
p ValueIterator
}
func NewCombinedIterator(iterA, iterB ValueIterator) *CombinedIterator {
return &CombinedIterator{
iterA: iterA,
iterB: iterB,
p: nil,
}
}
// advanceInnerIterators advances the inner iterators and returns true if has more
func (c *CombinedIterator) advanceInnerIterators() bool {
valA := c.iterA.Value()
valB := c.iterB.Value()
var nextA bool
var nextB bool
switch {
case c.p == nil:
// first
nextA = c.iterA.Next()
nextB = c.iterB.Next()
case valA == nil && valB == nil:
// last
return false
case valA == nil:
// iterA is done
c.p = c.iterB
nextB = c.iterB.Next()
case valB == nil:
// iterB is done
c.p = c.iterA
nextA = c.iterA.Next()
case bytes.Equal(valA.Key, valB.Key):
nextA = c.iterA.Next()
nextB = c.iterB.Next()
case bytes.Compare(valA.Key, valB.Key) < 0:
nextA = c.iterA.Next()
nextB = true
default:
// value of iterA < value of iterB
nextB = c.iterB.Next()
nextA = true
}
if c.iterA.Err() != nil {
c.p = c.iterA
return false
}
if c.iterB.Err() != nil {
c.p = c.iterB
return false
}
return nextA || nextB
}
func (c *CombinedIterator) Next() bool {
for {
if !c.advanceInnerIterators() {
return false
}
// set c.p to be the next (smaller) value or continue in case of tombstone
valA := c.iterA.Value()
valB := c.iterB.Value()
switch {
case valA == nil && valB == nil:
c.p = c.iterA
return false
case valA == nil:
c.p = c.iterB
case valB == nil:
c.p = c.iterA
case bytes.Equal(valA.Key, valB.Key) && valA.IsTombstone():
// continue without tombstone error
continue
case bytes.Compare(valA.Key, valB.Key) <= 0:
c.p = c.iterA
default:
c.p = c.iterB
}
if c.p.Value().IsTombstone() {
logging.Default().Error("unexpected tombstone")
continue
}
return true
}
}
func (c *CombinedIterator) SeekGE(id Key) {
c.p = nil
c.iterA.SeekGE(id)
c.iterB.SeekGE(id)
}
func (c *CombinedIterator) Value() *ValueRecord {
if c.p == nil {
return nil
}
return c.p.Value()
}
func (c *CombinedIterator) Err() error {
if c.p == nil {
return nil
}
return c.p.Err()
}
func (c *CombinedIterator) Close() {
c.iterA.Close()
c.iterB.Close()
}