Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Debug] DynamoDB #633

Merged
merged 7 commits into from
Aug 23, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion storage/database/db_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ func getDBEntryConfig(originalDBC *DBConfig, i DBEntryType, dbDir string) *DBCon

// Update dir to each Database specific directory.
newDBC.Dir = filepath.Join(originalDBC.Dir, dbDir)
// Update dynmao table name to Database specific name.
if newDBC.DynamoDBConfig != nil {
newDynamoDBConfig := *originalDBC.DynamoDBConfig
newDynamoDBConfig.TableName += "-" + dbDir
newDBC.DynamoDBConfig = &newDynamoDBConfig
}

return &newDBC
}
Expand Down Expand Up @@ -413,7 +419,7 @@ func newDatabase(dbc *DBConfig, entryType DBEntryType) (Database, error) {
case MemoryDB:
return NewMemDB(), nil
case DynamoDB:
return NewDynamoDB(dbc.DynamoDBConfig, entryType)
return NewDynamoDB(dbc.DynamoDBConfig)
default:
logger.Info("database type is not set, fall back to default LevelDB")
return NewLevelDB(dbc, 0)
Expand Down
27 changes: 16 additions & 11 deletions storage/database/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const WorkerNum = 10
const itemChanSize = WorkerNum * 2

var (
dynamoOnceClient sync.Once // makes sure dynamo client is created once
dynamoDBClient *dynamodb.DynamoDB // handles dynamoDB connections
dynamoOnceWorker sync.Once // makes sure worker is created once
dynamoWriteCh chan *batchWriteWorkerInput // use global write channel for shared worker
Expand Down Expand Up @@ -119,7 +120,7 @@ func GetDefaultDynamoDBConfig() *DynamoDBConfig {
}
}

func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error) {
func NewDynamoDB(config *DynamoDBConfig) (*dynamoDB, error) {
if config == nil {
return nil, nilDynamoConfigErr
}
Expand All @@ -136,11 +137,20 @@ func NewDynamoDB(config *DynamoDBConfig, dbtype DBEntryType) (*dynamoDB, error)
return nil, err
}

dynamoOnceClient.Do(func() {
joowon-byun marked this conversation as resolved.
Show resolved Hide resolved
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(config.Endpoint),
Region: aws.String(config.Region),
},
})))
})

dynamoDB := &dynamoDB{
config: *config,
fdb: s3FileDB,
}
dynamoDB.config.TableName += "-" + dbBaseDirs[dbtype]

dynamoDB.logger = logger.NewWith("region", config.Region, "tableName", dynamoDB.config.TableName)

// Check if the table is ready to serve
Expand Down Expand Up @@ -379,12 +389,6 @@ func (dynamo *dynamoDB) NewIteratorWithPrefix(prefix []byte) Iterator {
}

func createBatchWriteWorkerPool(endpoint, region string) {
dynamoDBClient = dynamodb.New(session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Endpoint: aws.String(endpoint),
Region: aws.String(region),
},
})))
dynamoWriteCh = make(chan *batchWriteWorkerInput, itemChanSize)
for i := 0; i < WorkerNum; i++ {
go createBatchWriteWorker(dynamoWriteCh)
Expand Down Expand Up @@ -485,10 +489,11 @@ func (batch *dynamoBatch) Put(key, val []byte) error {
})
batch.size += dataSize

if len(batch.batchItems) == dynamoBatchSize {
if len(batch.batchItems) >= dynamoBatchSize {
thisTimeItems := batch.batchItems[:25]
batch.batchItems = batch.batchItems[25:]
batch.wg.Add(1)
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, batch.batchItems, batch.wg}
batch.Reset()
dynamoWriteCh <- &batchWriteWorkerInput{batch.tableName, thisTimeItems, batch.wg}
aidan-kwon marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions storage/database/dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func testDynamoDB_Put(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand All @@ -50,7 +50,7 @@ func testDynamoDB_Put(t *testing.T) {
}

func testDynamoBatch_Write(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -82,7 +82,7 @@ func testDynamoBatch_Write(t *testing.T) {
}

func testDynamoBatch_WriteLargeData(t *testing.T) {
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig(), StateTrieDB)
dynamo, err := NewDynamoDB(GetDefaultDynamoDBConfig())
defer dynamo.deletedDB()
if err != nil {
t.Fatal(err)
Expand Down