-
Notifications
You must be signed in to change notification settings - Fork 348
/
dynamodb.go
104 lines (91 loc) · 2.69 KB
/
dynamodb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package testutil
import (
"context"
"fmt"
"net/http"
"testing"
nanoid "github.com/matoous/go-nanoid/v2"
"github.com/ory/dockertest/v3"
dc "github.com/ory/dockertest/v3/docker"
"github.com/treeverse/lakefs/pkg/kv"
"github.com/treeverse/lakefs/pkg/kv/dynamodb"
kvparams "github.com/treeverse/lakefs/pkg/kv/params"
)
const (
dbContainerTimeoutSeconds = 10 * 60 // 10 min
DynamodbLocalPort = "6432"
DynamodbLocalURI = "http://localhost:6432"
chars = "abcdef1234567890"
charsSize = 8
DynamoDBReadCapacity = 1000
DynamoDBWriteCapacity = 1000
DynamoDBScanLimit = 10
)
func GetDynamoDBInstance() (string, func(), error) {
dockerPool, err := dockertest.NewPool("")
if err != nil {
return "", nil, fmt.Errorf("could not connect to Docker: %w", err)
}
dynamodbDockerRunOptions := &dockertest.RunOptions{
Repository: "amazon/dynamodb-local",
Tag: "latest",
PortBindings: map[dc.Port][]dc.PortBinding{
"8000/tcp": {{HostPort: DynamodbLocalPort}},
},
}
resource, err := dockerPool.RunWithOptions(dynamodbDockerRunOptions)
if err != nil {
return "", nil, fmt.Errorf("could not start dynamodb local: %w", err)
}
// set cleanup
closer := func() {
err = dockerPool.Purge(resource)
if err != nil {
panic("could not kill dynamodb local container")
}
}
// expire, just to make sure
err = resource.Expire(dbContainerTimeoutSeconds)
if err != nil {
return "", nil, fmt.Errorf("could not expire dynamodb local container: %w", err)
}
err = dockerPool.Retry(func() error {
// waiting for dynamodb container to be ready by issuing an HTTP get request with
// exponential backoff retry. The response is not really meaningful for that case
// and so is ignored
resp, err := http.Get(DynamodbLocalURI)
if err != nil {
return err
}
_ = resp.Body.Close()
return nil
})
if err != nil {
return "", nil, fmt.Errorf("could not connect to dynamodb at %s: %w", DynamodbLocalURI, err)
}
// return DB URI
return DynamodbLocalURI, closer, nil
}
func UniqueKVTableName() string {
return "kvstore_" + nanoid.MustGenerate(chars, charsSize)
}
func GetDynamoDBProd(ctx context.Context, tb testing.TB) kv.Store {
table := UniqueKVTableName()
testParams := &kvparams.DynamoDB{
TableName: table,
ScanLimit: DynamoDBScanLimit,
AwsRegion: "us-east-1",
}
store, err := kv.Open(ctx, kvparams.Config{Type: dynamodb.DriverName, DynamoDB: testParams})
if err != nil {
tb.Fatalf("failed to open kv dynamodb store %s", err)
}
tb.Cleanup(func() {
defer store.Close()
err = store.(*dynamodb.Store).DropTable()
if err != nil {
tb.Fatalf("failed to delete table from DB %s %s", table, err)
}
})
return store
}