-
Notifications
You must be signed in to change notification settings - Fork 0
/
query_stream.go
81 lines (65 loc) · 2.06 KB
/
query_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package dynamoutils
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/pkg/errors"
)
type queryStream struct {
ctx context.Context
executor dynamodb.QueryAPIClient
queries []*dynamodb.QueryInput // queries first item is the one to use as long as queryHasNextPage is true
paginator *dynamodb.QueryPaginator // paginator is used if not nil and still have next
internalStream arrayStream // arrayStream is wrapping the result of the current query page
}
// NewQueryStream creates a stream that will execute the list of queries.
// If a query is paginated, all the pages will be requested before moving on the next query.
func NewQueryStream(ctx context.Context, executor DynamoQuery, queries []*dynamodb.QueryInput) Stream {
if len(queries) == 0 {
return NewArrayStream(nil)
}
stream := &queryStream{
ctx: ctx,
executor: executor,
internalStream: arrayStream{},
queries: queries,
}
stream.populateNextChunk()
return stream
}
func (s *queryStream) HasNext() bool {
return s.internalStream.HasNext()
}
func (s *queryStream) Next() map[string]types.AttributeValue {
next := s.internalStream.Next()
if !s.internalStream.HasNext() {
s.populateNextChunk()
}
return next
}
func (s *queryStream) populateNextChunk() {
for !s.internalStream.HasNext() {
if s.paginator != nil && s.paginator.HasMorePages() {
page, err := s.paginator.NextPage(s.ctx)
if err != nil {
s.internalStream.WithError(errors.Wrapf(err, "couldn't query %+v", s.paginator))
return
}
s.internalStream.appendNextChunk(page.Items)
} else if len(s.queries) > 0 {
query := s.queries[0]
s.queries = s.queries[1:]
s.paginator = dynamodb.NewQueryPaginator(s.executor, query)
// ... will fetch data on the next on the next loop
} else {
// end of the line
return
}
}
}
func (s *queryStream) Count() int64 {
return s.internalStream.count
}
func (s *queryStream) Error() error {
return s.internalStream.Error()
}