Skip to content

Commit

Permalink
finagle-netty4: Introduce SslConnectHandler
Browse files Browse the repository at this point in the history
Problem

We need to have a proper tooling around delaying the transport connection until
the SSL/TLS handshake is done and the session is verified.

Solution

`SslConectHandler` reuses the ideas introduced in `HttpProxyConnectHandler` and delays
the propagation of the connect promise back to a transport until SSL handshake is
succeed.

RB_ID=824682
  • Loading branch information
vkostyukov authored and jenkins committed Apr 27, 2016
1 parent 81f32b6 commit d03b308
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 0 deletions.
@@ -0,0 +1,94 @@
package com.twitter.finagle.netty4.ssl

import com.twitter.finagle.CancelledConnectionException
import com.twitter.finagle.netty4.channel.BufferingChannelOutboundHandler
import io.netty.channel._
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.{Future => NettyFuture, GenericFutureListener}
import java.net.SocketAddress
import javax.net.ssl.SSLSession

/**
* Delays the connect promise satisfaction (i.e., `ChannelTransport` creation) until the TLS/SSL
* handshake is done and [[SSLSession]] validation is succeed (optional).
*
* If `sessionValidation` returns `Some` exception, an [[SSLSession]] considered invalid and the
* connect promise failed with this exception and the channel is closed.
*/
private[netty4] class SslConnectHandler(
ssl: SslHandler,
sessionValidation: SSLSession => Option[Throwable]) extends ChannelOutboundHandlerAdapter
with BufferingChannelOutboundHandler { self =>

private[this] def fail(p: ChannelPromise, ctx: ChannelHandlerContext, t: Throwable): Unit = {
p.tryFailure(t)
failPendingWrites(ctx, t)
}

override def connect(
ctx: ChannelHandlerContext,
remote: SocketAddress,
local: SocketAddress,
promise: ChannelPromise
): Unit = {
val sslConnectPromise = ctx.newPromise()

// Cancel new promise if the original one is canceled.
promise.addListener(new GenericFutureListener[NettyFuture[Any]] {
override def operationComplete(f: NettyFuture[Any]): Unit =
if (f.isCancelled) {
if (!sslConnectPromise.cancel(true) && sslConnectPromise.isSuccess) {
// New connect promise wasn't cancelled because it was already satisfied (connected) so
// we need to close the channel to prevent resource leaks.
// See https://github.com/twitter/finagle/issues/345
failPendingWrites(ctx, new CancelledConnectionException())
ctx.close()
}
}
})

// Fail the original promise if a new one is failed.
sslConnectPromise.addListener(new GenericFutureListener[NettyFuture[Any]] {
override def operationComplete(f: NettyFuture[Any]): Unit =
// We filter cancellation here since we assume it was proxied from the original promise and
// is already being handled in a handler above.
if (!f.isSuccess && !f.isCancelled) {
// The connect request was failed so the channel was never active. Since no writes are
// expected from the previous handler, no need to fail the pending writes.
promise.setFailure(f.cause)
}
})

// React on satisfied handshake promise.
ssl.handshakeFuture().addListener(new GenericFutureListener[NettyFuture[Channel]] {
override def operationComplete(f: NettyFuture[Channel]): Unit =
if (f.isSuccess) {
// Perform session validation.
sessionValidation(ssl.engine().getSession).foreach { failure =>
fail(promise, ctx, failure)
ctx.close()
}

// If the original promise is not satisfied yet, then the hostname validation is ether
// succeed or wasn't performed at all. One way or another, `SslConnectHandler` is done
// and we don't need that anymore. If the promise already satisfied, the connection was
// failed and the failure is already propagated to the dispatcher so we don't need to
// worry about cleaning up the pipeline.
if (promise.trySuccess()) {
ctx.pipeline().remove(self) // drains pending writes when removed
}
} else {
// Handshake promise is failed so given `SslHandler` is going to close the channel we only
// need to fail pending writes and the connect promise.
fail(promise, ctx, f.cause())
}
})

// Propagate the connect event down to the pipeline, but use the new (detached) promise.
ctx.connect(remote, local, sslConnectPromise)
}

// We don't override either `exceptionCaught` or `channelInactive` here since `SslHandler`
// guarantees to fail the handshake promise (which we're already handling here) if any exception
// (caused by either inbound or outbound event or closed channel) occurs during the handshake.
}
@@ -0,0 +1,94 @@
package com.twitter.finagle.netty4.ssl

import com.twitter.finagle.CancelledConnectionException
import io.netty.channel.Channel
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.DefaultPromise
import org.junit.runner.RunWith
import org.scalatest.{OneInstancePerTest, FunSuite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito._
import java.net.InetSocketAddress
import javax.net.ssl.{SSLSession, SSLEngine}

@RunWith(classOf[JUnitRunner])
class SslConnectHandlerTest extends FunSuite with MockitoSugar with OneInstancePerTest {

val fakeAddress = InetSocketAddress.createUnresolved("ssl", 8081)

val (channel, sslHandler, handshakePromise) = {
val ch = new EmbeddedChannel()
val hd = mock[SslHandler]
val e = mock[SSLEngine]
val hp = new DefaultPromise[Channel](ch.eventLoop())
when(hd.handshakeFuture()).thenReturn(hp)
when(hd.engine).thenReturn(e)
when(e.getSession).thenReturn(mock[SSLSession])

(ch, hd, hp)
}

test("success") {
channel.pipeline().addFirst(new SslConnectHandler(sslHandler, _ => None))
val connectPromise = channel.connect(fakeAddress)
assert(!connectPromise.isDone)

channel.writeOutbound("pending write")
assert(channel.outboundMessages().size() == 0)

handshakePromise.setSuccess(channel)
assert(connectPromise.isSuccess)
assert(channel.readOutbound[String]() == "pending write")

channel.finishAndReleaseAll()
}

test("failed session validation") {
val e = new Exception("whoa")
channel.pipeline().addFirst(new SslConnectHandler(sslHandler, _ => Some(e)))
val connectPromise = channel.connect(fakeAddress)
assert(!connectPromise.isDone)

channel.writeOutbound("pending write")
assert(channel.outboundMessages().size() == 0)

handshakePromise.setSuccess(channel)
assert(connectPromise.cause() == e)
assert(intercept[Exception](channel.checkException()) == e)

channel.finishAndReleaseAll()
}

test("cancelled after connected") {
channel.pipeline().addFirst(new SslConnectHandler(sslHandler, _ => None))
val connectPromise = channel.connect(fakeAddress)
assert(!connectPromise.isDone)

channel.writeOutbound("pending write")
assert(channel.outboundMessages().size() == 0)

assert(connectPromise.cancel(true))
assert(!channel.isActive)
assert(intercept[Exception](channel.checkException()).isInstanceOf[CancelledConnectionException])

channel.finishAndReleaseAll()
}

test("failed handshake") {
channel.pipeline().addFirst(new SslConnectHandler(sslHandler, _ => None))
val connectPromise = channel.connect(fakeAddress)
assert(!connectPromise.isDone)

channel.writeOutbound("pending write")
assert(channel.outboundMessages().size() == 0)

val e = new Exception("not so good")
handshakePromise.setFailure(e)
assert(!connectPromise.isSuccess)
assert(intercept[Exception](channel.checkException()) == e)

channel.finishAndReleaseAll()
}
}

0 comments on commit d03b308

Please sign in to comment.