Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Test

on: [push]

jobs:
go-test:
runs-on: ubuntu-latest
steps:
- name: set up go
uses: actions/setup-go@v2
with:
go-version: 1.22
- name: checkout the code
uses: actions/checkout@v2
- name: check format
run: if [ "$(gofmt -s -l . | wc -l)" -gt 0 ]; then exit 1; fi
# - name: test
# run: make test
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
.idea
dynamodb-sync

dist/
23 changes: 23 additions & 0 deletions .goreleaser.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

version: 2

before:
hooks:
- rm -rf ./dist
- go mod tidy
- go generate ./...

builds:
- env: [CGO_ENABLED=0]
goos:
- linux
goarch:
- amd64
- arm64

changelog:
sort: asc
filters:
exclude:
- "^docs:"
- "^test:"
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.PHONY: clean
clean:
rm -f dynamodb-sync

.PHONY: fmt
fmt:
gofmt -s -w .

.PHONY: test
test:
go test -v ./...

.PHONY: build
build: clean
GOOS=linux GOARCH=amd64 go build
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
# dynamodb-sync
[![Test build status](https://github.com/thumbtack/dynamodb-sync/workflows/Test/badge.svg)](https://github.com/thumbtack/dynamodb-sync/actions?query=workflow%3ATest)

## dynamodb-sync
3 changes: 2 additions & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (sync *syncState) isCheckpointFound(key primaryKey) bool {

// Update the checkpoint for `key's` local state
// sync : timestamp,
// checkpoint[`shardId`]: `sequenceNumber`
//
// checkpoint[`shardId`]: `sequenceNumber`
func (sync *syncState) updateCheckpointLocal(
key primaryKey,
sequenceNumber string,
Expand Down
15 changes: 15 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/thumbtack/dynamodb-sync

go 1.22

require (
github.com/aws/aws-sdk-go v1.54.1
github.com/sirupsen/logrus v1.9.3
golang.org/x/time v0.5.0
)

require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)
27 changes: 27 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
github.com/aws/aws-sdk-go v1.54.1 h1:+ULL7oLC+v3T00fOMIohUarPI3SR3oyDd6FBEvgdhvs=
github.com/aws/aws-sdk-go v1.54.1/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
34 changes: 10 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ package main
import (
"encoding/json"
"errors"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"strings"

//"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/thumbtack/go/lib/metrics"
//"github.com/thumbtack/go/lib/monitoring"
"io/ioutil"
"net/http"
_ "net/http/pprof"
Expand All @@ -40,6 +36,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand All @@ -61,7 +58,6 @@ var ddbTable = os.Getenv(paramCheckpointTable)
var ddbRegion = os.Getenv(paramCheckpointRegion)
var ddbEndpoint = os.Getenv(paramCheckpointEndpoint)
var ddbClient = ddbConfigConnect(ddbRegion, ddbEndpoint, maxRetries, *logger)
var metricsClient = newMetricsClient()

type config struct {
SrcTable string `json:"src_table"`
Expand All @@ -77,7 +73,7 @@ type config struct {
ReadQps int64 `json:"read_qps"`
WriteQps int64 `json:"write_qps"`
UpdateCheckpointThreshold int `json:"update_checkpoint_threshold"`
EnableStreaming *bool `json:"enable_streaming"`
EnableStreaming *bool `json:"enable_streaming"`
}

// Config file is read and dumped into this struct
Expand Down Expand Up @@ -108,12 +104,12 @@ func NewSyncState(tableConfig config) *syncState {
var stream *dynamodbstreams.DynamoDBStreams

tr := &http.Transport{
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
MaxIdleConns: 2048,
MaxConnsPerHost: 1024,
}
httpClient := &http.Client{
Timeout:8*time.Second,
Transport:tr}
Timeout: 8 * time.Second,
Transport: tr}

srcSess := session.Must(
session.NewSession(
Expand Down Expand Up @@ -169,8 +165,8 @@ func NewSyncState(tableConfig config) *syncState {
}

type appConfig struct {
sync []config
verbose bool
sync []config
verbose bool
}

// The primary key of the Checkpoint ddb table, of the stream etc
Expand Down Expand Up @@ -199,15 +195,6 @@ func ddbConfigConnect(region string, endpoint string, maxRetries int, logger log
)))
}

func newMetricsClient() (client metrics.Client) {
client, err := metrics.NewAlfredAppClient()
if err != nil {
logger.WithFields(logging.Fields{"Error":err}).Error("Error in initializing metrics")
os.Exit(1)
}
return client
}

// app constructor
func NewApp() *appConfig {
logger.SetLevel(logging.InfoLevel)
Expand Down Expand Up @@ -239,8 +226,8 @@ func NewApp() *appConfig {
}

return &appConfig{
sync: tableConfig,
verbose: true,
sync: tableConfig,
verbose: true,
}
}

Expand Down Expand Up @@ -277,7 +264,6 @@ func setDefaults(tableConfig []config) ([]config, error) {
continue
}


if tableConfig[i].ReadQps == 0 {
tableConfig[i].ReadQps = 500
}
Expand Down
9 changes: 5 additions & 4 deletions replicate.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package main

import (
"golang.org/x/time/rate"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
Expand Down Expand Up @@ -196,8 +197,8 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error {
numShards := 0

type shardStats struct {
numShards int
tableName string
numShards int
tableName string
}

type activeShardStats struct {
Expand Down Expand Up @@ -227,7 +228,7 @@ func (sync *syncState) streamSync(key primaryKey, streamArn string) error {
}

numShards += len(result.StreamDescription.Shards)

for _, shard := range result.StreamDescription.Shards {
sync.checkpointLock.RLock()
_, ok := sync.expiredShards[*shard.ShardId]
Expand Down
2 changes: 1 addition & 1 deletion shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func backoff(i int, s string) {
wait := math.Pow(2, float64(i))
logger.WithFields(logging.Fields{
"Backoff Caller": s,
"Backoff Caller": s,
"Backoff Time(seconds)": wait,
}).Info("Backing off")
time.Sleep(time.Duration(wait) * time.Second)
Expand Down
2 changes: 1 addition & 1 deletion sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (sync *syncState) testStreamSyncWait() {
}

func (sync *syncState) testExpireShards() {
for random, _ := range sync.checkpoint {
for random := range sync.checkpoint {
k := primaryKey{sync.tableConfig.SrcTable, sync.tableConfig.DstTable}
sync.expireCheckpointLocal(k, aws.String(random))
sync.expireCheckpointRemote(k, random)
Expand Down
79 changes: 40 additions & 39 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"errors"
"golang.org/x/time/rate"
"os"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
logging "github.com/sirupsen/logrus"
Expand All @@ -23,36 +24,36 @@ func (sync *syncState) writeBatch(
batch map[string][]*dynamodb.WriteRequest,
key primaryKey, rl *rate.Limiter, reqCapacity float64,
writeBatchSize int64) []*dynamodb.ConsumedCapacity {
i := 0
r := rl.ReserveN(time.Now(), int(reqCapacity))
if !r.OK() {
r = rl.ReserveN(time.Now(), int(writeBatchSize))
}
time.Sleep(r.Delay())
consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0)

for len(batch) > 0 {
output,_ := sync.dstDynamo.BatchWriteItem(
&dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...)

if output.UnprocessedItems != nil {
logger.WithFields(logging.Fields{
"Unprocessed Items Size": len(output.UnprocessedItems),
"Source Table": key.sourceTable,
"Destination Table": key.dstTable,
}).Debug("Some items failed to be processed")
// exponential backoff before retrying
backoff(i, "BatchWrite")
i++
// Retry writing items that were not processed
batch = output.UnprocessedItems
}
i := 0
r := rl.ReserveN(time.Now(), int(reqCapacity))
if !r.OK() {
r = rl.ReserveN(time.Now(), int(writeBatchSize))
}
time.Sleep(r.Delay())
consumedCapacity := make([]*dynamodb.ConsumedCapacity, 0)

for len(batch) > 0 {
output, _ := sync.dstDynamo.BatchWriteItem(
&dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

consumedCapacity = append(consumedCapacity, output.ConsumedCapacity...)

if output.UnprocessedItems != nil {
logger.WithFields(logging.Fields{
"Unprocessed Items Size": len(output.UnprocessedItems),
"Source Table": key.sourceTable,
"Destination Table": key.dstTable,
}).Debug("Some items failed to be processed")
// exponential backoff before retrying
backoff(i, "BatchWrite")
i++
// Retry writing items that were not processed
batch = output.UnprocessedItems
}
return consumedCapacity
}
return consumedCapacity
}

// Group items from the `items` channel into
Expand Down Expand Up @@ -192,13 +193,13 @@ func (sync *syncState) updateCapacity(

var err error
logger.WithFields(logging.Fields{
"Table":tableName,
"New Read Capacity": newThroughput.readCapacity,
"Table": tableName,
"New Read Capacity": newThroughput.readCapacity,
"New Write Capacity": newThroughput.writeCapacity}).Info("Updating capacity")
input := &dynamodb.UpdateTableInput{
TableName: aws.String(tableName),
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(newThroughput.readCapacity),
ReadCapacityUnits: aws.Int64(newThroughput.readCapacity),
WriteCapacityUnits: aws.Int64(newThroughput.writeCapacity),
},
}
Expand Down Expand Up @@ -274,9 +275,9 @@ func (sync *syncState) getCapacity(tableName string, dynamo *dynamodb.DynamoDB)
} else {
result := output.Table.ProvisionedThroughput
logger.WithFields(logging.Fields{
"Table": tableName,
"Read Capacity": *result.ReadCapacityUnits,
"Write Capacity": *result.WriteCapacityUnits,
"Table": tableName,
"Read Capacity": *result.ReadCapacityUnits,
"Write Capacity": *result.WriteCapacityUnits,
}).Info("Fetched provisioned throughput of table")
return provisionedThroughput{
*result.ReadCapacityUnits,
Expand Down Expand Up @@ -316,9 +317,9 @@ func (sync *syncState) createTable(key primaryKey, properties *dynamodb.Describe
}).Info("Creating table")

input := &dynamodb.CreateTableInput{
TableName: aws.String(sync.tableConfig.DstTable),
KeySchema: properties.Table.KeySchema,
AttributeDefinitions: properties.Table.AttributeDefinitions,
TableName: aws.String(sync.tableConfig.DstTable),
KeySchema: properties.Table.KeySchema,
AttributeDefinitions: properties.Table.AttributeDefinitions,
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: properties.Table.ProvisionedThroughput.ReadCapacityUnits,
WriteCapacityUnits: properties.Table.ProvisionedThroughput.WriteCapacityUnits,
Expand Down