Skip to content
Permalink
Browse files

Merge f44462c into 2cb8dff

  • Loading branch information...
amirkarimi committed Oct 4, 2017
2 parents 2cb8dff + f44462c commit c483b7725ed01457b46c636525a5d33850219d36
@@ -1,7 +1,7 @@
package com.gu.scanamo

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult}
import com.amazonaws.services.dynamodbv2.model.{BatchWriteItemResult, DeleteItemResult, PutItemResult, ReturnValue}
import com.gu.scanamo.error.DynamoReadError
import com.gu.scanamo.ops.{ScanamoInterpreters, ScanamoOps}
import com.gu.scanamo.query.{Query, UniqueKey, UniqueKeys}
@@ -21,9 +21,9 @@ object ScanamoAsync {
def exec[A](client: AmazonDynamoDBAsync)(op: ScanamoOps[A])(implicit ec: ExecutionContext) =
op.foldMap(ScanamoInterpreters.future(client)(ec))

def put[T: DynamoFormat](client: AmazonDynamoDBAsync)(tableName: String)(item: T)
def put[T: DynamoFormat](client: AmazonDynamoDBAsync)(tableName: String)(item: T, returnValue: Option[ReturnValue] = None)
(implicit ec: ExecutionContext): Future[PutItemResult] =
exec(client)(ScanamoFree.put(tableName)(item))
exec(client)(ScanamoFree.put(tableName)(item, returnValue))

def putAll[T: DynamoFormat](client: AmazonDynamoDBAsync)(tableName: String)(items: Set[T])
(implicit ec: ExecutionContext): Future[List[BatchWriteItemResult]] =
@@ -41,18 +41,18 @@ object ScanamoAsync {
(implicit ec: ExecutionContext): Future[Set[Either[DynamoReadError, T]]] =
exec(client)(ScanamoFree.getAll[T](tableName)(keys))

def delete[T](client: AmazonDynamoDBAsync)(tableName: String)(key: UniqueKey[_])
def delete[T](client: AmazonDynamoDBAsync)(tableName: String)(key: UniqueKey[_], returnValue: Option[ReturnValue] = None)
(implicit ec: ExecutionContext): Future[DeleteItemResult] =
exec(client)(ScanamoFree.delete(tableName)(key))
exec(client)(ScanamoFree.delete(tableName)(key, returnValue))

def deleteAll(client: AmazonDynamoDBAsync)(tableName: String)(items: UniqueKeys[_])
(implicit ec: ExecutionContext): Future[List[BatchWriteItemResult]] =
exec(client)(ScanamoFree.deleteAll(tableName)(items))

def update[V: DynamoFormat](client: AmazonDynamoDBAsync)(tableName: String)(
key: UniqueKey[_], expression: UpdateExpression)(implicit ec: ExecutionContext
): Future[Either[DynamoReadError,V]] =
exec(client)(ScanamoFree.update[V](tableName)(key)(expression))
key: UniqueKey[_], expression: UpdateExpression, returnValue: Option[ReturnValue] = None)
(implicit ec: ExecutionContext): Future[Either[DynamoReadError,V]] =
exec(client)(ScanamoFree.update[V](tableName)(key, returnValue)(expression))

def scan[T: DynamoFormat](client: AmazonDynamoDBAsync)(tableName: String)
(implicit ec: ExecutionContext): Future[List[Either[DynamoReadError, T]]] =
@@ -16,8 +16,8 @@ object ScanamoFree {

private val batchSize = 25

def put[T](tableName: String)(item: T)(implicit f: DynamoFormat[T]): ScanamoOps[PutItemResult] =
ScanamoOps.put(ScanamoPutRequest(tableName, f.write(item), None))
def put[T](tableName: String)(item: T, returnValue: Option[ReturnValue] = None)(implicit f: DynamoFormat[T]): ScanamoOps[PutItemResult] =
ScanamoOps.put(ScanamoPutRequest(tableName, f.write(item), None, returnValue))

def putAll[T](tableName: String)(items: Set[T])(implicit f: DynamoFormat[T]): ScanamoOps[List[BatchWriteItemResult]] =
items.grouped(batchSize).toList.traverse(batch =>
@@ -65,8 +65,8 @@ object ScanamoFree {
}.map(_.flatMap(_.getResponses.get(tableName).asScala.toSet.map(read[T])).toSet)
}

def delete(tableName: String)(key: UniqueKey[_]): ScanamoOps[DeleteItemResult] =
ScanamoOps.delete(ScanamoDeleteRequest(tableName, key.asAVMap, None))
def delete(tableName: String)(key: UniqueKey[_], returnValue: Option[ReturnValue] = None): ScanamoOps[DeleteItemResult] =
ScanamoOps.delete(ScanamoDeleteRequest(tableName, key.asAVMap, None, returnValue))

def scan[T: DynamoFormat](tableName: String): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](ScanamoScanRequest(tableName, None, ScanamoQueryOptions.default))
@@ -98,11 +98,11 @@ object ScanamoFree {
def queryIndexWithLimit[T: DynamoFormat](tableName: String, indexName: String)(query: Query[_], limit: Int): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](ScanamoQueryRequest(tableName, Some(indexName), query, ScanamoQueryOptions.default.copy(limit = Some(limit))))

def update[T](tableName: String)(key: UniqueKey[_])(update: UpdateExpression)(
def update[T](tableName: String)(key: UniqueKey[_], returnValue: Option[ReturnValue] = None)(update: UpdateExpression)(
implicit format: DynamoFormat[T]
): ScanamoOps[Either[DynamoReadError, T]] =
ScanamoOps.update(ScanamoUpdateRequest(
tableName, key.asAVMap, update.expression, update.attributeNames, update.attributeValues, None)
tableName, key.asAVMap, update.expression, update.attributeNames, update.attributeValues, None, returnValue)
).map(
r => format.read(new AttributeValue().withM(r.getAttributes))
)
@@ -158,7 +158,10 @@ private[ops] object JavaRequests {

def put(req: ScanamoPutRequest): PutItemRequest =
req.condition.foldLeft(
new PutItemRequest().withTableName(req.tableName).withItem(req.item.getM)
new PutItemRequest()
.withTableName(req.tableName)
.withItem(req.item.getM)
.withReturnValues(req.returnValue.getOrElse(ReturnValue.NONE))
)((r, c) =>
c.attributeValues.foldLeft(
r.withConditionExpression(c.expression).withExpressionAttributeNames(c.attributeNames.asJava)
@@ -167,7 +170,10 @@ private[ops] object JavaRequests {

def delete(req: ScanamoDeleteRequest): DeleteItemRequest =
req.condition.foldLeft(
new DeleteItemRequest().withTableName(req.tableName).withKey(req.key.asJava)
new DeleteItemRequest()
.withTableName(req.tableName)
.withKey(req.key.asJava)
.withReturnValues(req.returnValue.getOrElse(ReturnValue.NONE))
)((r, c) =>
c.attributeValues.foldLeft(
r.withConditionExpression(c.expression).withExpressionAttributeNames(c.attributeNames.asJava)
@@ -179,7 +185,7 @@ private[ops] object JavaRequests {
new UpdateItemRequest().withTableName(req.tableName).withKey(req.key.asJava)
.withUpdateExpression(req.updateExpression)
.withExpressionAttributeNames(req.attributeNames.asJava)
.withReturnValues(ReturnValue.ALL_NEW)
.withReturnValues(req.returnValue.getOrElse(ReturnValue.ALL_NEW))
)((r, c) =>
c.attributeValues.foldLeft(
r.withConditionExpression(c.expression).withExpressionAttributeNames(
@@ -11,23 +11,23 @@ import cats.syntax.either._

case class ConditionalOperation[V, T](tableName: String, t: T)(
implicit state: ConditionExpression[T], format: DynamoFormat[V]) {
def put(item: V): ScanamoOps[Either[ConditionalCheckFailedException, PutItemResult]] = {
val unconditionalRequest = ScanamoPutRequest(tableName, format.write(item), None)
def put(item: V, returnValue: Option[ReturnValue] = None): ScanamoOps[Either[ConditionalCheckFailedException, PutItemResult]] = {
val unconditionalRequest = ScanamoPutRequest(tableName, format.write(item), None, returnValue)
ScanamoOps.conditionalPut(unconditionalRequest.copy(
condition = Some(state.apply(t)(unconditionalRequest.condition))))
}

def delete(key: UniqueKey[_]): ScanamoOps[Either[ConditionalCheckFailedException, DeleteItemResult]] = {
val unconditionalRequest = ScanamoDeleteRequest(tableName = tableName, key = key.asAVMap, None)
def delete(key: UniqueKey[_], returnValue: Option[ReturnValue] = None): ScanamoOps[Either[ConditionalCheckFailedException, DeleteItemResult]] = {
val unconditionalRequest = ScanamoDeleteRequest(tableName = tableName, key = key.asAVMap, None, returnValue)
ScanamoOps.conditionalDelete(unconditionalRequest.copy(
condition = Some(state.apply(t)(unconditionalRequest.condition))))
}

def update(key: UniqueKey[_], update: UpdateExpression):
def update(key: UniqueKey[_], update: UpdateExpression, returnValue: Option[ReturnValue] = None):
ScanamoOps[Either[ScanamoError, V]] = {

val unconditionalRequest = ScanamoUpdateRequest(
tableName, key.asAVMap, update.expression, update.attributeNames, update.attributeValues, None)
tableName, key.asAVMap, update.expression, update.attributeNames, update.attributeValues, None, returnValue)
ScanamoOps.conditionalUpdate(unconditionalRequest.copy(
condition = Some(state.apply(t)(unconditionalRequest.condition)))
).map(either => either.leftMap[ScanamoError](ConditionNotMet(_)).flatMap(
@@ -1,18 +1,20 @@
package com.gu.scanamo.request

import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ReturnValue}
import com.gu.scanamo.query.{Condition, Query}

case class ScanamoPutRequest(
tableName: String,
item: AttributeValue,
condition: Option[RequestCondition]
condition: Option[RequestCondition],
returnValue: Option[ReturnValue]
)

case class ScanamoDeleteRequest(
tableName: String,
key: Map[String, AttributeValue],
condition: Option[RequestCondition]
condition: Option[RequestCondition],
returnValue: Option[ReturnValue]
)

case class ScanamoUpdateRequest(
@@ -21,7 +23,8 @@ case class ScanamoUpdateRequest(
updateExpression: String,
attributeNames: Map[String, String],
attributeValues: Map[String, AttributeValue],
condition: Option[RequestCondition]
condition: Option[RequestCondition],
returnValue: Option[ReturnValue]
)

case class ScanamoScanRequest(
@@ -1,13 +1,14 @@
package com.gu.scanamo

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ReturnValue}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.{EitherValues, FunSpec, Matchers}
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType._
import com.gu.scanamo.query._

class ScanamoAsyncTest extends FunSpec with Matchers with ScalaFutures {
class ScanamoAsyncTest extends FunSpec with Matchers with ScalaFutures with EitherValues {
implicit val defaultPatience =
PatienceConfig(timeout = Span(2, Seconds), interval = Span(15, Millis))

@@ -29,6 +30,29 @@ class ScanamoAsyncTest extends FunSpec with Matchers with ScalaFutures {
}
}

it("should put asynchronously and return values correctly") {
LocalDynamoDB.usingTable(client)("asyncFarmers")('name -> S) {
case class Farm(asyncAnimals: List[String])
case class Farmer(name: String, age: Long, farm: Farm)

val originalItem = Farmer("McDonald", 156L, Farm(List("sheep", "cow")))
val updatedItem = Farmer("McDonald", 100L, Farm(List("sheep", "pig")))

val resultF = for {
_ <- ScanamoAsync.put(client)("asyncFarmers")(originalItem)
result <- ScanamoAsync.put(client)("asyncFarmers")(
updatedItem,
Some(ReturnValue.ALL_OLD)
)
} yield result

// Parse the returned result
val format = implicitly[DynamoFormat[Farmer]]
val parsedResult = format.read(new AttributeValue().withM(resultF.futureValue.getAttributes))
parsedResult.right.value should equal(originalItem)
}
}

it("should get asynchronously") {
LocalDynamoDB.usingTable(client)("asyncFarmers")('name -> S) {
case class Farm(asyncAnimals: List[String])

0 comments on commit c483b77

Please sign in to comment.
You can’t perform that action at this time.