-
Notifications
You must be signed in to change notification settings - Fork 2
/
transaction.go
94 lines (77 loc) · 2.21 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package dynamo
import (
"context"
"errors"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
func (c *Client) WriteTransaction(ctx context.Context, transaction *Transaction) error {
if len(transaction.Puts) == 0 && len(transaction.Deletes) == 0 {
return errors.New("WriteTransaction requires at least one transaction")
}
var items []types.TransactWriteItem
for _, cr := range transaction.Creates {
values, err := attributevalue.MarshalMap(cr)
if err != nil {
return err
}
items = append(items, types.TransactWriteItem{Put: &types.Put{
TableName: aws.String(c.table),
Item: values,
ConditionExpression: aws.String("attribute_not_exists(PK) AND attribute_not_exists(SK)"),
}})
}
for _, p := range transaction.Puts {
values, err := attributevalue.MarshalMap(p)
if err != nil {
return err
}
items = append(items, types.TransactWriteItem{Put: &types.Put{
TableName: aws.String(c.table),
Item: values,
}})
}
for _, d := range transaction.Deletes {
items = append(items, types.TransactWriteItem{Delete: &types.Delete{
TableName: aws.String(c.table),
Key: map[string]types.AttributeValue{
"PK": &types.AttributeValueMemberS{Value: d.PK.PK()},
"SK": &types.AttributeValueMemberS{Value: d.SK.SK()},
},
}})
}
_, err := c.svc.TransactWriteItems(ctx, &dynamodb.TransactWriteItemsInput{
TransactItems: items,
})
var tce *types.TransactionCanceledException
if errors.As(err, &tce) {
for _, reason := range tce.CancellationReasons {
if *reason.Code == "ConditionalCheckFailed" {
return ConditionalCheckFailedError{}
}
}
}
return err
}
type Transaction struct {
Creates []any
Puts []any
Deletes []Keys
}
func NewTransaction() *Transaction {
return &Transaction{}
}
func (t *Transaction) Create(v interface{}) *Transaction {
t.Creates = append(t.Creates, v)
return t
}
func (t *Transaction) Put(v interface{}) *Transaction {
t.Puts = append(t.Puts, v)
return t
}
func (t *Transaction) Delete(keys Keys) *Transaction {
t.Deletes = append(t.Deletes, keys)
return t
}