Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamo use gloabal worker #624

Merged
merged 16 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions storage/database/db_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func databaseDBManager(dbc *DBConfig) (*databaseManager, error) {
fallthrough
case StateTrieDB:
newDBC := getDBEntryConfig(dbc, entryType, dir)
if dbc.NumStateTrieShards > 1 {
if dbc.NumStateTrieShards > 1 && !dbc.DBType.selfSharding() { // make non-sharding db if the db is sharding itself
db, err = newShardedDB(newDBC, entryType, dbc.NumStateTrieShards)
} else {
db, err = newDatabase(newDBC, entryType)
Expand Down Expand Up @@ -413,7 +413,7 @@ func newDatabase(dbc *DBConfig, entryType DBEntryType) (Database, error) {
case MemoryDB:
return NewMemDB(), nil
case DynamoDB:
return NewDynamoDB(dbc.DynamoDBConfig)
return NewDynamoDB(dbc.DynamoDBConfig, entryType)
default:
logger.Info("database type is not set, fall back to default LevelDB")
return NewLevelDB(dbc, 0)
Expand Down
138 changes: 71 additions & 67 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/rcrowley/go-metrics"

"github.com/pkg/errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/klaytn/klaytn/log"
)

var overSizedDataPrefix = []byte("oversizeditem")

// errors
var dataNotFoundErr = errors.New("data is not found with the given key")
var nilDynamoConfigErr = errors.New("attempt to create DynamoDB with nil configuration")
Expand All @@ -58,32 +60,34 @@ const dynamoMaxRetry = 5
const WorkerNum = 10
const itemChanSize = WorkerNum * 2

var overSizedDataPrefix = []byte("oversizeditem")
var (
dynamoDBClient *dynamodb.DynamoDB // handles dynamoDB connections
dynamoOnceWorker sync.Once // makes sure worker is created once
dynamoWriteCh chan *batchWriteWorkerInput // use global write channel for shared worker
dynamoOpenedDBNum uint
)

type DynamoDBConfig struct {
TableName string
Region string // AWS region
Endpoint string // Where DynamoDB reside
IsProvisioned bool // Billing mode
ReadCapacityUnits int64 // read capacity when provsioned
WriteCapacityUnits int64 // write capacity when provsioned
ReadCapacityUnits int64 // read capacity when provisioned
WriteCapacityUnits int64 // write capacity when provisioned
}

type batchWriteWorkerInput struct {
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
tableName string
items []*dynamodb.WriteRequest
wg *sync.WaitGroup
logger log.Logger
aidan-kwon marked this conversation as resolved.
Show resolved Hide resolved
}

type dynamoDB struct {
config *DynamoDBConfig
db *dynamodb.DynamoDB
config DynamoDBConfig
fdb fileDB // where over size items are stored
logger log.Logger // Contextual logger tracking the database path

// worker pool
quitCh chan struct{}
writeCh chan *batchWriteWorkerInput

// metrics
batchWriteTimeMeter metrics.Meter
batchWriteCountMeter metrics.Meter
Expand Down Expand Up @@ -114,7 +118,7 @@ func GetDefaultDynamoDBConfig() *DynamoDBConfig {
}
}

func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error) {
if config == nil {
return nil, nilDynamoConfigErr
}
Expand All @@ -125,26 +129,34 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
config.Endpoint = "https://dynamodb." + config.Region + ".amazonaws.com"
}

logger.Info("creating s3FileDB ", "bucket", config.TableName)
dynamoOpenedDBNum++
aidan-kwon marked this conversation as resolved.
Show resolved Hide resolved

s3FileDB, err := newS3FileDB(config.Region, "https://s3."+config.Region+".amazonaws.com", config.TableName)
if err != nil {
logger.Error("Unable to create/get S3FileDB", "DB", config.TableName)
return nil, err
}

dynamoDB := &dynamoDB{
config: config,
logger: logger.NewWith("region", config.Region, "tableName", config.TableName),
quitCh: make(chan struct{}),
writeCh: make(chan *batchWriteWorkerInput, itemChanSize),
fdb: s3FileDB,
db: dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
dynamoOnceWorker.Do(func() {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
}))),
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
}
logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
aidan-kwon marked this conversation as resolved.
Show resolved Hide resolved
})

dynamoDB := &dynamoDB{
config: *config,
fdb: s3FileDB,
}
dynamoDB.config.TableName += "-" + dbBaseDirs[dbtype]
dynamoDB.logger = logger.NewWith("region", config.Region, "tableName", dynamoDB.config.TableName)

// Check if the table is ready to serve
for {
Expand All @@ -164,11 +176,7 @@ func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {

switch tableStatus {
case dynamodb.TableStatusActive:
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be charged until the DB is deleted.", "endPoint", config.Endpoint)
for i := 0; i < WorkerNum; i++ {
go dynamoDB.createBatchWriteWorker()
}
dynamoDB.logger.Info("made dynamo batch write workers", "workerNum", WorkerNum)
dynamoDB.logger.Warn("Successfully created dynamoDB table. You will be CHARGED until the DB is deleted.", "endPoint", config.Endpoint)
return dynamoDB, nil
case dynamodb.TableStatusDeleting, dynamodb.TableStatusArchiving, dynamodb.TableStatusArchived:
return nil, errors.New("failed to get DynamoDB table, table status : " + tableStatus)
Expand Down Expand Up @@ -207,7 +215,7 @@ func (dynamo *dynamoDB) createTable() error {
dynamo.logger.Warn("Billing mode is provisioned. You will be charged every hour.", "RCU", dynamo.config.ReadCapacityUnits, "WRU", dynamo.config.WriteCapacityUnits)
}

_, err := dynamo.db.CreateTable(input)
_, err := dynamoDBClient.CreateTable(input)
if err != nil {
dynamo.logger.Error("Error while creating the DynamoDB table", "err", err, "tableName", dynamo.config.TableName)
return err
Expand All @@ -217,7 +225,7 @@ func (dynamo *dynamoDB) createTable() error {
}

func (dynamo *dynamoDB) deleteTable() error {
if _, err := dynamo.db.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
if _, err := dynamoDBClient.DeleteTable(&dynamodb.DeleteTableInput{TableName: &dynamo.config.TableName}); err != nil {
dynamo.logger.Error("Error while deleting the DynamoDB table", "tableName", dynamo.config.TableName)
return err
}
Expand All @@ -235,7 +243,7 @@ func (dynamo *dynamoDB) tableStatus() (string, error) {
}

func (dynamo *dynamoDB) tableDescription() (*dynamodb.TableDescription, error) {
describe, err := dynamo.db.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
describe, err := dynamoDBClient.DescribeTable(&dynamodb.DescribeTableInput{TableName: aws.String(dynamo.config.TableName)})
if describe == nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +280,7 @@ func (dynamo *dynamoDB) Put(key []byte, val []byte) error {
Item: marshaledData,
}

_, err = dynamo.db.PutItem(params)
_, err = dynamoDBClient.PutItem(params)
if err != nil {
fmt.Printf("Put ERROR: %v\n", err.Error())
return err
Expand Down Expand Up @@ -301,7 +309,7 @@ func (dynamo *dynamoDB) Get(key []byte) ([]byte, error) {
ConsistentRead: aws.Bool(true),
}

result, err := dynamo.db.GetItem(params)
result, err := dynamoDBClient.GetItem(params)
if err != nil {
fmt.Printf("Get ERROR: %v\n", err.Error())
return nil, err
Expand Down Expand Up @@ -338,7 +346,7 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
},
}

_, err := dynamo.db.DeleteItem(params)
_, err := dynamoDBClient.DeleteItem(params)
if err != nil {
fmt.Printf("ERROR: %v\n", err.Error())
return err
Expand All @@ -347,7 +355,10 @@ func (dynamo *dynamoDB) Delete(key []byte) error {
}

func (dynamo *dynamoDB) Close() {
close(dynamo.quitCh)
dynamoOpenedDBNum--
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
if dynamoOpenedDBNum == 0 {
close(dynamoWriteCh)
}
KimKyungup marked this conversation as resolved.
Show resolved Hide resolved
}

func (dynamo *dynamoDB) Meter(prefix string) {
Expand All @@ -374,49 +385,42 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
return nil
}

func (dynamo *dynamoDB) createBatchWriteWorker() {
func createBatchWriteWorker(writeCh <-chan *batchWriteWorkerInput) {
failCount := 0
tableName := dynamo.config.TableName
batchWriteInput := &dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{},
}
dynamo.logger.Info("generate a dynamoDB batchWrite worker")

for {
select {
case <-dynamo.quitCh:
dynamo.logger.Info("close a dynamoDB batchWrite worker")
return
case batchInput := <-dynamo.writeCh:
batchWriteInput.RequestItems[tableName] = batchInput.items

BatchWriteItemOutput, err := dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
dynamo.logger.Warn("dynamoDB failed to write batch items", "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
dynamo.logger.Error("dynamoDB failed many times. sleep a second and retry",
"failCnt", failCount)
time.Sleep(time.Second)
}
}
for batchInput := range writeCh {
batchWriteInput.RequestItems[batchInput.tableName] = batchInput.items

if numUnprocessed != 0 {
dynamo.logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[tableName] = BatchWriteItemOutput.UnprocessedItems[tableName]
BatchWriteItemOutput, err := dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[batchInput.tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
failCount++
logger.Warn("dynamoDB failed to write batch items", "err", err, "failCnt", failCount)
if failCount > dynamoMaxRetry {
logger.Error("dynamoDB failed many times. sleep a second and retry",
"failCnt", failCount)
time.Sleep(time.Second)
}
}

BatchWriteItemOutput, err = dynamo.db.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
if numUnprocessed != 0 {
logger.Debug("dynamoDB batchWrite remains unprocessedItem",
"numUnprocessedItem", numUnprocessed)
batchWriteInput.RequestItems[batchInput.tableName] = BatchWriteItemOutput.UnprocessedItems[batchInput.tableName]
}

failCount = 0
batchInput.wg.Done()
BatchWriteItemOutput, err = dynamoDBClient.BatchWriteItem(batchWriteInput)
numUnprocessed = len(BatchWriteItemOutput.UnprocessedItems)
}

failCount = 0
batchInput.wg.Done()
}
logger.Debug("close a dynamoDB batchWrite worker")
}

func (dynamo *dynamoDB) NewBatch() Batch {
Expand Down Expand Up @@ -474,7 +478,7 @@ func (batch *dynamoBatch) Put(key, val []byte) error {

if len(batch.batchItems) == dynamoBatchSize {
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{batch.batchItems, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, batch.batchItems, batch.wg, batch.db.logger}
batch.Reset()
}
return nil
Expand All @@ -492,7 +496,7 @@ func (batch *dynamoBatch) Write() error {
writeRequest = batch.batchItems
}
batch.wg.Add(1)
batch.db.writeCh <- &batchWriteWorkerInput{writeRequest, batch.wg}
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, writeRequest, batch.wg, batch.db.logger}
numRemainedItems -= len(writeRequest)
}

Expand Down
6 changes: 3 additions & 3 deletions storage/database/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func testDynamoDB_Put(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +50,7 @@ func testDynamoDB_Put(t *testing.T) {
}

func testDynamoBatch_Write(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testDynamoBatch_Write(t *testing.T) {
}

func testDynamoBatch_WriteLargeData(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down
9 changes: 9 additions & 0 deletions storage/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ func (db DBType) ToValid() DBType {
return ""
}

// selfSharding returns if the db is able to shard by itself or not
func (db DBType) selfSharding() bool {
joowon-byun marked this conversation as resolved.
Show resolved Hide resolved
switch db {
case DynamoDB:
return true
}
return false
}

const IdealBatchSize = 100 * 1024

// Putter wraps the database write operation supported by both batches and regular databases.
Expand Down
2 changes: 1 addition & 1 deletion storage/database/s3filedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
}

if !exist {
localLogger.Warn("Requesting create S3 bucket. You will be charged until the bucket is deleted.")
_, err = s3DB.s3.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
Expand All @@ -85,6 +84,7 @@ func newS3FileDB(region, endpoint, bucketName string) (*s3FileDB, error) {
return nil, err
}
}
localLogger.Warn("Successfully created S3 bucket. You will be CHARGED until the bucket is deleted.")
return s3DB, nil
}

Expand Down