fs2 utilities to interact with AWS
Clone or download
Latest commit 7036905 Jan 7, 2019



Build Status Maven Central Coverage Status

fs2 Streaming utilities for interacting with AWS

Scope of the project

fs2-aws provides an fs2 interface to AWS services

The design goals are the same as fs2:

compositionality, expressiveness, resource safety, and speed


Streaming a file from S3

Creates a stream of Bytes; size of each part downlaoded is the chunkSize.

Example using IO for effects (any monad F <: Effect can be used):

readS3FileMultipart[IO]("testBucket", "testFile", 25)

Writing to a file in S3

A Pipe and Sink allow for writing a stream of Bytes to S3; size of each part uploaded is the chunkSize.

Example using IO for effects (any monad F <: Effect can be used):

Stream("test data")
  .uploadS3FileMultipart[IO]("testBucket", "testFile")


Streaming records from Kinesis with KCL

Example using IO for effects (any monad F <: ConcurrentEffect can be used):

val stream: Stream[IO, CommittableRecord] = readFromKinesisStream[IO]("appName", "streamName")

There are a number of other stream constructors available where you can provide more specific configuration for the KCL worker.


TODO: Implement better test consumer

For now, you can stubbed CommitableRecord and create a fs2.Stream to emit these records:

val record = new Record()
  .withApproximateArrivalTimestamp(new Date())

val testRecord = CommittableRecord(

Checkpointing records

Records must be checkpointed in Kinesis to keep track of which messages each consumer has received. Checkpointing a record in the KCL will automatically checkpoint all records upto that record. To checkpoint records, a Pipe and Sink are available. To help distinguish whether a record has been checkpointed or not, a CommittableRecord class exists to denote a record that hasn't been checkpointed, while the base Record class denotes a commited record.

readFromKinesisStream[IO]("appName", "streamName")

Publishing records to Kinesis with KPL

A Pipe and Sink allow for writing a stream of tuple2 (paritionKey, ByteBuffer) to a Kinesis stream.


  .map { d => ("partitionKey", ByteBuffer.wrap(d.getBytes))}

AWS credential chain and region can be configured by overriding the respective fields in the KinesisProducerClient parameter to writeToKinesis. Defaults to using the default AWS credentials chain and us-east-1 for region.

Kinesis Firehose

TODO: Stream get data, Stream send data



implicit val messageDecoder: Message => Either[Throwable, Quote] = { sqs_msg =>
      .sqsStream[IO, Quote](
        (config, callback) => SQSConsumerBuilder(config, callback))


//create stream for testing
def stream(deferedListener: Deferred[IO, MessageListener]) =
              .sqsStream[IO, Quote](deferedListener)
//create the program for testing the stream               
import io.circe.syntax._
import io.circe.generic.auto._
val quote = Quote(...)
val program : IO[List[(Quote, MessageListener)]] = for {
            d <- Deferred[IO, MessageListener]
            r <- IO.racePair(stream(d), d.get).flatMap {
              case Right((streamFiber, listener)) =>
                //simulate SQS stream fan-in here
                listener.onMessage(new SQSTextMessage(Printer.noSpaces.pretty(quote.asJson)))
              case _ => IO(Nil)
          } yield r
//Assert results
val result = program
result should be(...)

TODO: Stream send SQS messages