/
dynamodb.go
411 lines (343 loc) · 15.7 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
package aws
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
dynamodbv1 "github.com/lyft/clutch/backend/api/aws/dynamodb/v1"
awsv1 "github.com/lyft/clutch/backend/api/config/service/aws/v1"
)
// defaults for the dynamodb settings config as set by AWS
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ServiceQuotas.html#default-limits-throughput-capacity-modes
const (
AwsMaxRCU = 40000
AwsMaxWCU = 40000
SafeScaleFactor = 2.0
)
// constant to cover edge case where billing mode is not in table description
// and we cannot infer the billing mode from the provisioned throughput numbers
const TableBillingUnspecified = "UNSPECIFIED"
// get or set defaults for dynamodb scaling
func getScalingLimits(cfg *awsv1.Config) *awsv1.ScalingLimits {
if cfg.GetDynamodbConfig() == nil && cfg.DynamodbConfig.GetScalingLimits() == nil {
ds := &awsv1.ScalingLimits{
MaxReadCapacityUnits: AwsMaxRCU,
MaxWriteCapacityUnits: AwsMaxWCU,
MaxScaleFactor: float32(SafeScaleFactor),
EnableOverride: false,
}
return ds
}
return cfg.DynamodbConfig.ScalingLimits
}
func (c *client) DescribeTable(ctx context.Context, account, region, tableName string) (*dynamodbv1.Table, error) {
cl, err := c.getAccountRegionClient(account, region)
if err != nil {
return nil, err
}
result, err := getTable(ctx, cl, tableName)
if err != nil {
c.log.Warn("unable to find table in region: "+region, zap.Error(err))
return nil, err
}
ret := newProtoForTable(result.Table, account, region)
return ret, nil
}
func getTable(ctx context.Context, client *regionalClient, tableName string) (*dynamodb.DescribeTableOutput, error) {
input := &dynamodb.DescribeTableInput{TableName: aws.String(tableName)}
return client.dynamodb.DescribeTable(ctx, input)
}
// generator for the list of GSI protos
func getGlobalSecondaryIndexes(indexes []types.GlobalSecondaryIndexDescription) []*dynamodbv1.GlobalSecondaryIndex {
gsis := make([]*dynamodbv1.GlobalSecondaryIndex, len(indexes))
for idx, i := range indexes {
gsis[idx] = newProtoForGlobalSecondaryIndex(i)
}
return gsis
}
func newProtoForKeySchemas(inputSchema []types.KeySchemaElement) []*dynamodbv1.KeySchema {
schemaCollection := make([]*dynamodbv1.KeySchema, len(inputSchema))
for idx, schema := range inputSchema {
schemaCollection[idx] = &dynamodbv1.KeySchema{
AttributeName: *schema.AttributeName,
Type: newProtoForKeySchemaType(schema.KeyType),
}
}
return schemaCollection
}
func newProtoForKeySchemaType(keyType types.KeyType) dynamodbv1.KeySchema_Type {
value, ok := dynamodbv1.KeySchema_Type_value[string(keyType)]
if !ok {
return dynamodbv1.KeySchema_UNKNOWN
}
return dynamodbv1.KeySchema_Type(value)
}
func newProtoForAttributeDefinitions(inputAttributes []types.AttributeDefinition) []*dynamodbv1.AttributeDefinition {
attributeDefs := make([]*dynamodbv1.AttributeDefinition, len(inputAttributes))
for idx, attribute := range inputAttributes {
attributeDefs[idx] = &dynamodbv1.AttributeDefinition{
AttributeName: *attribute.AttributeName,
AttributeType: string(attribute.AttributeType),
}
}
return attributeDefs
}
// retrieve one GSI from table description
func getGlobalSecondaryIndex(indexes []types.GlobalSecondaryIndexDescription, targetIndexName string) (*types.GlobalSecondaryIndexDescription, error) {
for _, i := range indexes {
if *i.IndexName == targetIndexName {
return &i, nil
}
}
return nil, status.Error(codes.NotFound, "Global secondary index not found.")
}
func newProtoForTable(t *types.TableDescription, account, region string) *dynamodbv1.Table {
currentCapacity := &dynamodbv1.Throughput{
WriteCapacityUnits: aws.ToInt64(t.ProvisionedThroughput.WriteCapacityUnits),
ReadCapacityUnits: aws.ToInt64(t.ProvisionedThroughput.ReadCapacityUnits),
}
globalSecondaryIndexes := getGlobalSecondaryIndexes(t.GlobalSecondaryIndexes)
tableStatus := newProtoForTableStatus(t.TableStatus)
billingMode := newProtoForBillingMode(t)
keySchemas := newProtoForKeySchemas(t.KeySchema)
attributeDefinitions := newProtoForAttributeDefinitions(t.AttributeDefinitions)
ret := &dynamodbv1.Table{
Name: aws.ToString(t.TableName),
Account: account,
Region: region,
GlobalSecondaryIndexes: globalSecondaryIndexes,
ProvisionedThroughput: currentCapacity,
Status: tableStatus,
BillingMode: billingMode,
KeySchemas: keySchemas,
AttributeDefinitions: attributeDefinitions,
}
return ret
}
func newProtoForContinuousBackups(t *types.ContinuousBackupsDescription, account string, region string) *dynamodbv1.ContinuousBackups {
backups := &dynamodbv1.ContinuousBackups{
ContinuousBackupsStatus: newProtoForBackupStatus(string(t.ContinuousBackupsStatus)),
}
// if *t.PointInTimeRecoveryDescription is nil
// then we set PointInTimeRecoveryStatus to UNSPECIFIED
if t.PointInTimeRecoveryDescription == nil || t.PointInTimeRecoveryDescription.PointInTimeRecoveryStatus == "" {
backups.PointInTimeRecoveryStatus = dynamodbv1.ContinuousBackups_UNSPECIFIED
} else {
backups.PointInTimeRecoveryStatus = newProtoForBackupStatus(string(t.PointInTimeRecoveryDescription.PointInTimeRecoveryStatus))
}
// if *t.PointInTimeRecoveryDescription is nil or if EarliestRestorableDateTime is nil
// then we set EarliestRestorableDateTime to nil
if t.PointInTimeRecoveryDescription != nil && t.PointInTimeRecoveryDescription.EarliestRestorableDateTime != nil {
backups.EarliestRestorableDateTime = timestamppb.New(*t.PointInTimeRecoveryDescription.EarliestRestorableDateTime)
} else {
backups.EarliestRestorableDateTime = nil
}
// if *t.PointInTimeRecoveryDescription is nil or if LatestRestorableDateTime is nil
// then we set LatestRestorableDateTime to nil
if t.PointInTimeRecoveryDescription != nil && t.PointInTimeRecoveryDescription.LatestRestorableDateTime != nil {
backups.LatestRestorableDateTime = timestamppb.New(*t.PointInTimeRecoveryDescription.LatestRestorableDateTime)
} else {
backups.LatestRestorableDateTime = nil
}
return backups
}
func newProtoForTableStatus(s types.TableStatus) dynamodbv1.Table_Status {
value, ok := dynamodbv1.Table_Status_value[string(s)]
if !ok {
return dynamodbv1.Table_UNKNOWN
}
return dynamodbv1.Table_Status(value)
}
func newProtoForIndexStatus(s types.IndexStatus) dynamodbv1.GlobalSecondaryIndex_Status {
value, ok := dynamodbv1.GlobalSecondaryIndex_Status_value[string(s)]
if !ok {
return dynamodbv1.GlobalSecondaryIndex_UNKNOWN
}
return dynamodbv1.GlobalSecondaryIndex_Status(value)
}
func newProtoForBackupStatus(s string) dynamodbv1.ContinuousBackups_Status {
value, ok := dynamodbv1.ContinuousBackups_Status_value[s]
if !ok {
return dynamodbv1.ContinuousBackups_UNKNOWN
}
return dynamodbv1.ContinuousBackups_Status(value)
}
// manually retrieve the billing mode by inferring it from throughput
// to cover cases where AWS does not return the mode in the table description
// if a table is PROVISIONED, it will have at least 1 RCU/WCU provisioned
// if a table is PAY_PER_REQUEST (on demand), it will have 0 RCU/WCU provisioned
func getBillingMode(t *types.ProvisionedThroughputDescription) types.BillingMode {
if (*t.ReadCapacityUnits > 0) || (*t.WriteCapacityUnits > 0) {
return types.BillingModeProvisioned
}
if (*t.ReadCapacityUnits == 0) || (*t.WriteCapacityUnits == 0) {
return types.BillingModePayPerRequest
}
return TableBillingUnspecified // unable to infer what billing mode it is
}
func newProtoForBillingMode(t *types.TableDescription) dynamodbv1.Table_BillingMode {
var billingMode types.BillingMode
if t.BillingModeSummary == nil {
billingMode = getBillingMode(t.ProvisionedThroughput)
} else {
billingMode = t.BillingModeSummary.BillingMode
}
value, ok := dynamodbv1.Table_BillingMode_value[string(billingMode)]
if !ok {
return dynamodbv1.Table_BILLING_UNKNOWN
}
return dynamodbv1.Table_BillingMode(value)
}
func newProtoForGlobalSecondaryIndex(index types.GlobalSecondaryIndexDescription) *dynamodbv1.GlobalSecondaryIndex {
currentCapacity := &dynamodbv1.Throughput{
ReadCapacityUnits: aws.ToInt64(index.ProvisionedThroughput.ReadCapacityUnits),
WriteCapacityUnits: aws.ToInt64(index.ProvisionedThroughput.WriteCapacityUnits),
}
indexStatus := newProtoForIndexStatus(index.IndexStatus)
return &dynamodbv1.GlobalSecondaryIndex{
Name: aws.ToString(index.IndexName),
ProvisionedThroughput: currentCapacity,
Status: indexStatus,
}
}
func isValidIncrease(client *regionalClient, current *types.ProvisionedThroughputDescription, target types.ProvisionedThroughput, ignore_maximums bool) error {
// check for targets that are lower than current (can't scale down)
if *current.ReadCapacityUnits > *target.ReadCapacityUnits {
return status.Errorf(codes.FailedPrecondition, "Target read capacity [%d] is lower than current capacity [%d]", *target.ReadCapacityUnits, *current.ReadCapacityUnits)
}
if *current.WriteCapacityUnits > *target.WriteCapacityUnits {
return status.Errorf(codes.FailedPrecondition, "Target write capacity [%d] is lower than current capacity [%d]", *target.WriteCapacityUnits, *current.WriteCapacityUnits)
}
// override not enabled in config or override not set to true in args
if !client.dynamodbCfg.ScalingLimits.EnableOverride || !ignore_maximums {
// check for targets that exceed max limits
if *target.ReadCapacityUnits > client.dynamodbCfg.ScalingLimits.MaxReadCapacityUnits &&
*current.ReadCapacityUnits < client.dynamodbCfg.ScalingLimits.MaxReadCapacityUnits { // don't apply this check to tables with RCU already above max
return status.Errorf(codes.FailedPrecondition, "Target read capacity exceeds maximum allowed limits [%d]", client.dynamodbCfg.ScalingLimits.MaxReadCapacityUnits)
}
if *target.WriteCapacityUnits > client.dynamodbCfg.ScalingLimits.MaxWriteCapacityUnits &&
*current.WriteCapacityUnits < client.dynamodbCfg.ScalingLimits.MaxWriteCapacityUnits { // don't apply this check to tables with WCU already above max
return status.Errorf(codes.FailedPrecondition, "Target write capacity exceeds maximum allowed limits [%d]", client.dynamodbCfg.ScalingLimits.MaxWriteCapacityUnits)
}
// check for increases that exceed max increase scale
if (float32(*target.ReadCapacityUnits) / float32(*current.ReadCapacityUnits)) > client.dynamodbCfg.ScalingLimits.MaxScaleFactor {
return status.Errorf(codes.FailedPrecondition, "Target read capacity exceeds the scale limit of [%.1f]x current capacity", client.dynamodbCfg.ScalingLimits.MaxScaleFactor)
}
if (float32(*target.WriteCapacityUnits) / float32(*current.WriteCapacityUnits)) > client.dynamodbCfg.ScalingLimits.MaxScaleFactor {
return status.Errorf(codes.FailedPrecondition, "Target write capacity exceeds the scale limit of [%.1f]x current capacity", client.dynamodbCfg.ScalingLimits.MaxScaleFactor)
}
}
return nil
}
// Note: in some cases, table descriptions may not contain the billing mode summary
// even though they're provisioned, so we need to check for throughput settings
// as well to determine if a table capacity is provisioned
func isProvisioned(t *dynamodb.DescribeTableOutput) bool {
if t.Table.BillingModeSummary == nil {
billingMode := getBillingMode(t.Table.ProvisionedThroughput)
return billingMode == types.BillingModeProvisioned
}
return t.Table.BillingModeSummary.BillingMode == types.BillingModeProvisioned
}
func (c *client) UpdateCapacity(ctx context.Context, account, region, tableName string, targetTableCapacity *dynamodbv1.Throughput, indexUpdates []*dynamodbv1.IndexUpdateAction, ignore_maximums bool) (*dynamodbv1.Table, error) {
cl, err := c.getAccountRegionClient(account, region)
if err != nil {
return nil, err
}
currentTable, err := getTable(ctx, cl, tableName)
if err != nil {
c.log.Error("unable to find table", zap.Error(err))
return nil, err
}
if !(isProvisioned(currentTable)) {
return nil, status.Error(codes.FailedPrecondition, "Table billing mode is not set to PROVISIONED, cannot scale capacities.")
}
// initialize the input we'll pass to AWS
input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
}
// add target table capacity to input if valid
if targetTableCapacity != nil { // received new vals for table
t := types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(targetTableCapacity.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(targetTableCapacity.WriteCapacityUnits),
}
err = isValidIncrease(cl, currentTable.Table.ProvisionedThroughput, t, ignore_maximums)
if err != nil {
return nil, err
}
input.ProvisionedThroughput = &t
}
// add target index capacities to input if valid
if len(indexUpdates) > 0 { // received at least one index update
updates, err := generateIndexUpdates(cl, currentTable, indexUpdates, ignore_maximums)
if err != nil {
return nil, err
}
input.GlobalSecondaryIndexUpdates = append(input.GlobalSecondaryIndexUpdates, updates...)
}
result, err := cl.dynamodb.UpdateTable(ctx, input)
if err != nil {
return nil, err
}
ret := newProtoForTable(result.TableDescription, account, region)
return ret, nil
}
// given a list of ddbv1.updates, generate the AWS types for updates
// TO DO: this function currently terminates upon encountering one invalid index update,
// preserving the AWS SDK design of "all-or-nothing" UpdateTable functionality.
// confirm if that is still the desired behavior. Alternatively, we can selectively make updates.
func generateIndexUpdates(cl *regionalClient, t *dynamodb.DescribeTableOutput, indexUpdates []*dynamodbv1.IndexUpdateAction, ignore_maximums bool) ([]types.GlobalSecondaryIndexUpdate, error) {
currentIndexes := t.Table.GlobalSecondaryIndexes
var updates []types.GlobalSecondaryIndexUpdate
for _, i := range indexUpdates {
currentIndex, err := getGlobalSecondaryIndex(currentIndexes, i.Name)
if err != nil {
return nil, err
}
targetIndexCapacity := types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(i.IndexThroughput.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(i.IndexThroughput.WriteCapacityUnits),
}
err = isValidIncrease(cl, currentIndex.ProvisionedThroughput, targetIndexCapacity, ignore_maximums)
if err != nil {
return nil, err
}
newIndexUpdate := types.GlobalSecondaryIndexUpdate{
Update: &types.UpdateGlobalSecondaryIndexAction{
IndexName: aws.String(i.Name),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(i.IndexThroughput.ReadCapacityUnits),
WriteCapacityUnits: aws.Int64(i.IndexThroughput.WriteCapacityUnits),
},
},
}
updates = append(updates, newIndexUpdate)
}
return updates, nil
}
func (c *client) BatchGetItem(ctx context.Context, account string, region string, input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) {
client, err := c.getAccountRegionClient(account, region)
if err != nil {
return nil, err
}
return client.dynamodb.BatchGetItem(ctx, input)
}
func (c *client) DescribeContinuousBackups(ctx context.Context, account string, region string, tableName string) (*dynamodbv1.ContinuousBackups, error) {
cl, err := c.getAccountRegionClient(account, region)
if err != nil {
return nil, err
}
input := &dynamodb.DescribeContinuousBackupsInput{TableName: aws.String(tableName)}
res, err := cl.dynamodb.DescribeContinuousBackups(ctx, input)
if err != nil {
c.log.Warn("unable to get backups description for table: "+tableName, zap.Error(err))
return nil, err
}
ret := newProtoForContinuousBackups(res.ContinuousBackupsDescription, account, region)
return ret, nil
}