Skip to content

Commit

Permalink
Added ability to wait between Lock attempts
Browse files Browse the repository at this point in the history
This should stop clients from hammering Dynamo while waiting for a lock.

Also:

* Now only attempt to release a lock 3 times
* Tidied up the tests and moved away from Testify Suite
  • Loading branch information
thomshutt committed Mar 16, 2017
1 parent 3138a51 commit c31a917
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 151 deletions.
35 changes: 22 additions & 13 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ type AWSDynamoer interface {

func (db *database) Put(name string, created int64) error {
i := map[string]*dynamodb.AttributeValue{
"Name": {
"Name": &dynamodb.AttributeValue{
S: aws.String(name),
},
"Created": {
"Created": &dynamodb.AttributeValue{
N: aws.String(strconv.FormatInt(created, 10)),
},
}

no := false
e := map[string]*dynamodb.ExpectedAttributeValue{
"Name": {
Exists: aws.Bool(false),
"Name": &dynamodb.ExpectedAttributeValue{
Exists: &no,
},
}

Expand All @@ -64,25 +65,29 @@ func (db *database) Put(name string, created int64) error {
Item: i,
Expected: e,
}

_, err := db.client.PutItem(pit)
return err
if err != nil {
return err
}

return nil
}

func (db *database) Get(name string) (*models.Item, error) {
kc := map[string]*dynamodb.Condition{
"Name": {
"Name": &dynamodb.Condition{
AttributeValueList: []*dynamodb.AttributeValue{
{
&dynamodb.AttributeValue{
S: aws.String(name),
},
},
ComparisonOperator: aws.String("EQ"),
},
}
yes := true
qi := &dynamodb.QueryInput{
TableName: aws.String(db.tableName),
ConsistentRead: aws.Bool(true),
ConsistentRead: &yes,
Select: aws.String("SPECIFIC_ATTRIBUTES"),
AttributesToGet: []*string{aws.String("Name"), aws.String("Created")},
KeyConditions: kc,
Expand All @@ -96,9 +101,9 @@ func (db *database) Get(name string) (*models.Item, error) {
// Make sure that no or 1 item is returned from DynamoDB
if qo.Count != nil {
if *qo.Count == 0 {
return nil, fmt.Errorf("No item for Name, %s", name)
return nil, errors.New(fmt.Sprintf("No item for Name, %s", name))
} else if *qo.Count > 1 {
return nil, fmt.Errorf("Expected only 1 item returned from Dynamo, got %d", *qo.Count)
return nil, errors.New(fmt.Sprintf("Expected only 1 item returned from Dynamo, got %d", *qo.Count))
}
} else {
return nil, errors.New("Count not returned")
Expand Down Expand Up @@ -130,7 +135,7 @@ func (db *database) Get(name string) (*models.Item, error) {

func (db *database) Delete(name string) error {
k := map[string]*dynamodb.AttributeValue{
"Name": {
"Name": &dynamodb.AttributeValue{
S: aws.String(name),
},
}
Expand All @@ -139,5 +144,9 @@ func (db *database) Delete(name string) error {
Key: k,
}
_, err := db.client.DeleteItem(dii)
return err
if err != nil {
return err
}

return nil
}
4 changes: 4 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func TestDBSuite(t *testing.T) {
suite.Run(t, new(DBSuite))
}

func (s *DBSuite) SetupSuite() {

}

func (s *DBSuite) SetupTest() {
s.mock = new(mocks.AWSDynamoer)
s.db = NewDatabase(DB_VALID_TABLE_NAME, DB_VALID_REGION, DB_VALID_NO_ENDPOINT, DB_VALID_DISABLE_SSL_NO)
Expand Down
50 changes: 17 additions & 33 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
package: github.com/zencoder/ddbsync
import:
- package: github.com/aws/aws-sdk-go
version: v1.7.9
version: v0.10.4
subpackages:
- aws
- internal/endpoints
- internal/protocol/json/jsonutil
- internal/protocol/jsonrpc
- internal/protocol/rest
- internal/signer/v4
- service/dynamodb
- package: github.com/stretchr/objx
version: cbeaeb16a013161a98496fad62933b1d21786672
- package: github.com/stretchr/testify
version: v1.1.4
version: c478a808a1b37e10c82f71a2172728f8742bad51
subpackages:
- assert
- mock
- require
- suite
- package: github.com/vaughan0/go-ini
version: a98ad7ee00ec53921f08832bc06ecf7fd600e6a1
23 changes: 16 additions & 7 deletions lock_service_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
package ddbsync

import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"testing"
)

const LOCK_SERVICE_VALID_MUTEX_NAME string = "mut-test"
const LOCK_SERVICE_VALID_MUTEX_TTL int64 = 4

func TestNewLock(t *testing.T) {
type LockServiceSuite struct {
suite.Suite
}

func TestLockServiceSuite(t *testing.T) {
suite.Run(t, new(LockServiceSuite))
}

func (s *LockServiceSuite) TestNewLock() {
ls := &LockService{}
m := ls.NewLock(LOCK_SERVICE_VALID_MUTEX_NAME, LOCK_SERVICE_VALID_MUTEX_TTL)

require.NotNil(t, ls)
require.NotNil(t, m)
require.IsType(t, &LockService{}, ls)
require.IsType(t, &Mutex{}, m)
require.Equal(t, &Mutex{Name: LOCK_SERVICE_VALID_MUTEX_NAME, TTL: LOCK_SERVICE_VALID_MUTEX_TTL}, m)
assert.NotNil(s.T(), ls)
assert.NotNil(s.T(), m)
assert.IsType(s.T(), &LockService{}, ls)
assert.IsType(s.T(), &Mutex{}, m)
assert.Equal(s.T(), &Mutex{Name: LOCK_SERVICE_VALID_MUTEX_NAME, TTL: LOCK_SERVICE_VALID_MUTEX_TTL}, m)
}
28 changes: 4 additions & 24 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,14 @@ import (
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
)

// A Mutex is a mutual exclusion lock.
// Mutexes can be created as part of other structures.
type Mutex struct {
Name string
TTL int64
LockReattemptWait time.Duration
db DBer
Name string
TTL int64
db DBer
}

var _ sync.Locker = (*Mutex)(nil) // Forces compile time checking of the interface
Expand All @@ -47,32 +43,16 @@ func (m *Mutex) Lock() {
if err == nil {
return
}

// Log the error if it's not one we expect to see
if awsErr, ok := err.(awserr.Error); ok {
switch awsErr.Code() {
case dynamodb.ErrCodeConditionalCheckFailedException: // Something already holds the mutex
default:
log.Printf("Lock. AWS error: %v", awsErr.Message())
}
} else {
log.Printf("Lock. Error: %v", err)
}

time.Sleep(m.LockReattemptWait)
}
}

// Unlock will delete an item in a DynamoDB table.
// If for some reason we can't (Dynamo is down / TTL of lock expired and something else deleted it) then
// we give up after a few attempts and let the TTL catch it (if it hasn't already).
func (m *Mutex) Unlock() {
for i := 0; i < 3; i++ {
for {
err := m.db.Delete(m.Name)
if err == nil {
return
}
log.Printf("Unlock. Error: %v", err)
}
}

Expand Down
Loading

0 comments on commit c31a917

Please sign in to comment.