Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Stub for testing websocket stream #1493

Open
Kirill5k opened this issue Jul 11, 2022 · 6 comments
Open

[FEATURE] Stub for testing websocket stream #1493

Kirill5k opened this issue Jul 11, 2022 · 6 comments

Comments

@Kirill5k
Copy link

Original question/feature request was posted in gitter: https://gitter.im/softwaremill/sttp?at=605d9dbd563232374c459a79

Consider I have a function which sends a request via asWebSocketStream:

basicRequest
      .get(uri)
      .response(asWebSocketStream(Fs2Streams[F])(pipe))
      .send(backend)

where backend is SttpBackend[F, Fs2Streams[F] with capabilities.WebSockets].

Given that web socket streams aren't supported by the stub, it is impossible to test such function.

A possible solution would be to provide a stub for testing web socket streams.

@adamw
Copy link
Member

adamw commented Jul 15, 2022

Did you try wrapping the stubbed web socket stream with RawStream? (https://sttp.softwaremill.com/en/latest/testing.html#testing-streams)

@Kirill5k
Copy link
Author

I've tried it just now with the following backend:

  val testingBackend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] = AsyncHttpClientFs2Backend
    .stub[IO]
    .whenAnyRequest
    .thenRespond(
      SttpBackendStub.RawStream(
        WebSocketStub
          .initialReceive(List(WebSocketFrame.text("Hello, World!")))
          .thenRespond(_ => List(WebSocketFrame.text("Hello, World!")))
      )
    )

However it does not seem to be working (unless I've done something wrong).

@adamw
Copy link
Member

adamw commented Jul 20, 2022

Right, there's case ResponseAsWebSocketStream(_, _) => None in the stub :)

However, I don't think we can make this work, as it would require fusing the client-side pipe specified in the .response(asWebSocketStream(...)) with the server-side pipe returned by the stub response. And sttp doesn't have any knowledge of the underlying streaming implementation, meaning that it doesn't also know how to combine two pipes. So I think you'll have to test the pipes by hand.

@adamw
Copy link
Member

adamw commented Jul 20, 2022

I'll close this as won't fix as we can't really do much with a stream pipe - it's opaque for sttp. If you would have some ideas how to fix this, please reopen :)

@adamw adamw closed this as completed Jul 20, 2022
@Zuchos
Copy link
Contributor

Zuchos commented Apr 5, 2024

I want to propose a solution to this one. I solved it by writing my own sttp backend stub.

import cats.effect.IO
import cats.effect.std.Queue
import sttp.{capabilities, monad}
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.client3.impl.cats.implicits.*
import sttp.ws.WebSocketFrame

import sttp.model.StatusCode

class Fs2StreamsWebsocketBackend(
    initialFrames: List[WebSocketFrame],
    serverPipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame],
    serverQueue: Queue[IO, WebSocketFrame]
) extends SttpBackend[IO, Fs2Streams[IO] & capabilities.WebSockets] {

  override def send[T, R >: Fs2Streams[IO] with capabilities.WebSockets with capabilities.Effect[IO]](request: Request[T, R]): IO[Response[T]] = {
    respond(request.response).getOrElse(throw new RuntimeException("Not implemented"))
  }

  private def respond[T](
      ra: ResponseAs[T, _],
  ): Option[IO[Response[T]]] = {
    ra match {
      case ResponseAsWebSocketStream(_, pipe1) =>
        val clientSideWebsocketPipe   = pipe1.asInstanceOf[fs2.Pipe[IO, WebSocketFrame, WebSocketFrame]]
        val response: IO[Response[T]] = IO.never[Response[T]]
        val source                    = fs2.Stream.fromQueueUnterminated(serverQueue)
        val runStream = source.through(clientSideWebsocketPipe)
          .through(serverPipe).flatMap {
            frame => fs2.Stream.eval(serverQueue.offer(frame)) >> fs2.Stream.empty
          }.compile.drain.void.start
        (serverQueue.tryOfferN(initialFrames) >> runStream >> response).some
      case MappedResponseAs(raw: ResponseAs[_, _], _, _) =>
        respond(raw).map(_.map(_.asInstanceOf[Response[T]]))
      case ResponseAsFromMetadata(conditions, _) =>
        respond(conditions.head.responseAs)
      case _ =>
        throw new RuntimeException("Not implemented")
    }
  }

  override def close(): IO[Unit] = IO.unit

  override def responseMonad: monad.MonadError[IO] = monad.MonadError[IO]
}
import cats.effect.IO
import cats.effect.std.Queue
import com.typesafe.scalalogging.StrictLogging
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.*
import sttp.ws.WebSocketFrame
import cats.effect.unsafe.implicits.global

import scala.concurrent.duration.DurationInt

class SttpStubSpec extends org.scalatest.freespec.AnyFreeSpec with StrictLogging {

  "sttp should allow to mock streams" in {
    val serverLogic: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
      _.flatMap {
        case text: WebSocketFrame.Text if text.payload.startsWith("numbers") =>
          fs2.Stream.eval(IO(logger.info("S: Received message: " + text.payload))) >>
            fs2.Stream.emits(List(WebSocketFrame.text("1"), WebSocketFrame.text("2")))
        case text: WebSocketFrame.Text =>
          fs2.Stream.eval(IO(logger.info("Server - received message: " + text.payload))) >>
            fs2.Stream.empty
        case _ => fs2.Stream.empty
      }
    }

    val backend: IO[Fs2StreamsWebsocketBackend] =
      for {
        queue <- Queue.unbounded[IO, WebSocketFrame]
      } yield new Fs2StreamsWebsocketBackend(
        List(WebSocketFrame.text("hello")),
        serverLogic,
        queue
      )

    backend.flatMap { backend =>
      val pipe: fs2.Pipe[IO, WebSocketFrame, WebSocketFrame] = {
        input =>
          val readFromWebSocket: fs2.Stream[IO, WebSocketFrame] = input.flatMap {
            case WebSocketFrame.Text(payload, _, _) =>
              fs2.Stream.eval(IO(logger.info("Client - received message: " + payload))) >>
                fs2.Stream.empty[IO]
            case _ =>
              fs2.Stream.emit(WebSocketFrame.text("error"))
          }
          val subscribeToWallets = fs2.Stream.emit[IO, WebSocketFrame](WebSocketFrame.text("numbers"))
          subscribeToWallets.merge(readFromWebSocket)
      }

      val value = basicRequest
        .response(asWebSocketStream(Fs2Streams[IO])(pipe))
        .get(uri"wss://echo.websocket.org")
        .send(backend)
      (value >> IO.sleep(3.seconds))
    }.unsafeRunSync()
  }

}

It's mvp-ish implementation to satisfy my needs but with a few changes to SttpBackendStab it could be fairly easy to convert it into an extension of it. If tryAdjustResponseBody wouldn't be an object method, but a part of the class and have an implementation like:

private[client3] def tryAdjustResponseBody[F[_], T, U](
     ra: ResponseAs[T, _],
     b: U,
     meta: ResponseMetadata
 )(implicit monad: MonadError[F]): Option[F[T]] = {
   customResponseHandling(ra,b,meta) orElse defaultImplementation(ra,b,meta)
}

then it would be enough to extend the stub for particular types and override customResponseHandling (which would be empty by default) to achieve the same.

@adamw
Copy link
Member

adamw commented Apr 5, 2024

@Zuchos yeah you're right, we could add customBodyAdjustments or sth like that as a parameter to the backend stub, just as we have matchers, or customEncodingHandler in regular backends. This should be possible in sttp4 since we don't have to keep bincompat there (yet :) ). I'll reopen this then.

@adamw adamw reopened this Apr 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants