Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Reactive Streams to add stream support to VertxFutureServerInterpreter #2552

Merged
merged 4 commits into from Nov 14, 2022

Conversation

tdroxler
Copy link
Contributor

@tdroxler tdroxler commented Nov 7, 2022

Following #2533 I gave it a try.

The change is backward compatible and people using VertxFutureServerInterpreter shouldn't have to change their endpoints, but if they want they can now use streaming.

Using Reactive Streams rather than Vertx Streams offers more flexibility. Any streaming library implementing reactivestreams.Publisher can be used.

So even zio, fs2, akka-streams etc can be used to handles streams in VertxFutureServerInterpreter.

I could successfully use this in this commit of my project

My endpoint type changed like this:

- type BaseEndpoint[I, O] = Endpoint[Unit, I, ApiError[_ <: StatusCode], O, Any]
+ type BaseEndpoint[I, O] = Endpoint[Unit, I, ApiError[_ <: StatusCode], O, ReactiveStreams]

Using a streamBody give me: BaseEndpoint[Address, Publisher[Buffer]]

In my code I then used akka-streams to create a Publisher, but I could have used anything else.

…erpreter`

The change is backward compatible and people using
`VertxFutureServerInterpreter` shouldn't have to change their endpoints,
but if they want they can now use streaming.

Using `Reactive Streams` rather than `Vertx Streams` offers more
flexibility. Any streaming library implementing `reactivestreams.Publisher` can be used.

So even `zio`, `fs2`, `akka-streams` etc can be used to handles streams
in `VertxFutureServerInterpreter`.
@Pask423 Pask423 self-assigned this Nov 7, 2022
Copy link
Contributor

@Pask423 Pask423 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi
Thanks for PR could you please add server streaming tests for vertx.
Such line ++ new ServerStreamingTests(createServerTest, ReactiveStreams).tests() in VertxServerTest alongside adding ReactiveStreams type param in tests should do the job. If you will have any questions let me know.

@tdroxler
Copy link
Contributor Author

tdroxler commented Nov 7, 2022

Hi @Pask423,

I tried hard to make the tests pass, but there's something I don't understand in vertx :/ from what I get, pipeTo should be enough, but everything else I tried also failed.

Any chance to have a vertx expert to look at it?

I pushed a new commit with the tests.

Thx, cheers.

@Pask423
Copy link
Contributor

Pask423 commented Nov 9, 2022

I think that it is unlikely, unfortunately - at least I do not know anyone who we could ask. I will try to take a look tomorrow or today. Maybe together we will come up with something.

@tdroxler
Copy link
Contributor Author

tdroxler commented Nov 9, 2022

I think that it is unlikely, unfortunately - at least I do not know anyone who we could ask. I will try to take a look tomorrow or today. Maybe together we will come up with something.

Thx, I'll also try to ask the vertx team directly

@tdroxler
Copy link
Contributor Author

tdroxler commented Nov 10, 2022

Sooooo I tried the all day to make the tests pass with the reactive-streams without success, when I do small test using only the conversion Publisher -> ReadStream and the opposite everything is fine with pipeTo. But then the all streams tests aren't passing, I have absolutely no idea why.

So I went with another proposition (latest commit) to use directly the vertx.streams.*, it offer a bit less freedom, but in my project I can easily use those ReadStream from a Publisher : https://github.com/alephium/explorer-backend/blob/6d9cde147b0ba9ec7cdc8bc9971140b6ad5cc680/app/src/main/scala/org/alephium/explorer/web/AddressServer.scala#L95-L98 , just need to subscribe my Publisher

What do you think?

@Pask423
Copy link
Contributor

Pask423 commented Nov 14, 2022

Seems ok I came to similar conclusions some time ago - I think that this reactive streams lib is not intended to use in the way we want to use it here. However I would like you to change a few things in the code.

build.sbt Outdated
@@ -1258,7 +1258,8 @@ lazy val vertxServer: ProjectMatrix = (projectMatrix in file("server/vertx-serve
.settings(
name := "tapir-vertx-server",
libraryDependencies ++= Seq(
"io.vertx" % "vertx-web" % Versions.vertx
"io.vertx" % "vertx-web" % Versions.vertx,
"io.vertx" % "vertx-reactive-streams" % Versions.vertx
Copy link
Contributor

@Pask423 Pask423 Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I think it is not used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


trait VertxStreams extends Streams[VertxStreams] {
override type BinaryStream = ReadStream[Buffer]
override type Pipe[A, B] = Processor[A, B]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with ReadStream[A] => ReadStream[B] to not mix reactive streams and vertx streams

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import sttp.tapir.tests.{Test, TestSuite}

import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this and .asInstanceOf - change VertxTestServerInterpreter to have VertxStreams instead of any. You will also have to change route contract in VertxTestServerBlockingInterpreter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -20,3 +21,10 @@ object ReadStreamCompatible {
override def fromReadStream(s: ReadStream[Buffer]): Nothing = ???
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove incompatible - not needed anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@adamw adamw merged commit d3a462e into softwaremill:master Nov 14, 2022
@adamw
Copy link
Member

adamw commented Nov 14, 2022

Thanks! :)

@adamw
Copy link
Member

adamw commented Nov 14, 2022

It would also be great to add a Streaming subsection to https://tapir.softwaremill.com/en/latest/server/vertx.html like akka-http/http4s have

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants