Skip to content

Commit

Permalink
Using RequestHeader that without context for WebSocket handling (#3669)
Browse files Browse the repository at this point in the history
  • Loading branch information
GreyPlane committed Apr 11, 2024
1 parent c673b18 commit bc6d4e5
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ trait PlayServerInterpreter {
}

override def apply(header: RequestHeader): Handler =
if (isWebSocket(header))
WebSocket.acceptOrResult { header =>
getResponse(header, header.withBody(Source.empty))
if (isWebSocket(header)) {
WebSocket.acceptOrResult { request =>
getResponse(header, request.withBody(Source.empty))
}
else
} else
playServerOptions.defaultActionBuilder.async(streamParser) { request =>
getResponse(header, request).flatMap {
case Left(result) => Future.successful(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package sttp.tapir.server.play
import org.apache.pekko.actor.ActorSystem
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.apache.pekko.stream.scaladsl.Flow
import org.scalatest.matchers.should.Matchers._
import play.api.Mode
import play.api.routing.Router
import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig}
import sttp.capabilities.WebSockets
import sttp.capabilities.fs2.Fs2Streams
import sttp.capabilities.pekko.PekkoStreams
import sttp.client3._
import sttp.tapir._
import sttp.tapir.tests.Test
import sttp.ws.{WebSocket, WebSocketFrame}

import scala.concurrent.Future

class PlayServerWithContextTest(backend: SttpBackend[IO, Any])(implicit _actorSystem: ActorSystem) {
class PlayServerWithContextTest(backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets])(implicit _actorSystem: ActorSystem) {
import _actorSystem.dispatcher

def tests(): List[Test] = List(
Expand All @@ -34,6 +39,34 @@ class PlayServerWithContextTest(backend: SttpBackend[IO, Any])(implicit _actorSy
}
r.onComplete(_ => s.stop())
r
},
Test("websocket with prefixed play route") {
val e = endpoint.get
.in("hello")
.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](PekkoStreams))
.serverLogicSuccess[Future](_ => Future.successful(Flow[String].map(_ => "world")))

val components = new DefaultPekkoHttpServerComponents {
override lazy val serverConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test)
override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher))
override def router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/test")
}
val s = components.server
val r = basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
m1 <- ws.receiveText()
_ <- ws.close()
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m3)
})
.get(uri"ws://localhost:${s.mainAddress.getPort}/test/hello")
.send(backend)
.map(_.body shouldBe Right(List("world", Left(WebSocketFrame.Close(1000, "normal closure")))))
.unsafeToFuture()
r.onComplete(_ => s.stop())
r
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ trait PlayServerInterpreter {

override def apply(header: RequestHeader): Handler =
if (isWebSocket(header))
WebSocket.acceptOrResult { header =>
getResponse(header, header.withBody(Source.empty))
WebSocket.acceptOrResult { request =>
getResponse(header, request.withBody(Source.empty))
}
else
playServerOptions.defaultActionBuilder.async(streamParser) { request =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package sttp.tapir.server.play

import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.scalatest.matchers.should.Matchers._
import play.api.Mode
import play.api.routing.Router
import play.core.server.{DefaultAkkaHttpServerComponents, ServerConfig}
import sttp.capabilities.WebSockets
import sttp.capabilities.akka.AkkaStreams
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3._
import sttp.tapir._
import sttp.tapir.tests.Test
import sttp.ws.{WebSocket, WebSocketFrame}

import scala.concurrent.Future

class PlayServerWithContextTest(backend: SttpBackend[IO, Any])(implicit _actorSystem: ActorSystem) {
class PlayServerWithContextTest(backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets])(implicit _actorSystem: ActorSystem) {
import _actorSystem.dispatcher

def tests(): List[Test] = List(
Expand All @@ -34,6 +39,34 @@ class PlayServerWithContextTest(backend: SttpBackend[IO, Any])(implicit _actorSy
}
r.onComplete(_ => s.stop())
r
},
Test("websocket with prefixed play route") {
val e = endpoint.get
.in("hello")
.out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](AkkaStreams))
.serverLogicSuccess[Future](_ => Future.successful(Flow[String].map(_ => "world")))

val components = new DefaultAkkaHttpServerComponents {
override lazy val serverConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test)
override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher))
override def router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/test")
}
val s = components.server
val r = basicRequest
.response(asWebSocket { (ws: WebSocket[IO]) =>
for {
_ <- ws.sendText("test1")
m1 <- ws.receiveText()
_ <- ws.close()
m3 <- ws.eitherClose(ws.receiveText())
} yield List(m1, m3)
})
.get(uri"ws://localhost:${s.mainAddress.getPort}/test/hello")
.send(backend)
.map(_.body shouldBe Right(List("world", Left(WebSocketFrame.Close(1000, "normal closure")))))
.unsafeToFuture()
r.onComplete(_ => s.stop())
r
}
)
}

0 comments on commit bc6d4e5

Please sign in to comment.