Skip to content

Commit

Permalink
Use SystemMaterializer in AkkaSpecWithMaterializer (akka#3075)
Browse files Browse the repository at this point in the history
* Use SystemMaterializer in AkkaSpecWithMaterializer

* core: fix ClientCancellationSpec

 * use a proper test setup
 * enable TLS tests

Why was the extra materializer needed? To make sure assertAllStagesStopped
doesn't stumble over long-running streams (server binding and pool). Now
using assertAllStagesStopped is a bit useless as we force more of the background
resources to terminate but still cannot hurt.

If assertAllStagesStopped fails with weird errors (or just an ask timeout turns
up), it's probably because TlsActor doesn't support assertAllStagesStopped.
See akka/akka#28691
  • Loading branch information
jrudolph committed May 11, 2020
1 parent 8b8baad commit 15f108c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,71 @@

package akka.http.impl.engine.client

import javax.net.ssl.SSLContext
import akka.http.impl.util.{ AkkaSpecWithMaterializer, ExampleHttpContexts }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, headers }
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.http.impl.util.AkkaSpecWithMaterializer
import akka.http.scaladsl.{ ConnectionContext, Http }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ TestPublisher, TestSubscriber, Utils }
import akka.http.scaladsl.model.headers

class ClientCancellationSpec extends AkkaSpecWithMaterializer {
// TODO document why this explicit materializer is needed here?
val noncheckedMaterializer = SystemMaterializer(system).materializer

"Http client connections" must {
val address = Await.result(
"support cancellation in simple outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
})

"support cancellation in pooled outgoing connection" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get)
)
})

"support cancellation in simple outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
pending
testCase(
Http().outgoingConnectionHttps("akka.example.org", 443, settings = settingsWithProxyTransport, connectionContext = ExampleHttpContexts.exampleClientContext)
)
})

"support cancellation in pooled outgoing connection with TLS" in Utils.assertAllStagesStopped(new TestSetup {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps("akka.example.org", 443,
settings = ConnectionPoolSettings(system).withConnectionSettings(settingsWithProxyTransport),
connectionContext = ExampleHttpContexts.exampleClientContext))
.map(_._1.get))
})

}

class TestSetup {
lazy val binding = Await.result(
Http().bindAndHandleSync(
{ req => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0)(noncheckedMaterializer),
{ _ => HttpResponse(headers = headers.Connection("close") :: Nil) },
"localhost", 0),
5.seconds
).localAddress
)
lazy val address = binding.localAddress

val addressTls = Await.result(
lazy val bindingTls = Await.result(
Http().bindAndHandleSync(
{ req => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
{ _ => HttpResponse() }, // TLS client does full-close, no need for the connection:close header
"localhost",
0,
connectionContext = ConnectionContext.https(SSLContext.getDefault))(noncheckedMaterializer),
connectionContext = ExampleHttpContexts.exampleServerContext),
5.seconds
).localAddress
)
lazy val addressTls = bindingTls.localAddress

def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped {
def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = {
val requests = TestPublisher.probe[HttpRequest]()
val responses = TestSubscriber.probe[HttpResponse]()
Source.fromPublisher(requests).via(connection).runWith(Sink.fromSubscriber(responses))
Expand All @@ -47,36 +77,15 @@ class ClientCancellationSpec extends AkkaSpecWithMaterializer {
responses.expectNext().entity.dataBytes.runWith(Sink.cancelled)
responses.cancel()
requests.expectCancellation()
}

"support cancellation in simple outgoing connection" in {
testCase(
Http().outgoingConnection(address.getHostName, address.getPort))
}

"support cancellation in pooled outgoing connection" in {
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPool(address.getHostName, address.getPort))
.map(_._1.get))
}

"support cancellation in simple outgoing connection with TLS" in {
pending
testCase(
Http().outgoingConnectionHttps(addressTls.getHostName, addressTls.getPort))
}
binding.terminate(1.second)
bindingTls.terminate(1.second)

"support cancellation in pooled outgoing connection with TLS" in {
pending
testCase(
Flow[HttpRequest]
.map((_, ()))
.via(Http().cachedHostConnectionPoolHttps(addressTls.getHostName, addressTls.getPort))
.map(_._1.get))
Http().shutdownAllConnectionPools()
}

def settingsWithProxyTransport: ClientConnectionSettings =
ClientConnectionSettings(system)
.withTransport(ExampleHttpContexts.proxyTransport(addressTls))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import akka.event.LoggingAdapter
import akka.http.ParsingErrorHandler
import akka.http.impl.engine.ws.ByteStringSinkProbe
import akka.http.impl.util._
import akka.http.javadsl.model
import akka.http.scaladsl.Http.ServerLayer
import akka.http.scaladsl.model.HttpEntity._
import akka.http.scaladsl.model.HttpMethods._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.http.impl.util

import akka.stream.ActorMaterializer
import akka.stream.{ ActorMaterializer, SystemMaterializer }
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter

Expand All @@ -16,15 +16,15 @@ abstract class AkkaSpecWithMaterializer(s: String)

def this() = this("")

implicit val materializer = ActorMaterializer()
implicit val materializer = SystemMaterializer(system).materializer

override protected def beforeTermination(): Unit =
// don't log anything during shutdown, especially not AbruptTerminationExceptions
EventFilter.custom { case x => true }.intercept {
// shutdown materializer first, otherwise it will only be shutdown during
// main system guardian being shutdown which will be after the logging has
// reverted to stdout logging that cannot be intercepted
materializer.shutdown()
materializer.asInstanceOf[ActorMaterializer].shutdown()
// materializer shutdown is async but cannot be watched
Thread.sleep(10)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@
package akka.http.impl.util

import java.io.InputStream
import java.security.{ SecureRandom, KeyStore }
import java.security.cert.{ CertificateFactory, Certificate }
import javax.net.ssl.{ SSLParameters, SSLContext, TrustManagerFactory, KeyManagerFactory }
import java.net.InetSocketAddress
import java.security.{ KeyStore, SecureRandom }
import java.security.cert.{ Certificate, CertificateFactory }

import akka.http.scaladsl.HttpsConnectionContext
import akka.actor.ActorSystem
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLParameters, TrustManagerFactory }
import akka.http.scaladsl.{ ClientTransport, Http, HttpsConnectionContext }
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.scaladsl.Flow
import akka.util.ByteString

import scala.concurrent.Future

/**
* These are HTTPS example configurations that take key material from the resources/key folder.
Expand Down Expand Up @@ -62,4 +69,14 @@ object ExampleHttpContexts {

def loadX509Certificate(resourceName: String): Certificate =
CertificateFactory.getInstance("X.509").generateCertificate(resourceStream(resourceName))

/**
* A client transport that will rewrite the target address to a fixed address. This can be used
* to pretend to connect to akka.example.org which is required to connect to the example server certificate.
*/
def proxyTransport(realAddress: InetSocketAddress): ClientTransport =
new ClientTransport {
override def connectTo(host: String, port: Int, settings: ClientConnectionSettings)(implicit system: ActorSystem): Flow[ByteString, ByteString, Future[Http.OutgoingConnection]] =
ClientTransport.TCP.connectTo(realAddress.getHostString, realAddress.getPort, settings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.http.scaladsl.settings.RoutingSettings
import akka.http.scaladsl.settings.ServerSettings
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.util.FastFuture._
import akka.stream.{ Materializer, SystemMaterializer }
import akka.stream.SystemMaterializer
import akka.testkit.TestKit
import akka.util.ConstantFun
import com.typesafe.config.{ Config, ConfigFactory }
Expand Down

0 comments on commit 15f108c

Please sign in to comment.