Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Reactive Streams to add stream support to VertxFutureServerInterpreter #2552

Merged
merged 4 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.vertx.core.logging.LoggerFactory
import io.vertx.core.{Handler, Future => VFuture}
import io.vertx.ext.web.{Route, Router, RoutingContext}
import sttp.monad.FutureMonad
import sttp.tapir.capabilities.NoStreams
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.interceptor.RequestResult
import sttp.tapir.server.interpreter.{BodyListener, ServerInterpreter}
Expand All @@ -13,7 +12,7 @@ import sttp.tapir.server.vertx.decoders.{VertxRequestBody, VertxServerRequest}
import sttp.tapir.server.vertx.encoders.{VertxOutputEncoders, VertxToResponseBody}
import sttp.tapir.server.vertx.interpreters.{CommonServerInterpreter, FromVFuture}
import sttp.tapir.server.vertx.routing.PathMapping.extractRouteDefinition
import sttp.tapir.server.vertx.streams.ReadStreamCompatible
import sttp.tapir.server.vertx.streams.{VertxStreams, ReadStreamCompatible}

import scala.concurrent.{ExecutionContext, Future, Promise}

Expand All @@ -28,7 +27,7 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
* @return
* A function, that given a router, will attach this endpoint to it
*/
def route[A, U, I, E, O](e: ServerEndpoint[Any, Future]): Router => Route = { router =>
def route[A, U, I, E, O](e: ServerEndpoint[VertxStreams, Future]): Router => Route = { router =>
mountWithDefaultHandlers(e)(router, extractRouteDefinition(e.endpoint))
.handler(endpointHandler(e))
}
Expand All @@ -39,21 +38,22 @@ trait VertxFutureServerInterpreter extends CommonServerInterpreter {
* @return
* A function, that given a router, will attach this endpoint to it
*/
def blockingRoute(e: ServerEndpoint[Any, Future]): Router => Route = { router =>
def blockingRoute(e: ServerEndpoint[VertxStreams, Future]): Router => Route = { router =>
mountWithDefaultHandlers(e)(router, extractRouteDefinition(e.endpoint))
.blockingHandler(endpointHandler(e))
}

private def endpointHandler(
e: ServerEndpoint[Any, Future]
e: ServerEndpoint[VertxStreams, Future]
): Handler[RoutingContext] = { rc =>
implicit val ec: ExecutionContext = vertxFutureServerOptions.executionContextOrCurrentCtx(rc)
implicit val monad: FutureMonad = new FutureMonad()
implicit val bodyListener: BodyListener[Future, RoutingContext => VFuture[Void]] = new VertxBodyListener[Future]
val interpreter = new ServerInterpreter[Any, Future, RoutingContext => VFuture[Void], NoStreams](
val reactiveStreamsReadStream: ReadStreamCompatible[VertxStreams] = streams.reactiveStreamsReadStreamCompatible()
val interpreter = new ServerInterpreter[VertxStreams, Future, RoutingContext => VFuture[Void], VertxStreams](
_ => List(e),
new VertxRequestBody(vertxFutureServerOptions, FutureFromVFuture)(ReadStreamCompatible.incompatible),
new VertxToResponseBody(vertxFutureServerOptions)(ReadStreamCompatible.incompatible),
new VertxRequestBody(vertxFutureServerOptions, FutureFromVFuture)(reactiveStreamsReadStream),
new VertxToResponseBody(vertxFutureServerOptions)(reactiveStreamsReadStream),
vertxFutureServerOptions.interceptors,
vertxFutureServerOptions.deleteFile
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ trait ReadStreamCompatible[S <: Streams[S]] {

object ReadStreamCompatible {
def apply[S <: Streams[S]](implicit ev: ReadStreamCompatible[S]): ReadStreamCompatible[S] = ev
}

val incompatible: ReadStreamCompatible[NoStreams] = new ReadStreamCompatible[NoStreams] {
override val streams: NoStreams = NoStreams
override def asReadStream(s: Nothing): ReadStream[Buffer] = ???
override def fromReadStream(s: ReadStream[Buffer]): Nothing = ???
}
trait VertxStreams extends Streams[VertxStreams] {
override type BinaryStream = ReadStream[Buffer]
override type Pipe[A, B] = ReadStream[A] => ReadStream[B]
}

object VertxStreams extends VertxStreams
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package sttp.tapir.server.vertx

import io.vertx.core.buffer.Buffer
import io.vertx.core.streams.ReadStream

package object streams {

def reactiveStreamsReadStreamCompatible(): ReadStreamCompatible[VertxStreams] = new ReadStreamCompatible[VertxStreams] {

override val streams: VertxStreams = VertxStreams

override def asReadStream(readStream: ReadStream[Buffer]): ReadStream[Buffer] =
readStream

override def fromReadStream(readStream: ReadStream[Buffer]): ReadStream[Buffer] =
readStream
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cats.effect.{IO, Resource}
import io.vertx.core.Vertx
import sttp.monad.FutureMonad
import sttp.tapir.server.tests._
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.tests.{Test, TestSuite}

import scala.concurrent.ExecutionContext
Expand All @@ -24,7 +25,7 @@ class VertxServerTest extends TestSuite {
createServerTest,
partContentTypeHeaderSupport = false, // README: doesn't seem supported but I may be wrong
partOtherHeaderSupport = false
).tests()
).tests() ++ new ServerStreamingTests(createServerTest, VertxStreams).tests()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package sttp.tapir.server.vertx
import io.vertx.core.Vertx
import io.vertx.ext.web.{Route, Router}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.vertx.streams.VertxStreams

import scala.concurrent.Future

class VertxTestServerBlockingInterpreter(vertx: Vertx) extends VertxTestServerInterpreter(vertx) {
override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): Router => Route = { router =>
override def route(es: List[ServerEndpoint[VertxStreams, Future]], interceptors: Interceptors): Router => Route = { router =>
val options: VertxFutureServerOptions = interceptors(VertxFutureServerOptions.customiseInterceptors).options
val interpreter = VertxFutureServerInterpreter(options)
es.map(interpreter.blockingRoute(_)(router)).last
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import io.vertx.core.{Vertx, Future => VFuture}
import io.vertx.ext.web.{Route, Router}
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.tests.TestServerInterpreter
import sttp.tapir.server.vertx.streams.VertxStreams
import sttp.tapir.tests.Port

import scala.concurrent.Future

class VertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Future, Any, VertxFutureServerOptions, Router => Route] {
class VertxTestServerInterpreter(vertx: Vertx) extends TestServerInterpreter[Future, VertxStreams, VertxFutureServerOptions, Router => Route] {
import VertxTestServerInterpreter._

override def route(es: List[ServerEndpoint[Any, Future]], interceptors: Interceptors): Router => Route = { router =>
override def route(es: List[ServerEndpoint[VertxStreams, Future]], interceptors: Interceptors): Router => Route = { router =>
val options: VertxFutureServerOptions = interceptors(VertxFutureServerOptions.customiseInterceptors).options
val interpreter = VertxFutureServerInterpreter(options)
es.map(interpreter.route(_)(router)).last
Expand Down