Skip to content
ZIO-powered client for AWS SQS
Branch: master
Clone or download

README.md

ZIO Connector for AWS SQS

CircleCI

This library is a ZIO-powered client for AWS SQS. It is built on top of the AWS SDK for Java 2.0.

Add the dependency

To use zio-sqs, add the following line in your build.sbt file:

libraryDependencies += "dev.zio" %% "zio-sqs" % "0.1.1"

How to use

In order to use the connector, you need a SqsAsyncClient. Refer to the AWS SDK Documentation if you need help.

Publish messages

Use SqsPublisher.send to publish messages to a queue.

def send(
    client: SqsAsyncClient,
    queueUrl: String,
    msg: String,
    settings: SqsPublisherSettings = SqsPublisherSettings()
  ): Task[Unit]

SqsPublisherSettings allows your to configure a number of things:

import zio.sqs.SqsPublisher

SqsPublisher.send(client, queueUrl, msg)

Consume messages

Use SqsStream.apply to get a stream of messages from a queue. It returns a ZIO Stream that you can consume with all the operators available.

def apply(
  client: SqsAsyncClient,
  queueUrl: String,
  settings: SqsStreamSettings = SqsStreamSettings()
): Stream[Throwable, Message]

SqsStreamSettings allows your to configure a number of things:

  • autoDelete: if true, messages will be automatically deleted from the queue when they're consumed by the stream, if false you have to delete them explicitly by calling SqsStream.deleteMessage (default true)
  • stopWhenQueueEmpty: if true the stream will close when there the queue is empty, if false the stream will go on forever (default false)
  • attributeNames: see the related page on AWS docs
  • maxNumberOfMessages: number of messages to query at once from SQS (default 1)
  • messageAttributeNames: see the related page on AWS docs
  • visibilityTimeout: see the related page on AWS docs (default 30)
  • waitTimeSeconds: see the related page on AWS docs (default 20),

Example:

import zio.sqs.{SqsStream, SqsStreamSettings}

SqsStream(
  client,
  queueUrl,
  SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = 3)
).foreach(msg => UIO(println(msg.body)))

Helpers

The zio.sqs.Utils object provides a couple helpful functions to create a queue and find a queue URL from its name.

def createQueue(
  client: SqsAsyncClient,
  name: String,
  attributes: Map[QueueAttributeName, String] = Map()
): Task[Unit]

def getQueueUrl(
  client: SqsAsyncClient,
  name: String
): Task[String]

Full example

import java.net.URI
import scalaz.zio.{ App, IO, Task, UIO, ZIO }
import software.amazon.awssdk.auth.credentials.{ AwsBasicCredentials, StaticCredentialsProvider }
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import zio.sqs.{ SqsPublisher, SqsStream, SqsStreamSettings, Utils }

object TestApp extends App {

  override def run(args: List[String]): ZIO[Environment, Nothing, Int] =
    (for {
      client <- Task {
                 SqsAsyncClient
                   .builder()
                   .region(Region.of("ap-northeast-2"))
                   .credentialsProvider(
                     StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "key"))
                   )
                   .endpointOverride(new URI("http://localhost:4576")) // point to localstack
                   .build()
               }
      queueName = "TestQueue"
      _         <- Utils.createQueue(client, queueName)
      queueUrl  <- Utils.getQueueUrl(client, queueName)
      _         <- SqsPublisher.send(client, queueUrl, "hello")
      _         <- SqsStream(
                     client,
                     queueUrl,
                     SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = 3)
                   ).foreach(msg => UIO(println(msg.body)))
    } yield 0).foldM(e => UIO(println(e.toString())).const(1), IO.succeed)
}
You can’t perform that action at this time.