Skip to content

Commit

Permalink
review reflect: move worker creation on successful table creation, en…
Browse files Browse the repository at this point in the history
…hance log
  • Loading branch information
winnie.byun committed Aug 18, 2020
1 parent 6052873 commit 4a497a0
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/session"

"github.com/rcrowley/go-metrics"

"github.com/pkg/errors"
Expand Down Expand Up @@ -128,28 +129,12 @@ func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error)
config.Endpoint = "https://dynamodb." + config.Region + ".amazonaws.com"
}

dynamoOpenedDBNum++

s3FileDB, err := newS3FileDB(config.Region, "https://s3."+config.Region+".amazonaws.com", config.TableName)
if err != nil {
logger.Error("Unable to create/get S3FileDB", "DB", config.TableName)
return nil, err
}

dynamoOnceWorker.Do(func() {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
}
logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
})

dynamoDB := &dynamoDB{
config: *config,
fdb: s3FileDB,
Expand All @@ -176,6 +161,12 @@ func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error)
switch tableStatus {
case dynamodb.TableStatusActive:
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be CHARGED until the DB is deleted.", "endPoint", config.Endpoint)
// count successful table creating
dynamoOpenedDBNum++
// create workers on the first successful table creation
dynamoOnceWorker.Do(func() {
createBatchWriteWorkerPool(config.Endpoint, config.Region)
})
return dynamoDB, nil
case dynamodb.TableStatusDeleting, dynamodb.TableStatusArchiving, dynamodb.TableStatusArchived:
return nil, errors.New("failed to get DynamoDB table, table status : " + tableStatus)
Expand Down Expand Up @@ -384,11 +375,26 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
return nil
}

func createBatchWriteWorkerPool(endpoint, region string) {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(endpoint),
Region: aws.String(region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
}
logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
}

func createBatchWriteWorker(writeCh <-chan *batchWriteWorkerInput) {
failCount := 0
batchWriteInput := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{},
}
logger.Debug("generate a dynamoDB batchWrite worker")

for batchInput := range writeCh {
batchWriteInput.RequestItems[batchInput.tableName] = batchInput.items
Expand Down

0 comments on commit 4a497a0

Please sign in to comment.