Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamo use gloabal worker #624

Merged
merged 16 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions storage/database/db_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func databaseDBManager(dbc *DBConfig) (*databaseManager, error) {
fallthrough
case StateTrieDB:
newDBC := getDBEntryConfig(dbc, entryType, dir)
if dbc.NumStateTrieShards > 1 {
if dbc.NumStateTrieShards > 1 && !dbc.DBType.selfShardable() { // make non-sharding db if the db is sharding itself
db, err = newShardedDB(newDBC, entryType, dbc.NumStateTrieShards)
} else {
db, err = newDatabase(newDBC, entryType)
Expand Down Expand Up @@ -413,7 +413,7 @@ func newDatabase(dbc *DBConfig, entryType DBEntryType) (Database, error) {
case MemoryDB:
return NewMemDB(), nil
case DynamoDB:
return NewDynamoDB(dbc.DynamoDBConfig)
return NewDynamoDB(dbc.DynamoDBConfig, entryType)
default:
logger.Info("database type is not set, fall back to default LevelDB")
return NewLevelDB(dbc, 0)
Expand Down
157 changes: 85 additions & 72 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"

"github.com/rcrowley/go-metrics"

"github.com/pkg/errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/klaytn/klaytn/log"
)

var overSizedDataPrefix = []byte("oversizeditem")

// errors
var dataNotFoundErr = errors.New("data is not found with the given key")
var nilDynamoConfigErr = errors.New("attempt to create DynamoDB with nil configuration")
Expand All @@ -58,32 +61,34 @@ const dynamoMaxRetry = 5
const WorkerNum = 10
const itemChanSize = WorkerNum * 2

var overSizedDataPrefix = []byte("oversizeditem")
var (
dynamoDBClient *dynamodb.DynamoDB // handles dynamoDB connections
dynamoOnceWorker sync.Once // makes sure worker is created once
dynamoWriteCh chan *batchWriteWorkerInput // use global write channel for shared worker
dynamoOpenedDBNum uint
)

type DynamoDBConfig struct {
TableName string
Region string // AWS region
Endpoint string // Where DynamoDB reside
IsProvisioned bool // Billing mode
ReadCapacityUnits int64 // read capacity when provsioned
WriteCapacityUnits int64 // write capacity when provsioned
ReadCapacityUnits int64 // read capacity when provisioned
WriteCapacityUnits int64 // write capacity when provisioned
}

type batchWriteWorkerInput struct {
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
tableName string
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
}

// TODO-Klaytn refactor the structure : there are common configs that are placed separated
type dynamoDB struct {
config *DynamoDBConfig
db *dynamodb.DynamoDB
config DynamoDBConfig
fdb fileDB // where over size items are stored
logger log.Logger // Contextual logger tracking the database path

// worker pool
quitCh chan struct{}
writeCh chan *batchWriteWorkerInput

// metrics
batchWriteTimeMeter metrics.Meter
batchWriteCountMeter metrics.Meter
Expand Down Expand Up @@ -114,7 +119,7 @@ func GetDefaultDynamoDBConfig() *DynamoDBConfig {
}
}

func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error) {
if config == nil {
return nil, nilDynamoConfigErr
}
Expand All @@ -125,26 +130,18 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
config.Endpoint = "https://dynamodb." + config.Region + ".amazonaws.com"
}

logger.Info("creating s3FileDB ", "bucket", config.TableName)
s3FileDB, err := newS3FileDB(config.Region, "https://s3."+config.Region+".amazonaws.com", config.TableName)
if err != nil {
logger.Error("Unable to create/get S3FileDB", "DB", config.TableName)
return nil, err
}

dynamoDB := &dynamoDB{
config: config,
logger: logger.NewWith("region", config.Region, "tableName", config.TableName),
quitCh: make(chan struct{}),
writeCh: make(chan *batchWriteWorkerInput, itemChanSize),
fdb: s3FileDB,
db: dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
}))),
config: *config,
fdb: s3FileDB,
}
dynamoDB.config.TableName += "-" + dbBaseDirs[dbtype]
dynamoDB.logger = logger.NewWith("region", config.Region, "tableName", dynamoDB.config.TableName)

// Check if the table is ready to serve
for {
Expand All @@ -164,11 +161,13 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {

switch tableStatus {
case dynamodb.TableStatusActive:
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be charged until the DB is deleted.", "endPoint", config.Endpoint)
for i := 0; i < WorkerNum; i++ {
go dynamoDB.createBatchWriteWorker()
}
dynamoDB.logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be CHARGED until the DB is deleted.", "endPoint", config.Endpoint)
// count successful table creating
dynamoOpenedDBNum++
// create workers on the first successful table creation
dynamoOnceWorker.Do(func() {
createBatchWriteWorkerPool(config.Endpoint, config.Region)
})
return dynamoDB, nil
case dynamodb.TableStatusDeleting, dynamodb.TableStatusArchiving, dynamodb.TableStatusArchived:
return nil, errors.New("failed to get DynamoDB table, table status : " + tableStatus)
Expand Down Expand Up @@ -207,7 +206,7 @@ func (dynamo *dynamoDB) createTable() error {
dynamo.logger.Warn("Billing mode is provisioned. You will be charged every hour.", "RCU", dynamo.config.ReadCapacityUnits, "WRU", dynamo.config.WriteCapacityUnits)
}

_, err := dynamo.db.CreateTable(input)
_, err := dynamoDBClient.CreateTable(input)
if err != nil {
dynamo.logger.Error("Error while creating the DynamoDB table", "err", err, "tableName", dynamo.config.TableName)
return err
Expand All @@ -217,7 +216,7 @@ func (dynamo *dynamoDB) createTable() error {
}

func (dynamo *dynamoDB) deleteTable() error {
if _, err := dynamo.db.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
if _, err := dynamoDBClient.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
dynamo.logger.Error("Error while deleting the DynamoDB table", "tableName", dynamo.config.TableName)
return err
}
Expand All @@ -235,7 +234,7 @@ func (dynamo *dynamoDB) tableStatus() (string, error) {
}

func (dynamo *dynamoDB) tableDescription() (*dynamodb.TableDescription, error) {
describe, err := dynamo.db.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
describe, err := dynamoDBClient.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
if describe == nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +271,7 @@ func (dynamo *dynamoDB) Put(key []byte, val []byte) error {
Item: marshaledData,
}

_, err = dynamo.db.PutItem(params)
_, err = dynamoDBClient.PutItem(params)
if err != nil {
fmt.Printf("Put ERROR: %v\n", err.Error())
return err
Expand Down Expand Up @@ -301,7 +300,7 @@ func (dynamo *dynamoDB) Get(key []byte) ([]byte, error) {
ConsistentRead: aws.Bool(true),
}

result, err := dynamo.db.GetItem(params)
result, err := dynamoDBClient.GetItem(params)
if err != nil {
fmt.Printf("Get ERROR: %v\n", err.Error())
return nil, err
Expand Down Expand Up @@ -338,7 +337,7 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
},
}

_, err := dynamo.db.DeleteItem(params)
_, err := dynamoDBClient.DeleteItem(params)
if err != nil {
fmt.Printf("ERROR: %v\n", err.Error())
return err
Expand All @@ -347,7 +346,12 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
}

func (dynamo *dynamoDB) Close() {
close(dynamo.quitCh)
if dynamoOpenedDBNum > 0 {
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
dynamoOpenedDBNum--
}
if dynamoOpenedDBNum == 0 {
close(dynamoWriteCh)
}
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
}

func (dynamo *dynamoDB) Meter(prefix string) {
Expand All @@ -374,49 +378,58 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
return nil
}

func (dynamo *dynamoDB) createBatchWriteWorker() {
func createBatchWriteWorkerPool(endpoint, region string) {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(endpoint),
Region: aws.String(region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
}
logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
}

func createBatchWriteWorker(writeCh <-chan *batchWriteWorkerInput) {
failCount := 0
tableName := dynamo.config.TableName
batchWriteInput := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{},
}
dynamo.logger.Info("generate a dynamoDB batchWrite worker")

for {
select {
case <-dynamo.quitCh:
dynamo.logger.Info("close a dynamoDB batchWrite worker")
return
case batchInput := <-dynamo.writeCh:
batchWriteInput.RequestItems[tableName] = batchInput.items

BatchWriteItemOutput, err := dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
dynamo.logger.Warn("dynamoDB failed to write batch items", "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
dynamo.logger.Error("dynamoDB failed many times. sleep a second and retry",
"failCnt", failCount)
time.Sleep(time.Second)
}
}

if numUnprocessed != 0 {
dynamo.logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[tableName] = BatchWriteItemOutput.UnprocessedItems[tableName]
logger.Debug("generate a dynamoDB batchWrite worker")

for batchInput := range writeCh {
batchWriteInput.RequestItems[batchInput.tableName] = batchInput.items

BatchWriteItemOutput, err := dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[batchInput.tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
logger.Warn("dynamoDB failed to write batch items",
"tableName", batchInput.tableName, "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
logger.Error("dynamoDB failed many times. sleep a second and retry",
"tableName", batchInput.tableName, "failCnt", failCount)
time.Sleep(time.Second)
}
}

BatchWriteItemOutput, err = dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
if numUnprocessed != 0 {
logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"tableName", batchInput.tableName, "numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[batchInput.tableName] = BatchWriteItemOutput.UnprocessedItems[batchInput.tableName]
}

failCount = 0
batchInput.wg.Done()
BatchWriteItemOutput, err = dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
}

failCount = 0
batchInput.wg.Done()
}
logger.Debug("close a dynamoDB batchWrite worker")
}

func (dynamo *dynamoDB) NewBatch() Batch {
Expand Down Expand Up @@ -474,7 +487,7 @@ func (batch *dynamoBatch) Put(key, val []byte) error {

if len(batch.batchItems) == dynamoBatchSize {
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{batch.batchItems, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, batch.batchItems, batch.wg}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility of panic if Close is called while processing Put operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just tested.
A panic will not occur even though Close() is called on non-empty channel. (call Put() -> Close()) It will close after all of the items are processed.
However, a panic will occur when put is called in a closed channel (call Close() -> Put()).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. To avoid the second case, we need to use switch & case. Do you have any other alternatives?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the original code, not all of the items in the channel will be processed. There might be some items we expect to be writen but actually no. Actually, in the original code, it will wait inifinitively in Write() because the workers will not process the input and wg.Done() is not called.
Is there other ways to handle both cases?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to consider the 2nd case. It is misbehavior of users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to consider the 2nd case. It is misbehavior of users.

I think this is the case which rarely happens.
However, I don't think it is misbehavior of users. Since the user does not know whether it will cause panic or not.

However, again, as Database.Close is called after Blockchain.Stop, there is no possibility that Database.Put is called after Database.Close.

batch.Reset()
}
return nil
Expand All @@ -492,7 +505,7 @@ func (batch *dynamoBatch) Write() error {
writeRequest = batch.batchItems
}
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{writeRequest, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, writeRequest, batch.wg}
numRemainedItems -= len(writeRequest)
}

Expand Down
6 changes: 3 additions & 3 deletions storage/database/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func testDynamoDB_Put(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +50,7 @@ func testDynamoDB_Put(t *testing.T) {
}

func testDynamoBatch_Write(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testDynamoBatch_Write(t *testing.T) {
}

func testDynamoBatch_WriteLargeData(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions storage/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (db DBType) ToValid() DBType {
return ""
}

// selfShardable returns if the db is able to shard by itself or not
func (db DBType) selfShardable() bool {
switch db {
case DynamoDB:
return true
}
return false
}

const IdealBatchSize = 100 * 1024

// Putter wraps the database write operation supported by both batches and regular databases.
Expand Down
2 changes: 1 addition & 1 deletion storage/database/s3filedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
}

if !exist {
localLogger.Warn("Requesting create S3 bucket. You will be charged until the bucket is deleted.")
_, err = s3DB.s3.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
Expand All @@ -85,6 +84,7 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
return nil, err
}
}
localLogger.Warn("Successfully created S3 bucket. You will be CHARGED until the bucket is deleted.")
return s3DB, nil
}

Expand Down