Skip to content

Commit

Permalink
finagle-mysql: Move Connection Phase to Session Acquisition
Browse files Browse the repository at this point in the history
Problem / Solution

We have added the ability for SSL/TLS to be used with MySQL. To do so
for TLS, the MySQL Connection Phase occurs as part of session acquisition
in the `MysqlTransporter`. This change moves the Connection Phase for plain
text connections there as well.

JIRA Issues: CSL-8683

Differential Revision: https://phabricator.twitter.biz/D355549
  • Loading branch information
ryanoneill authored and jenkins committed Aug 15, 2019
1 parent ffd3f46 commit cd4877c
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 308 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ Breaking API Changes
`private[this] count(Long, Response)` and is no longer part of the public API.
``PHAB_ID=D350733``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-mysql: Handshaking for the MySQL 'Connection Phase' now occurs as part of session
acquisition. As part of this change, the
`com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition` toggle
has been removed and it no longer applies. ``PHAB_ID=D355549``

19.8.0
------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.twitter.finagle.http.codec.HttpClientDispatcher
import com.twitter.finagle.http.exp.IdentityStreamTransport
import com.twitter.finagle.liveness.FailureDetector
import com.twitter.finagle.mux.pushsession.{MessageWriter, MuxMessageDecoder}
import com.twitter.finagle.mysql.param.Credentials
import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver}
import com.twitter.finagle.transport.{QueueTransport, Transport}
import com.twitter.finagle.util.DefaultTimer
Expand Down Expand Up @@ -130,8 +129,8 @@ class ClientSessionTest extends FunSuite with MockitoSugar {

testSessionStatus(
"mysql-dispatcher", { tr: Transport[mysql.transport.Packet, mysql.transport.Packet] =>
val params = Stack.Params.empty + Credentials(Some("username"), Some("password"))
val dispatcher = new mysql.ClientDispatcher(tr, params, performHandshake = true)
val params = Stack.Params.empty
val dispatcher = new mysql.ClientDispatcher(tr, params)
() =>
dispatcher.status
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
{
"toggles": [
{
"id": "com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition",
"description": "Perform MySQL session connection phase (handshaking) as part of service acquisition",
"fraction": 0.0
}
]
"toggles": []
}
27 changes: 2 additions & 25 deletions finagle-mysql/src/main/scala/com/twitter/finagle/Mysql.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import com.twitter.finagle.param.{
Tracer => _,
_
}
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.service.{ResponseClassifier, RetryBudget}
import com.twitter.finagle.stats.{ExceptionStatsHandler, NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.toggle.Toggle
import com.twitter.finagle.tracing._
import com.twitter.finagle.transport.{Transport, TransportContext}
import com.twitter.util.{Duration, Future, Monitor}
Expand Down Expand Up @@ -99,14 +97,6 @@ object MySqlClientTracingFilter {
*/
object Mysql extends com.twitter.finagle.Client[Request, Result] with MysqlRichClient {

private[this] val includeHandshakeInServiceAcquisitionToggle: Toggle[Int] =
Toggles("com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition")

// This param having a value other than `None` indicates that
// the client is setup to try to use SSL/TLS.
private[this] def wantsToUseSsl(params: Stack.Params): Boolean =
params[Transport.ClientSsl].sslClientConfiguration.isDefined

protected val supportUnsigned: Boolean = UnsignedColumns.param.default.supported

object Client {
Expand Down Expand Up @@ -163,15 +153,6 @@ object Mysql extends com.twitter.finagle.Client[Request, Result] with MysqlRichC
with WithDefaultLoadBalancer[Client]
with MysqlRichClient {

// If the param is set to use SSL/TLS, we force handshaking to
// be done as part of service acquisition. We want all handshaking
// to be done eventually as part of service acquisition, and SSL/TLS
// is functionality that didn't exist previously, so make SSL/TLS take
// the preferred path.
private[this] val includeHandshakeInServiceAcquisition: Boolean =
wantsToUseSsl(params) ||
includeHandshakeInServiceAcquisitionToggle(ServerInfo().id.hashCode)

protected val supportUnsigned: Boolean = params[UnsignedColumns].supported

protected def copy1(
Expand All @@ -184,16 +165,12 @@ object Mysql extends com.twitter.finagle.Client[Request, Result] with MysqlRichC
protected type Context = TransportContext

protected def newTransporter(addr: SocketAddress): Transporter[In, Out, Context] =
new MysqlTransporter(addr, params, includeHandshakeInServiceAcquisition)
new MysqlTransporter(addr, params)

protected def newDispatcher(
transport: Transport[In, Out] { type Context <: Client.this.Context }
): Service[Request, Result] =
// If we're performing handshaking during the service acquisition phase
// (via the transporter, see the `newTransporter` method above), then we
// don't want to perform it again here. If we didn't there, then we do here.
// Eventually `newTransporter` should be the only place where it's done.
mysql.ClientDispatcher(transport, params, !includeHandshakeInServiceAcquisition)
mysql.ClientDispatcher(transport, params)

/**
* The maximum number of concurrent prepare statements.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,11 @@ private[finagle] object ClientDispatcher {
* Creates a mysql client dispatcher with write-through caches for optimization.
* @param trans A transport that reads a writes logical mysql packets.
* @param params A collection of `Stack.Params` useful for configuring a mysql client.
* @param performHandshake Indicates whether MySQL session establishment should be
* performed in the dispatcher. This is preferably performed in the transporter during
* service acquisition.
*/
def apply(
trans: Transport[Packet, Packet],
params: Stack.Params,
performHandshake: Boolean
): Service[Request, Result] = {
def apply(trans: Transport[Packet, Packet], params: Stack.Params): Service[Request, Result] = {
val maxConcurrentPrepareStatements = params[MaxConcurrentPrepareStatements].num
new PrepareCache(
new ClientDispatcher(trans, params, performHandshake),
new ClientDispatcher(trans, params),
Caffeine.newBuilder().maximumSize(maxConcurrentPrepareStatements)
)
}
Expand All @@ -93,35 +86,22 @@ private[finagle] object ClientDispatcher {
*/
private[finagle] final class ClientDispatcher(
trans: Transport[Packet, Packet],
params: Stack.Params,
performHandshake: Boolean)
params: Stack.Params)
extends GenSerialClientDispatcher[Request, Result, Packet, Packet](
trans,
params[Stats].statsReceiver) {
import ClientDispatcher._

// We only support plain handshaking when it's done inside
// the dispatcher. Eventually it should be entirely removed.
private[this] val handshake = new PlainHandshake(params, trans)

// Perform the handshake (possibly) once
private[this] val connectionPhase: Future[Unit] =
if (performHandshake) handshake.connectionPhase().unit
else Future.Done

private[this] val supportUnsigned: Boolean = params[UnsignedColumns].supported

override def apply(req: Request): Future[Result] =
connectionPhase
.flatMap { _ =>
super.apply(req)
}.onFailure {
// a LostSyncException represents a fatal state between
// the client / server. The error is unrecoverable
// so we close the service.
case e @ LostSyncException(_) => close()
case _ =>
}
super.apply(req).onFailure {
// a LostSyncException represents a fatal state between
// the client / server. The error is unrecoverable
// so we close the service.
case e @ LostSyncException(_) => close()
case _ =>
}

override def close(deadline: Time): Future[Unit] = trans.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ import java.net.SocketAddress

/**
* A MySQL specific `framedBuf` `Transporter` which is responsible
* for connection establishment and framing. When the `performHandshake`
* parameter is provided a value of `true`, it is additionally responsible
* for session establishment for a plain MySQL session.
* for connection establishment, framing, and session establishment.
*/
private[finagle] final class MysqlTransporter(
val remoteAddress: SocketAddress,
params: Stack.Params,
performHandshake: Boolean)
params: Stack.Params)
extends Transporter[Packet, Packet, TransportContext] {

private[this] val framerFactory = () => {
Expand All @@ -38,21 +35,12 @@ private[finagle] final class MysqlTransporter(
MysqlTransporter.paramsWithoutSsl(params)
)

private[this] def createTransport(): Future[MysqlTransport] =
netty4Transporter().map { transport =>
new MysqlTransport(transport.map(_.toBuf, Packet.fromBuf))
}

private[this] def createTransportWithSession(): Future[MysqlTransport] = {
createTransport().flatMap { transport =>
val handshake = Handshake(params, transport)
handshake.connectionPhase().map(_ => transport)
}
}

def apply(): Future[Transport[Packet, Packet] { type Context <: TransportContext }] =
if (performHandshake) createTransportWithSession()
else createTransport()
netty4Transporter().flatMap { transport =>
val mysqlTransport = new MysqlTransport(transport.map(_.toBuf, Packet.fromBuf))
val handshake = Handshake(params, mysqlTransport)
handshake.connectionPhase().map(_ => mysqlTransport)
}

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,41 +1,14 @@
package com.twitter.finagle.mysql.integration

import com.twitter.finagle.toggle.flag
import com.twitter.util.{Await, Duration}
import org.scalatest.FunSuite

class PingTest extends FunSuite with IntegrationClient {

test("ping default - (toggle off)") {
test("ping default") {
val theClient = client.orNull
val result = Await.result(theClient.ping(), Duration.fromSeconds(1))
// If we get here, result is Unit, and all is good
}

}

class PingToggleOffTest extends FunSuite with IntegrationClient {

// For this test, handshaking is done in the dispatcher.
test("ping - (turn toggle off)") {
flag.overrides.let("com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition", 0.0) {
val theClient = client.orNull
val result = Await.result(theClient.ping(), Duration.fromSeconds(1))
// If we get here, result is Unit, and all is good
}
}

}

class PingToggleOnTest extends FunSuite with IntegrationClient {

// For this test, handshaking is done in the transporter.
test("ping - (turn toggle on)") {
flag.overrides.let("com.twitter.finagle.mysql.IncludeHandshakeInServiceAcquisition", 1.0) {
val theClient = client.orNull
val result = Await.result(theClient.ping(), Duration.fromSeconds(1))
// If we get here, result is Unit, and all is good
}
}

}
Loading

0 comments on commit cd4877c

Please sign in to comment.