Skip to content

Commit

Permalink
=htc akka#1012 add ClientTransport tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed Jun 13, 2017
1 parent 2c4e299 commit fc796c4
Showing 1 changed file with 68 additions and 2 deletions.
Expand Up @@ -9,14 +9,16 @@ import java.nio.ByteBuffer
import java.nio.channels.{ ServerSocketChannel, SocketChannel }
import java.util.concurrent.atomic.AtomicInteger

import akka.actor.ActorSystem
import akka.http.impl.engine.client.PoolMasterActor.PoolInterfaceRunning
import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.impl.settings.ConnectionPoolSettingsImpl
import akka.http.impl.util.SingletonException
import akka.http.impl.util._
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
import akka.http.scaladsl.Http
import akka.http.scaladsl.{ ClientTransport, ConnectionContext, Http }
import akka.stream.ActorMaterializer
import akka.stream.TLSProtocol._
import akka.stream.scaladsl._
Expand Down Expand Up @@ -334,6 +336,15 @@ class ConnectionPoolSpec extends AkkaSpec("""
Await.result(gateway.poolStatus(), 1500.millis.dilated).get shouldBe a[PoolInterfaceRunning]
awaitCond({ Await.result(gateway.poolStatus(), 1500.millis.dilated).isEmpty }, 2000.millis.dilated)
}

"use the configured ClientTransport" in new ClientTransportTestSetup {
def issueRequest(request: HttpRequest, settings: ConnectionPoolSettings): Future[HttpResponse] =
Source.single(request.withUri(request.uri.toRelative))
.via(Http().outgoingConnectionUsingTransport(
host = request.uri.authority.host.address, port = request.uri.effectivePort,
transport = settings.transport, settings = settings.connectionSettings, connectionContext = ConnectionContext.noEncryption()))
.runWith(Sink.head)
}
}

"The single-request client infrastructure" should {
Expand Down Expand Up @@ -368,6 +379,11 @@ class ConnectionPoolSpec extends AkkaSpec("""
val thrown = the[IllegalUriException] thrownBy Await.result(responseFuture, 1.second.dilated)
thrown should have message "Cannot determine request scheme and target endpoint as HttpMethod(GET) request to /foo doesn't have an absolute URI"
}

"use the configured ClientTransport" in new ClientTransportTestSetup {
def issueRequest(request: HttpRequest, settings: ConnectionPoolSettings): Future[HttpResponse] =
Http().singleRequest(request, settings = settings)
}
}

"The superPool client infrastructure" should {
Expand All @@ -388,6 +404,15 @@ class ConnectionPoolSpec extends AkkaSpec("""
case x fail(x.toString)
}
}

"use the configured ClientTransport" in new ClientTransportTestSetup {
def issueRequest(request: HttpRequest, settings: ConnectionPoolSettings): Future[HttpResponse] =
Source.single(request)
.map((_, ()))
.via(Http().superPool[Unit](settings = settings))
.map(_._1.get)
.runWith(Sink.head)
}
}

"be able to handle 500 `Connection: close` requests against the test server" in new TestSetup {
Expand Down Expand Up @@ -566,4 +591,45 @@ class ConnectionPoolSpec extends AkkaSpec("""
}

object NoErrorComplete extends SingletonException

abstract class ClientTransportTestSetup {
def issueRequest(request: HttpRequest, settings: ConnectionPoolSettings): Future[HttpResponse]

class CustomTransport extends ClientTransport {
val in = ByteStringSinkProbe()
val out = TestPublisher.probe[ByteString]()
val promise = Promise[(String, Int, ClientConnectionSettings)]()

def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
promise.success((host, port, settings))
Flow.fromSinkAndSource(in.sink, Source.fromPublisher(out)).mapMaterializedValue(_ Promise().future)
}
}

import system.dispatcher

val transport = new CustomTransport
val poolSettings =
ConnectionPoolSettings(system)
.withTransport(transport)
.withConnectionSettings(ClientConnectionSettings(system).withIdleTimeout(23.hours))

val responseFuture = issueRequest(HttpRequest(uri = "http://example.org/test"), settings = poolSettings)

val (host, port, settings) = transport.promise.future.awaitResult(10.seconds)
host should ===("example.org")
port should ===(80)
settings.idleTimeout should ===(23.hours)

transport.in.ensureSubscription()
transport.in.expectUtf8EncodedString("GET /test HTTP/1.1\r\n")
transport.out.sendNext(ByteString("HTTP/1.1 200 OK\r\nConnection: close\r\n\r\n"))

val response = responseFuture.awaitResult(10.seconds)
response.status should ===(StatusCodes.OK)
transport.out.sendNext(ByteString("Hello World!"))
transport.out.sendComplete()

response.entity.dataBytes.utf8String.awaitResult(10.seconds) should ===("Hello World!")
}
}

0 comments on commit fc796c4

Please sign in to comment.