Skip to content

Commit

Permalink
Support Streaming for Armeria backend
Browse files Browse the repository at this point in the history
Motivation:

Armeria natively supports Streaming through Reactive Streams

Modifications:

- Introduce `AbstractArmeriaBackend` for common conversions
- Support streaming using `Publisher` of Reactive Streams
- Add new modules for better interops with other libraries
  - FS2
  - Monix
  - Cats
  - Scalaz
  - ZIO

Result:

Better interop Armeria with sttp.
  • Loading branch information
ikhoon committed Feb 14, 2021
1 parent f686fb3 commit 1b42d29
Show file tree
Hide file tree
Showing 30 changed files with 1,648 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package sttp.client3.armeria.cats

import cats.effect.{Concurrent, Resource, Sync}
import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
import org.reactivestreams.Publisher
import sttp.client3.armeria.AbstractArmeriaBackend.newClient
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client3.impl.cats.CatsMonadAsyncError
import sttp.client3.internal.NoStreams
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import sttp.monad.MonadAsyncError

private final class ArmeriaCatsBackend[F[_]: Concurrent](client: WebClient, closeFactory: Boolean)
extends AbstractArmeriaBackend[F, Nothing](client, closeFactory, new CatsMonadAsyncError) {

override val streams: NoStreams = NoStreams

override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Nothing] =
new BodyFromStreamMessage[F, Nothing] {

override val streams: NoStreams = NoStreams

override implicit val monad: MonadAsyncError[F] = new CatsMonadAsyncError

override def publisherToStream(streamMessage: StreamMessage[HttpData]): Nothing =
throw new UnsupportedOperationException("This backend does not support streaming")
}

override protected def streamToPublisher(stream: Nothing): Publisher[HttpData] =
throw new UnsupportedOperationException("This backend does not support streaming")
}

object ArmeriaCatsBackend {

/** Creates a new `SttpBackend`. */
def apply[F[_]: Concurrent](): SttpBackend[F, Any] =
apply(newClient(), closeFactory = false)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def apply[F[_]: Concurrent](options: SttpBackendOptions): SttpBackend[F, Any] =
apply(newClient(options), closeFactory = true)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def resource[F[_]: Concurrent](options: SttpBackendOptions): Resource[F, SttpBackend[F, Any]] = {
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close())
}

/** Creates a new `SttpBackend` with
* the specified `WebClient`.
*/
def usingClient[F[_]: Concurrent](client: WebClient): SttpBackend[F, Any] =
apply(client, closeFactory = false)

private def apply[F[_]: Concurrent](
client: WebClient,
closeFactory: Boolean
): SttpBackend[F, Any] =
new FollowRedirectsBackend(new ArmeriaCatsBackend(client, closeFactory))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sttp.client3.armeria.cats

import cats.effect.IO
import sttp.client3._
import sttp.client3.impl.cats.CatsTestBase
import sttp.client3.testing.HttpTest

class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsTestBase {
override val backend: SttpBackend[IO, Any] = ArmeriaCatsBackend[IO]()

"illegal url exceptions" - {
"should be wrapped in the effect wrapper" in {
basicRequest.get(uri"ps://sth.com").send(backend).toFuture().failed.map { e =>
e shouldBe a[IllegalArgumentException]
}
}
}

override def supportsHostHeaderOverride = false
override def supportsMultipart = false
override def supportsCancellation = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package sttp.client3.armeria.fs2

import cats.effect.{ConcurrentEffect, Resource, Sync}
import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
import fs2.interop.reactivestreams._
import fs2.{Chunk, Stream}
import org.reactivestreams.Publisher
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.armeria.AbstractArmeriaBackend.newClient
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client3.impl.cats.CatsMonadAsyncError
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import sttp.monad.MonadAsyncError

private final class ArmeriaFs2Backend[F[_]: ConcurrentEffect](client: WebClient, closeFactory: Boolean)
extends AbstractArmeriaBackend[F, Fs2Streams[F]](client, closeFactory, new CatsMonadAsyncError) {

override val streams: Fs2Streams[F] = Fs2Streams[F]

override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Fs2Streams[F]] =
new BodyFromStreamMessage[F, Fs2Streams[F]] {

override val streams: Fs2Streams[F] = Fs2Streams[F]

override implicit val monad: MonadAsyncError[F] = new CatsMonadAsyncError

override def publisherToStream(streamMessage: StreamMessage[HttpData]): Stream[F, Byte] =
streamMessage.toStream[F].flatMap(httpData => Stream.chunk(Chunk.bytes(httpData.array())))
}

override protected def streamToPublisher(stream: Stream[F, Byte]): Publisher[HttpData] =
stream.chunks
.map(chunk => {
val bytes = chunk.toBytes
HttpData.wrap(bytes.values, bytes.offset, bytes.length)
})
.toUnicastPublisher
}

object ArmeriaFs2Backend {

/** Creates a new `SttpBackend`. */
def apply[F[_]: ConcurrentEffect](): SttpBackend[F, Fs2Streams[F]] =
apply(newClient(), closeFactory = false)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def apply[F[_]: ConcurrentEffect](options: SttpBackendOptions): SttpBackend[F, Fs2Streams[F]] =
apply(newClient(options), closeFactory = true)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def resource[F[_]: ConcurrentEffect](options: SttpBackendOptions): Resource[F, SttpBackend[F, Fs2Streams[F]]] = {
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close())
}

/** Creates a new `SttpBackend` with the specified `WebClient`.
*/
def usingClient[F[_]: ConcurrentEffect](client: WebClient): SttpBackend[F, Fs2Streams[F]] =
apply(client, closeFactory = false)

private def apply[F[_]: ConcurrentEffect](
client: WebClient,
closeFactory: Boolean
): SttpBackend[F, Fs2Streams[F]] =
new FollowRedirectsBackend(new ArmeriaFs2Backend(client, closeFactory))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sttp.client3.armeria.fs2

import cats.effect.IO
import sttp.client3.SttpBackend
import sttp.client3.impl.cats.CatsTestBase
import sttp.client3.testing.HttpTest

class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase {
override val backend: SttpBackend[IO, Any] = ArmeriaFs2Backend()

override def supportsHostHeaderOverride = false
override def supportsMultipart = false
override def supportsCancellation = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sttp.client3.armeria.fs2

import cats.effect.IO
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.SttpBackend
import sttp.client3.impl.fs2.Fs2StreamingTest

class ArmeriaFs2StreamingTest extends Fs2StreamingTest {
override val backend: SttpBackend[IO, Fs2Streams[IO]] =
ArmeriaFs2Backend()

override protected def supportsStreamingMultipartParts: Boolean = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sttp.client3.armeria.future

import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
import org.reactivestreams.Publisher
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import sttp.client3.armeria.AbstractArmeriaBackend.newClient
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client3.internal.NoStreams
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import sttp.monad.{FutureMonad, MonadAsyncError}

private final class ArmeriaFutureBackend(client: WebClient, closeFactory: Boolean)
extends AbstractArmeriaBackend[Future, Nothing](client, closeFactory, new FutureMonad()) {

override val streams: NoStreams = NoStreams

override protected def bodyFromStreamMessage: BodyFromStreamMessage[Future, Nothing] =
new BodyFromStreamMessage[Future, Nothing] {

override val streams: NoStreams = NoStreams

override implicit def monad: MonadAsyncError[Future] = new FutureMonad()

override def publisherToStream(streamMessage: StreamMessage[HttpData]): streams.BinaryStream =
throw new UnsupportedOperationException("This backend does not support streaming")
}

override protected def streamToPublisher(stream: streams.BinaryStream): Publisher[HttpData] =
throw new UnsupportedOperationException("This backend does not support streaming")
}

object ArmeriaFutureBackend {

/** Creates a new `SttpBackend`.
*/
def apply(): SttpBackend[Future, Any] = apply(newClient(), closeFactory = false)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def apply(options: SttpBackendOptions): SttpBackend[Future, Any] =
apply(newClient(options), closeFactory = true)

/** Creates a new `SttpBackend` with
* the specified `WebClient`.
*/
def usingClient(client: WebClient): SttpBackend[Future, Any] =
apply(client, closeFactory = false)

private def apply(client: WebClient, closeFactory: Boolean): SttpBackend[Future, Any] =
new FollowRedirectsBackend(new ArmeriaFutureBackend(client, closeFactory))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package sttp.client3.armeria.future

import scala.concurrent.Future
import sttp.client3.SttpBackend
import sttp.client3.testing.{ConvertToFuture, HttpTest}

class ArmeriaFutureHttpTest extends HttpTest[Future] {

override val backend: SttpBackend[Future, Any] = {
ArmeriaFutureBackend()
}
override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future

override def supportsHostHeaderOverride = false
override def supportsMultipart = false
override def supportsCancellation = false

override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package sttp.client3.armeria.monix

import com.linecorp.armeria.client.WebClient
import com.linecorp.armeria.common.HttpData
import com.linecorp.armeria.common.stream.StreamMessage
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import org.reactivestreams.Publisher
import sttp.capabilities.monix.MonixStreams
import sttp.client3.armeria.AbstractArmeriaBackend.newClient
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage}
import sttp.client3.impl.monix.TaskMonadAsyncError
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions}
import sttp.monad.MonadAsyncError

private final class ArmeriaMonixBackend(client: WebClient, closeFactory: Boolean)(implicit scheduler: Scheduler)
extends AbstractArmeriaBackend[Task, MonixStreams](client, closeFactory, TaskMonadAsyncError) {

override val streams: MonixStreams = MonixStreams

override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, MonixStreams] =
new BodyFromStreamMessage[Task, MonixStreams] {

override val streams: MonixStreams = MonixStreams

override implicit def monad: MonadAsyncError[Task] = TaskMonadAsyncError

override def publisherToStream(streamMessage: StreamMessage[HttpData]): Observable[Array[Byte]] =
Observable.fromReactivePublisher(streamMessage).map(_.array())
}

override protected def streamToPublisher(stream: Observable[Array[Byte]]): Publisher[HttpData] =
stream.map(HttpData.wrap).toReactivePublisher
}

object ArmeriaMonixBackend {

/** Creates a new `SttpBackend`.
*/
def apply()(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] =
apply(newClient(), closeFactory = false)

/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */
def apply(options: SttpBackendOptions)(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] =
apply(newClient(options), closeFactory = true)

/** Creates a new `SttpBackend` with
* the specified `WebClient`.
*/
def usingClient(client: WebClient)(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] =
apply(client, closeFactory = false)

private def apply(client: WebClient, closeFactory: Boolean)(implicit
scheduler: Scheduler
): SttpBackend[Task, MonixStreams] =
new FollowRedirectsBackend(new ArmeriaMonixBackend(client, closeFactory))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package sttp.client3.armeria.monix

import java.util.concurrent.TimeoutException
import monix.eval.Task
import sttp.client3.SttpBackend
import sttp.client3.impl.monix.convertMonixTaskToFuture
import sttp.client3.testing.{ConvertToFuture, HttpTest}
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.duration.DurationInt

class ArmeriaMonixHttpTest extends HttpTest[Task] {
override val backend: SttpBackend[Task, Any] = ArmeriaMonixBackend()
override implicit val convertToFuture: ConvertToFuture[Task] = convertMonixTaskToFuture

override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] =
t.map(Some(_))
.timeout(timeoutMillis.milliseconds)
.onErrorRecover { case _: TimeoutException =>
None
}

override def supportsHostHeaderOverride = false
override def supportsMultipart = false
override def supportsCancellation = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sttp.client3.armeria.monix

import monix.eval.Task
import sttp.capabilities.monix.MonixStreams
import sttp.client3.SttpBackend
import sttp.client3.impl.monix.MonixStreamingTest
import monix.execution.Scheduler.Implicits.global

class ArmeriaMonixStreamingTest extends MonixStreamingTest {
override val backend: SttpBackend[Task, MonixStreams] =
ArmeriaMonixBackend()

override protected def supportsStreamingMultipartParts: Boolean = false
}

0 comments on commit 1b42d29

Please sign in to comment.