/
batch_get.go
127 lines (106 loc) · 3.58 KB
/
batch_get.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package procedure
import (
"context"
"errors"
"sync"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/nisimpson/ezddb"
)
// BatchGet functions generate dynamodb put input data given some context.
type BatchGet func(context.Context) (*dynamodb.BatchGetItemInput, error)
// NewBatchGetProcedure creates a new batch get procedure instance.
func NewBatchGetProcedure() BatchGet {
return func(ctx context.Context) (*dynamodb.BatchGetItemInput, error) {
return &dynamodb.BatchGetItemInput{
RequestItems: make(map[string]types.KeysAndAttributes),
}, nil
}
}
// Invoke is a wrapper around the function invocation for stylistic purposes.
func (g BatchGet) Invoke(ctx context.Context) (*dynamodb.BatchGetItemInput, error) {
return g(ctx)
}
// BatchGetModifier makes modifications to the input before the procedure is executed.
type BatchGetModifier interface {
// ModifyBatchGetItemInput is invoked when this modifier is applied to the provided input.
ModifyBatchGetItemInput(context.Context, *dynamodb.BatchGetItemInput) error
}
// Modify adds modifying functions to the procedure, transforming the input
// before it is executed.
func (b BatchGet) Modify(modifiers ...BatchGetModifier) BatchGet {
mapper := func(ctx context.Context, input *dynamodb.BatchGetItemInput, mod BatchGetModifier) error {
return mod.ModifyBatchGetItemInput(ctx, input)
}
return func(ctx context.Context) (*dynamodb.BatchGetItemInput, error) {
return modify[dynamodb.BatchGetItemInput](ctx, b, newModiferGroup(modifiers, mapper).Join())
}
}
// Execute executes the procedure, returning the API result.
func (b BatchGet) Execute(ctx context.Context,
getter ezddb.BatchGetter, options ...func(*dynamodb.Options)) (*dynamodb.BatchGetItemOutput, error) {
if input, err := b.Invoke(ctx); err != nil {
return nil, err
} else {
return getter.BatchGetItem(ctx, input, options...)
}
}
type MultiBatchGet []BatchGet
func (m MultiBatchGet) Invoke(ctx context.Context) ([]*dynamodb.BatchGetItemInput, error) {
inputs := make([]*dynamodb.BatchGetItemInput, 0, len(m))
for _, fn := range m {
if input, err := fn.Invoke(ctx); err != nil {
return nil, err
} else {
inputs = append(inputs, input)
}
}
return inputs, nil
}
func (m MultiBatchGet) Execute(ctx context.Context,
writer ezddb.BatchGetter, options ...func(*dynamodb.Options)) ([]*dynamodb.BatchGetItemOutput, error) {
outputs := make([]*dynamodb.BatchGetItemOutput, 0)
errs := make([]error, 0)
for _, proc := range m {
out, err := proc.Execute(ctx, writer, options...)
if err != nil {
errs = append(errs, err)
continue
}
outputs = append(outputs, out)
}
if len(errs) > 0 {
return outputs, errors.Join(errs...)
}
return outputs, nil
}
type MultiBatchGetResult struct {
awaiter *sync.WaitGroup
outputs []*dynamodb.BatchGetItemOutput
errors []error
}
func (m *MultiBatchGetResult) Wait() ([]*dynamodb.BatchGetItemOutput, error) {
m.awaiter.Wait()
if len(m.errors) > 0 {
return nil, errors.Join(m.errors...)
}
return m.outputs, nil
}
func (m MultiBatchGet) ExecuteAsync(ctx context.Context,
writer ezddb.BatchGetter, options ...func(*dynamodb.Options)) *MultiBatchGetResult {
runner := func(proc BatchGet, result *MultiBatchGetResult) {
defer result.awaiter.Done()
output, err := proc.Execute(ctx, writer, options...)
if err != nil {
result.errors = append(result.errors, err)
} else {
result.outputs = append(result.outputs, output)
}
}
result := &MultiBatchGetResult{}
for _, proc := range m {
result.awaiter.Add(1)
go runner(proc, result)
}
return result
}