-
Notifications
You must be signed in to change notification settings - Fork 299
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support Streaming for Armeria backend
Motivation: Armeria natively supports Streaming through Reactive Streams Modifications: - Introduce `AbstractArmeriaBackend` for common conversions - Support streaming using `Publisher` of Reactive Streams - Add new modules for better interops with other libraries - FS2 - Monix - Cats - Scalaz - ZIO Result: Better interop Armeria with sttp.
- Loading branch information
Showing
31 changed files
with
1,660 additions
and
19 deletions.
There are no files selected for viewing
61 changes: 61 additions & 0 deletions
61
armeria-backend/cats/src/main/scala/sttp/client3/armeria/cats/ArmeriaCatsBackend.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package sttp.client3.armeria.cats | ||
|
||
import cats.effect.{Concurrent, Resource, Sync} | ||
import com.linecorp.armeria.client.WebClient | ||
import com.linecorp.armeria.common.HttpData | ||
import com.linecorp.armeria.common.stream.StreamMessage | ||
import org.reactivestreams.Publisher | ||
import sttp.client3.armeria.AbstractArmeriaBackend.newClient | ||
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage} | ||
import sttp.client3.impl.cats.CatsMonadAsyncError | ||
import sttp.client3.internal.NoStreams | ||
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} | ||
import sttp.monad.MonadAsyncError | ||
|
||
private final class ArmeriaCatsBackend[F[_]: Concurrent](client: WebClient, closeFactory: Boolean) | ||
extends AbstractArmeriaBackend[F, Nothing](client, closeFactory, new CatsMonadAsyncError) { | ||
|
||
override val streams: NoStreams = NoStreams | ||
|
||
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Nothing] = | ||
new BodyFromStreamMessage[F, Nothing] { | ||
|
||
override val streams: NoStreams = NoStreams | ||
|
||
override implicit val monad: MonadAsyncError[F] = new CatsMonadAsyncError | ||
|
||
override def publisherToStream(streamMessage: StreamMessage[HttpData]): Nothing = | ||
throw new UnsupportedOperationException("This backend does not support streaming") | ||
} | ||
|
||
override protected def streamToPublisher(stream: Nothing): Publisher[HttpData] = | ||
throw new UnsupportedOperationException("This backend does not support streaming") | ||
} | ||
|
||
object ArmeriaCatsBackend { | ||
|
||
/** Creates a new `SttpBackend`. */ | ||
def apply[F[_]: Concurrent](): SttpBackend[F, Any] = | ||
apply(newClient(), closeFactory = false) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def apply[F[_]: Concurrent](options: SttpBackendOptions): SttpBackend[F, Any] = | ||
apply(newClient(options), closeFactory = true) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def resource[F[_]: Concurrent](options: SttpBackendOptions): Resource[F, SttpBackend[F, Any]] = { | ||
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close()) | ||
} | ||
|
||
/** Creates a new `SttpBackend` with | ||
* the specified `WebClient`. | ||
*/ | ||
def usingClient[F[_]: Concurrent](client: WebClient): SttpBackend[F, Any] = | ||
apply(client, closeFactory = false) | ||
|
||
private def apply[F[_]: Concurrent]( | ||
client: WebClient, | ||
closeFactory: Boolean | ||
): SttpBackend[F, Any] = | ||
new FollowRedirectsBackend(new ArmeriaCatsBackend(client, closeFactory)) | ||
} |
22 changes: 22 additions & 0 deletions
22
armeria-backend/cats/src/test/scala/sttp/client3/armeria/cats/ArmeriaCatsHttpTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package sttp.client3.armeria.cats | ||
|
||
import cats.effect.IO | ||
import sttp.client3._ | ||
import sttp.client3.impl.cats.CatsTestBase | ||
import sttp.client3.testing.HttpTest | ||
|
||
class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsTestBase { | ||
override val backend: SttpBackend[IO, Any] = ArmeriaCatsBackend[IO]() | ||
|
||
"illegal url exceptions" - { | ||
"should be wrapped in the effect wrapper" in { | ||
basicRequest.get(uri"ps://sth.com").send(backend).toFuture().failed.map { e => | ||
e shouldBe a[IllegalArgumentException] | ||
} | ||
} | ||
} | ||
|
||
override def supportsHostHeaderOverride = false | ||
override def supportsMultipart = false | ||
override def supportsCancellation = false | ||
} |
12 changes: 12 additions & 0 deletions
12
armeria-backend/cats/src/test/scala/sttp/client3/armeria/cats/SomeTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package sttp.client3.armeria.cats | ||
|
||
import cats.effect.{ContextShift, IO} | ||
import org.scalatest.funsuite.AnyFunSuite | ||
|
||
class SomeTest extends AnyFunSuite { | ||
test("test") { | ||
|
||
implicit val cs: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.global) | ||
val backend = ArmeriaCatsBackend[IO]() | ||
} | ||
} |
67 changes: 67 additions & 0 deletions
67
armeria-backend/fs2/src/main/scala/sttp/client3/armeria/fs2/ArmeriaFs2Backend.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package sttp.client3.armeria.fs2 | ||
|
||
import cats.effect.{ConcurrentEffect, Resource, Sync} | ||
import com.linecorp.armeria.client.WebClient | ||
import com.linecorp.armeria.common.HttpData | ||
import com.linecorp.armeria.common.stream.StreamMessage | ||
import fs2.interop.reactivestreams._ | ||
import fs2.{Chunk, Stream} | ||
import org.reactivestreams.Publisher | ||
import sttp.capabilities.fs2.Fs2Streams | ||
import sttp.client3.armeria.AbstractArmeriaBackend.newClient | ||
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage} | ||
import sttp.client3.impl.cats.CatsMonadAsyncError | ||
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} | ||
import sttp.monad.MonadAsyncError | ||
|
||
private final class ArmeriaFs2Backend[F[_]: ConcurrentEffect](client: WebClient, closeFactory: Boolean) | ||
extends AbstractArmeriaBackend[F, Fs2Streams[F]](client, closeFactory, new CatsMonadAsyncError) { | ||
|
||
override val streams: Fs2Streams[F] = Fs2Streams[F] | ||
|
||
override protected def bodyFromStreamMessage: BodyFromStreamMessage[F, Fs2Streams[F]] = | ||
new BodyFromStreamMessage[F, Fs2Streams[F]] { | ||
|
||
override val streams: Fs2Streams[F] = Fs2Streams[F] | ||
|
||
override implicit val monad: MonadAsyncError[F] = new CatsMonadAsyncError | ||
|
||
override def publisherToStream(streamMessage: StreamMessage[HttpData]): Stream[F, Byte] = | ||
streamMessage.toStream[F].flatMap(httpData => Stream.chunk(Chunk.bytes(httpData.array()))) | ||
} | ||
|
||
override protected def streamToPublisher(stream: Stream[F, Byte]): Publisher[HttpData] = | ||
stream.chunks | ||
.map(chunk => { | ||
val bytes = chunk.toBytes | ||
HttpData.wrap(bytes.values, bytes.offset, bytes.length) | ||
}) | ||
.toUnicastPublisher | ||
} | ||
|
||
object ArmeriaFs2Backend { | ||
|
||
/** Creates a new `SttpBackend`. */ | ||
def apply[F[_]: ConcurrentEffect](): SttpBackend[F, Fs2Streams[F]] = | ||
apply(newClient(), closeFactory = false) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def apply[F[_]: ConcurrentEffect](options: SttpBackendOptions): SttpBackend[F, Fs2Streams[F]] = | ||
apply(newClient(options), closeFactory = true) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def resource[F[_]: ConcurrentEffect](options: SttpBackendOptions): Resource[F, SttpBackend[F, Fs2Streams[F]]] = { | ||
Resource.make(Sync[F].delay(apply(newClient(options), closeFactory = true)))(_.close()) | ||
} | ||
|
||
/** Creates a new `SttpBackend` with the specified `WebClient`. | ||
*/ | ||
def usingClient[F[_]: ConcurrentEffect](client: WebClient): SttpBackend[F, Fs2Streams[F]] = | ||
apply(client, closeFactory = false) | ||
|
||
private def apply[F[_]: ConcurrentEffect]( | ||
client: WebClient, | ||
closeFactory: Boolean | ||
): SttpBackend[F, Fs2Streams[F]] = | ||
new FollowRedirectsBackend(new ArmeriaFs2Backend(client, closeFactory)) | ||
} |
14 changes: 14 additions & 0 deletions
14
armeria-backend/fs2/src/test/scala/sttp/client3/armeria/fs2/ArmeriaFs2HttpTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package sttp.client3.armeria.fs2 | ||
|
||
import cats.effect.IO | ||
import sttp.client3.SttpBackend | ||
import sttp.client3.impl.cats.CatsTestBase | ||
import sttp.client3.testing.HttpTest | ||
|
||
class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase { | ||
override val backend: SttpBackend[IO, Any] = ArmeriaFs2Backend() | ||
|
||
override def supportsHostHeaderOverride = false | ||
override def supportsMultipart = false | ||
override def supportsCancellation = false | ||
} |
13 changes: 13 additions & 0 deletions
13
armeria-backend/fs2/src/test/scala/sttp/client3/armeria/fs2/ArmeriaFs2StreamingTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package sttp.client3.armeria.fs2 | ||
|
||
import cats.effect.IO | ||
import sttp.capabilities.fs2.Fs2Streams | ||
import sttp.client3.SttpBackend | ||
import sttp.client3.impl.fs2.Fs2StreamingTest | ||
|
||
class ArmeriaFs2StreamingTest extends Fs2StreamingTest { | ||
override val backend: SttpBackend[IO, Fs2Streams[IO]] = | ||
ArmeriaFs2Backend() | ||
|
||
override protected def supportsStreamingMultipartParts: Boolean = false | ||
} |
53 changes: 53 additions & 0 deletions
53
armeria-backend/future/src/main/scala/sttp/client3/armeria/future/ArmeriaFutureBackend.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package sttp.client3.armeria.future | ||
|
||
import com.linecorp.armeria.client.WebClient | ||
import com.linecorp.armeria.common.HttpData | ||
import com.linecorp.armeria.common.stream.StreamMessage | ||
import org.reactivestreams.Publisher | ||
import scala.concurrent.ExecutionContext.Implicits.global | ||
import scala.concurrent.Future | ||
import sttp.client3.armeria.AbstractArmeriaBackend.newClient | ||
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage} | ||
import sttp.client3.internal.NoStreams | ||
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} | ||
import sttp.monad.{FutureMonad, MonadAsyncError} | ||
|
||
private final class ArmeriaFutureBackend(client: WebClient, closeFactory: Boolean) | ||
extends AbstractArmeriaBackend[Future, Nothing](client, closeFactory, new FutureMonad()) { | ||
|
||
override val streams: NoStreams = NoStreams | ||
|
||
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Future, Nothing] = | ||
new BodyFromStreamMessage[Future, Nothing] { | ||
|
||
override val streams: NoStreams = NoStreams | ||
|
||
override implicit def monad: MonadAsyncError[Future] = new FutureMonad() | ||
|
||
override def publisherToStream(streamMessage: StreamMessage[HttpData]): streams.BinaryStream = | ||
throw new UnsupportedOperationException("This backend does not support streaming") | ||
} | ||
|
||
override protected def streamToPublisher(stream: streams.BinaryStream): Publisher[HttpData] = | ||
throw new UnsupportedOperationException("This backend does not support streaming") | ||
} | ||
|
||
object ArmeriaFutureBackend { | ||
|
||
/** Creates a new `SttpBackend`. | ||
*/ | ||
def apply(): SttpBackend[Future, Any] = apply(newClient(), closeFactory = false) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def apply(options: SttpBackendOptions): SttpBackend[Future, Any] = | ||
apply(newClient(options), closeFactory = true) | ||
|
||
/** Creates a new `SttpBackend` with | ||
* the specified `WebClient`. | ||
*/ | ||
def usingClient(client: WebClient): SttpBackend[Future, Any] = | ||
apply(client, closeFactory = false) | ||
|
||
private def apply(client: WebClient, closeFactory: Boolean): SttpBackend[Future, Any] = | ||
new FollowRedirectsBackend(new ArmeriaFutureBackend(client, closeFactory)) | ||
} |
19 changes: 19 additions & 0 deletions
19
...ria-backend/future/src/test/scala/sttp/client3/armeria/future/ArmeriaFutureHttpTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package sttp.client3.armeria.future | ||
|
||
import scala.concurrent.Future | ||
import sttp.client3.SttpBackend | ||
import sttp.client3.testing.{ConvertToFuture, HttpTest} | ||
|
||
class ArmeriaFutureHttpTest extends HttpTest[Future] { | ||
|
||
override val backend: SttpBackend[Future, Any] = { | ||
ArmeriaFutureBackend() | ||
} | ||
override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future | ||
|
||
override def supportsHostHeaderOverride = false | ||
override def supportsMultipart = false | ||
override def supportsCancellation = false | ||
|
||
override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_)) | ||
} |
58 changes: 58 additions & 0 deletions
58
armeria-backend/monix/src/main/scala/sttp/client3/armeria/monix/ArmeriaMonixBackend.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package sttp.client3.armeria.monix | ||
|
||
import com.linecorp.armeria.client.WebClient | ||
import com.linecorp.armeria.common.HttpData | ||
import com.linecorp.armeria.common.stream.StreamMessage | ||
import monix.eval.Task | ||
import monix.execution.Scheduler | ||
import monix.reactive.Observable | ||
import org.reactivestreams.Publisher | ||
import sttp.capabilities.monix.MonixStreams | ||
import sttp.client3.armeria.AbstractArmeriaBackend.newClient | ||
import sttp.client3.armeria.{AbstractArmeriaBackend, BodyFromStreamMessage} | ||
import sttp.client3.impl.monix.TaskMonadAsyncError | ||
import sttp.client3.{FollowRedirectsBackend, SttpBackend, SttpBackendOptions} | ||
import sttp.monad.MonadAsyncError | ||
|
||
private final class ArmeriaMonixBackend(client: WebClient, closeFactory: Boolean)(implicit scheduler: Scheduler) | ||
extends AbstractArmeriaBackend[Task, MonixStreams](client, closeFactory, TaskMonadAsyncError) { | ||
|
||
override val streams: MonixStreams = MonixStreams | ||
|
||
override protected def bodyFromStreamMessage: BodyFromStreamMessage[Task, MonixStreams] = | ||
new BodyFromStreamMessage[Task, MonixStreams] { | ||
|
||
override val streams: MonixStreams = MonixStreams | ||
|
||
override implicit def monad: MonadAsyncError[Task] = TaskMonadAsyncError | ||
|
||
override def publisherToStream(streamMessage: StreamMessage[HttpData]): Observable[Array[Byte]] = | ||
Observable.fromReactivePublisher(streamMessage).map(_.array()) | ||
} | ||
|
||
override protected def streamToPublisher(stream: Observable[Array[Byte]]): Publisher[HttpData] = | ||
stream.map(HttpData.wrap).toReactivePublisher | ||
} | ||
|
||
object ArmeriaMonixBackend { | ||
|
||
/** Creates a new `SttpBackend`. | ||
*/ | ||
def apply()(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] = | ||
apply(newClient(), closeFactory = false) | ||
|
||
/** Creates a new `SttpBackend` with the specified `SttpBackendOptions`. */ | ||
def apply(options: SttpBackendOptions)(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] = | ||
apply(newClient(options), closeFactory = true) | ||
|
||
/** Creates a new `SttpBackend` with | ||
* the specified `WebClient`. | ||
*/ | ||
def usingClient(client: WebClient)(implicit scheduler: Scheduler): SttpBackend[Task, MonixStreams] = | ||
apply(client, closeFactory = false) | ||
|
||
private def apply(client: WebClient, closeFactory: Boolean)(implicit | ||
scheduler: Scheduler | ||
): SttpBackend[Task, MonixStreams] = | ||
new FollowRedirectsBackend(new ArmeriaMonixBackend(client, closeFactory)) | ||
} |
25 changes: 25 additions & 0 deletions
25
armeria-backend/monix/src/test/scala/sttp/client3/armeria/monix/ArmeriaMonixHttpTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package sttp.client3.armeria.monix | ||
|
||
import java.util.concurrent.TimeoutException | ||
import monix.eval.Task | ||
import sttp.client3.SttpBackend | ||
import sttp.client3.impl.monix.convertMonixTaskToFuture | ||
import sttp.client3.testing.{ConvertToFuture, HttpTest} | ||
import monix.execution.Scheduler.Implicits.global | ||
import scala.concurrent.duration.DurationInt | ||
|
||
class ArmeriaMonixHttpTest extends HttpTest[Task] { | ||
override val backend: SttpBackend[Task, Any] = ArmeriaMonixBackend() | ||
override implicit val convertToFuture: ConvertToFuture[Task] = convertMonixTaskToFuture | ||
|
||
override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = | ||
t.map(Some(_)) | ||
.timeout(timeoutMillis.milliseconds) | ||
.onErrorRecover { case _: TimeoutException => | ||
None | ||
} | ||
|
||
override def supportsHostHeaderOverride = false | ||
override def supportsMultipart = false | ||
override def supportsCancellation = false | ||
} |
14 changes: 14 additions & 0 deletions
14
...a-backend/monix/src/test/scala/sttp/client3/armeria/monix/ArmeriaMonixStreamingTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package sttp.client3.armeria.monix | ||
|
||
import monix.eval.Task | ||
import sttp.capabilities.monix.MonixStreams | ||
import sttp.client3.SttpBackend | ||
import sttp.client3.impl.monix.MonixStreamingTest | ||
import monix.execution.Scheduler.Implicits.global | ||
|
||
class ArmeriaMonixStreamingTest extends MonixStreamingTest { | ||
override val backend: SttpBackend[Task, MonixStreams] = | ||
ArmeriaMonixBackend() | ||
|
||
override protected def supportsStreamingMultipartParts: Boolean = false | ||
} |
Oops, something went wrong.