-
Notifications
You must be signed in to change notification settings - Fork 0
/
array_stream.go
65 lines (51 loc) · 1.42 KB
/
array_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package dynamoutils
import (
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/pkg/errors"
)
type arrayStream struct {
err error
results []map[string]types.AttributeValue
count int64
}
// NewArrayStream creates a stream from a slice ; the slice will be updated.
func NewArrayStream(results []map[string]types.AttributeValue) Stream {
return &arrayStream{
results: results,
}
}
func (s *arrayStream) HasNext() bool {
return s.err == nil && len(s.results) > 0
}
func (s *arrayStream) IsLast() bool {
return s.err == nil && len(s.results) > 1
}
func (s *arrayStream) Next() (current map[string]types.AttributeValue) {
if s.err != nil {
panic(errors.Wrapf(s.err, "Next() can't be called when an error occured"))
}
if !s.HasNext() {
panic(errors.Wrapf(s.err, "Next() can't be called when an HasNext() returns false"))
}
current, s.results = s.results[0], s.results[1:]
s.count++
return current
}
func (s *arrayStream) appendNextChunk(chunk []map[string]types.AttributeValue) {
results := make([]map[string]types.AttributeValue, len(s.results)+len(chunk))
copy(results, s.results)
copy(results[len(s.results):], chunk)
s.results = results
}
func (s *arrayStream) Count() int64 {
return s.count
}
func (s *arrayStream) Error() error {
return s.err
}
// WithError keeps exiting error if already present. err can be nil.
func (s *arrayStream) WithError(err error) {
if s.err == nil {
s.err = err
}
}