-
Notifications
You must be signed in to change notification settings - Fork 107
/
iterator.go
132 lines (115 loc) · 2.91 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package writelog
import (
"context"
"errors"
)
var (
_ Iterator = (*staticIterator)(nil)
_ Iterator = (*PipeIterator)(nil)
// ErrIteratorInvalid is raised when Value() is called on an iterator that finished already or hasn't started yet.
ErrIteratorInvalid = errors.New("mkvs: write log iterator invalid")
)
const (
// pipeIteratorQueueSize is the number of elements the pipe iterator can take before starting to block.
pipeIteratorQueueSize = 100
)
// Iterator iterates over MKVS write log entries between two different storage instances.
type Iterator interface {
// Next advances the iterator to the next element and returns false if there are no more elements.
Next() (bool, error)
// Value returns the log entry the iterator is currently pointing to.
Value() (LogEntry, error)
}
type staticIterator struct {
cursor int
entries WriteLog
}
func (i *staticIterator) Next() (bool, error) {
i.cursor++
if i.cursor >= len(i.entries) || len(i.entries) == 0 {
return false, nil
}
return true, nil
}
func (i *staticIterator) Value() (LogEntry, error) {
if i.cursor < 0 || i.cursor >= len(i.entries) {
return LogEntry{}, ErrIteratorInvalid
}
return i.entries[i.cursor], nil
}
// NewStaticIterator returns a new writelog iterator that's backed by a static in-memory array.
func NewStaticIterator(writeLog WriteLog) Iterator {
return &staticIterator{
cursor: -1,
entries: writeLog,
}
}
// PipeIterator is a queue-backed writelog iterator which can be asynchronously
// both pushed into and read from.
type PipeIterator struct {
queue chan interface{}
cached *LogEntry
ctx context.Context
}
func (i *PipeIterator) Next() (bool, error) {
select {
case ret, ok := <-i.queue:
if !ok {
i.cached = nil
return false, nil
}
switch obj := ret.(type) {
case error:
i.cached = nil
return false, obj
case *LogEntry:
i.cached = obj
}
return true, nil
case <-i.ctx.Done():
return false, i.ctx.Err()
}
}
func (i *PipeIterator) Value() (LogEntry, error) {
if i.cached == nil {
return LogEntry{}, ErrIteratorInvalid
}
return *i.cached, nil
}
func (i *PipeIterator) Put(logEntry *LogEntry) error {
select {
case i.queue <- logEntry:
return nil
case <-i.ctx.Done():
return i.ctx.Err()
}
}
// PutError pushed an error to the iterator's read side.
func (i *PipeIterator) PutError(err error) error {
select {
case i.queue <- err:
return nil
case <-i.ctx.Done():
return i.ctx.Err()
}
}
// Close signals an end of the log entry stream to the iterator's read side.
func (i *PipeIterator) Close() {
close(i.queue)
}
// NewPipeIterator returns a new PipeIterator.
func NewPipeIterator(ctx context.Context) PipeIterator {
return PipeIterator{
queue: make(chan interface{}, pipeIteratorQueueSize),
ctx: ctx,
}
}
// DrainIterator drains the iterator, discarding all values.
func DrainIterator(it Iterator) error {
for {
more, err := it.Next()
if !more || err != nil {
return err
}
}
}