Skip to content

Commit

Permalink
Merge pull request #2552 from tdroxler/feature/vertx-reactive-streams
Browse files Browse the repository at this point in the history
Use `Reactive Streams` to add stream support to `VertxFutureServerInterpreter`
  • Loading branch information
adamw committed Nov 14, 2022
2 parents 10b13ce + 27991f4 commit d3a462e
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.vertx.core.logging.LoggerFactory
import io.vertx.core.{Handler, Future => VFuture}
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.monad.FutureMonad
import sttp.tapir.capabilities.NoStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
Expand All @@ -13,7 +12,7 @@ import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.ReadStreamCompatible
import sttp.tapir.server.vertx.streams.{VertxStreams, ReadStreamCompatible}

import scala.concurrent.{ExecutionContext, Future, Promise}

Expand All @@ -28,7 +27,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
* @return
* A function, that given a router, will attach this endpoint to it
*/
def route[A, U, I, E, O](e: ServerEndpoint[Any, Future]): Router => Route = { router =>
def route[A, U, I, E, O](e: ServerEndpoint[VertxStreams, Future]): Router => Route = { router =>
mountWithDefaultHandlers(e)(router, extractRouteDefinition(e.endpoint))
.handler(endpointHandler(e))
}
Expand All @@ -39,21 +38,22 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
* @return
* A function, that given a router, will attach this endpoint to it
*/
def blockingRoute(e: ServerEndpoint[Any, Future]): Router => Route = { router =>
def blockingRoute(e: ServerEndpoint[VertxStreams, Future]): Router => Route = { router =>
mountWithDefaultHandlers(e)(router, extractRouteDefinition(e.endpoint))
.blockingHandler(endpointHandler(e))
}

private def endpointHandler(
e: ServerEndpoint[Any, Future]
e: ServerEndpoint[VertxStreams, Future]
): Handler[RoutingContext] = { rc =>
implicit val ec: ExecutionContext = vertxFutureServerOptions.executionContextOrCurrentCtx(rc)
implicit val monad: FutureMonad = new FutureMonad()
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future]
val interpreter = new ServerInterpreter[Any, Future, RoutingContext => VFuture[Void], NoStreams](
val reactiveStreamsReadStream: ReadStreamCompatible[VertxStreams] = streams.reactiveStreamsReadStreamCompatible()
val interpreter = new ServerInterpreter[VertxStreams, Future, RoutingContext => VFuture[Void], VertxStreams](
_ => List(e),
new VertxRequestBody(vertxFutureServerOptions, FutureFromVFuture)(ReadStreamCompatible.incompatible),
new VertxToResponseBody(vertxFutureServerOptions)(ReadStreamCompatible.incompatible),
new VertxRequestBody(vertxFutureServerOptions, FutureFromVFuture)(reactiveStreamsReadStream),
new VertxToResponseBody(vertxFutureServerOptions)(reactiveStreamsReadStream),
vertxFutureServerOptions.interceptors,
vertxFutureServerOptions.deleteFile
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ trait ReadStreamCompatible[S <: Streams[S]] {

object ReadStreamCompatible {
def apply[S <: Streams[S]](implicit ev: ReadStreamCompatible[S]): ReadStreamCompatible[S] = ev
}

val incompatible: ReadStreamCompatible[NoStreams] = new ReadStreamCompatible[NoStreams] {
override val streams: NoStreams = NoStreams
override def asReadStream(s: Nothing): ReadStream[Buffer] = ???
override def fromReadStream(s: ReadStream[Buffer]): Nothing = ???
}
trait VertxStreams extends Streams[VertxStreams] {
override type BinaryStream = ReadStream[Buffer]
override type Pipe[A, B] = ReadStream[A] => ReadStream[B]
}

object VertxStreams extends VertxStreams
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package sttp.tapir.server.vertx

import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream

package object streams {

def reactiveStreamsReadStreamCompatible(): ReadStreamCompatible[VertxStreams] = new ReadStreamCompatible[VertxStreams] {

override val streams: VertxStreams = VertxStreams

override def asReadStream(readStream: ReadStream[Buffer]): ReadStream[Buffer] =
readStream

override def fromReadStream(readStream: ReadStream[Buffer]): ReadStream[Buffer] =
readStream
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.{IO, Resource}
import io.vertx.core.Vertx
import sttp.monad.FutureMonad
import sttp.tapir.server.tests._
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.tests.{Test, TestSuite}

import scala.concurrent.ExecutionContext
Expand All @@ -24,7 +25,7 @@ class VertxServerTest extends TestSuite {
createServerTest,
partContentTypeHeaderSupport = false, // README: doesn't seem supported but I may be wrong
partOtherHeaderSupport = false
).tests()
).tests() ++ new ServerStreamingTests(createServerTest, VertxStreams).tests()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package sttp.tapir.server.vertx
import io.vertx.core.Vertx
import io.vertx.ext.web.{Route, Router}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.vertx.streams.VertxStreams

import scala.concurrent.Future

class VertxTestServerBlockingInterpreter(vertx: Vertx) extends VertxTestServerInterpreter(vertx) {
override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): Router => Route = { router =>
override def route(es: List[ServerEndpoint[VertxStreams, Future]], interceptors: Interceptors): Router => Route = { router =>
val options: VertxFutureServerOptions = interceptors(VertxFutureServerOptions.customiseInterceptors).options
val interpreter = VertxFutureServerInterpreter(options)
es.map(interpreter.blockingRoute(_)(router)).last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import io.vertx.core.{Vertx, Future => VFuture}
import io.vertx.ext.web.{Route, Router}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.tests.Port

import scala.concurrent.Future

class VertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Future, Any, VertxFutureServerOptions, Router => Route] {
class VertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Future, VertxStreams, VertxFutureServerOptions, Router => Route] {
import VertxTestServerInterpreter._

override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): Router => Route = { router =>
override def route(es: List[ServerEndpoint[VertxStreams, Future]], interceptors: Interceptors): Router => Route = { router =>
val options: VertxFutureServerOptions = interceptors(VertxFutureServerOptions.customiseInterceptors).options
val interpreter = VertxFutureServerInterpreter(options)
es.map(interpreter.route(_)(router)).last
Expand Down

0 comments on commit d3a462e

Please sign in to comment.