Permalink
Browse files

! can: get rid of Http.Register.keepOpenOnPeerClosed, fixes #401

As discussed in the ticket:

client side
-----------

 - keepOpenOnPeerClosed = settings.sslEncryption
 - because the SslTlsSupport pipeline stage might be active but encryption
   still be disabled, we need to always actively close the connection whenever
   we see a PeerClosed event

server side
-----------

 - keepOpenOnPeerClosed = true
 - if no response is currently pending we can close the connection immediately
   when we see a PeerClosed event
 - otherwise, we let the last pending response close the connection
  • Loading branch information...
jrudolph committed Oct 21, 2013
1 parent ce170d4 commit f6b029277a8156497bdc6ebd0d89505f322bd563
@@ -94,6 +94,7 @@ class HttpClientConnectionPipelineSpec extends Specification with RawSpecs2Pipel
connectionActor ! Tcp.Received(ByteString(rawResponse("123")))
connectionActor ! Tcp.PeerClosed
commands.expectMsg(Pipeline.Tell(probe.ref, response("123"), connectionActor))
+ commands.expectMsg(Tcp.Close)
commands.expectNoMsg(100.millis)
}
@@ -86,7 +86,6 @@ object Http extends ExtensionKey[HttpExt] {
}
case class Register(handler: ActorRef,
- keepOpenOnPeerClosed: Boolean = false,
fastPath: FastPath = EmptyFastPath) extends Command
case class RegisterChunkHandler(handler: ActorRef) extends Command
@@ -103,6 +103,7 @@ private object ClientFrontend {
case x: Tcp.ConnectionClosed
openRequests.foldLeft(closeCommanders)(_ + _.sender) foreach (dispatch(_, x))
+ if (x eq Tcp.PeerClosed) commandPL(Tcp.Close)
eventPL(x) // terminates the connection actor
case TickGenerator.Tick
@@ -45,7 +45,8 @@ private class HttpClientConnection(connectCommander: ActorRef,
context.setReceiveTimeout(Duration.Undefined)
log.debug("Connected to {}", connected.remoteAddress)
val tcpConnection = sender
- tcpConnection ! Tcp.Register(self)
+ // if sslEncryption is enabled we may need keepOpenOnPeerClosed
+ tcpConnection ! Tcp.Register(self, keepOpenOnPeerClosed = connect.sslEncryption)
context.watch(tcpConnection)
connectCommander ! connected
context.become(running(tcpConnection, pipelineStage, pipelineContext(connected)))
@@ -42,9 +42,12 @@ private class HttpServerConnection(tcpConnection: ActorRef,
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive: Receive = {
- case Tcp.Register(handler, keepOpenOnPeerClosed, _) register(handler, keepOpenOnPeerClosed)
-
- case Http.Register(handler, keepOpenOnPeerClosed, fastPath) register(handler, keepOpenOnPeerClosed, fastPath)
+ // legacy, to support routing.HttpService without needing to depend on spray-can from spray-routing
+ case Tcp.Register(handler, keepOpenOnPeerClosed, _)
+ if (keepOpenOnPeerClosed)
+ log.warning("Tcp.Register(keepOpenOnPeerClosed = true) not supported for HTTP connections")
+ register(handler)
+ case Http.Register(handler, fastPath) register(handler, fastPath)
case ReceiveTimeout
log.warning("Configured registration timeout of {} expired, stopping", settings.registrationTimeout)
@@ -57,9 +60,9 @@ private class HttpServerConnection(tcpConnection: ActorRef,
}
}
- def register(handler: ActorRef, keepOpenOnPeerClosed: Boolean, fastPath: Http.FastPath = Http.EmptyFastPath): Unit = {
+ def register(handler: ActorRef, fastPath: Http.FastPath = Http.EmptyFastPath): Unit = {
context.setReceiveTimeout(Duration.Undefined)
- tcpConnection ! Tcp.Register(self, keepOpenOnPeerClosed)
+ tcpConnection ! Tcp.Register(self, keepOpenOnPeerClosed = true)
context.watch(tcpConnection)
context.watch(handler)
context.become(running(tcpConnection, pipelineStage, pipelineContext(handler, fastPath)))
@@ -62,7 +62,7 @@ private trait OpenRequestComponent { component ⇒
def timeoutTimeout: Duration
class DefaultOpenRequest(val request: HttpRequest,
- private[this] val closeAfterResponseCompletion: Boolean,
+ private[this] var closeAfterResponseCompletion: Boolean,
private[this] var state: RequestState) extends OpenRequest {
private[this] val receiverRef = new ResponseReceiverRef(this)
private[this] var nextInChain: OpenRequest = EmptyOpenRequest
@@ -206,7 +206,7 @@ private trait OpenRequestComponent { component ⇒
case WaitingForFinalResponseAck(lastSender) lastSender
case _ context.handler
}
-
+ if (nextInChain.isEmpty) closeAfterResponseCompletion = true
nextInChain.handleClosed(ev) + handler
}
@@ -147,7 +147,9 @@ private object ServerFrontend {
val interestedParties = firstUnconfirmed.handleClosed(ev) + context.handler
interestedParties.foreach(sendClosed)
- eventPL(ev) // terminates the connection actor
+ if (ev ne Http.PeerClosed) eventPL(ev) // will stop this actor
+ else if (firstUnconfirmed.isEmpty) downstreamCommandPL(Tcp.Close) // idle connection, close actively
+ // else if (ev == PeerClosed) last in chain will close the connection eventually
case TickGenerator.Tick
if (requestTimeout.isFinite())

0 comments on commit f6b0292

Please sign in to comment.