Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#259: Added function to allow processing messages at least once #692

Open
wants to merge 4 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions zio-sqs/src/main/scala/zio/sqs/SqsStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package zio.sqs

import zio.aws.sqs._
import zio.aws.sqs.model._
import zio.{ RIO, ZIO }
import zio.stream.ZStream
import zio._
import zio.stream._
import zio.aws.sqs.model.primitives.MessageAttributeName

object SqsStream {
Expand Down Expand Up @@ -36,4 +36,80 @@ object SqsStream {

def deleteMessage(queueUrl: String, msg: Message.ReadOnly): RIO[Sqs, Unit] =
zio.aws.sqs.Sqs.deleteMessage(DeleteMessageRequest(queueUrl, msg.receiptHandle.getOrElse(""))).mapError(_.toThrowable)

/**
* Wraps the messages in a scoped ZIO that will delete the message from the queue only if the ZIO succeeds.
* WARNING: Do NOT do .mapZIO(identity) unless the stream is short-lived, as this will only delete the messages when the stream is closed.
* Lower level api for more advanced use cases, e.g.:
* - batch-process messages, deleting all messages when the batch succeeds: .rechunk(1000).chunks.map(chunk => ZIO.foreach(chunk)(identity).parallelFinalizers)
* - expose parsed messages to your users: .map(_.flatMap(_.getBody).flatMap(parseBody))
*
* Use `processMessages` for the common case.
*/
def deletingOnSuccess(
url: String,
settings: SqsStreamSettings = SqsStreamSettings(),
deleteRetrySchedule: Schedule[Any, Any, Any] = defaultDeleteRetryPolicy
): ZStream[Sqs, Throwable, ZIO[Scope, Nothing, Message.ReadOnly]] =
SqsStream(url, settings).mapZIO { msg: Message.ReadOnly =>
ZIO.serviceWith[Sqs] { sqs =>
ZIO.succeedNow(msg).withFinalizerExit { (msg, exit) =>
exit match {
case Exit.Success(_) =>
SqsStream
.deleteMessage(url, msg)
.provideEnvironment(ZEnvironment(sqs))
// Retry in case it fails to delete transiently
.retry(deleteRetrySchedule)
// Die if it fails and stop the stream, to avoid reprocessing validly processed messages
// This would be very rare and caught early in testing.
.orDie
case Exit.Failure(_) =>
ZIO.unit
}
}
}
}

/**
* Processes SQS messages from the queue using a given function and
* deletes the messages from the queue only if they processed successfully.
* This ensures that no message is lost in case of an error.
*
* If the message fails to be processed (=error or defect),
* it will reappear in the queue after the `visibilityTimeout` expires.
* If a DLQ is attached to the queue, the message will move into the DLQ after a set number of retries.
*
* Returns a stream original message and the outcome of the processing, to allow attaching logging or metrics.
* The returned Stream is also convenient for testing:
* exposing what was processed and allowing to stop the stream on some ocndition.
*
* @param url the URL of the SQS queue
* @param settings the settings for the stream of messages from the queue
* @param parallelism how many messages are processed in parallel
* @param deleteRetrySchedule the schedule by which to retry deleting the message (which should be rare)
* @param process the function that takes an ObjectSummary and returns a ZIO effect
* @tparam E the error type of the ZIO effect
* @tparam A the value type of the ZIO effect
* @return a ZStream that can either fail with a Throwable or emit values of type Either[(Message.ReadOnly, Cause[E]), A]
*/
def processMessages[E, A](
url: String,
settings: SqsStreamSettings,
parallelism: Int = 1,
deleteRetrySchedule: Schedule[Any, Any, Any] = defaultDeleteRetryPolicy
)(process: Message.ReadOnly => ZIO[Any, E, A]): ZStream[Sqs, Throwable, Either[Cause[E], A]] =
zio.sqs.SqsStream
.deletingOnSuccess(url, settings, deleteRetrySchedule)
.mapZIOPar(parallelism) { msg =>
ZIO
.scoped[Any] {
msg.flatMap(process)
}
.sandbox // Capture any errors and defects
.either // Move the error to the return value, so that it doesn't stop the stream
}

private val defaultDeleteRetryPolicy: Schedule[Any, Any, Any] =
Schedule.exponential(10.milliseconds) && Schedule.upTo(30.seconds)
}
39 changes: 39 additions & 0 deletions zio-sqs/src/test/scala/zio/sqs/SqsConsumerSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package zio.sqs
import zio.test._
import zio._
import zio.aws.sqs.Sqs
import zio.aws.sqs.model.SendMessageRequest
object SqsConsumerSpec extends ZIOSpecDefault {

def produce(url: String, message: String): ZIO[Sqs, Throwable, Unit] =
Sqs.sendMessage(SendMessageRequest(url, message)).unit.mapError(_.toThrowable)
override def spec: Spec[TestEnvironment with Scope, Any] =
suite("SQS Consumer spec")(
test("retry failed messages indefinitely") {
val settings = SqsStreamSettings(
autoDelete = false,
stopWhenQueueEmpty = true,
visibilityTimeout = Some(1),
waitTimeSeconds = Some(2) // more than visibilityTimeout to allow errors to reappear
)
for {
_ <- Utils.createQueue("test-queue")
url <- Utils.getQueueUrl("test-queue")
_ <- produce(url, "1")
_ <- produce(url, "not number1")
_ <- produce(url, "2")
messages <- SqsStream
.processMessages(url, settings)(
_.getBody.flatMap(body => ZIO.attempt(body.toInt))
)
.take(3 + 2)
.runCollect
successes = messages.collect { case Right(v) => v }
failures = messages.collect { case Left(v) => v }
} yield assertTrue(
successes == Chunk(1, 2),
failures.length == 3 // It is retried multiple times
)
}
).provideLayer(MockSqsServerAndClient.layer)
}
56 changes: 54 additions & 2 deletions zio-sqs/src/test/scala/zio/sqs/ZioSqsMockServer.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package zio.sqs

import java.net.URI
import zio.aws.core.config.AwsConfig
import zio.aws.core.config.{ AwsConfig, CommonAwsConfig }
import zio.aws.sqs.Sqs
import org.elasticmq.rest.sqs.SQSRestServer
import org.elasticmq.RelaxedSQSLimits
import org.elasticmq.rest.sqs.TheSQSRestServerBuilder
import org.elasticmq.NodeAddress
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import zio.{ Scope, ZIO, ZLayer }
import zio.aws.netty.NettyHttpClient
import zio._

object ZioSqsMockServer extends TheSQSRestServerBuilder(None, None, "", 9324, NodeAddress(), true, RelaxedSQSLimits, "elasticmq", "000000000000", None) {
private val staticCredentialsProvider: StaticCredentialsProvider =
Expand All @@ -31,3 +32,54 @@ object ZioSqsMockServer extends TheSQSRestServerBuilder(None, None, "", 9324, No
.endpointOverride(uri)
)
}

object MockSqsServerAndClient {

lazy val layer: ZLayer[Any, Throwable, Sqs] =
Copy link
Author

@V-Lamp V-Lamp May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to suggest this approach, where a new mock server is created, with a random port to allow for parallel testing.
The aws config and sqs client are made all together, so that tests just need this single layer and no setup within the tests themselves.

(NettyHttpClient.default ++ mockServer) >>> AwsConfig.configured() >>> zio.aws.sqs.Sqs.live

private val usedPorts = Ref.unsafe.make(Set.empty[Int])(Unsafe.unsafe)

private val getUnusedPort: ZIO[Scope, Throwable, Int] = ZIO
.randomWith(_.nextIntBetween(9000, 50000))
.repeatUntilZIO(port => usedPorts.get.map(!_.contains(port)))
.tap(port => usedPorts.update(_ + port))
.withFinalizer(port => usedPorts.update(_ - port))
.timeoutFail(new Exception("Could not find unused port"))(1.seconds)

private lazy val mockServer: ZLayer[Any, Throwable, CommonAwsConfig] = {
val dummyAwsKeys =
StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "key"))
val region = Region.AP_NORTHEAST_2
ZLayer.scoped {
for {
// Random port to allow parallel tests
port <- getUnusedPort
serverBuilder <- ZIO.attempt(
TheSQSRestServerBuilder(
providedActorSystem = None,
providedQueueManagerActor = None,
interface = "",
port = port,
serverAddress = NodeAddress(),
generateServerAddress = true,
sqsLimits = RelaxedSQSLimits,
_awsRegion = region.toString,
_awsAccountId = "000000000000",
queueEventListener = None
)
)
server <- ZIO.acquireRelease(
ZIO.attempt(serverBuilder.start())
)(server => ZIO.succeed(server.stopAndWait()))
awsConfig = CommonAwsConfig(
region = Some(region),
credentialsProvider = dummyAwsKeys,
endpointOverride = Some(new URI(s"http://localhost:${port}")),
None
)
} yield awsConfig
}
}

}