Skip to content

Commit

Permalink
Web Sockets for netty-loom (#3675)
Browse files Browse the repository at this point in the history
  • Loading branch information
kciesielski committed Apr 19, 2024
1 parent f1b14fd commit 1cf911d
Show file tree
Hide file tree
Showing 22 changed files with 820 additions and 189 deletions.
19 changes: 14 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ lazy val rawAllAggregates = core.projectRefs ++
derevo.projectRefs ++
awsCdk.projectRefs

def buildWithLoom(project: String): Boolean =
project.contains("Loom") || project.contains("nima") || project.contains("perfTests") || project.contains("examples3")

lazy val allAggregates: Seq[ProjectReference] = {
val filteredByNative = if (sys.env.isDefinedAt("STTP_NATIVE")) {
println("[info] STTP_NATIVE defined, including native in the aggregate projects")
Expand All @@ -259,13 +262,13 @@ lazy val allAggregates: Seq[ProjectReference] = {
}
if (sys.env.isDefinedAt("ONLY_LOOM")) {
println("[info] ONLY_LOOM defined, including only loom-based projects")
filteredByNative.filter(p => (p.toString.contains("Loom") || p.toString.contains("nima") || p.toString.contains("perfTests")))
filteredByNative.filter(p => buildWithLoom(p.toString))
} else if (sys.env.isDefinedAt("ALSO_LOOM")) {
println("[info] ALSO_LOOM defined, including also loom-based projects")
filteredByNative
} else {
println("[info] ONLY_LOOM *not* defined, *not* including loom-based-projects")
filteredByNative.filterNot(p => (p.toString.contains("Loom") || p.toString.contains("nima") || p.toString.contains("perfTests")))
filteredByNative.filterNot(p => buildWithLoom(p.toString))
}

}
Expand Down Expand Up @@ -545,7 +548,6 @@ lazy val perfTests: ProjectMatrix = (projectMatrix in file("perf-tests"))
http4sServer,
nettyServer,
nettyServerCats,
nettyServerLoom,
playServer,
vertxServer,
vertxServerCats,
Expand Down Expand Up @@ -1454,9 +1456,15 @@ lazy val nettyServerLoom: ProjectMatrix =
.settings(
name := "tapir-netty-server-loom",
// needed because of https://github.com/coursier/coursier/issues/2016
useCoursier := false
useCoursier := false,
Test / run / fork := true,
libraryDependencies ++= Seq(
"com.softwaremill.ox" %% "core" % Versions.ox,
"org.reactivestreams" % "reactive-streams-tck" % Versions.reactiveStreams % Test,
"com.disneystreaming" %% "weaver-cats" % "0.8.4" % Test
)
)
.jvmPlatform(scalaVersions = scala2_13And3Versions)
.jvmPlatform(scalaVersions = List(scala3))
.dependsOn(nettyServer, serverTests % Test)

lazy val nettyServerCats: ProjectMatrix = nettyServerProject("cats", catsEffect)
Expand Down Expand Up @@ -2141,6 +2149,7 @@ lazy val examples: ProjectMatrix = (projectMatrix in file("examples"))
sttpClient,
swaggerUiBundle,
http4sServerZio,
nettyServerLoom,
nettyServerZio,
zioHttpServer,
zioJson,
Expand Down
28 changes: 13 additions & 15 deletions doc/server/netty.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,11 @@ val binding: Future[NettyFutureServerBinding] =
NettyFutureServer().addEndpoint(helloWorld).start()
```

The `tapir-netty-server-loom` server uses `Id[T]` as its wrapper effect for compatibility, while `Id[A]` means in fact just `A`, representing direct style.
The `tapir-netty-server-loom` server uses `Id[T]` as its wrapper effect for compatibility, while `Id[A]` means in fact just `A`, representing direct style. It is
available only for Scala 3.
See [examples/HelloWorldNettySyncServer.scala](https://github.com/softwaremill/tapir/blob/master/examples/src/main/scala/sttp/tapir/examples/HelloWorldNettySyncServer.scala) for a full example.
To learn more about handling concurrency with Ox, see the [documentation](https://ox.softwaremill.com/).

```scala
import sttp.tapir._
import sttp.tapir.server.netty.loom.{Id, NettySyncServer, NettySyncServerBinding}

val helloWorld = endpoint
.get
.in("hello").in(query[String]("name"))
.out(stringBody)
.serverLogicSuccess[Id](name => s"Hello, $name!")

val binding: NettySyncServerBinding =
NettySyncServer().addEndpoint(helloWorld).start()
```

## Configuration

Expand All @@ -85,7 +75,10 @@ NettyFutureServer(NettyConfig.default.socketBacklog(256))

## Web sockets

The netty-cats interpreter supports web sockets, with pipes of type `fs2.Pipe[F, REQ, RESP]`. See [web sockets](../endpoint/websockets.md)

### tapir-netty-server-cats

The Cats Effects interpreter supports web sockets, with pipes of type `fs2.Pipe[F, REQ, RESP]`. See [web sockets](../endpoint/websockets.md)
for more details.

To create a web socket endpoint, use Tapir's `out(webSocketBody)` output type:
Expand Down Expand Up @@ -148,6 +141,11 @@ object WebSocketsNettyCatsServer extends ResourceApp.Forever {
}
```

### tapir-netty-server-loom

In the Loom-based backend, Tapir uses [Ox](https://ox.softwaremill.com) to manage concurrency, and your transformation pipeline should be represented as `Ox ?=> Source[A] => Source[B]`. Any forks started within this function will be run under a safely isolated internal scope.
See [examples/websocket/WebSocketNettySyncServer.scala](https://github.com/softwaremill/tapir/blob/master/examples/src/main/scala/sttp/tapir/examples/websocket/WebSocketNettySyncServer.scala) for a full example.

## Graceful shutdown

A Netty server can be gracefully closed using the function `NettyFutureServerBinding.stop()` (and analogous functions available in Cats and ZIO bindings). This function ensures that the server will wait at most 10 seconds for in-flight requests to complete, while rejecting all new requests with 503 during this period. Afterwards, it closes all server resources.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sttp.tapir.examples

import ox.*
import sttp.tapir.*
import sttp.tapir.server.netty.loom.{Id, NettySyncServer}

object HelloWorldNettySyncServer:
val helloWorld = endpoint.get
.in("hello")
.in(query[String]("name"))
.out(stringBody)
.serverLogicSuccess[Id](name => s"Hello, $name!")

NettySyncServer().addEndpoint(helloWorld).startAndWait()

// Alternatively, if you need manual control of the structured concurrency scope, server lifecycle,
// or just metadata from `NettySyncServerBinding` (like port number), use `start()`:
object HelloWorldNettySyncServer2:
val helloWorld = endpoint.get
.in("hello")
.in(query[String]("name"))
.out(stringBody)
.serverLogicSuccess[Id](name => s"Hello, $name!")

supervised {
val serverBinding = useInScope(NettySyncServer().addEndpoint(helloWorld).start())(_.stop())
println(s"Tapir is running on port ${serverBinding.port}")
never
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sttp.tapir.examples.websocket

import ox.*
import ox.channels.*
import sttp.capabilities.WebSockets
import sttp.tapir.*
import sttp.tapir.server.netty.loom.Id
import sttp.tapir.server.netty.loom.OxStreams
import sttp.tapir.server.netty.loom.OxStreams.Pipe // alias for Ox ?=> Source[A] => Source[B]
import sttp.tapir.server.netty.loom.NettySyncServer
import sttp.ws.WebSocketFrame

import scala.concurrent.duration.*

object WebSocketNettySyncServer:
// Web socket endpoint
val wsEndpoint =
endpoint.get
.in("ws")
.out(
webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](OxStreams)
.concatenateFragmentedFrames(false) // All these options are supported by tapir-netty
.ignorePong(true)
.autoPongOnPing(true)
.decodeCloseRequests(false)
.decodeCloseResponses(false)
.autoPing(Some((10.seconds, WebSocketFrame.Ping("ping-content".getBytes))))
)

// Your processor transforming a stream of requests into a stream of responses
val wsPipe: Pipe[String, String] = requestStream => requestStream.map(_.toUpperCase)
// Alternatively, requests and responses can be treated separately, for example to emit frames to the client from another source:
val wsPipe2: Pipe[String, String] = { in =>
fork {
in.drain() // read and ignore requests
}
// emit periodic responses
Source.tick(1.second).map(_ => System.currentTimeMillis()).map(_.toString)
}

// The WebSocket endpoint, builds the pipeline in serverLogicSuccess
val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Id](_ => wsPipe)

// A regular /GET endpoint
val helloWorldEndpoint =
endpoint.get.in("hello").in(query[String]("name")).out(stringBody)

val helloWorldServerEndpoint = helloWorldEndpoint
.serverLogicSuccess[Id](name => s"Hello, $name!")

def main(args: Array[String]): Unit =
NettySyncServer()
.host("0.0.0.0")
.port(8080)
.addEndpoints(List(wsServerEndpoint, helloWorldServerEndpoint))
.startAndWait()
8 changes: 7 additions & 1 deletion perf-tests/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Performance tests

To work with performance tests, make sure you are running JDK 21+, and that the `ALSO_LOOM` environment variable is set, because the `perf-tests` project includes `tapir-netty-loom` and `tapir-nima`, which require Loom JDK feature to be available.
To work with performance tests, make sure you are running JDK 21+, and that the `ALSO_LOOM` environment variable is set, because the `perf-tests` project includes `tapir-nima`, which require Loom JDK feature to be available.

Performance tests are executed by running `PerfTestSuiteRunner`, which is a standard "Main" Scala application, configured by command line parameters. It executes a sequence of tests, where
each test consist of:
Expand Down Expand Up @@ -122,6 +122,12 @@ For WebSockets we want to measure latency distribution, not throughput, so use g
```
perfTests/runMain sttp.tapir.perf.apis.ServerRunner http4s.Tapir
```
If you're testing `NettySyncServer` (tapir-server-netty-loom), its server runner is located elsewhere:
```
nettyServerLoom3/Test/runMain sttp.tapir.netty.loom.perf.NettySyncServerRunner
```
This is caused by `perf-tests` using Scala 2.13 forced by Gatling, while `NettySyncServer` is written excluisively for Scala 3.

3. Run the simulation using Gatling's task:
```
perfTests/Gatling/testOnly sttp.tapir.perf.WebSocketsSimulation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sttp.tapir.perf.apis
import cats.effect.IO
import sttp.tapir._
import sttp.tapir.perf.Common._
import sttp.tapir.server.netty.loom.Id
import sttp.tapir.server.ServerEndpoint
import sttp.tapir.server.model.EndpointExtensions._

Expand Down Expand Up @@ -67,5 +66,4 @@ trait Endpoints {

def genEndpointsFuture(count: Int): List[ServerEndpoint[Any, Future]] = genServerEndpoints(count)(Future.successful)
def genEndpointsIO(count: Int): List[ServerEndpoint[Any, IO]] = genServerEndpoints(count)(IO.pure)
def genEndpointsId(count: Int): List[ServerEndpoint[Any, Id]] = genServerEndpoints[Id](count)(x => x: Id[String])
}

This file was deleted.

2 changes: 2 additions & 0 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ object Versions {
val json4s = "4.0.7"
val metrics4Scala = "4.2.9"
val nettyReactiveStreams = "3.0.2"
val ox = "0.0.26"
val reactiveStreams = "1.0.4"
val sprayJson = "1.3.6"
val scalaCheck = "1.17.1"
val scalaTest = "3.2.18"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package sttp.tapir.server.netty.loom

import ox.Ox
import ox.channels.Source
import sttp.capabilities.Streams

trait OxStreams extends Streams[OxStreams]:
override type BinaryStream = Nothing
override type Pipe[A, B] = Ox ?=> Source[A] => Source[B]

object OxStreams extends OxStreams

0 comments on commit 1cf911d

Please sign in to comment.