Skip to content

Commit

Permalink
Merge pull request #624 from winnie-byun/dynamo-gloabal-worker
Browse files Browse the repository at this point in the history
Dynamo use gloabal worker
  • Loading branch information
joowon-byun committed Aug 19, 2020
2 parents 53c1c50 + da0f741 commit acfcd55
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 78 deletions.
4 changes: 2 additions & 2 deletions storage/database/db_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func databaseDBManager(dbc *DBConfig) (*databaseManager, error) {
fallthrough
case StateTrieDB:
newDBC := getDBEntryConfig(dbc, entryType, dir)
if dbc.NumStateTrieShards > 1 {
if dbc.NumStateTrieShards > 1 && !dbc.DBType.selfShardable() { // make non-sharding db if the db is sharding itself
db, err = newShardedDB(newDBC, entryType, dbc.NumStateTrieShards)
} else {
db, err = newDatabase(newDBC, entryType)
Expand Down Expand Up @@ -413,7 +413,7 @@ func newDatabase(dbc *DBConfig, entryType DBEntryType) (Database, error) {
case MemoryDB:
return NewMemDB(), nil
case DynamoDB:
return NewDynamoDB(dbc.DynamoDBConfig)
return NewDynamoDB(dbc.DynamoDBConfig, entryType)
default:
logger.Info("database type is not set, fall back to default LevelDB")
return NewLevelDB(dbc, 0)
Expand Down
157 changes: 85 additions & 72 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"

"github.com/rcrowley/go-metrics"

"github.com/pkg/errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/klaytn/klaytn/log"
)

var overSizedDataPrefix = []byte("oversizeditem")

// errors
var dataNotFoundErr = errors.New("data is not found with the given key")
var nilDynamoConfigErr = errors.New("attempt to create DynamoDB with nil configuration")
Expand All @@ -58,32 +61,34 @@ const dynamoMaxRetry = 5
const WorkerNum = 10
const itemChanSize = WorkerNum * 2

var overSizedDataPrefix = []byte("oversizeditem")
var (
dynamoDBClient *dynamodb.DynamoDB // handles dynamoDB connections
dynamoOnceWorker sync.Once // makes sure worker is created once
dynamoWriteCh chan *batchWriteWorkerInput // use global write channel for shared worker
dynamoOpenedDBNum uint
)

type DynamoDBConfig struct {
TableName string
Region string // AWS region
Endpoint string // Where DynamoDB reside
IsProvisioned bool // Billing mode
ReadCapacityUnits int64 // read capacity when provsioned
WriteCapacityUnits int64 // write capacity when provsioned
ReadCapacityUnits int64 // read capacity when provisioned
WriteCapacityUnits int64 // write capacity when provisioned
}

type batchWriteWorkerInput struct {
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
tableName string
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
}

// TODO-Klaytn refactor the structure : there are common configs that are placed separated
type dynamoDB struct {
config *DynamoDBConfig
db *dynamodb.DynamoDB
config DynamoDBConfig
fdb fileDB // where over size items are stored
logger log.Logger // Contextual logger tracking the database path

// worker pool
quitCh chan struct{}
writeCh chan *batchWriteWorkerInput

// metrics
batchWriteTimeMeter metrics.Meter
batchWriteCountMeter metrics.Meter
Expand Down Expand Up @@ -114,7 +119,7 @@ func GetDefaultDynamoDBConfig() *DynamoDBConfig {
}
}

func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error) {
if config == nil {
return nil, nilDynamoConfigErr
}
Expand All @@ -125,26 +130,18 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
config.Endpoint = "https://dynamodb." + config.Region + ".amazonaws.com"
}

logger.Info("creating s3FileDB ", "bucket", config.TableName)
s3FileDB, err := newS3FileDB(config.Region, "https://s3."+config.Region+".amazonaws.com", config.TableName)
if err != nil {
logger.Error("Unable to create/get S3FileDB", "DB", config.TableName)
return nil, err
}

dynamoDB := &dynamoDB{
config: config,
logger: logger.NewWith("region", config.Region, "tableName", config.TableName),
quitCh: make(chan struct{}),
writeCh: make(chan *batchWriteWorkerInput, itemChanSize),
fdb: s3FileDB,
db: dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
}))),
config: *config,
fdb: s3FileDB,
}
dynamoDB.config.TableName += "-" + dbBaseDirs[dbtype]
dynamoDB.logger = logger.NewWith("region", config.Region, "tableName", dynamoDB.config.TableName)

// Check if the table is ready to serve
for {
Expand All @@ -164,11 +161,13 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {

switch tableStatus {
case dynamodb.TableStatusActive:
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be charged until the DB is deleted.", "endPoint", config.Endpoint)
for i := 0; i < WorkerNum; i++ {
go dynamoDB.createBatchWriteWorker()
}
dynamoDB.logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be CHARGED until the DB is deleted.", "endPoint", config.Endpoint)
// count successful table creating
dynamoOpenedDBNum++
// create workers on the first successful table creation
dynamoOnceWorker.Do(func() {
createBatchWriteWorkerPool(config.Endpoint, config.Region)
})
return dynamoDB, nil
case dynamodb.TableStatusDeleting, dynamodb.TableStatusArchiving, dynamodb.TableStatusArchived:
return nil, errors.New("failed to get DynamoDB table, table status : " + tableStatus)
Expand Down Expand Up @@ -207,7 +206,7 @@ func (dynamo *dynamoDB) createTable() error {
dynamo.logger.Warn("Billing mode is provisioned. You will be charged every hour.", "RCU", dynamo.config.ReadCapacityUnits, "WRU", dynamo.config.WriteCapacityUnits)
}

_, err := dynamo.db.CreateTable(input)
_, err := dynamoDBClient.CreateTable(input)
if err != nil {
dynamo.logger.Error("Error while creating the DynamoDB table", "err", err, "tableName", dynamo.config.TableName)
return err
Expand All @@ -217,7 +216,7 @@ func (dynamo *dynamoDB) createTable() error {
}

func (dynamo *dynamoDB) deleteTable() error {
if _, err := dynamo.db.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
if _, err := dynamoDBClient.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
dynamo.logger.Error("Error while deleting the DynamoDB table", "tableName", dynamo.config.TableName)
return err
}
Expand All @@ -235,7 +234,7 @@ func (dynamo *dynamoDB) tableStatus() (string, error) {
}

func (dynamo *dynamoDB) tableDescription() (*dynamodb.TableDescription, error) {
describe, err := dynamo.db.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
describe, err := dynamoDBClient.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
if describe == nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +271,7 @@ func (dynamo *dynamoDB) Put(key []byte, val []byte) error {
Item: marshaledData,
}

_, err = dynamo.db.PutItem(params)
_, err = dynamoDBClient.PutItem(params)
if err != nil {
fmt.Printf("Put ERROR: %v\n", err.Error())
return err
Expand Down Expand Up @@ -301,7 +300,7 @@ func (dynamo *dynamoDB) Get(key []byte) ([]byte, error) {
ConsistentRead: aws.Bool(true),
}

result, err := dynamo.db.GetItem(params)
result, err := dynamoDBClient.GetItem(params)
if err != nil {
fmt.Printf("Get ERROR: %v\n", err.Error())
return nil, err
Expand Down Expand Up @@ -338,7 +337,7 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
},
}

_, err := dynamo.db.DeleteItem(params)
_, err := dynamoDBClient.DeleteItem(params)
if err != nil {
fmt.Printf("ERROR: %v\n", err.Error())
return err
Expand All @@ -347,7 +346,12 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
}

func (dynamo *dynamoDB) Close() {
close(dynamo.quitCh)
if dynamoOpenedDBNum > 0 {
dynamoOpenedDBNum--
}
if dynamoOpenedDBNum == 0 {
close(dynamoWriteCh)
}
}

func (dynamo *dynamoDB) Meter(prefix string) {
Expand All @@ -374,49 +378,58 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
return nil
}

func (dynamo *dynamoDB) createBatchWriteWorker() {
func createBatchWriteWorkerPool(endpoint, region string) {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(endpoint),
Region: aws.String(region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
}
logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
}

func createBatchWriteWorker(writeCh <-chan *batchWriteWorkerInput) {
failCount := 0
tableName := dynamo.config.TableName
batchWriteInput := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{},
}
dynamo.logger.Info("generate a dynamoDB batchWrite worker")

for {
select {
case <-dynamo.quitCh:
dynamo.logger.Info("close a dynamoDB batchWrite worker")
return
case batchInput := <-dynamo.writeCh:
batchWriteInput.RequestItems[tableName] = batchInput.items

BatchWriteItemOutput, err := dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
dynamo.logger.Warn("dynamoDB failed to write batch items", "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
dynamo.logger.Error("dynamoDB failed many times. sleep a second and retry",
"failCnt", failCount)
time.Sleep(time.Second)
}
}

if numUnprocessed != 0 {
dynamo.logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[tableName] = BatchWriteItemOutput.UnprocessedItems[tableName]
logger.Debug("generate a dynamoDB batchWrite worker")

for batchInput := range writeCh {
batchWriteInput.RequestItems[batchInput.tableName] = batchInput.items

BatchWriteItemOutput, err := dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[batchInput.tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
logger.Warn("dynamoDB failed to write batch items",
"tableName", batchInput.tableName, "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
logger.Error("dynamoDB failed many times. sleep a second and retry",
"tableName", batchInput.tableName, "failCnt", failCount)
time.Sleep(time.Second)
}
}

BatchWriteItemOutput, err = dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
if numUnprocessed != 0 {
logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"tableName", batchInput.tableName, "numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[batchInput.tableName] = BatchWriteItemOutput.UnprocessedItems[batchInput.tableName]
}

failCount = 0
batchInput.wg.Done()
BatchWriteItemOutput, err = dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
}

failCount = 0
batchInput.wg.Done()
}
logger.Debug("close a dynamoDB batchWrite worker")
}

func (dynamo *dynamoDB) NewBatch() Batch {
Expand Down Expand Up @@ -474,7 +487,7 @@ func (batch *dynamoBatch) Put(key, val []byte) error {

if len(batch.batchItems) == dynamoBatchSize {
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{batch.batchItems, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, batch.batchItems, batch.wg}
batch.Reset()
}
return nil
Expand All @@ -492,7 +505,7 @@ func (batch *dynamoBatch) Write() error {
writeRequest = batch.batchItems
}
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{writeRequest, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, writeRequest, batch.wg}
numRemainedItems -= len(writeRequest)
}

Expand Down
6 changes: 3 additions & 3 deletions storage/database/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func testDynamoDB_Put(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +50,7 @@ func testDynamoDB_Put(t *testing.T) {
}

func testDynamoBatch_Write(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testDynamoBatch_Write(t *testing.T) {
}

func testDynamoBatch_WriteLargeData(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions storage/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (db DBType) ToValid() DBType {
return ""
}

// selfShardable returns if the db is able to shard by itself or not
func (db DBType) selfShardable() bool {
switch db {
case DynamoDB:
return true
}
return false
}

const IdealBatchSize = 100 * 1024

// Putter wraps the database write operation supported by both batches and regular databases.
Expand Down
2 changes: 1 addition & 1 deletion storage/database/s3filedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
}

if !exist {
localLogger.Warn("Requesting create S3 bucket. You will be charged until the bucket is deleted.")
_, err = s3DB.s3.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
Expand All @@ -85,6 +84,7 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
return nil, err
}
}
localLogger.Warn("Successfully created S3 bucket. You will be CHARGED until the bucket is deleted.")
return s3DB, nil
}

Expand Down

0 comments on commit acfcd55

Please sign in to comment.