diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml new file mode 100644 index 0000000..54cd5a4 --- /dev/null +++ b/.github/workflows/test.yaml @@ -0,0 +1,18 @@ +name: Test + +on: [push] + +jobs: + go-test: + runs-on: ubuntu-latest + steps: + - name: set up go + uses: actions/setup-go@v2 + with: + go-version: 1.22 + - name: checkout the code + uses: actions/checkout@v2 + - name: check format + run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi + # - name: test + # run: make test diff --git a/.gitignore b/.gitignore index 50d62a7..436debc 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ +.idea dynamodb-sync + +dist/ diff --git a/.goreleaser.yaml b/.goreleaser.yaml new file mode 100644 index 0000000..e4cb740 --- /dev/null +++ b/.goreleaser.yaml @@ -0,0 +1,23 @@ + +version: 2 + +before: + hooks: + - rm -rf ./dist + - go mod tidy + - go generate ./... + +builds: + - env: [CGO_ENABLED=0] + goos: + - linux + goarch: + - amd64 + - arm64 + +changelog: + sort: asc + filters: + exclude: + - "^docs:" + - "^test:" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..83ed0f6 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +.PHONY: clean +clean: + rm -f dynamodb-sync + +.PHONY: fmt +fmt: + gofmt -s -w . + +.PHONY: test +test: + go test -v ./... + +.PHONY: build +build: clean + GOOS=linux GOARCH=amd64 go build diff --git a/README.md b/README.md index b9025b0..fd0e79f 100644 --- a/README.md +++ b/README.md @@ -1 +1,3 @@ -# dynamodb-sync \ No newline at end of file +[![Test build status](https://github.com/thumbtack/dynamodb-sync/workflows/Test/badge.svg)](https://github.com/thumbtack/dynamodb-sync/actions?query=workflow%3ATest) + +## dynamodb-sync diff --git a/checkpoint.go b/checkpoint.go index 52e9b18..dca3e41 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -128,7 +128,8 @@ func (sync *syncState) isCheckpointFound(key primaryKey) bool { // Update the checkpoint for `key's` local state // sync : timestamp, -// checkpoint[`shardId`]: `sequenceNumber` +// +// checkpoint[`shardId`]: `sequenceNumber` func (sync *syncState) updateCheckpointLocal( key primaryKey, sequenceNumber string, diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e915622 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/thumbtack/dynamodb-sync + +go 1.22 + +require ( + github.com/aws/aws-sdk-go v1.54.1 + github.com/sirupsen/logrus v1.9.3 + golang.org/x/time v0.5.0 +) + +require ( + github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/stretchr/testify v1.9.0 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..775dd3c --- /dev/null +++ b/go.sum @@ -0,0 +1,27 @@ +github.com/aws/aws-sdk-go v1.54.1 h1:+ULL7oLC+v3T00fOMIohUarPI3SR3oyDd6FBEvgdhvs= +github.com/aws/aws-sdk-go v1.54.1/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index bad7a58..ad74269 100644 --- a/main.go +++ b/main.go @@ -25,12 +25,8 @@ package main import ( "encoding/json" "errors" - "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "strings" - //"github.com/aws/aws-sdk-go/aws/credentials/stscreds" - "github.com/thumbtack/go/lib/metrics" - //"github.com/thumbtack/go/lib/monitoring" "io/ioutil" "net/http" _ "net/http/pprof" @@ -40,6 +36,7 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodbstreams" @@ -61,7 +58,6 @@ var ddbTable = os.Getenv(paramCheckpointTable) var ddbRegion = os.Getenv(paramCheckpointRegion) var ddbEndpoint = os.Getenv(paramCheckpointEndpoint) var ddbClient = ddbConfigConnect(ddbRegion, ddbEndpoint, maxRetries, *logger) -var metricsClient = newMetricsClient() type config struct { SrcTable string `json:"src_table"` @@ -77,7 +73,7 @@ type config struct { ReadQps int64 `json:"read_qps"` WriteQps int64 `json:"write_qps"` UpdateCheckpointThreshold int `json:"update_checkpoint_threshold"` - EnableStreaming *bool `json:"enable_streaming"` + EnableStreaming *bool `json:"enable_streaming"` } // Config file is read and dumped into this struct @@ -108,12 +104,12 @@ func NewSyncState(tableConfig config) *syncState { var stream *dynamodbstreams.DynamoDBStreams tr := &http.Transport{ - MaxIdleConns: 2048, - MaxConnsPerHost: 1024, + MaxIdleConns: 2048, + MaxConnsPerHost: 1024, } httpClient := &http.Client{ - Timeout:8*time.Second, - Transport:tr} + Timeout: 8 * time.Second, + Transport: tr} srcSess := session.Must( session.NewSession( @@ -169,8 +165,8 @@ func NewSyncState(tableConfig config) *syncState { } type appConfig struct { - sync []config - verbose bool + sync []config + verbose bool } // The primary key of the Checkpoint ddb table, of the stream etc @@ -199,15 +195,6 @@ func ddbConfigConnect(region string, endpoint string, maxRetries int, logger log ))) } -func newMetricsClient() (client metrics.Client) { - client, err := metrics.NewAlfredAppClient() - if err != nil { - logger.WithFields(logging.Fields{"Error":err}).Error("Error in initializing metrics") - os.Exit(1) - } - return client -} - // app constructor func NewApp() *appConfig { logger.SetLevel(logging.InfoLevel) @@ -239,8 +226,8 @@ func NewApp() *appConfig { } return &appConfig{ - sync: tableConfig, - verbose: true, + sync: tableConfig, + verbose: true, } } @@ -277,7 +264,6 @@ func setDefaults(tableConfig []config) ([]config, error) { continue } - if tableConfig[i].ReadQps == 0 { tableConfig[i].ReadQps = 500 } diff --git a/replicate.go b/replicate.go index 2a653ea..c231084 100644 --- a/replicate.go +++ b/replicate.go @@ -1,10 +1,11 @@ package main import ( - "golang.org/x/time/rate" "sync" "time" + "golang.org/x/time/rate" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodbstreams" @@ -196,8 +197,8 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error { numShards := 0 type shardStats struct { - numShards int - tableName string + numShards int + tableName string } type activeShardStats struct { @@ -227,7 +228,7 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error { } numShards += len(result.StreamDescription.Shards) - + for _, shard := range result.StreamDescription.Shards { sync.checkpointLock.RLock() _, ok := sync.expiredShards[*shard.ShardId] diff --git a/shard.go b/shard.go index d356c50..bed51a7 100644 --- a/shard.go +++ b/shard.go @@ -12,7 +12,7 @@ import ( func backoff(i int, s string) { wait := math.Pow(2, float64(i)) logger.WithFields(logging.Fields{ - "Backoff Caller": s, + "Backoff Caller": s, "Backoff Time(seconds)": wait, }).Info("Backing off") time.Sleep(time.Duration(wait) * time.Second) diff --git a/sync_test.go b/sync_test.go index e768e6f..e0289c8 100644 --- a/sync_test.go +++ b/sync_test.go @@ -272,7 +272,7 @@ func (sync *syncState) testStreamSyncWait() { } func (sync *syncState) testExpireShards() { - for random, _ := range sync.checkpoint { + for random := range sync.checkpoint { k := primaryKey{sync.tableConfig.SrcTable, sync.tableConfig.DstTable} sync.expireCheckpointLocal(k, aws.String(random)) sync.expireCheckpointRemote(k, random) diff --git a/table.go b/table.go index e276977..f875a23 100644 --- a/table.go +++ b/table.go @@ -2,12 +2,13 @@ package main import ( "errors" - "golang.org/x/time/rate" "os" "strings" "sync" "time" + "golang.org/x/time/rate" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" logging "github.com/sirupsen/logrus" @@ -23,36 +24,36 @@ func (sync *syncState) writeBatch( batch map[string][]*dynamodb.WriteRequest, key primaryKey, rl *rate.Limiter, reqCapacity float64, writeBatchSize int64) []*dynamodb.ConsumedCapacity { - i := 0 - r := rl.ReserveN(time.Now(), int(reqCapacity)) - if !r.OK() { - r = rl.ReserveN(time.Now(), int(writeBatchSize)) - } - time.Sleep(r.Delay()) - consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0) - - for len(batch) > 0 { - output,_ := sync.dstDynamo.BatchWriteItem( - &dynamodb.BatchWriteItemInput{ - RequestItems: batch, - }) - - consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...) - - if output.UnprocessedItems != nil { - logger.WithFields(logging.Fields{ - "Unprocessed Items Size": len(output.UnprocessedItems), - "Source Table": key.sourceTable, - "Destination Table": key.dstTable, - }).Debug("Some items failed to be processed") - // exponential backoff before retrying - backoff(i, "BatchWrite") - i++ - // Retry writing items that were not processed - batch = output.UnprocessedItems - } + i := 0 + r := rl.ReserveN(time.Now(), int(reqCapacity)) + if !r.OK() { + r = rl.ReserveN(time.Now(), int(writeBatchSize)) + } + time.Sleep(r.Delay()) + consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0) + + for len(batch) > 0 { + output, _ := sync.dstDynamo.BatchWriteItem( + &dynamodb.BatchWriteItemInput{ + RequestItems: batch, + }) + + consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...) + + if output.UnprocessedItems != nil { + logger.WithFields(logging.Fields{ + "Unprocessed Items Size": len(output.UnprocessedItems), + "Source Table": key.sourceTable, + "Destination Table": key.dstTable, + }).Debug("Some items failed to be processed") + // exponential backoff before retrying + backoff(i, "BatchWrite") + i++ + // Retry writing items that were not processed + batch = output.UnprocessedItems } - return consumedCapacity + } + return consumedCapacity } // Group items from the `items` channel into @@ -192,13 +193,13 @@ func (sync *syncState) updateCapacity( var err error logger.WithFields(logging.Fields{ - "Table":tableName, - "New Read Capacity": newThroughput.readCapacity, + "Table": tableName, + "New Read Capacity": newThroughput.readCapacity, "New Write Capacity": newThroughput.writeCapacity}).Info("Updating capacity") input := &dynamodb.UpdateTableInput{ TableName: aws.String(tableName), ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(newThroughput.readCapacity), + ReadCapacityUnits: aws.Int64(newThroughput.readCapacity), WriteCapacityUnits: aws.Int64(newThroughput.writeCapacity), }, } @@ -274,9 +275,9 @@ func (sync *syncState) getCapacity(tableName string, dynamo *dynamodb.DynamoDB) } else { result := output.Table.ProvisionedThroughput logger.WithFields(logging.Fields{ - "Table": tableName, - "Read Capacity": *result.ReadCapacityUnits, - "Write Capacity": *result.WriteCapacityUnits, + "Table": tableName, + "Read Capacity": *result.ReadCapacityUnits, + "Write Capacity": *result.WriteCapacityUnits, }).Info("Fetched provisioned throughput of table") return provisionedThroughput{ *result.ReadCapacityUnits, @@ -316,9 +317,9 @@ func (sync *syncState) createTable(key primaryKey, properties *dynamodb.Describe }).Info("Creating table") input := &dynamodb.CreateTableInput{ - TableName: aws.String(sync.tableConfig.DstTable), - KeySchema: properties.Table.KeySchema, - AttributeDefinitions: properties.Table.AttributeDefinitions, + TableName: aws.String(sync.tableConfig.DstTable), + KeySchema: properties.Table.KeySchema, + AttributeDefinitions: properties.Table.AttributeDefinitions, ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ ReadCapacityUnits: properties.Table.ProvisionedThroughput.ReadCapacityUnits, WriteCapacityUnits: properties.Table.ProvisionedThroughput.WriteCapacityUnits,