-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_stream.go
87 lines (71 loc) · 1.99 KB
/
get_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package dynamoutils
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// DynamoGetStreamExecutor creates the query and execute in on the dynamoDB
type DynamoGetStreamExecutor struct {
db DynamoBatchGetItem
table string
projectionExpression *string
}
type getStream struct {
ctx context.Context
executor GetStreamAdapter
internalStream arrayStream
keys []map[string]types.AttributeValue
buffer []map[string]types.AttributeValue
}
// NewGetStream batches requests to get each item by their natural key
func NewGetStream(ctx context.Context, executor GetStreamAdapter, keys []map[string]types.AttributeValue, bufferSize int64) Stream {
stream := &getStream{
ctx: ctx,
executor: executor,
buffer: make([]map[string]types.AttributeValue, 0, bufferSize),
internalStream: arrayStream{},
keys: keys,
}
stream.populateNextChunk()
return stream
}
func (s *getStream) HasNext() bool {
return s.internalStream.HasNext()
}
func (s *getStream) Next() map[string]types.AttributeValue {
next := s.internalStream.Next()
for !s.internalStream.HasNext() && (len(s.buffer) > 0 || len(s.keys) > 0) {
s.populateNextChunk()
}
return next
}
func (s *getStream) Count() int64 {
return s.internalStream.Count()
}
func (s *getStream) Error() error {
return s.internalStream.Error()
}
func (s *getStream) populateNextChunk() {
end := cap(s.buffer) - len(s.buffer)
if end > len(s.keys) {
end = len(s.keys)
}
if end > 0 {
s.buffer = append(s.buffer, s.keys[:end]...)
s.keys = s.keys[end:]
}
if len(s.buffer) == 0 {
return
}
result, err := s.executor.BatchGet(s.ctx, s.buffer)
if err != nil {
s.internalStream.WithError(err)
return
}
s.buffer = s.buffer[:0]
for _, unprocessedKeys := range result.UnprocessedKeys {
s.buffer = append(s.buffer, unprocessedKeys.Keys...)
}
for _, records := range result.Responses {
s.internalStream.appendNextChunk(records)
}
}