forked from brave/go-sync
/
dynamo.go
152 lines (134 loc) · 4.93 KB
/
dynamo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package datastoretest
import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"runtime"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/odedlaz/go-sync/datastore"
)
// DeleteTable deletes datastore.Table in dynamoDB.
func DeleteTable(dynamo *datastore.Dynamo) error {
_, err := dynamo.DeleteTable(
&dynamodb.DeleteTableInput{TableName: aws.String(datastore.Table)})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Return as successful if the table is not existed.
if aerr.Code() == dynamodb.ErrCodeResourceNotFoundException {
return nil
}
} else {
return fmt.Errorf("error deleting table: %w", err)
}
}
return dynamo.WaitUntilTableNotExists(
&dynamodb.DescribeTableInput{TableName: aws.String(datastore.Table)})
}
// CreateTable creates datastore.Table in dynamoDB.
func CreateTable(dynamo *datastore.Dynamo) error {
_, b, _, _ := runtime.Caller(0)
root := filepath.Join(filepath.Dir(b), "../../")
raw, err := ioutil.ReadFile(filepath.Join(root, "schema/dynamodb/table.json"))
if err != nil {
return fmt.Errorf("error reading table.json: %w", err)
}
var input dynamodb.CreateTableInput
err = json.Unmarshal(raw, &input)
if err != nil {
return fmt.Errorf("error unmarshalling raw data from table.json: %w", err)
}
input.TableName = aws.String(datastore.Table)
_, err = dynamo.CreateTable(&input)
if err != nil {
return fmt.Errorf("error creating table: %w", err)
}
return dynamo.WaitUntilTableExists(
&dynamodb.DescribeTableInput{TableName: aws.String(datastore.Table)})
}
// ResetTable deletes and creates datastore.Table in dynamoDB.
func ResetTable(dynamo *datastore.Dynamo) error {
if err := DeleteTable(dynamo); err != nil {
return fmt.Errorf("error deleting table to reset table: %w", err)
}
return CreateTable(dynamo)
}
// ScanSyncEntities scans the dynamoDB table and returns all sync items.
func ScanSyncEntities(dynamo *datastore.Dynamo) ([]datastore.SyncEntity, error) {
filter := expression.AttributeExists(expression.Name("Version"))
expr, err := expression.NewBuilder().WithFilter(filter).Build()
if err != nil {
return nil, fmt.Errorf("error building expression to scan sync entitites: %w", err)
}
input := &dynamodb.ScanInput{
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
FilterExpression: expr.Filter(),
TableName: aws.String(datastore.Table),
}
out, err := dynamo.Scan(input)
if err != nil {
return nil, fmt.Errorf("error doing scan for sync entities: %w", err)
}
syncItems := []datastore.SyncEntity{}
err = dynamodbattribute.UnmarshalListOfMaps(out.Items, &syncItems)
if err != nil {
return nil, fmt.Errorf("error unmarshalling sync entitites: %w", err)
}
return syncItems, nil
}
// ScanTagItems scans the dynamoDB table and returns all tag items.
func ScanTagItems(dynamo *datastore.Dynamo) ([]datastore.ServerClientUniqueTagItem, error) {
filter := expression.And(
expression.AttributeNotExists(expression.Name("ExpireAt")),
expression.AttributeNotExists(expression.Name("Version")))
expr, err := expression.NewBuilder().WithFilter(filter).Build()
if err != nil {
return nil, fmt.Errorf("error building expression to scan tag items: %w", err)
}
input := &dynamodb.ScanInput{
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
FilterExpression: expr.Filter(),
TableName: aws.String(datastore.Table),
}
out, err := dynamo.Scan(input)
if err != nil {
return nil, fmt.Errorf("error doing scan for tag items: %w", err)
}
tagItems := []datastore.ServerClientUniqueTagItem{}
err = dynamodbattribute.UnmarshalListOfMaps(out.Items, &tagItems)
if err != nil {
return nil, fmt.Errorf("error unmarshalling tag items: %w", err)
}
return tagItems, nil
}
// ScanClientItemCounts scans the dynamoDB table and returns all client item
// counts.
func ScanClientItemCounts(dynamo *datastore.Dynamo) ([]datastore.ClientItemCount, error) {
filter := expression.AttributeExists(expression.Name("ItemCount"))
expr, err := expression.NewBuilder().WithFilter(filter).Build()
if err != nil {
return nil, fmt.Errorf("error building expression to scan item counts: %w", err)
}
input := &dynamodb.ScanInput{
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
FilterExpression: expr.Filter(),
TableName: aws.String(datastore.Table),
}
out, err := dynamo.Scan(input)
if err != nil {
return nil, fmt.Errorf("error doing scan for item counts: %w", err)
}
clientItemCounts := []datastore.ClientItemCount{}
err = dynamodbattribute.UnmarshalListOfMaps(out.Items, &clientItemCounts)
if err != nil {
return nil, fmt.Errorf("error unmarshalling item counts: %w", err)
}
return clientItemCounts, nil
}