/
dynamodb.go
231 lines (183 loc) · 6.01 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package dynamoboot
import (
"context"
"errors"
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/nielskrijger/goboot"
"github.com/rs/zerolog"
)
var (
defaultMigrationsTable = "migrations"
errMissingConfig = errors.New("missing dynamodb configuration")
errMissingRegion = errors.New("config \"dynamodb.region\" is required")
)
const (
tableWaitTimeout = 2 * time.Minute
tableWaitInterval = 5 * time.Second
)
type DynamodbConfig struct {
// The AWS region to connection to
Region string `yaml:"region"`
// When true connects to a DynamoDB instance on localhost:8000
Local bool `yaml:"local"`
// Name of the table keeping track of migration history
MigrationsTable string `yaml:"migrationsTable"`
}
type DynamoDB struct {
MigrationsTable string
Migrations []*Migration
Client *dynamodb.Client
Config *DynamodbConfig
log zerolog.Logger
}
// Configure connects to DynamoDB.
func (db *DynamoDB) Configure(env *goboot.AppEnv) error {
db.log = env.Log
// unmarshal Config and set defaults
db.Config = &DynamodbConfig{}
if !env.Config.InConfig("dynamodb") {
return errMissingConfig
}
if err := env.Config.Sub("dynamodb").Unmarshal(db.Config); err != nil {
return fmt.Errorf("parsing dynamodb configuration: %w", err)
}
if !env.Config.IsSet("dynamodb.region") {
return errMissingRegion
}
if err := env.Config.Sub("dynamodb").Unmarshal(db.Config); err != nil {
return fmt.Errorf("parsing dynamodb configuration: %w", err)
}
if !env.Config.IsSet("dynamodb.migrationsTable") {
db.Config.MigrationsTable = defaultMigrationsTable
}
if db.Config.Local {
client, err := db.createLocalClient(context.Background())
if err != nil {
return fmt.Errorf("connecting to local dynamodb Client: %w", err)
}
db.Client = client
} else {
client, err := db.createClient(context.Background())
if err != nil {
return fmt.Errorf("creating dynamodb client: %w", err)
}
db.Client = client
}
// check if we can connect to DynamoDB
if err := db.testConnectivity(context.Background()); err != nil {
return err
}
return nil
}
// Name is needed for the AppService interface.
func (db *DynamoDB) Name() string {
return "dynamodb"
}
// Init runs the DynamoDB migrations.
func (db *DynamoDB) Init() error {
if err := db.Migrate(context.Background()); err != nil {
return fmt.Errorf("running DynamoDB migrations: %w", err)
}
return nil
}
// Close is needed for the AppService interface.
func (db *DynamoDB) Close() error {
return nil // DynamoDB Client does not need closing
}
// createLocalClient connects to a dynamodb in given region.
func (db *DynamoDB) createClient(ctx context.Context) (*dynamodb.Client, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(db.Config.Region),
)
if err != nil {
return nil, fmt.Errorf("creating DynamoDB client: %w", err)
}
return dynamodb.NewFromConfig(cfg), nil //nolint:contextcheck
}
// createLocalClient connects to a local dynamodb emulator on port 8000.
func (db *DynamoDB) createLocalClient(ctx context.Context) (*dynamodb.Client, error) {
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(db.Config.Region), // value doesn't actually matter as long as it exists
config.WithEndpointResolverWithOptions(aws.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (aws.Endpoint, error) {
return aws.Endpoint{URL: "http://localhost:8000"}, nil
})),
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: "dummy",
SecretAccessKey: "dummy",
SessionToken: "dummy",
Source: "Hard-coded credentials; values are irrelevant for local DynamoDB",
},
}),
)
if err != nil {
return nil, fmt.Errorf("creating local DynamoDB client: %w", err)
}
return dynamodb.NewFromConfig(cfg), nil //nolint:contextcheck
}
func (db *DynamoDB) testConnectivity(ctx context.Context) error {
_, err := db.Client.ListTables(ctx, &dynamodb.ListTablesInput{})
if err != nil {
return fmt.Errorf("connecting to DynamoDB: %w", err)
}
db.log.Info().Msg("successfully connected to DynamoDB")
return nil
}
func (db *DynamoDB) TableExists(ctx context.Context, tableName string) (bool, error) {
tables, err := db.Client.ListTables(ctx, &dynamodb.ListTablesInput{})
if err != nil {
return false, fmt.Errorf("list tables: %w", err)
}
for _, n := range tables.TableNames {
if n == tableName {
return true, nil
}
}
return false, nil
}
// CreateTable creates a table and waits until it is ready.
func (db *DynamoDB) CreateTable(ctx context.Context, tableInput *dynamodb.CreateTableInput) error {
if _, err := db.Client.CreateTable(ctx, tableInput); err != nil {
return fmt.Errorf("creating table %q: %w", *tableInput.TableName, err)
}
if err := db.waitForTable(ctx, *tableInput.TableName); err != nil {
return err
}
db.log.Info().Msgf("created DynamoDB table %q", *tableInput.TableName)
return nil
}
// CreateTableIfNotExists creates a table if it does not exist and waits until it is ready.
func (db *DynamoDB) CreateTableIfNotExists(ctx context.Context, tableInput *dynamodb.CreateTableInput) error {
exists, err := db.TableExists(ctx, *tableInput.TableName)
if err != nil {
return err
}
if exists {
db.log.Info().Msgf("table %q already exists, nothing to do here", *tableInput.TableName)
return nil
}
return db.CreateTable(ctx, tableInput)
}
// waitForTable blocks until a DynamoDB table is ready for reading/writing.
func (db *DynamoDB) waitForTable(ctx context.Context, tableName string) error {
w := dynamodb.NewTableExistsWaiter(db.Client)
err := w.Wait(ctx,
&dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
},
tableWaitTimeout,
func(o *dynamodb.TableExistsWaiterOptions) {
o.MaxDelay = tableWaitInterval
o.MinDelay = tableWaitInterval
})
if err != nil {
return fmt.Errorf("timed out while waiting for table to become active: %w", err)
}
return nil
}