Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/scanamo/scanamo into rk-a…
Browse files Browse the repository at this point in the history
…kka-stream-1.0.1
  • Loading branch information
Regis Kuckaertz committed May 21, 2019
2 parents 2e7f637 + 6bbe0ff commit 7fb50ea
Show file tree
Hide file tree
Showing 31 changed files with 824 additions and 750 deletions.
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
version = "2.0.0-RC4"
maxColumn = 120
align = most
align.openParenCallSite = false
Expand Down
25 changes: 17 additions & 8 deletions alpakka/src/main/scala/org/scanamo/ScanamoAlpakka.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package org.scanamo

import org.scanamo.ops.{AlpakkaInterpreter, ScanamoOps}
import org.scanamo.ops.retrypolicy.RetryPolicy
import akka.stream.scaladsl.Source
import akka.NotUsed
import akka.stream.alpakka.dynamodb.DynamoClient
import akka.stream.scaladsl.Source
import cats.Monad
import org.scanamo.ops.{AlpakkaInterpreter, ScanamoOps}
import org.scanamo.ops.retrypolicy.RetryPolicy

/**
* Provides the same interface as [[org.scanamo.Scanamo]], except that it requires an
* [[https://github.com/akka/alpakka Alpakka]] client,
* and an implicit [[scala.concurrent.ExecutionContext]] and returns a [[scala.concurrent.Future]]
*/
class ScanamoAlpakka private (client: DynamoClient, retrySettings: RetryPolicy) {
import ScanamoAlpakka._

private final val interpreter = new AlpakkaInterpreter(client, retrySettings)

def exec[A](op: ScanamoOps[A]): AlpakkaInterpreter.Alpakka[A] =
op.foldMap(interpreter)
}

object ScanamoAlpakka extends AlpakkaInstances {
def exec[A](
client: DynamoClient
)(op: ScanamoOps[A], retrySettings: RetryPolicy = RetryPolicy.Max(numberOfRetries = 3)): Source[A, NotUsed] =
op.foldMap(AlpakkaInterpreter.future(client, retrySettings))
def apply(
client: DynamoClient,
retrySettings: RetryPolicy = RetryPolicy.Max(numberOfRetries = 3)
): ScanamoAlpakka = new ScanamoAlpakka(client, retrySettings)
}

private[scanamo] trait AlpakkaInstances {
Expand All @@ -30,4 +39,4 @@ private[scanamo] trait AlpakkaInstances {
case Right(b) => Source.single(b)
}
}
}
}
67 changes: 37 additions & 30 deletions alpakka/src/main/scala/org/scanamo/ops/AlpakkaInterpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,44 @@ import akka.stream.alpakka.dynamodb.scaladsl.DynamoDb
import akka.stream.scaladsl.Source
import akka.NotUsed

object AlpakkaInterpreter extends WithRetry {
private[scanamo] class AlpakkaInterpreter(client: DynamoClient, retryPolicy: RetryPolicy)
extends (ScanamoOpsA ~> AlpakkaInterpreter.Alpakka)
with WithRetry {

type Alpakka[A] = Source[A, NotUsed]

def future(client: DynamoClient, retryPolicy: RetryPolicy): ScanamoOpsA ~> Alpakka =
new (ScanamoOpsA ~> Alpakka) {
private final def run(op: AwsOp): Alpakka[op.B] =
retry(DynamoDb.source(op).withAttributes(DynamoAttributes.client(client)), retryPolicy)
private final def run(op: AwsOp): AlpakkaInterpreter.Alpakka[op.B] =
retry(DynamoDb.source(op).withAttributes(DynamoAttributes.client(client)), retryPolicy)

override def apply[A](ops: ScanamoOpsA[A]) =
ops match {
case Put(req) => run(JavaRequests.put(req))
case Get(req) => run(req)
case Delete(req) => run(JavaRequests.delete(req))
case Scan(req) => run(AwsPagedOp.create(JavaRequests.scan(req)))
case Query(req) => run(AwsPagedOp.create(JavaRequests.query(req)))
case Update(req) => run(JavaRequests.update(req))
case BatchWrite(req) => run(req)
case BatchGet(req) => run(req)
case ConditionalDelete(req) =>
run(JavaRequests.delete(req)).map(Either.right[ConditionalCheckFailedException, DeleteItemResult]).recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
case ConditionalPut(req) =>
run(JavaRequests.put(req)).map(Either.right[ConditionalCheckFailedException, PutItemResult]).recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
case ConditionalUpdate(req) =>
run(JavaRequests.update(req)).map(Either.right[ConditionalCheckFailedException, UpdateItemResult]).recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
}
def apply[A](ops: ScanamoOpsA[A]) =
ops match {
case Put(req) => run(JavaRequests.put(req))
case Get(req) => run(req)
case Delete(req) => run(JavaRequests.delete(req))
case Scan(req) => run(AwsPagedOp.create(JavaRequests.scan(req)))
case Query(req) => run(AwsPagedOp.create(JavaRequests.query(req)))
case Update(req) => run(JavaRequests.update(req))
case BatchWrite(req) => run(req)
case BatchGet(req) => run(req)
case ConditionalDelete(req) =>
run(JavaRequests.delete(req))
.map(Either.right[ConditionalCheckFailedException, DeleteItemResult])
.recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
case ConditionalPut(req) =>
run(JavaRequests.put(req))
.map(Either.right[ConditionalCheckFailedException, PutItemResult])
.recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
case ConditionalUpdate(req) =>
run(JavaRequests.update(req))
.map(Either.right[ConditionalCheckFailedException, UpdateItemResult])
.recover {
case e: ConditionalCheckFailedException => Either.left(e)
}
}
}

object AlpakkaInterpreter {
type Alpakka[A] = Source[A, NotUsed]
}
Loading

0 comments on commit 7fb50ea

Please sign in to comment.