forked from gruntwork-io/terragrunt
/
dynamo_lock_table.go
182 lines (150 loc) · 6.65 KB
/
dynamo_lock_table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package dynamodb
import (
"time"
"fmt"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/gruntwork-io/terragrunt/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/gruntwork-io/terragrunt/errors"
)
// Create the lock table in DynamoDB if it doesn't already exist
func createLockTableIfNecessary(tableName string, client *dynamodb.DynamoDB) error {
tableExists, err := lockTableExistsAndIsActive(tableName, client)
if err != nil {
return err
}
if !tableExists {
util.Logger.Printf("Lock table %s does not exist in DynamoDB. Will need to create it just this first time.", tableName)
return createLockTable(tableName, DEFAULT_READ_CAPACITY_UNITS, DEFAULT_WRITE_CAPACITY_UNITS, client)
}
return nil
}
// Return true if the lock table exists in DynamoDB and is in "active" state
func lockTableExistsAndIsActive(tableName string, client *dynamodb.DynamoDB) (bool, error) {
output, err := client.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(tableName)})
if err != nil {
if awsErr, isAwsErr := err.(awserr.Error); isAwsErr && awsErr.Code() == "ResourceNotFoundException" {
return false, nil
} else {
return false, errors.WithStackTrace(err)
}
}
return *output.Table.TableStatus == dynamodb.TableStatusActive, nil
}
// Create a lock table in DynamoDB and wait until it is in "active" state. If the table already exists, merely wait
// until it is in "active" state.
func createLockTable(tableName string, readCapacityUnits int, writeCapacityUnits int, client *dynamodb.DynamoDB) error {
util.Logger.Printf("Creating table %s in DynamoDB", tableName)
attributeDefinitions := []*dynamodb.AttributeDefinition{
&dynamodb.AttributeDefinition{AttributeName: aws.String(ATTR_STATE_FILE_ID), AttributeType: aws.String(dynamodb.ScalarAttributeTypeS)},
}
keySchema := []*dynamodb.KeySchemaElement{
&dynamodb.KeySchemaElement{AttributeName: aws.String(ATTR_STATE_FILE_ID), KeyType: aws.String(dynamodb.KeyTypeHash)},
}
_, err := client.CreateTable(&dynamodb.CreateTableInput{
TableName: aws.String(tableName),
AttributeDefinitions: attributeDefinitions,
KeySchema: keySchema,
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(int64(readCapacityUnits)),
WriteCapacityUnits: aws.Int64(int64(writeCapacityUnits)),
},
})
if err != nil {
if isTableAlreadyBeingCreatedError(err) {
util.Logger.Printf("Looks like someone created table %s at the same time. Will wait for it to be in active state.", tableName)
} else {
return errors.WithStackTrace(err)
}
}
return waitForTableToBeActive(tableName, client, MAX_RETRIES_WAITING_FOR_TABLE_TO_BE_ACTIVE, SLEEP_BETWEEN_TABLE_STATUS_CHECKS)
}
// Return true if the given error is the error message returned by AWS when the resource already exists
func isTableAlreadyBeingCreatedError(err error) bool {
awsErr, isAwsErr := err.(awserr.Error)
return isAwsErr && awsErr.Code() == "ResourceInUseException"
}
// Wait for the given DynamoDB table to be in the "active" state. If it's not in "active" state, sleep for the
// specified amount of time, and try again, up to a maximum of maxRetries retries.
func waitForTableToBeActive(tableName string, client *dynamodb.DynamoDB, maxRetries int, sleepBetweenRetries time.Duration) error {
for i := 0; i < maxRetries; i++ {
tableReady, err := lockTableExistsAndIsActive(tableName, client)
if err != nil {
return err
}
if tableReady {
util.Logger.Printf("Success! Table %s is now in active state.", tableName)
return nil
}
util.Logger.Printf("Table %s is not yet in active state. Will check again after %s.", tableName, sleepBetweenRetries)
time.Sleep(sleepBetweenRetries)
}
return errors.WithStackTrace(TableActiveRetriesExceeded{TableName: tableName, Retries: maxRetries})
}
// Remove the given item from the DynamoDB lock table
func removeItemFromLockTable(itemId string, tableName string, client *dynamodb.DynamoDB) error {
// TODO: should we check that the entry has our own metadata and not someone else's?
_, err := client.DeleteItem(&dynamodb.DeleteItemInput{
Key: createKeyFromItemId(itemId),
TableName: aws.String(tableName),
})
return errors.WithStackTrace(err)
}
// Write the given item to the DynamoDB lock table. If the given item already exists, return an error.
func writeItemToLockTable(itemId string, tableName string, client *dynamodb.DynamoDB) error {
item, err := createItemAttributes(itemId, client)
if err != nil {
return err
}
// Conditional writes in DynamoDB should be strongly consistent: http://stackoverflow.com/a/23371813/483528
// https://r.32k.io/locking-with-dynamodb
_, err = client.PutItem(&dynamodb.PutItemInput{
TableName: aws.String(tableName),
Item: item,
ConditionExpression: aws.String(fmt.Sprintf("attribute_not_exists(%s)", ATTR_STATE_FILE_ID)),
})
return errors.WithStackTrace(err)
}
// Try to write the given item to the DynamoDB lock table. If the item already exists, that means someone already has
// the lock, so display their metadata, sleep for the given amount of time, and try again, up to a maximum of
// maxRetries retries.
func writeItemToLockTableUntilSuccess(itemId string, tableName string, client *dynamodb.DynamoDB, maxRetries int, sleepBetweenRetries time.Duration) error {
for i := 0; i < maxRetries; i++ {
util.Logger.Printf("Attempting to create lock item for state file %s in DynamoDB table %s", itemId, tableName)
err := writeItemToLockTable(itemId, tableName, client)
if err == nil {
util.Logger.Printf("Lock acquired!")
return nil
}
if isItemAlreadyExistsErr(err) {
displayLockMetadata(itemId, tableName, client)
util.Logger.Printf("Will try to acquire lock again in %s.", sleepBetweenRetries)
time.Sleep(sleepBetweenRetries)
} else {
return err
}
}
return errors.WithStackTrace(AcquireLockRetriesExceeded{ItemId: itemId, Retries: maxRetries})
}
// Return true if the given error is the error returned by AWS when a conditional check fails. This is usually
// indicates an item you tried to create already exists.
func isItemAlreadyExistsErr(err error) bool {
unwrappedErr := errors.Unwrap(err)
awsErr, isAwsErr := unwrappedErr.(awserr.Error)
return isAwsErr && awsErr.Code() == "ConditionalCheckFailedException"
}
type TableActiveRetriesExceeded struct {
TableName string
Retries int
}
func (err TableActiveRetriesExceeded) Error() string {
return fmt.Sprintf("Table %s is still not in active state after %d retries.", err.TableName, err.Retries)
}
type AcquireLockRetriesExceeded struct {
ItemId string
Retries int
}
func (err AcquireLockRetriesExceeded) Error() string {
return fmt.Sprintf("Unable to acquire lock for item %s after %d retries.", err.ItemId, err.Retries)
}