-
Notifications
You must be signed in to change notification settings - Fork 124
/
ScanamoFree.scala
125 lines (101 loc) · 5.9 KB
/
ScanamoFree.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.gu.scanamo
import com.amazonaws.services.dynamodbv2.model.{PutRequest, WriteRequest, _}
import com.gu.scanamo.DynamoResultStream.{QueryResultStream, ScanResultStream}
import com.gu.scanamo.error.DynamoReadError
import com.gu.scanamo.ops.ScanamoOps
import com.gu.scanamo.query._
import com.gu.scanamo.request.{ScanamoDeleteRequest, ScanamoPutRequest, ScanamoUpdateRequest}
import com.gu.scanamo.update.UpdateExpression
object ScanamoFree {
import cats.instances.list._
import cats.syntax.traverse._
import collection.JavaConverters._
private val batchSize = 25
def put[T](tableName: String)(item: T)(implicit f: DynamoFormat[T]): ScanamoOps[PutItemResult] =
ScanamoOps.put(ScanamoPutRequest(tableName, f.write(item), None))
def putAll[T](tableName: String)(items: Set[T])(implicit f: DynamoFormat[T]): ScanamoOps[List[BatchWriteItemResult]] =
items.grouped(batchSize).toList.traverseU(batch =>
ScanamoOps.batchWrite(
new BatchWriteItemRequest().withRequestItems(Map(tableName -> batch.toList.map(i =>
new WriteRequest().withPutRequest(new PutRequest().withItem(f.write(i).getM))
).asJava).asJava)
)
)
def deleteAll(tableName: String)(items: UniqueKeys[_]): ScanamoOps[List[BatchWriteItemResult]] = {
items.asAVMap.grouped(batchSize).toList.traverseU { batch =>
ScanamoOps.batchWrite(
new BatchWriteItemRequest().withRequestItems(
Map(tableName -> batch.toList
.map(item =>
new WriteRequest().withDeleteRequest(
new DeleteRequest().withKey(item.asJava)))
.asJava).asJava)
)
}
}
def get[T](tableName: String)(key: UniqueKey[_])
(implicit ft: DynamoFormat[T]): ScanamoOps[Option[Either[DynamoReadError, T]]] =
for {
res <- ScanamoOps.get(new GetItemRequest().withTableName(tableName).withKey(key.asAVMap.asJava))
} yield
Option(res.getItem).map(read[T])
def getWithConsistency[T](tableName: String)(key: UniqueKey[_])
(implicit ft: DynamoFormat[T]): ScanamoOps[Option[Either[DynamoReadError, T]]] =
for {
res <- ScanamoOps.get(new GetItemRequest().withTableName(tableName).withKey(key.asAVMap.asJava).withConsistentRead(true))
} yield
Option(res.getItem).map(read[T])
def getAll[T: DynamoFormat](tableName: String)(keys: UniqueKeys[_]): ScanamoOps[Set[Either[DynamoReadError, T]]] = {
for {
res <- ScanamoOps.batchGet(
new BatchGetItemRequest().withRequestItems(Map(tableName ->
new KeysAndAttributes().withKeys(keys.asAVMap.map(_.asJava).asJava)
).asJava)
)
} yield
res.getResponses.get(tableName).asScala.toSet.map(read[T])
}
def delete(tableName: String)(key: UniqueKey[_]): ScanamoOps[DeleteItemResult] =
ScanamoOps.delete(ScanamoDeleteRequest(tableName, key.asAVMap, None))
def scan[T: DynamoFormat](tableName: String): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName))
def scanConsistent[T: DynamoFormat](tableName: String): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withConsistentRead(true))
def scanWithLimit[T: DynamoFormat](tableName: String, limit: Int): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withLimit(limit))
def scanIndex[T: DynamoFormat](tableName: String, indexName: String): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withIndexName(indexName))
def scanIndexWithLimit[T: DynamoFormat](tableName: String, indexName: String, limit: Int): ScanamoOps[List[Either[DynamoReadError, T]]] =
ScanResultStream.stream[T](new ScanRequest().withTableName(tableName).withIndexName(indexName).withLimit(limit))
def query[T: DynamoFormat](tableName: String)(query: Query[_]): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](query(new QueryRequest().withTableName(tableName)))
def queryConsistent[T: DynamoFormat](tableName: String)(query: Query[_]): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](query(new QueryRequest().withTableName(tableName).withConsistentRead(true)))
def queryWithLimit[T: DynamoFormat](tableName: String)(query: Query[_], limit: Int): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](query(new QueryRequest().withTableName(tableName)).withLimit(limit))
def queryIndex[T: DynamoFormat](tableName: String, indexName: String)(query: Query[_]): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](query(new QueryRequest().withTableName(tableName)).withIndexName(indexName))
def queryIndexWithLimit[T: DynamoFormat](tableName: String, indexName: String)(query: Query[_], limit: Int): ScanamoOps[List[Either[DynamoReadError, T]]] =
QueryResultStream.stream[T](query(new QueryRequest().withTableName(tableName)).withIndexName(indexName).withLimit(limit))
def update[T](tableName: String)(key: UniqueKey[_])(update: UpdateExpression)(
implicit format: DynamoFormat[T]
): ScanamoOps[Either[DynamoReadError, T]] =
ScanamoOps.update(ScanamoUpdateRequest(
tableName, key.asAVMap, update.expression, update.attributeNames, update.attributeValues, None)
).map(
r => format.read(new AttributeValue().withM(r.getAttributes))
)
/**
* {{{
* prop> import collection.JavaConverters._
* prop> import com.amazonaws.services.dynamodbv2.model._
*
* prop> (m: Map[String, Int]) =>
* | ScanamoFree.read[Map[String, Int]](
* | m.mapValues(i => new AttributeValue().withN(i.toString)).asJava
* | ) == Right(m)
* }}}
*/
def read[T](m: java.util.Map[String, AttributeValue])(implicit f: DynamoFormat[T]): Either[DynamoReadError, T] =
f.read(new AttributeValue().withM(m))
}