Skip to content

Commit

Permalink
finagle-core: Support passing http proxy credentials to ClientBuilder
Browse files Browse the repository at this point in the history
Problem
- ClientBuilder allows you to set a httpProxy but you cannot specify credentials for auth, therefore it cannot work with authenticated http proxies (like the one we use with mesos).

Solution
- Add method to pass in credentials to ClientBuilder.
- If credentials are supplied, add a Proxy-Authorization header to the CONNECT request.

RB_ID=710481
  • Loading branch information
Ankur Dahiya authored and jenkins committed Jul 13, 2015
1 parent 6f7c6a8 commit 526aef0
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 29 deletions.
1 change: 1 addition & 0 deletions finagle-core/src/main/scala/BUILD
Expand Up @@ -10,6 +10,7 @@ scala_library(name='scala',
'3rdparty/jvm/io/netty',
'finagle/finagle-core/src/main/java',
'util/util-app',
'util/util-codec',
'util/util-collection',
'util/util-core',
'util/util-hashing',
Expand Down
Expand Up @@ -2,6 +2,7 @@ package com.twitter.finagle.builder

import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.finagle.client.Transporter.Credentials
import com.twitter.finagle.client.{DefaultPool, StackClient, StdStackClient}
import com.twitter.finagle.client.{StackBasedClient, Transporter}
import com.twitter.finagle.factory.{BindingFactory, TimeoutFactory}
Expand Down Expand Up @@ -682,7 +683,13 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
* If this is defined concurrently with socksProxy, the order in which they are applied is undefined.
*/
def httpProxy(httpProxy: SocketAddress): This =
configured(Transporter.HttpProxy(Some(httpProxy)))
configured(params[Transporter.HttpProxy].copy(sa = Some(httpProxy)))

/**
* For the http proxy use these [[Credentials]] for authentication.
*/
def httpProxyUsernameAndPassword(credentials: Credentials): This =
configured(params[Transporter.HttpProxy].copy(credentials = Some(credentials)))

@deprecated("Use socksProxy(socksProxy: Option[SocketAddress])", "2014-12-02")
def socksProxy(socksProxy: SocketAddress): This =
Expand Down
Expand Up @@ -83,11 +83,18 @@ object Transporter {
* $param a HttpProxy as the endpoint for a `Transporter`.
* @see http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#9.9
*/
case class HttpProxy(sa: Option[SocketAddress]) {
case class HttpProxy(sa: Option[SocketAddress], credentials: Option[Credentials]) {
def mk(): (HttpProxy, Stack.Param[HttpProxy]) =
(this, HttpProxy.param)

def this(sa: Option[SocketAddress]) = this(sa, None)
}
object HttpProxy {
implicit val param = Stack.Param(HttpProxy(None))
implicit val param = Stack.Param(HttpProxy(None, None))
}

/**
* This class wraps the username, password that we use for http proxy auth
*/
case class Credentials(username: String, password: String)
}
@@ -1,33 +1,51 @@
package com.twitter.finagle.httpproxy

import com.twitter.finagle.client.Transporter.Credentials
import com.twitter.finagle.{ChannelClosedException, ConnectionFailedException, InconsistentStateException}
import com.twitter.io.Charsets
import com.twitter.util.Base64StringEncoder

import java.net.{InetSocketAddress, SocketAddress}
import java.util.concurrent.atomic.AtomicReference

import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._

import com.twitter.finagle.{ChannelClosedException, ConnectionFailedException, InconsistentStateException}

/**
* Handle SSL connections through a proxy that accepts HTTP CONNECT.
*
* See http://www.w3.org/Protocols/rfc2616/rfc2616-sec9.html#9.9
*
*/
object HttpConnectHandler {
def addHandler(proxyAddr: SocketAddress, addr: InetSocketAddress, pipeline: ChannelPipeline): HttpConnectHandler = {
def addHandler(
proxyAddr: SocketAddress,
addr: InetSocketAddress,
pipeline: ChannelPipeline,
proxyCredentials: Option[Credentials]
): HttpConnectHandler = {
val clientCodec = new HttpClientCodec()
val handler = new HttpConnectHandler(proxyAddr, addr, clientCodec)
val handler = new HttpConnectHandler(proxyAddr, addr, clientCodec, proxyCredentials)
pipeline.addFirst("httpProxyCodec", handler)
pipeline.addFirst("clientCodec", clientCodec)
handler
}

def addHandler(
proxyAddr: SocketAddress,
addr: InetSocketAddress,
pipeline: ChannelPipeline
): HttpConnectHandler = {
addHandler(proxyAddr, addr, pipeline, None)
}
}

class HttpConnectHandler(proxyAddr: SocketAddress, addr: InetSocketAddress, clientCodec: HttpClientCodec)
extends SimpleChannelHandler
{
class HttpConnectHandler(
proxyAddr: SocketAddress,
addr: InetSocketAddress,
clientCodec: HttpClientCodec,
proxyCredentials: Option[Credentials])
extends SimpleChannelHandler {
import HttpConnectHandler._

private[this] val connectFuture = new AtomicReference[ChannelFuture](null)
Expand All @@ -41,6 +59,9 @@ class HttpConnectHandler(proxyAddr: SocketAddress, addr: InetSocketAddress, clie
val hostNameWithPort = addr.getAddress.getHostName + ":" + addr.getPort
val req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.CONNECT, hostNameWithPort)
req.headers().set("Host", hostNameWithPort)
proxyCredentials.foreach { creds =>
req.headers().set(HttpHeaders.Names.PROXY_AUTHORIZATION, proxyAuthorizationHeader(creds))
}
Channels.write(ctx, Channels.future(ctx.getChannel), req, null)
}

Expand Down Expand Up @@ -120,4 +141,9 @@ class HttpConnectHandler(proxyAddr: SocketAddress, addr: InetSocketAddress, clie
fail(e.getChannel, new ConnectionFailedException(cause, addr))
}
}

private[this] def proxyAuthorizationHeader(creds: Credentials) = {
val bytes = "%s:%s".format(creds.username, creds.password).getBytes(Charsets.Utf8)
"Basic " + Base64StringEncoder.encode(bytes)
}
}
Expand Up @@ -129,8 +129,8 @@ object Netty3Transporter {
val Transporter.ConnectTimeout(connectTimeout) = params[Transporter.ConnectTimeout]
val LatencyCompensation.Compensation(compensation) = params[LatencyCompensation.Compensation]
val Transporter.TLSHostname(tlsHostname) = params[Transporter.TLSHostname]
val Transporter.HttpProxy(httpProxy) = params[Transporter.HttpProxy]
val Transporter.SocksProxy(socksProxy, credentials) = params[Transporter.SocksProxy]
val Transporter.HttpProxy(httpProxy, httpProxyCredentials) = params[Transporter.HttpProxy]
val Transporter.SocksProxy(socksProxy, socksCredentials) = params[Transporter.SocksProxy]
val Transport.BufferSizes(sendBufSize, recvBufSize) = params[Transport.BufferSizes]
val Transport.TLSClientEngine(tls) = params[Transport.TLSClientEngine]
val Transport.Liveness(readerTimeout, writerTimeout, keepAlive) = params[Transport.Liveness]
Expand All @@ -146,8 +146,9 @@ object Netty3Transporter {
newTransport = (ch: Channel) => newTransport(ch).cast[In, Out],
tlsConfig = tls map { case engine => Netty3TransporterTLSConfig(engine, tlsHostname) },
httpProxy = httpProxy,
httpProxyCredentials = httpProxyCredentials,
socksProxy = socksProxy,
socksUsernameAndPassword = credentials,
socksUsernameAndPassword = socksCredentials,
channelReaderTimeout = readerTimeout,
channelWriterTimeout = writerTimeout,
channelSnooper = snooper,
Expand Down Expand Up @@ -266,7 +267,8 @@ case class Netty3Transporter[In, Out](
channelReaderTimeout: Duration = Duration.Top,
channelWriterTimeout: Duration = Duration.Top,
channelSnooper: Option[ChannelSnooper] = None,
channelOptions: Map[String, Object] = Netty3Transporter.defaultChannelOptions
channelOptions: Map[String, Object] = Netty3Transporter.defaultChannelOptions,
httpProxyCredentials: Option[Transporter.Credentials] = None
) extends ((SocketAddress, StatsReceiver) => Future[Transport[In, Out]]) {
private[this] val statsHandlers = new IdentityHashMap[StatsReceiver, ChannelHandler]

Expand Down Expand Up @@ -354,7 +356,7 @@ case class Netty3Transporter[In, Out](

(httpProxy, addr) match {
case (Some(proxyAddr), inetAddr: InetSocketAddress) if !inetAddr.isUnresolved =>
HttpConnectHandler.addHandler(proxyAddr, inetAddr, pipeline)
HttpConnectHandler.addHandler(proxyAddr, inetAddr, pipeline, httpProxyCredentials)
case _ =>
}

Expand Down
Expand Up @@ -59,7 +59,10 @@ void testParams() {
new Transporter.SocksProxy(
SocksProxyFlags.socksProxy(),
SocksProxyFlags.socksUsernameAndPassword()).mk())
.configured(new Transporter.HttpProxy(Option.<SocketAddress>empty()).mk())
.configured(
new Transporter.HttpProxy(
Option.<SocketAddress>empty(),
Option.<Transporter.Credentials>empty()).mk())
.configured(
new BindingFactory.BaseDtab(new Function0<Dtab>() {
public Dtab apply() { return Dtab.empty(); }
Expand Down
@@ -1,15 +1,16 @@
package com.twitter.finagle.httpproxy

import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import com.twitter.finagle.client.Transporter.Credentials
import java.net.{InetAddress, SocketAddress, InetSocketAddress}
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.junit.runner.RunWith
import org.scalatest.mock.MockitoSugar
import org.mockito.Mockito.{times, verify, when, atLeastOnce}
import org.mockito.ArgumentCaptor
import org.mockito.Matchers._
import org.jboss.netty.channel._
import java.net.{InetAddress, SocketAddress, InetSocketAddress}
import org.jboss.netty.handler.codec.http._
import org.mockito.Mockito.{times, verify, when, atLeastOnce}
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar

@RunWith(classOf[JUnitRunner])
class HttpConnectHandlerTest extends FunSuite with MockitoSugar {
Expand All @@ -29,7 +30,7 @@ class HttpConnectHandlerTest extends FunSuite with MockitoSugar {
val connectFuture = Channels.future(channel, true)
val connectRequested = new DownstreamChannelStateEvent(
channel, connectFuture, ChannelState.CONNECTED, remoteAddress)
val ch = HttpConnectHandler.addHandler(proxyAddress, remoteAddress, pipeline)
val ch = HttpConnectHandler.addHandler(proxyAddress, remoteAddress, pipeline, None)
ch.handleDownstream(ctx, connectRequested)

def checkDidClose() {
Expand Down Expand Up @@ -134,6 +135,33 @@ class HttpConnectHandlerTest extends FunSuite with MockitoSugar {
}
}

test("HttpConnectHandler should add ProxyAuthorization header when proxy credentials are supplied") {
val h = new HttpConnectHandlerHelper
import h._

val handler = HttpConnectHandler.addHandler(
proxyAddress,
remoteAddress,
pipeline,
Some(Credentials("user", "pass")))

handler.handleDownstream(ctx, connectRequested)
handler.handleUpstream(ctx, new UpstreamChannelStateEvent(
channel, ChannelState.CONNECTED, remoteAddress))
assert(!connectFuture.isDone)
verify(ctx, times(0)).sendUpstream(any[ChannelEvent])

// send connect request
val ec = ArgumentCaptor.forClass(classOf[DownstreamMessageEvent])
verify(ctx, atLeastOnce).sendDownstream(ec.capture)
val e = ec.getValue
val req = e.getMessage.asInstanceOf[DefaultHttpRequest]
assert(req.getMethod === HttpMethod.CONNECT)
assert(req.getUri === "localhost:" + port)
assert(req.headers().get("Host") === "localhost:" + port)
assert(req.headers().get("Proxy-Authorization") === "Basic dXNlcjpwYXNz")
}

test("HttpConnectHandler should propagate connection failure") {
val h = new HttpConnectHandlerHelper
import h._
Expand Down
@@ -1,19 +1,20 @@
package com.twitter.finagle.netty3

import com.twitter.conversions.time._
import com.twitter.finagle.Stack
import com.twitter.finagle.client.Transporter.Credentials
import com.twitter.finagle.client.{LatencyCompensation, Transporter}
import com.twitter.finagle.param.Label
import com.twitter.finagle.netty3.socks.SocksConnectHandler
import com.twitter.finagle.netty3.transport.ChannelTransport
import com.twitter.finagle.Stack
import com.twitter.finagle.param.Label
import com.twitter.finagle.ssl.Engine
import com.twitter.finagle.stats.{NullStatsReceiver, InMemoryStatsReceiver}
import com.twitter.finagle.transport.Transport
import com.twitter.util.Duration
import java.net.InetSocketAddress
import javax.net.ssl.{SSLEngineResult, SSLEngine, SSLSession}
import org.jboss.netty.channel._
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel._
import org.jboss.netty.handler.timeout.IdleStateHandler
import org.junit.runner.RunWith
import org.mockito.Matchers._
Expand All @@ -34,7 +35,7 @@ class Netty3TransporterTest extends FunSpec with MockitoSugar {
Transporter.ConnectTimeout(1.seconds) +
LatencyCompensation.Compensation(12.millis) +
Transporter.TLSHostname(Some("tls.host")) +
Transporter.HttpProxy(Some(new InetSocketAddress(0))) +
Transporter.HttpProxy(Some(new InetSocketAddress(0)), Some(Credentials("user", "pw"))) +
Transporter.SocksProxy(Some(new InetSocketAddress(0)), Some("user", "pw")) +
Transport.BufferSizes(Some(100), Some(200)) +
Transport.TLSClientEngine.param.default +
Expand All @@ -50,6 +51,7 @@ class Netty3TransporterTest extends FunSpec with MockitoSugar {
inputParams[Transport.TLSClientEngine].e.map(
Netty3TransporterTLSConfig(_, inputParams[Transporter.TLSHostname].hostname)))
assert(transporter.httpProxy == inputParams[Transporter.HttpProxy].sa)
assert(transporter.httpProxyCredentials == inputParams[Transporter.HttpProxy].credentials)
assert(transporter.socksProxy == inputParams[Transporter.SocksProxy].sa)
assert(transporter.socksUsernameAndPassword == inputParams[Transporter.SocksProxy].credentials)
assert(transporter.channelReaderTimeout == inputParams[Transport.Liveness].readTimeout)
Expand Down

0 comments on commit 526aef0

Please sign in to comment.