Skip to content

Commit

Permalink
Merge pull request #14 from wolfeidau/feat_hooks
Browse files Browse the repository at this point in the history
feat(observability): added hook for parameters to enable debugging
  • Loading branch information
wolfeidau committed Aug 8, 2021
2 parents 42e7e33 + 2fb69f7 commit aae0e85
Show file tree
Hide file tree
Showing 8 changed files with 756 additions and 508 deletions.
148 changes: 73 additions & 75 deletions dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ package dynastore
import (
"context"
"fmt"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
dexp "github.com/aws/aws-sdk-go/service/dynamodb/expression"
)

type contextKey int

const (
OperationNameKey contextKey = 1 + iota

listDefaultTimeout = time.Second * 10
)

Expand Down Expand Up @@ -77,13 +79,9 @@ func (kv *KVPair) DecodeFields(out interface{}) error {
return dynamodbattribute.UnmarshalMap(kv.fields, out)
}

func isReservedField(s string) bool {
_, ok := reservedFields[s]
return ok
}

type DynaSession struct {
dynamodbiface.DynamoDBAPI
*dynamodb.DynamoDB
storeHooks *StoreHooks
}

func (ds *DynaSession) Table(tableName string) *Dynatable {
Expand Down Expand Up @@ -115,8 +113,21 @@ func New(cfgs ...*aws.Config) *DynaSession {
sess := session.Must(session.NewSession(cfgs...))
dynamoSvc := dynamodb.New(sess)

return &DynaSession{
dynamoSvc, nil,
}
}

// New construct a DynamoDB backed store with default session / service
func NewWithOptions(awscfg *aws.Config, options ...SessionOption) *DynaSession {
sessionOptions := NewSessionOptions(options...)

sess := session.Must(session.NewSession(awscfg))
dynamoSvc := dynamodb.New(sess)

return &DynaSession{
dynamoSvc,
sessionOptions.storeHooks,
}
}

Expand All @@ -137,6 +148,8 @@ func (ddb *DynaPartition) Put(key string, options ...WriteOption) error {
func (ddb *DynaPartition) PutWithContext(ctx context.Context, key string, options ...WriteOption) error {
writeOptions := NewWriteOptions(options...)

ctx = setOperationName(ctx, "Put")

update, err := buildUpdate(writeOptions)
if err != nil {
return fmt.Errorf("failed to build update: %w", err)
Expand All @@ -147,14 +160,18 @@ func (ddb *DynaPartition) PutWithContext(ctx context.Context, key string, option
return fmt.Errorf("failed to build update expression: %w", err)
}

_, err = ddb.session.UpdateItemWithContext(ctx, &dynamodb.UpdateItemInput{
updateItem := &dynamodb.UpdateItemInput{
TableName: aws.String(ddb.GetTableName()),
Key: buildKeys(ddb.partition, key),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ReturnValues: aws.String(dynamodb.ReturnValueAllNew),
})
}

ctx = ddb.session.storeHooks.RequestBuilt(ctx, updateItem)

_, err = ddb.session.UpdateItemWithContext(ctx, updateItem)
if err != nil {
return fmt.Errorf("failed to update item: %w", err)
}
Expand All @@ -171,12 +188,17 @@ func (ddb *DynaPartition) Exists(key string, options ...ReadOption) (bool, error
func (ddb *DynaPartition) ExistsWithContext(ctx context.Context, key string, options ...ReadOption) (bool, error) {
readOptions := NewReadOptions(options...)

res, err := ddb.session.GetItemWithContext(ctx, &dynamodb.GetItemInput{
ctx = setOperationName(ctx, "Exists")

getItem := &dynamodb.GetItemInput{
TableName: aws.String(ddb.GetTableName()),
Key: buildKeys(ddb.partition, key),
ConsistentRead: aws.Bool(readOptions.consistent),
})
}

ctx = ddb.session.storeHooks.RequestBuilt(ctx, getItem)

res, err := ddb.session.GetItemWithContext(ctx, getItem)
if err != nil {
return false, fmt.Errorf("failed to get item: %w", err)
}
Expand All @@ -202,6 +224,8 @@ func (ddb *DynaPartition) Get(key string, options ...ReadOption) (*KVPair, error
func (ddb *DynaPartition) GetWithContext(ctx context.Context, key string, options ...ReadOption) (*KVPair, error) {
readOptions := NewReadOptions(options...)

ctx = setOperationName(ctx, "Get")

res, err := ddb.getKey(ctx, key, readOptions)
if err != nil {
return nil, fmt.Errorf("failed to get by key: %w", err)
Expand Down Expand Up @@ -230,10 +254,16 @@ func (ddb *DynaPartition) Delete(key string) error {

// Delete the value at the specified key
func (ddb *DynaPartition) DeleteWithContext(ctx context.Context, key string) error {
_, err := ddb.session.DeleteItemWithContext(ctx, &dynamodb.DeleteItemInput{
ctx = setOperationName(ctx, "Delete")

deleteItem := &dynamodb.DeleteItemInput{
TableName: aws.String(ddb.GetTableName()),
Key: buildKeys(ddb.partition, key),
})
}

ctx = ddb.session.storeHooks.RequestBuilt(ctx, deleteItem)

_, err := ddb.session.DeleteItemWithContext(ctx, deleteItem)
if err != nil {
return fmt.Errorf("failed to delete item: %w", err)
}
Expand All @@ -250,6 +280,8 @@ func (ddb *DynaPartition) ListPage(prefix string, options ...ReadOption) (*KVPai
func (ddb *DynaPartition) ListPageWithContext(ctx context.Context, prefix string, options ...ReadOption) (*KVPairPage, error) {
readOptions := NewReadOptions(options...)

ctx = setOperationName(ctx, "ListPage")

rangeKey := "name"

if readOptions.index != nil {
Expand All @@ -267,7 +299,7 @@ func (ddb *DynaPartition) ListPageWithContext(ctx context.Context, prefix string
return nil, fmt.Errorf("failed to build exp: %w", err)
}

si := &dynamodb.QueryInput{
query := &dynamodb.QueryInput{
TableName: aws.String(ddb.GetTableName()),
KeyConditionExpression: expr.KeyCondition(),
ExpressionAttributeNames: expr.Names(),
Expand All @@ -277,7 +309,7 @@ func (ddb *DynaPartition) ListPageWithContext(ctx context.Context, prefix string
}

if readOptions.index != nil {
si.IndexName = aws.String(readOptions.index.name)
query.IndexName = aws.String(readOptions.index.name)
}

var decodedKey map[string]*dynamodb.AttributeValue
Expand All @@ -289,10 +321,12 @@ func (ddb *DynaPartition) ListPageWithContext(ctx context.Context, prefix string
return nil, fmt.Errorf("failed to decompress key: %w", err)
}

si.ExclusiveStartKey = decodedKey
query.ExclusiveStartKey = decodedKey
}

res, err := ddb.session.QueryWithContext(ctx, si)
ctx = ddb.session.storeHooks.RequestBuilt(ctx, query)

res, err := ddb.session.QueryWithContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to run query: %w", err)
}
Expand Down Expand Up @@ -337,7 +371,9 @@ func (ddb *DynaPartition) List(prefix string, options ...ReadOption) ([]*KVPair,
func (ddb *DynaPartition) ListWithContext(ctx context.Context, prefix string, options ...ReadOption) ([]*KVPair, error) {
readOptions := NewReadOptions(options...)

si := &dynamodb.QueryInput{
ctx = setOperationName(ctx, "List")

query := &dynamodb.QueryInput{
TableName: aws.String(ddb.GetTableName()),
KeyConditionExpression: aws.String("#id = :partition AND begins_with(#name, :namePrefix)"),
ExpressionAttributeNames: map[string]*string{
Expand All @@ -355,7 +391,9 @@ func (ddb *DynaPartition) ListWithContext(ctx context.Context, prefix string, op

var items []map[string]*dynamodb.AttributeValue

err := ddb.session.QueryPagesWithContext(ctx, si,
ctx = ddb.session.storeHooks.RequestBuilt(ctx, query)

err := ddb.session.QueryPagesWithContext(ctx, query,
func(page *dynamodb.QueryOutput, lastPage bool) bool {
items = append(items, page.Items...)

Expand Down Expand Up @@ -414,16 +452,19 @@ func (ddb *DynaPartition) AtomicPutWithContext(ctx context.Context, key string,
return false, nil, fmt.Errorf("failed to build update expression: %w", err)
}

res, err := ddb.session.UpdateItemWithContext(ctx, &dynamodb.UpdateItemInput{
updateItem := &dynamodb.UpdateItemInput{
TableName: aws.String(ddb.GetTableName()),
Key: buildKeys(ddb.partition, key),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
UpdateExpression: expr.Update(),
ConditionExpression: expr.Condition(),
ReturnValues: aws.String(dynamodb.ReturnValueAllNew),
})
}

ctx = ddb.session.storeHooks.RequestBuilt(setOperationName(ctx, "AtomicPut"), updateItem)

res, err := ddb.session.UpdateItemWithContext(ctx, updateItem)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == dynamodb.ErrCodeConditionalCheckFailedException {
Expand Down Expand Up @@ -462,6 +503,8 @@ func (ddb *DynaPartition) AtomicDelete(key string, previous *KVPair) (bool, erro
//
// FIXME: should the second case just return false, nil?
func (ddb *DynaPartition) AtomicDeleteWithContext(ctx context.Context, key string, previous *KVPair) (bool, error) {
ctx = setOperationName(ctx, "AtomicDelete")

getRes, err := ddb.getKey(ctx, key, NewReadOptions())
if err != nil {
return false, err
Expand All @@ -486,6 +529,8 @@ func (ddb *DynaPartition) AtomicDeleteWithContext(ctx context.Context, key strin
ExpressionAttributeValues: expr.Values(),
}

ctx = ddb.session.storeHooks.RequestBuilt(ctx, req)

_, err = ddb.session.DeleteItemWithContext(ctx, req)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand All @@ -500,14 +545,18 @@ func (ddb *DynaPartition) AtomicDeleteWithContext(ctx context.Context, key strin
}

func (ddb *DynaPartition) getKey(ctx context.Context, key string, options *ReadOptions) (*dynamodb.GetItemOutput, error) {
return ddb.session.GetItemWithContext(ctx, &dynamodb.GetItemInput{
getItem := &dynamodb.GetItemInput{
TableName: aws.String(ddb.GetTableName()),
ConsistentRead: aws.Bool(options.consistent),
Key: map[string]*dynamodb.AttributeValue{
"id": {S: aws.String(ddb.partition)},
"name": {S: aws.String(key)},
},
})
}

ctx = ddb.session.storeHooks.RequestBuilt(ctx, getItem)

return ddb.session.GetItemWithContext(ctx, getItem)
}

func buildUpdate(options *WriteOptions) (dexp.UpdateBuilder, error) {
Expand Down Expand Up @@ -581,54 +630,3 @@ func updateWithConditions(previous *KVPair) dexp.ConditionBuilder {

return dexp.Or(checkExists, checkExpires)
}

// DecodeItem decode a DDB attribute value into a KVPair
func DecodeItem(item map[string]*dynamodb.AttributeValue) (*KVPair, error) {
kv := &KVPair{}

err := dynamodbattribute.UnmarshalMap(item, kv)
if err != nil {
return nil, err
}

if val, ok := item["payload"]; ok {
kv.value = val
}

kv.fields = make(map[string]*dynamodb.AttributeValue)

for k, v := range item {
if !isReservedField(k) {
kv.fields[k] = v
}
}

return kv, nil
}

func isItemExpired(item map[string]*dynamodb.AttributeValue) bool {
var ttl int64

if v, ok := item["expires"]; ok {
ttl, _ = strconv.ParseInt(aws.StringValue(v.N), 10, 64)
return time.Unix(ttl, 0).Before(time.Now())
}

return false
}

// MarshalStruct this helper method marshals a struct into an *dynamodb.AttributeValue which contains a map
// in the format required to provide to WriteWithAttributeValue.
func MarshalStruct(in interface{}) (*dynamodb.AttributeValue, error) {
item, err := dynamodbattribute.MarshalMap(in)
if err != nil {
return nil, err
}

return &dynamodb.AttributeValue{M: item}, nil
}

// UnmarshalStruct this helper method un-marshals a struct from an *dynamodb.AttributeValue returned by KVPair.AttributeValue.
func UnmarshalStruct(val *dynamodb.AttributeValue, out interface{}) error {
return dynamodbattribute.UnmarshalMap(val.M, out)
}
Loading

0 comments on commit aae0e85

Please sign in to comment.