Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Debug] DynamoDB #633

Merged
merged 7 commits into from
Aug 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion storage/database/db_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ func getDBEntryConfig(originalDBC *DBConfig, i DBEntryType, dbDir string) *DBCon

// Update dir to each Database specific directory.
newDBC.Dir = filepath.Join(originalDBC.Dir, dbDir)
// Update dynmao table name to Database specific name.
if newDBC.DynamoDBConfig != nil {
newDynamoDBConfig := *originalDBC.DynamoDBConfig
newDynamoDBConfig.TableName += "-" + dbDir
newDBC.DynamoDBConfig = &newDynamoDBConfig
}

return &newDBC
}
Expand Down Expand Up @@ -413,7 +419,7 @@ func newDatabase(dbc *DBConfig, entryType DBEntryType) (Database, error) {
case MemoryDB:
return NewMemDB(), nil
case DynamoDB:
return NewDynamoDB(dbc.DynamoDBConfig, entryType)
return NewDynamoDB(dbc.DynamoDBConfig)
default:
logger.Info("database type is not set, fall back to default LevelDB")
return NewLevelDB(dbc, 0)
Expand Down
28 changes: 20 additions & 8 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func GetDefaultDynamoDBConfig() *DynamoDBConfig {
}
}

func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error) {
func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
if config == nil {
return nil, nilDynamoConfigErr
}
Expand All @@ -136,11 +136,20 @@ func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error)
return nil, err
}

if dynamoDBClient == nil {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
})))
}

dynamoDB := &dynamoDB{
config: *config,
fdb: s3FileDB,
}
dynamoDB.config.TableName += "-" + dbBaseDirs[dbtype]

dynamoDB.logger = logger.NewWith("region", config.Region, "tableName", dynamoDB.config.TableName)

// Check if the table is ready to serve
Expand Down Expand Up @@ -379,12 +388,6 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
}

func createBatchWriteWorkerPool(endpoint, region string) {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(endpoint),
Region: aws.String(region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
Expand All @@ -406,6 +409,14 @@ func createBatchWriteWorker(writeCh <-chan *batchWriteWorkerInput) {
numUnprocessed := len(BatchWriteItemOutput.UnprocessedItems[batchInput.tableName])
for err != nil || numUnprocessed != 0 {
if err != nil {
// ValidationException occurs when a required parameter is missing, a value is out of range,
// or data types mismatch and so on. If this is the case, check if there is a duplicated key,
// batch length out of range, null value and so on.
// When ValidationException occurs, retrying won't fix the problem.
if strings.Contains(err.Error(), "ValidationException") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this error happen?

  • If we remove the dynamoDB table, it can be happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It happens when there is a input errror such as duplicated input in batch, 0 or more than 25 items in batch, empty input in item and so on.
When we remove teh dynamoDB table, ResourceNotFoundException will occur. But this error can also occur when the table is being created or archived. I don't think it is a good idea to crit on ResourceNotFoundException because it can be solved when time passes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is duplicated input? Is it same key/value item with another one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if there are two items with the same key, it is an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I got it your intention.
Please write some comments about the case of the error and why this crit is needed.

logger.Crit("Invalid input for dynamoDB BatchWrite",
"err", err, "tableName", batchInput.tableName, "itemNum", len(batchInput.items))
}
failCount++
logger.Warn("dynamoDB failed to write batch items",
"tableName", batchInput.tableName, "err", err, "failCnt", failCount)
Expand Down Expand Up @@ -444,6 +455,7 @@ type dynamoBatch struct {
wg *sync.WaitGroup
}

// TODO-klaytn need to check for duplicated keys in batch
func (batch *dynamoBatch) Put(key, val []byte) error {
data := DynamoData{Key: key, Val: val}
dataSize := len(val)
Expand Down
6 changes: 3 additions & 3 deletions storage/database/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func testDynamoDB_Put(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +50,7 @@ func testDynamoDB_Put(t *testing.T) {
}

func testDynamoBatch_Write(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testDynamoBatch_Write(t *testing.T) {
}

func testDynamoBatch_WriteLargeData(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down