Ruben’s MySQL codec. #98

Closed
wants to merge 36 commits into
from
Select commit
Jump to file
+3,381 −1
View
2 ...emcached/src/test/scala/com/twitter/finagle/memcached/integration/ExternalMemcached.scala
@@ -17,7 +17,7 @@ object ExternalMemcached { self =>
p.waitFor()
require(p.exitValue() == 0, "memcached binary must be present.")
}
-
+
private[this] def findAddress() {
var tries = 100
while (address == None && tries >= 0) {
View
67 finagle-mysql/README.md
@@ -0,0 +1,67 @@
+A MySQL client built for finagle.
+
+---
+## Overview
+
+*This is meant to give a very brief overview of the MySQL Client/Server protocol and reference relevant source code within finagle-mysql. For an exposition of the MySQL Client/Server protocol, refer to [MySQL Forge](http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol)
+and [Understanding MySQL Internal](http://my.safaribooksonline.com/book/databases/mysql/0596009577/client-server-communication/orm9780596009571-chp-4).*
+
+**Packets** - The basic unit of communication for the MySQL Client/Server protocol is an application-layer packet. A MySQL packet consists of a header (size and sequence number) followed by a body. Packets can be fragmented across frames during transmission. To simplify the decoding of results received from the server, the codec includes a packet frame decoder on the pipeline.
+
+`* protocol/Packet.scala, codec/PacketFrameDecoder.scala`
+
+**Handshake and Authentication** - When a client connects to the server, the server sends a greeting. This greeting contains information about the server version, protocol, capabilities, etc. The client replies with a login request containing, among other information, authentication credentials. To ensure connections are authenticated before they are issued by finagle, the codec implements prepareConnFactory with an AuthenticationProxy.
+
+`* protocol/Handshake.scala, Codec.scala`
+
+**Capabilities** - The client and server express their capability succinctly as bit vectors. Each set bit in the vector represents what the client/server is capable of or willing to do. Finagle-mysql provides constants of all the capability flags available (at the time of writing) and a comprehensive way to build capability bit vectors.
+
+ // This clients capabilities
+ val clientCapability = Capability(
+ LongFlag,
+ Transactions,
+ Protocol41,
+ FoundRows,
+ Interactive,
+ LongPassword,
+ ConnectWithDB,
+ SecureConnection,
+ LocalFiles
+ )
+`* protocol/Capability.scala, Codec.scala`
+
+ Note: This client only supports protocol version 4.1 (available in MySQL version 4.1 and above). This is strictly enforced during authentication with a MySQL server.
+
+**Requests** - Most requests sent to the server are Command Requests. They contain a command byte and any arguments that are specific to the command. Command Requests are sent to the server as a MySQL Packet. The first byte of the packet body must contain a valid command byte followed by the arguments. Within finagle-mysql, each Request object has a data field which defines the body of the Packet. Requests are translated into a logical MySQL Packet by the toChannelBuffer method when they reach the Encoder.
+
+`* protocol/Request.scala`
+
+**Results** - finagle-mysql translates packets received from the server into Scala objects. Each result object has a relevant decode method that translates the packet(s) into the object according to the protocol. Result packets can be distinguished by their first byte. Some result packets denote the start of a longer transmission and need to be defragged by the decoder.
+
+ResultSets are returned from the server for queries that return Rows. A Row can be String encoded or Binary encoded depending on the Request used to execute the query. For example, a QueryRequest uses the String protocol and a PreparedStatement uses the binary protocol.
+
+`* codec/Endec.scala, protocol/{Result.scala, ResultSet.scala, PreparedStatement.scala}`
+
+**Value** - finagle-mysql provides a Value ADT that can represent all values returned from MySQL. However, this does not include logic to decode every data type. For unsupported values finagle-mysql will return a RawStringValue and RawBinaryValue for the String and Binary protocols, respectively. Other note worthy Value objects include NullValue (SQL NULL) and EmptyValue.
+
+The following code depicts a robust, safe, and idiomatic way to extract and deconstruct a Value from a Row.
+
+ // The row.valueOf(...) method returns an Option[Value].
+ val userId: Option[Long] = row.valueOf("id") map {
+ case LongValue(id) => id
+ case _ => -1
+ }
+
+Pattern matching all possible values of the Value ADT gives great flexibility and control. For example, it allows the programmer to handle NullValues and EmptyValues with specific application logic.
+
+`* protocol/Value.scala`
+
+**Byte Buffers** - The BufferReader and BufferWriter interfaces provide convenient methods for reading/writing primitive data types exchanged between the client/server. This includes all primitive numeric types and strings (null-terminated and length coded). All Buffer methods are side-effecting, that is, each call to a read*/write* method will increase the current read and write position. Note, the bytes exchanged between the client/server are encoded in little-endian byte order.
+
+`* protocol/Buffer.scala`
+
+**Charset** - Currently, finagle-mysql only supports UTF-8 character encodings. This is strictly enforced when authenticating with a MySQL server. For more information about how to configure a MySQL instance to use UTF-8 refer to the [MySQL Documentation](http://dev.mysql.com/doc/refman/5.0/en/charset-applications.html).
+
+Note: MySQL also supports variable charsets at the table and field level. This charset data is part of the [Field Packet](http://dev.mysql.com/doc/internals/en/field-packet.html).
+
+`* protocol/Charset.scala`
View
43 finagle-mysql/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-mysql</artifactId>
+ <packaging>jar</packaging>
+ <version>4.0.3-SNAPSHOT</version>
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

version needs update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ <parent>
+ <groupId>com.twitter</groupId>
+ <artifactId>scala-parent</artifactId>
+ <version>0.0.2</version>
+ <relativePath>../../parents/scala-parent/pom.xml</relativePath>
+ </parent>
+ <properties>
+ <git.dir>${project.basedir}/../../.git</git.dir>
+ </properties>
+ <dependencies>
+ <!-- library dependencies -->
+ <!-- project dependencies -->
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-core</artifactId>
+ <version>4.0.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>util-logging</artifactId>
+ <version>4.0.1</version>
+ </dependency>
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

ditto for these.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
View
169 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Client.scala
@@ -0,0 +1,169 @@
+package com.twitter.finagle.mysql
+
+import com.twitter.finagle.builder.ClientBuilder
+import com.twitter.finagle.mysql.protocol._
+import com.twitter.finagle.mysql.util.Query
+import com.twitter.finagle.Service
+import com.twitter.finagle.{ServiceFactory, Codec, CodecFactory}
+import com.twitter.util.Future
+
+object Client {
+ /**
+ * Constructs a Client given a ServiceFactory.
+ */
+ def apply(factory: ServiceFactory[Request, Result]): Client = {
+ new Client(factory)
+ }
+
+ /**
+ * Constructs a ServiceFactory using a single host.
+ * @param host a String of host:port combination.
+ * @param username the username used to authenticate to the mysql instance
+ * @param password the password used to authenticate to the mysql instance
+ * @param dbname database to initially use
+ */
+ def apply(host: String, username: String, password: String, dbname: Option[String]): Client = {
+ val factory = ClientBuilder()
+ .codec(new MySQL(username, password, dbname))
+ .hosts(host)
+ .hostConnectionLimit(1)
+ .buildFactory()
+
+ apply(factory)
+ }
+
+ def apply(host: String, username: String, password: String): Client = {
+ apply(host, username, password, None)
+ }
+
+ def apply(host: String, username: String, password: String, dbname: String): Client = {
+ apply(host, username, password, Some(dbname))
+ }
+}
+
+class Client(factory: ServiceFactory[Request, Result]) {
+ private[this] lazy val fService = factory.apply()
+
+ /**
+ * Sends a query to the server without using
+ * prepared statements.
+ * @param sql An sql statement to be executed.
+ * @return an OK Result or a ResultSet for queries that return
+ * rows.
+ */
+ def query(sql: String) = send(QueryRequest(sql)) {
+ case rs: ResultSet => Future.value(rs)
+ case ok: OK => Future.value(ok)
+ }
+
+ /**
+ * Runs a query that returns a result set. For each row
+ * in the ResultSet, call f on the row and return the results.
+ * @param sql A sql statement that returns a result set.
+ * @param f A function from ResultSet to any type T.
+ * @return a Future of Seq[T]
+ */
+ def select[T](sql: String)(f: Row => T): Future[Seq[T]] = query(sql) map {
+ case rs: ResultSet => rs.rows.map(f)
+ case ok: OK => Seq()
+ }
+
+ /**
+ * Sends a query to server to be prepared for execution.
+ * @param sql A query to be prepared on the server.
+ * @return PreparedStatement
+ */
+ def prepare(sql: String, params: Any*) = {
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

explicitly annotate return type here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ val stmt = Query.expandParams(sql, params)
+ send(PrepareRequest(stmt)) {
+ case ps: PreparedStatement =>
+ ps.statement.setValue(stmt)
+ if(params.size > 0)
+ ps.parameters = Query.flatten(params).toArray
+
+ Future.value(ps)
+ }
+ }
+
+ /**
+ * Execute a prepared statement.
+ * @return an OK Result or a ResultSet for queries that return
+ * rows.
+ */
+ def execute(ps: PreparedStatement) = send(ExecuteRequest(ps)) {
+ case rs: ResultSet =>
+ ps.bindParameters()
+ Future.value(rs)
+ case ok: OK =>
+ ps.bindParameters()
+ Future.value(ok)
+ }
+
+ /**
+ * Combines the prepare and execute operations.
+ * @return a Future[(PreparedStatement, Result)] tuple.
+ */
+ def prepareAndExecute(sql: String, params: Any*) =
+ prepare(sql, params: _*) flatMap { ps =>
+ execute(ps) map {
+ res => (ps, res)
+ }
+ }
+
+
+ /**
+ * Runs a query that returns a result set. For each row
+ * in the ResultSet, call f on the row and return the results.
+ * @param ps A prepared statement.
+ * @param f A function from ResultSet to any type T.
+ * @return a Future of Seq[T]
+ */
+ def select[T](ps: PreparedStatement)(f: Row => T): Future[Seq[T]] = execute(ps) map {
+ case rs: ResultSet => rs.rows.map(f)
+ case ok: OK => Seq()
+ }
+
+ /**
+ * Combines the prepare and select operations.
+ * @return a Future[(PreparedStatement, Seq[T])] tuple.
+ */
+ def prepareAndSelect[T](sql: String, params: Any*)(f: Row => T) =
+ prepare(sql, params: _*) flatMap { ps =>
+ select(ps)(f) map {
+ seq => (ps, seq)
+ }
+ }
+
+ /**
+ * Close a prepared statement on the server.
+ * @return OK result.
+ */
+ def closeStatement(ps: PreparedStatement) = send(CloseRequest(ps)) {
+ case ok: OK => Future.value(ok)
+ }
+
+ def selectDB(schema: String) = send(UseRequest(schema)) {
+ case ok: OK => Future.value(ok)
+ }
+
+ def ping = send(PingRequest) {
+ case ok: OK => Future.value(ok)
+ }
+
+ /**
+ * Close the ServiceFactory and its underlying resources.
+ */
+ def close() = factory.close()
+
+ /**
+ * Helper function to send requests to the ServiceFactory
+ * and handle Error responses from the server.
+ */
+ private[this] def send[T](r: Request)(handler: PartialFunction[Result, Future[T]]) =
+ fService flatMap { service =>
+ service(r) flatMap (handler orElse {
+ case Error(c, s, m) => Future.exception(ServerError(c + " - " + m))
+ case result => Future.exception(ClientError("Unhandled result from server: " + result))
+ })
+ }
+}
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

for all public interfaces here: include return types. it serves both as (in code) documentation and it also fixes the types, which is important; otherwise refinement types may be exposed unintentionally, causing API conflicts later:

http://twitter.github.com/effectivescala/#Types%20and%20Generics-Return%20type%20annotations

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
88 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Codec.scala
@@ -0,0 +1,88 @@
+package com.twitter.finagle.mysql
+
+import com.twitter.finagle._
+import com.twitter.finagle.mysql.codec.{PacketFrameDecoder, Endec}
+import com.twitter.finagle.mysql.protocol._
+import com.twitter.finagle.mysql.protocol.Capability._
+import com.twitter.util.Future
+import org.jboss.netty.channel.{ChannelPipelineFactory, Channels, Channel}
+
+class MySQL(username: String, password: String, database: Option[String])
+ extends CodecFactory[Request, Result] {
+ private[this] val clientCapability = Capability(
+ LongFlag,
+ Transactions,
+ Protocol41,
+ FoundRows,
+ Interactive,
+ LongPassword,
+ ConnectWithDB,
+ SecureConnection,
+ LocalFiles
+ )
+
+ def server = throw new Exception("Not yet implemented...")
+
+ def client = Function.const {
+ new Codec[Request, Result] {
+
+ def pipelineFactory = new ChannelPipelineFactory {
+ def getPipeline = {
+ val pipeline = Channels.pipeline()
+
+ pipeline.addLast("frameDecoder", new PacketFrameDecoder)
+ pipeline.addLast("EncoderDecoder", new Endec)
+
+ pipeline
+ }
+ }
+
+ // Authenticate each connection before returning it via a ServiceFactoryProxy.
+ override def prepareConnFactory(underlying: ServiceFactory[Request, Result]) =
+ new AuthenticationProxy(underlying, username, password, database, clientCapability)
+
+ }
+ }
+}
+
+class AuthenticationProxy(
+ underlying: ServiceFactory[Request, Result],
+ username: String,
+ password: String,
+ database: Option[String],
+ clientCap: Capability)
+ extends ServiceFactoryProxy(underlying) {
+
+ def makeLoginReq(sg: ServersGreeting) =
+ LoginRequest(username, password, database, clientCap, sg.salt, sg.serverCap)
+
+ def acceptGreeting(res: Result) = res match {
+ case sg: ServersGreeting if !sg.serverCap.has(Capability.Protocol41) =>
+ Future.exception(IncompatibleServer("This client is only compatible with MySQL version 4.1 and later."))
+
+ case sg: ServersGreeting if !Charset.isUTF8(sg.charset) =>
+ Future.exception(IncompatibleServer("This client is only compatible with UTF-8 charset encoding."))
+
+ case sg: ServersGreeting =>
+ Future.value(sg)
+
+ case r =>
+ Future.exception(new ClientError("Invalid Reply type %s".format(r.getClass.getName)))
+ }
+
+ def acceptLogin(res: Result) = res match {
+ case r: OK =>
+ Future.value(res)
+
+ case Error(c, _, m) =>
+ Future.exception(ServerError("Error when authenticating the client "+ c + " - " + m))
+ }
+
+ override def apply(conn: ClientConnection) = for {
+ service <- self(conn)
+ result <- service(ClientInternalGreet)
+ sg <- acceptGreeting(result)
+ loginRes <- service(makeLoginReq(sg))
+ _ <- acceptLogin(loginRes)
+ } yield service
+}
View
141 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Example.scala
@@ -0,0 +1,141 @@
+import com.twitter.conversions.time._
+import com.twitter.util.Try
+import com.twitter.finagle.mysql._
+import com.twitter.finagle.mysql.protocol._
+import com.twitter.util.Future
+import java.net.InetSocketAddress
+import java.sql.Date
+
+case class SwimmingRecord(
+ event: String,
+ time: Float,
+ name: String,
+ nationality: String,
+ date: Date
+) extends {
+
+ def toArray = Array(event, time, name, nationality, date)
+
+ override def toString = {
+ def q(s: String) = "'" + s + "'"
+ "(" + q(event) + "," + time + "," + q(name) + "," + q(nationality) + "," + q(date.toString) + ")"
+ }
+}
+
+object SwimmingRecord {
+ val createTableSQL =
+ """CREATE TEMPORARY TABLE IF NOT EXISTS `finagle-mysql-example` (
+ `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
+ `event` varchar(30) DEFAULT NULL,
+ `time` float DEFAULT NULL,
+ `name` varchar(40) DEFAULT NULL,
+ `nationality` varchar(20) DEFAULT NULL,
+ `date` date DEFAULT NULL,
+ PRIMARY KEY (`id`)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8"""
+
+ val records = List(
+ SwimmingRecord("50 m freestyle", 20.91F, "Cesar Cielo", "Brazil", Date.valueOf("2009-12-18")),
+ SwimmingRecord("100 m freestyle", 46.91F, "Cesar Cielo", "Brazil", Date.valueOf("2009-08-02")),
+ SwimmingRecord("50 m backstroke", 24.04F, "Liam Tancock", "Great Britain", Date.valueOf("2009-08-02")),
+ SwimmingRecord("100 m backstroke", 51.94F, "Aaron Peirsol", "United States", Date.valueOf("2009-07-08")),
+ SwimmingRecord("50 m butterfly", 22.43F, "Rafael Munoz", "Spain", Date.valueOf("2009-05-05")),
+ SwimmingRecord("100 m butterfly", 49.82F, "Michael Phelps", "United States", Date.valueOf("2009-07-29"))
+ )
+}
+
+object Main {
+ def main(args: Array[String]): Unit = {
+ val options = parseArgs(Map(), args.toList)
+ val host = options.getOrElse("host", "localhost").asInstanceOf[String]
+ val port = options.getOrElse("port", 3306).asInstanceOf[Int]
+ val username = options.getOrElse("username", "<user>").asInstanceOf[String]
+ val password = options.getOrElse("password", "<password>").asInstanceOf[String]
+ val dbname = "test"
+
+ val client = Client(host+":"+port, username, password, dbname)
+ if (createTable(client) && insertValues(client)) {
+ val query = "SELECT * FROM `finagle-mysql-example` WHERE `date` BETWEEN '2009-06-01' AND '2009-8-31'"
+ val qres: Future[Seq[_]] = client.select(query) { row =>
+ // row.valueOf returns an Option[Value]
+ val event = row.valueOf("event") map {
+ case StringValue(s) => s
+ case _ => "Default"
+ }
+
+ val nationality = row.valueOf("nationality") map {
+ case StringValue(s) => s
+ case _ => "Default"
+ }
+
+ val date = row.valueOf("date") map {
+ case DateValue(d) => d
+ case _ => new Date(0)
+ }
+
+ // Of course this could cause a runtime error if there isn't a field name
+ // "time" or if the Value at the field named time is not of type
+ // FloatValue(_)
+ val FloatValue(time) = row.valueOf("time").get
+
+ val StringValue(name) = row.valueOf("name").get
+
+ (name, time)
+ }
+
+ qres onSuccess { res =>
+ res.foreach(println)
+ } onFailure {
+ e => println("Failed query with %s".format(e))
+ }
+ }
+ }
+
+ def createTable(client: Client): Boolean = {
+ val res = client.query(SwimmingRecord.createTableSQL).get(1.second) onFailure {
+ case e => println("Failed with %s when attempting to create table".format(e))
+ }
+
+ res.isReturn
+ }
+
+ def insertValues(client: Client): Boolean = {
+ val insertSQL = "INSERT INTO `finagle-mysql-example` (`event`, `time`, `name`, `nationality`, `date`) VALUES (?,?,?,?,?)"
+ var insertResults: Seq[Try[_]] = Seq()
+ // create a prepared statement on the server
+ // and insert swimming records.
+ val preparedFuture = client.prepare(insertSQL).get(1.second)
+ preparedFuture onSuccess { ps =>
+ insertResults = for (r <- SwimmingRecord.records) yield {
+ ps.parameters = r.toArray
+ client.execute(ps).get(1.second) onFailure {
+ case e => println("Failed with %s when attempting to insert %s".format(e, r))
+ }
+ }
+ } onFailure {
+ case e => println("Failed with %s when attempting to create a prepared statement".format(e))
+ }
+
+ // close prepared statement on the server
+ for(ps <- preparedFuture) {
+ client.closeStatement(ps).get(1.second) onFailure {
+ e => println("Unable to close PreparedStatement: %s".format(e))
+ }
+ }
+
+ preparedFuture.isReturn && insertResults.forall(_.isReturn)
+ }
+
+ def parseArgs(parsed: Map[String, Any], args: List[String]): Map[String, Any] = args match {
+ case Nil => parsed
+ case "-host" :: value :: tail =>
+ parseArgs(parsed + ("host" -> value), tail)
+ case "-port" :: value :: tail =>
+ parseArgs(parsed + ("port" -> value.toInt), tail)
+ case "-u" :: value :: tail =>
+ parseArgs(parsed + ("username" -> value), tail)
+ case "-p" :: value :: tail =>
+ parseArgs(parsed + ("password" -> value), tail)
+ case unknown :: tail => parsed
+ }
+}
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

love this. we'll move it under finagle-example when it's ready for prime time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
5 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Exceptions.scala
@@ -0,0 +1,5 @@
+package com.twitter.finagle.mysql
+
+case class ClientError(msg: String) extends Exception(msg)
+case class ServerError(msg: String) extends Exception(msg)
+case class IncompatibleServer(msg: String) extends Exception(msg)
View
177 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/Endec.scala
@@ -0,0 +1,177 @@
+package com.twitter.finagle.mysql.codec
+
+import com.twitter.finagle.mysql.ClientError
+import com.twitter.finagle.mysql.protocol._
+import com.twitter.logging.Logger
+import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+import org.jboss.netty.channel._
+
+sealed trait State
+case object Idle extends State
+case object WaitingForGreeting extends State
+case class Defragging(
+ expected: Int,
+ packets: Seq[Seq[Packet]]
+) extends State
+
+/**
+ * Encoder: Encodes a Request into a ChannelBuffer.
+ * Decoder: Decodes a Packet into a POJO.
+ *
+ * There are specific packets received from MySQL that can
+ * be easily decoded based on their first byte. However, more complex
+ * results need to be defragged as they arrive in the pipeline.
+ * To accomplish this, this handler needs to contain some state.
+ *
+ * Some of state is volatile because it is shared between handleDownstream
+ * and handleUpstream events which are usually executed
+ * on separate threads.
+ */
+class Endec extends SimpleChannelHandler {
+ private[this] val log = Logger("finagle-mysql")
+ private[this] var state: State = WaitingForGreeting
+ private[this] var defragDecoder: (Packet, Seq[Packet], Seq[Packet]) => Result = _
+ @volatile private[this] var expectPrepareOK: Boolean = false
+ @volatile private[this] var expectBinaryResults: Boolean = false
+
+ /**
+ * Netty downstream handler. The message should contain a packet from
+ * the MySQL server.
+ */
+ override def messageReceived(ctx: ChannelHandlerContext, evt: MessageEvent) = evt.getMessage match {
+ case packet: Packet =>
+ val decodedResult = decode(packet)
+ decodedResult map { Channels.fireMessageReceived(ctx, _) }
+
+ case unknown =>
+ Channels.disconnect(ctx.getChannel)
+ log.error("Endec: Expected Packet and received: " + unknown.getClass.getName)
+ }
+
+ /**
+ * Netty upstream handler. The message should contain a
+ * Request object.
+ */
+ override def writeRequested(ctx: ChannelHandlerContext, evt: MessageEvent) = evt.getMessage match {
+ // Synthesize a response for a CloseRequest because we don't
+ // expect one from the server.
+ case req: CommandRequest if req.cmd == Command.COM_STMT_CLOSE =>
+ val buffer = encode(req)
+ Channels.write(ctx, evt.getFuture, buffer, evt.getRemoteAddress)
+ Channels.fireMessageReceived(ctx, CloseStatementOK)
+
+ case req: Request =>
+ val buffer = encode(req)
+ Channels.write(ctx, evt.getFuture, buffer, evt.getRemoteAddress)
+
+ case unknown =>
+ log.error("Endec: Expected Request and received: " + unknown.getClass.getName)
+ }
+
+ /**
+ * Logical entry point for the Decoder.
+ * Decodes a packet based on the current state
+ * of this decoder.
+ */
+ def decode(packet: Packet): Option[Result] = state match {
+ case WaitingForGreeting =>
+ transition(Idle)
+ Some(ServersGreeting.decode(packet))
+
+ case Idle => decodePacket(packet)
+ case Defragging(_,_) => defrag(packet)
+ }
+
+ /**
+ * Logical entry point for the Encoder.
+ * Encodes a request into ChannelBuffer.
+ * Note, some requests change the state of the decoder.
+ */
+ def encode(req: Request): ChannelBuffer = req match {
+ case r: CommandRequest =>
+ expectPrepareOK = (r.cmd == Command.COM_STMT_PREPARE)
+ expectBinaryResults = (r.cmd == Command.COM_STMT_EXECUTE)
+ r.toChannelBuffer
+
+ case r: Request =>
+ r.toChannelBuffer
+ }
+
+ private[this] def transition(s: State) = state = s
+
+ /**
+ * Decode the packet into a Result object based on the
+ * first byte in the packet body (field_count). Some bytes denote the
+ * start of a longer transmission. In those cases, transition
+ * into the Defragging state.
+ */
+ private[this] def decodePacket(packet: Packet): Option[Result] = packet.body(0) match {
+ case Packet.OkByte if expectPrepareOK =>
+ def expected(n: Int) = if (n > 0) 1 else 0
+
+ val ok = PreparedOK.decode(packet)
+ val numSetsExpected = expected(ok.numOfParams) + expected(ok.numOfColumns)
+
+ defragDecoder = PreparedStatement.decode
+ transition(Defragging(numSetsExpected, Nil))
+ defrag(packet)
+
+ case Packet.OkByte => Some(OK.decode(packet))
+ case Packet.EofByte => Some(EOF.decode(packet))
+ case Packet.ErrorByte => Some(Error.decode(packet))
+
+ case byte =>
+ defragDecoder = ResultSet.decode(expectBinaryResults)
+ transition(Defragging(2, Nil))
+ defrag(packet)
+ }
+
+ /**
+ * Defrags a set of packets expected from the server. This handles defragging
+ * packets for a ResultSet and a PreparedStatement.
+ *
+ * For a PreparedStatement the packet sequences are not neccessarily
+ * defragged in order and the order needs to be determined based on the
+ * PreparedOK meta data. This happens when the PreparedStatement is decoded
+ * in order to simplify this method.
+ */
+ private[this] def defrag(packet: Packet): Option[Result] = (state, packet.body(0)) match {
+ // header packet, no sets expected to follow
+ case (Defragging(0, Nil), _) =>
+ transition(Idle)
+ Some(defragDecoder(packet, Nil, Nil))
+
+ // header packet, some sets expected to follow
+ case (Defragging(expected, Nil), _) =>
+ transition(Defragging(expected, Seq(Seq(packet), Nil)))
+ None
+
+ // first set complete, no sets expected to follow
+ case (Defragging(1, Seq(header, xs)), Packet.EofByte) =>
+ transition(Idle)
+ Some(defragDecoder(header(0), xs.reverse, Nil))
+
+ // first set complete, 1 set expected to follow
+ case (Defragging(2, Seq(header, xs)), Packet.EofByte) =>
+ transition(Defragging(2, Seq(header, xs, Nil)))
+ None
+
+ // prepend onto first set
+ case (Defragging(expected, Seq(header, xs)), _) =>
+ transition(Defragging(expected, Seq(header, packet +: xs)))
+ None
+
+ // second set complete - no sets can follow.
+ case (Defragging(2, Seq(header, xs, ys)), Packet.EofByte) =>
+ transition(Idle)
+ Some(defragDecoder(header(0), xs.reverse, ys.reverse))
+
+ // prepend onto second set
+ case (Defragging(2, Seq(header, xs, ys)), _) =>
+ transition(Defragging(2, Seq(header, xs, packet +: ys)))
+ None
+
+ case _ =>
+ throw new ClientError("Endec: Unexpected state when defragmenting packets.")
+ }
+}
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

very nice. FOR LATER: i think we could improve this a bit further by embedding expectOK and defragDecoder into the state as well; but let’s leave it as-is for now. (I find it much easier to understand than the previous version)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
44 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/PacketFrameDecoder.scala
@@ -0,0 +1,44 @@
+package com.twitter.finagle.mysql.codec
+
+import com.twitter.finagle.mysql.protocol.{Packet, BufferReader}
+import com.twitter.finagle.mysql.util.BufferUtil
+import com.twitter.logging.Logger
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
+import org.jboss.netty.handler.codec.frame.FrameDecoder
+
+/**
+ * Decodes logical MySQL packets that could be fragmented across
+ * frames. MySQL packets are a length encoded set of bytes written
+ * in little endian byte order.
+ */
+class PacketFrameDecoder extends FrameDecoder {
+ private[this] val log = Logger("finagle-mysql")
+ override def decode(ctx: ChannelHandlerContext, channel: Channel, buffer: ChannelBuffer): Packet = {
+ if (buffer.readableBytes < Packet.HeaderSize)
+ return null
+
+ buffer.markReaderIndex()
+
+ val header = new Array[Byte](Packet.HeaderSize)
+ buffer.readBytes(header)
+ val br = BufferReader(header)
+
+ val length = br.readInt24()
+ val seq = br.readUnsignedByte()
+
+ if (buffer.readableBytes < length) {
+ buffer.resetReaderIndex()
+ return null
+ }
+
+ log.debug("<- Decoding MySQL packet (length=%d, seq=%d)".format(length, seq))
+
+ val body = new Array[Byte](length)
+ buffer.readBytes(body)
+
+ log.debug(BufferUtil.hex(body))
+
+ Packet(length, seq, body)
+ }
+}
View
469 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Buffer.scala
@@ -0,0 +1,469 @@
+package com.twitter.finagle.mysql.protocol
+
+import com.twitter.finagle.mysql.ClientError
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.ChannelBuffers._
+import java.nio.charset.{Charset => JCharset}
+import java.nio.ByteOrder
+
+/**
+ * The BufferReader and BufferWriter interfaces provide methods for
+ * reading/writing primitive data types exchanged between the client/server.
+ * This includes all primitive numeric types and strings (null-terminated and length coded).
+ * All Buffer methods are side-effecting. That is, each call to a read* or write*
+ * method will increase the current offset.
+ *
+ * Both BufferReader and BufferWriter assume bytes are written
+ * in little endian. This conforms with the MySQL protocol.
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

The reader will be curious as to why you wouldn’t just use a ChannelBuffer (why didn’t you?)

@roanta
Twitter, Inc. member
roanta added a line comment Aug 10, 2012

There are several reason why I thought it would be better to wrap ChannelBuffer and provide a separate interface.

  1. Each time a user intends to read from a MySQL packet, a ChannelBuffer needs to be created with the correct ByteOrder. It just seemed more naturally to offer an interface specific to the codec that assures this.

  2. There are specific methods that a ChannelBuffer doesn't offer and are at the core of the protocol (readLengthCodedString/Bytes and writeLengthCodedString/Bytes). This could have easily been offered as a method in a an object ex. Buffer.readLengthCodedString(c: ChannelBuffer). Again, I thought it would be more naturally to provide this as part of an interface.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ */
+
+object Buffer {
+ val NULL_LENGTH = -1 // denotes a SQL NULL value when reading a length coded binary.
+ val EMPTY_STRING = new String
+ val EMPTY_BYTE_ARRAY = new Array[Byte](0)
+
+ case object CorruptBufferException
+ extends Exception("Corrupt data or client/server are out of sync.")
+
+ /**
+ * Calculates the size required to store a length
+ * according to the MySQL protocol for length coded
+ * binary.
+ */
+ def sizeOfLen(l: Long) =
+ if (l < 251) 1 else if (l < 65536) 3 else if (l < 16777216) 4 else 9
+
+ /**
+ * Wraps the arrays into a ChannelBuffer with the
+ * appropriate MySQL protocol byte order. A wrappedBuffer
+ * avoids copying the underlying arrays.
+ */
+ def toChannelBuffer(bytes: Array[Byte]*) =
+ wrappedBuffer(ByteOrder.LITTLE_ENDIAN, bytes: _*)
+}
+
+trait BufferReader {
+ /**
+ * Buffer capacity.
+ */
+ def capacity: Int = array.size
+
+ /**
+ * Current offset in the buffer.
+ */
+ def offset: Int
+
+ /**
+ * Access the underlying array. Note, this
+ * is not always a safe operation because the
+ * the buffer could contain a composition of
+ * arrays, in which case this will throw an
+ * exception.
+ */
+ def array: Array[Byte]
+
+ /**
+ * Denotes if the buffer is readable upto the given width
+ * based on the current offset.
+ */
+ def readable(width: Int): Boolean
+
+ def readByte(): Byte
+ def readUnsignedByte(): Short
+ def readShort(): Short
+ def readUnsignedShort(): Int
+ def readInt24(): Int
+ def readUnsignedInt24(): Int
+ def readInt(): Int
+ def readUnsignedInt(): Long
+ def readLong(): Long
+ def readFloat(): Float
+ def readDouble(): Double
+
+ /**
+ * Increases offset by n.
+ */
+ def skip(n: Int): Unit
+
+ /**
+ * Consumes the rest of the buffer and returns
+ * it in a new Array[Byte].
+ * @return Array[Byte] containing the rest of the buffer.
+ */
+ def takeRest(): Array[Byte] = take(capacity - offset)
+
+ /**
+ * Consumes n bytes in the buffer and
+ * returns them in a new Array.
+ * @return An Array[Byte] containing bytes from offset to offset+n
+ */
+ def take(n: Int): Array[Byte]
+
+ /**
+ * Reads a MySQL data field. A variable-length numeric value.
+ * Depending on the first byte, reads a different width from
+ * the buffer. For more info, refer to MySQL Client/Server protocol
+ * documentation.
+ * @return a numeric value representing the number of
+ * bytes expected to follow.
+ */
+ def readLengthCodedBinary(): Int = {
+ val firstByte = readUnsignedByte()
+ if (firstByte < 251)
+ firstByte
+ else
+ firstByte match {
+ case 251 => Buffer.NULL_LENGTH
+ case 252 => readUnsignedShort()
+ case 253 => readUnsignedInt24()
+
+ // 254 Indicates a set of bytes with length >= 2^24.
+ // The current implementation does not support
+ // this.
+ case 254 =>
+ throw new ClientError("BufferReader: LONG_BLOB is not supported!")
+ // readLong()
+
+ case _ =>
+ throw Buffer.CorruptBufferException
+ }
+ }
+
+ /**
+ * Reads a null-terminated string where
+ * null is denoted by '\0'. Uses Charset.defaultCharset
+ * to decode strings.
+ * @return a null-terminated String starting at offset.
+ */
+ def readNullTerminatedString(): String = {
+ val start = offset
+ var length = 0
+
+ while (readByte() != 0x00)
+ length += 1
+
+ this.toString(start, length, Charset.defaultCharset)
+ }
+
+ /**
+ * Reads a length encoded string according to the MySQL
+ * Client/Server protocol. Uses Charset.defaultCharset to
+ * decode strings. For more details refer to MySQL
+ * documentation.
+ * @return a MySQL length coded String starting at
+ * offset.
+ */
+ def readLengthCodedString(): String = {
+ val length = readLengthCodedBinary()
+ if (length == Buffer.NULL_LENGTH)
+ null
+ else if (length == 0)
+ Buffer.EMPTY_STRING
+ else {
+ val start = offset
+ skip(length)
+ this.toString(start, length, Charset.defaultCharset)
+ }
+ }
+
+ /**
+ * Reads a length encoded set of bytes according to the MySQL
+ * Client/Server protocol. This is indentical to a length coded
+ * string except the bytes are returned raw.
+ * @return an Array[Byte] containing the length coded set of
+ * bytes starting at offset.
+ */
+ def readLengthCodedBytes(): Array[Byte] = {
+ val len = readLengthCodedBinary()
+ if (len == Buffer.NULL_LENGTH)
+ null
+ else if (len == 0)
+ Buffer.EMPTY_BYTE_ARRAY
+ else
+ take(len)
+ }
+
+ /**
+ * Returns the bytes from start to start+length
+ * into a string using the given java.nio.charset.Charset.
+ */
+ def toString(start: Int, length: Int, charset: JCharset) =
+ new String(array, start, length, charset)
+
+ /**
+ * Returns a Netty ChannelBuffer representing
+ * the underlying array. The ChannelBuffer
+ * is guaranteed ByteOrder.LITTLE_ENDIAN.
+ */
+ def toChannelBuffer: ChannelBuffer
+}
+
+object BufferReader {
+ /**
+ * Creates a BufferReader from an Array[Byte].
+ * @param bytes Byte array to read from.
+ * @param startOffset initial offset.
+ */
+ def apply(bytes: Array[Byte], startOffset: Int = 0): BufferReader = {
+ require(bytes != null)
+ require(startOffset >= 0)
+
+ val underlying = Buffer.toChannelBuffer(bytes)
+ underlying.readerIndex(startOffset)
+ new BufferReaderImpl(underlying)
+ }
+
+ /**
+ * Creates a BufferReader from a Netty ChannelBuffer.
+ * The ChannelBuffer must have ByteOrder.LITTLE_ENDIAN.
+ */
+ def apply(underlying: ChannelBuffer): BufferReader = {
+ require(underlying.order == ByteOrder.LITTLE_ENDIAN)
+ new BufferReaderImpl(underlying)
+ }
+
+ /**
+ * BufferReader implementation backed by a Netty ChannelBuffer.
+ */
+ private[this] class BufferReaderImpl(underlying: ChannelBuffer) extends BufferReader {
+ override def capacity = underlying.capacity
+ def offset = underlying.readerIndex
+ def array = underlying.array
+
+ def readable(width: Int) = underlying.readableBytes >= width
+
+ def readByte(): Byte = underlying.readByte()
+ def readUnsignedByte(): Short = underlying.readUnsignedByte()
+ def readShort(): Short = underlying.readShort()
+ def readUnsignedShort(): Int = underlying.readUnsignedShort()
+ def readInt24(): Int = underlying.readMedium()
+ def readUnsignedInt24(): Int = underlying.readUnsignedMedium()
+ def readInt(): Int = underlying.readInt()
+ def readUnsignedInt(): Long = underlying.readUnsignedInt()
+ def readLong(): Long = underlying.readLong()
+ def readFloat() = underlying.readFloat()
+ def readDouble() = underlying.readDouble()
+
+ def skip(n: Int) = underlying.skipBytes(n)
+
+ def take(n: Int) = {
+ val res = new Array[Byte](n)
+ underlying.readBytes(res)
+ res
+ }
+
+ /**
+ * Forward to ChannelBuffer in case underlying is a composition of
+ * arrays.
+ */
+ override def toString(start: Int, length: Int, charset: JCharset) =
+ underlying.toString(start, length, charset)
+
+ def toChannelBuffer = underlying
+ }
+}
+
+trait BufferWriter {
+ /**
+ * Buffer capacity.
+ */
+ def capacity: Int = array.size
+
+ /**
+ * Current writer offset.
+ */
+ def offset: Int
+
+ /**
+ * Access the underlying array. Note, this
+ * is not always a safe operation because the
+ * the buffer could contain a composition of
+ * arrays, in which case this will throw an
+ * exception.
+ */
+ def array: Array[Byte]
+
+ /**
+ * Denotes if the buffer is writable upto the given width
+ * based on the current offset.
+ */
+ def writable(width: Int): Boolean
+
+ def writeBoolean(b: Boolean): BufferWriter
+ def writeByte(n: Int): BufferWriter
+ def writeShort(n: Int): BufferWriter
+ def writeInt24(n: Int): BufferWriter
+ def writeInt(n: Int): BufferWriter
+ def writeLong(n: Long): BufferWriter
+ def writeFloat(f: Float): BufferWriter
+ def writeDouble(d: Double): BufferWriter
+
+ def skip(n: Int): BufferWriter
+
+ /**
+ * Fills the rest of the buffer with the given byte.
+ * @param b Byte used to fill.
+ */
+ def fillRest(b: Byte) = fill(capacity - offset, b)
+
+ /**
+ * Fills the buffer from current offset to offset+n with b.
+ * @param n width to fill
+ * @param b Byte used to fill.
+ */
+ def fill(n: Int, b: Byte) = {
+ (offset until offset + n) foreach { j => writeByte(b) }
+ this
+ }
+
+ /**
+ * Writes bytes onto the buffer.
+ * @param bytes Array[Byte] to copy onto the buffer.
+ */
+ def writeBytes(bytes: Array[Byte]): BufferWriter
+
+ /**
+ * Writes a length coded binary according the the MySQL
+ * Client/Server protocol. Refer to MySQL documentation for
+ * more information.
+ */
+ def writeLengthCodedBinary(length: Long): BufferWriter = {
+ if (length < 251) {
+ writeByte(length.toInt)
+ } else if (length < 65536) {
+ writeByte(252)
+ writeShort(length.toInt)
+ } else if (length < 16777216) {
+ writeByte(253)
+ writeInt24(length.toInt)
+ } else {
+ writeByte(254)
+ writeLong(length)
+ }
+ }
+
+ /**
+ * Writes a null terminated string onto the buffer where
+ * '\0' denotes null. Uses Charset.defaultCharset to decode the given
+ * String.
+ * @param s String to write.
+ */
+ def writeNullTerminatedString(s: String): BufferWriter = {
+ writeBytes(s.getBytes(Charset.defaultCharset))
+ writeByte('\0')
+ this
+ }
+
+ /**
+ * Writes a length coded string using the MySQL Client/Server
+ * protocol. Uses Charset.defaultCharset to decode the given
+ * String.
+ * @param s String to write to buffer.
+ */
+ def writeLengthCodedString(s: String): BufferWriter = {
+ writeLengthCodedBinary(s.length)
+ writeBytes(s.getBytes(Charset.defaultCharset))
+ this
+ }
+
+ /**
+ * Writes a length coded set of bytes according to the MySQL
+ * client/server protocol.
+ */
+ def writeLengthCodedBytes(bytes: Array[Byte]): BufferWriter = {
+ writeLengthCodedBinary(bytes.length)
+ writeBytes(bytes)
+ this
+ }
+
+ /**
+ * Returns a Netty ChannelBuffer representing
+ * the underlying buffer. The ChannelBuffer
+ * is guaranteed ByteOrder.LITTLE_ENDIAN.
+ */
+ def toChannelBuffer: ChannelBuffer
+}
+
+object BufferWriter {
+ /**
+ * Creates a BufferWriter from an Array[Byte].
+ * @param bytes Byte array to read from.
+ * @param startOffset initial offset.
+ */
+ def apply(bytes: Array[Byte], startOffset: Int = 0): BufferWriter = {
+ require(bytes != null)
+ require(startOffset >= 0)
+
+ // Note, a wrappedBuffer avoids copying the the array.
+ val underlying = Buffer.toChannelBuffer(bytes)
+ underlying.writerIndex(startOffset)
+ new BufferWriterImpl(underlying)
+ }
+
+ /**
+ * Creates a BufferWriter from a Netty ChannelBuffer.
+ */
+ def apply(underlying: ChannelBuffer): BufferWriter = {
+ require(underlying.order == ByteOrder.LITTLE_ENDIAN)
+ new BufferWriterImpl(underlying)
+ }
+
+ /**
+ * BufferWriter implementation backed by a Netty ChannelBuffer.
+ */
+ private[this] class BufferWriterImpl(underlying: ChannelBuffer) extends BufferWriter {
+ override def capacity = underlying.capacity
+ def offset = underlying.writerIndex
+ def array = underlying.array
+ def writable(width: Int = 1): Boolean = underlying.writableBytes >= width
+
+ def writeBoolean(b: Boolean): BufferWriter = if(b) writeByte(1) else writeByte(0)
+
+ def writeByte(n: Int): BufferWriter = {
+ underlying.writeByte(n)
+ this
+ }
+
+ def writeShort(n: Int): BufferWriter = {
+ underlying.writeShort(n)
+ this
+ }
+
+ def writeInt24(n: Int): BufferWriter = {
+ underlying.writeMedium(n)
+ this
+ }
+
+ def writeInt(n: Int): BufferWriter = {
+ underlying.writeInt(n)
+ this
+ }
+
+ def writeLong(n: Long): BufferWriter = {
+ underlying.writeLong(n)
+ this
+ }
+
+ def writeFloat(f: Float): BufferWriter = {
+ underlying.writeFloat(f)
+ this
+ }
+
+ def writeDouble(d: Double): BufferWriter = {
+ underlying.writeDouble(d)
+ this
+ }
+
+ def skip(n: Int) = {
+ underlying.writerIndex(offset + n)
+ this
+ }
+
+ def writeBytes(bytes: Array[Byte]) = {
+ underlying.writeBytes(bytes)
+ this
+ }
+
+ def toChannelBuffer = underlying
+ }
+}
View
57 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Capability.scala
@@ -0,0 +1,57 @@
+package com.twitter.finagle.mysql.protocol
+
+object Capability {
+ val LongPassword = 0x0001 // new more secure passwords
+ val FoundRows = 0x0002 // Found instead of affected rows
+ val LongFlag = 0x0004 // Get all column flags
+ val ConnectWithDB = 0x0008 // One can specify db on connect
+ val NoSchema = 0x0010 // Don't allow database.table.column
+ val Compress = 0x0020 // Can use compression protocol
+ val ODBC = 0x0040 // Odbc client
+ val LocalFiles = 0x0080 // Can use LOAD DATA LOCAL
+ val IgnoreSpace = 0x0100 // Ignore spaces before '('
+ val Protocol41 = 0x0200 // New 4.1 protocol
+ val Interactive = 0x0400 // This is an interactive client
+ val SSL = 0x0800 // Switch to SSL after handshake
+ val IgnoreSigPipe = 0x1000 // IGNORE sigpipes
+ val Transactions = 0x2000 // Client knows about transactions
+ val SecureConnection = 0x8000 // New 4.1 authentication
+ val MultiStatements = 0x10000 // Enable/disable multi-stmt support
+ val MultiResults = 0x20000 // Enable/disable multi-results */
+
+ val CapabilityMap = Map(
+ "CLIENT_LONG_PASSWORD" -> LongPassword,
+ "CLIENT_FOUND_ROWS" -> FoundRows,
+ "CLIENT_LONG_FLAG" -> LongFlag,
+ "CLIENT_CONNECT_WITH_DB" -> ConnectWithDB,
+ "CLIENT_NO_SCHEMA" -> NoSchema,
+ "CLIENT_COMPRESS" -> Compress,
+ "CLIENT_ODBC" -> ODBC,
+ "CLIENT_LOCAL_FILES" -> LocalFiles,
+ "CLIENT_IGNORE_SPACE" -> IgnoreSpace,
+ "CLIENT_PROTOCOL_41" -> Protocol41,
+ "CLIENT_INTERACTIVE" -> Interactive,
+ "CLIENT_SSL" -> SSL,
+ "CLIENT_IGNORE_SIGPIPE" -> IgnoreSigPipe,
+ "CLIENT_TRANSACTIONS" -> Transactions,
+ "CLIENT_SECURE_CONNECTION" -> SecureConnection,
+ "CLIENT_MULTI_STATEMENTS" -> MultiStatements,
+ "CLIENT_MULTI_RESULTS" -> MultiResults
+ )
+
+ def apply(flags: Int*): Capability = {
+ val m = flags.foldLeft(0)(_|_)
+ Capability(m)
+ }
+}
+
+case class Capability(mask: Int) {
+ def +(flag: Int) = Capability(mask, flag)
+ def -(flag: Int) = Capability(mask & ~flag)
+ def has(flag: Int) = hasAll(flag)
+ def hasAll(flags: Int*) = flags map {f: Int => (f & mask) > 0} reduceLeft {_ && _}
+ override def toString() = {
+ val cs = Capability.CapabilityMap filter {t => has(t._2)} map {_._1} mkString(", ")
+ "Capability(" + mask + ": " + cs + ")"
+ }
+}
View
63 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Charset.scala
@@ -0,0 +1,63 @@
+package com.twitter.finagle.mysql.protocol
+
+import java.nio.charset.{Charset => JCharset}
+
+object Charset {
+ /**
+ * Default Charset to use when decoding strings.
+ */
+ val defaultCharset = JCharset.forName("UTF-8")
+
+ /**
+ * MySQL UTF-8 Collations.
+ */
+ val Utf8_bin = 83.toShort
+ val Utf8_czech_ci = 202.toShort
+ val Utf8_danish_ci = 203.toShort
+ val Utf8_esperanto_ci = 209.toShort
+ val Utf8_estonian_ci = 198.toShort
+ val Utf8_general_ci = 33.toShort
+ val Utf8_general_mysql500_ci = 223.toShort
+ val Utf8_hungarian_ci = 210.toShort
+ val Utf8_icelandic_ci = 193.toShort
+ val Utf8_latvian_ci = 194.toShort
+ val Utf8_lithuanian_ci = 204.toShort
+ val Utf8_persian_ci = 208.toShort
+ val Utf8_polish_ci = 197.toShort
+ val Utf8_romanian_ci = 195.toShort
+ val Utf8_roman_ci = 207.toShort
+ val Utf8_sinhala_ci = 211.toShort
+ val Utf8_slovak_ci = 205.toShort
+ val Utf8_spanish2_ci = 206.toShort
+ val Utf8_spanish_ci = 199.toShort
+ val Utf8_swedish_ci = 200.toShort
+ val Utf8_turkish_ci = 201.toShort
+ val Utf8_unicode_ci = 192.toShort
+
+ private[this] val Utf8Set = Set(
+ Utf8_bin,
+ Utf8_czech_ci,
+ Utf8_danish_ci,
+ Utf8_esperanto_ci,
+ Utf8_estonian_ci,
+ Utf8_general_ci,
+ Utf8_general_mysql500_ci,
+ Utf8_hungarian_ci,
+ Utf8_icelandic_ci,
+ Utf8_latvian_ci,
+ Utf8_lithuanian_ci,
+ Utf8_persian_ci,
+ Utf8_polish_ci,
+ Utf8_romanian_ci,
+ Utf8_roman_ci,
+ Utf8_sinhala_ci,
+ Utf8_slovak_ci,
+ Utf8_spanish2_ci,
+ Utf8_spanish_ci,
+ Utf8_swedish_ci,
+ Utf8_turkish_ci,
+ Utf8_unicode_ci
+ )
+
+ def isUTF8(code: Short) = Utf8Set.contains(code)
+}
View
93 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Handshake.scala
@@ -0,0 +1,93 @@
+package com.twitter.finagle.mysql.protocol
+
+import com.twitter.finagle.mysql.protocol.Capability._
+import java.security.MessageDigest
+
+/**
+ * Initial Result received from server during handshaking.
+ */
+case class ServersGreeting(
+ protocol: Byte,
+ version: String,
+ threadId: Int,
+ salt: Array[Byte], // 20 bytes from 2 different fields
+ serverCap: Capability,
+ charset: Short,
+ status: Short
+) extends Result
+
+object ServersGreeting {
+ def decode(packet: Packet): ServersGreeting = {
+ val br = BufferReader(packet.body)
+ val protocol = br.readByte()
+ val version = br.readNullTerminatedString()
+ val threadId = br.readInt()
+ val salt1 = br.take(8)
+ br.skip(1) // 1 filler byte always 0x00
+ val serverCap = Capability(br.readUnsignedShort())
+ val charset = br.readUnsignedByte()
+ val status = br.readShort()
+ br.skip(13)
+ val salt2 = br.take(12)
+
+ ServersGreeting(
+ protocol,
+ version,
+ threadId,
+ Array.concat(salt1, salt2),
+ serverCap,
+ charset,
+ status
+ )
+ }
+}
+
+/**
+ * Reply to ServerGreeting sent during handshaking phase.
+ */
+case class LoginRequest(
+ username: String,
+ password: String,
+ database: Option[String],
+ clientCap: Capability,
+ salt: Array[Byte],
+ serverCap: Capability,
+ charset: Short = Charset.Utf8_general_ci,
+ maxPacket: Int = 0x10000000
+) extends Request(seq = 1) {
+ private[this] val fixedBodySize = 34
+ private[this] val dbNameSize = database map { _.size+1 } getOrElse(0)
+ private[this] val dataSize = username.size + hashPassword.size + dbNameSize + fixedBodySize
+ lazy val hashPassword = encryptPassword(password, salt)
+
+ override val data = {
+ val bw = BufferWriter(new Array[Byte](dataSize))
+ val capability = if (dbNameSize == 0) clientCap - ConnectWithDB else clientCap
+ bw.writeInt(capability.mask)
+ bw.writeInt(maxPacket)
+ bw.writeByte(charset)
+ bw.fill(23, 0.toByte) // 23 reserved bytes - zeroed out
+ bw.writeNullTerminatedString(username)
+ bw.writeLengthCodedBytes(hashPassword)
+ if (clientCap.has(ConnectWithDB) && serverCap.has(ConnectWithDB))
+ bw.writeNullTerminatedString(database.get)
+
+ bw.toChannelBuffer
+ }
+
+ private[this] def encryptPassword(password: String, salt: Array[Byte]) = {
+ val md = MessageDigest.getInstance("SHA-1")
+ val hash1 = md.digest(password.getBytes(Charset.defaultCharset.displayName))
+ md.reset()
+ val hash2 = md.digest(hash1)
+ md.reset()
+ md.update(salt)
+ md.update(hash2)
+
+ val digest = md.digest()
+ (0 until digest.length) foreach { i =>
+ digest(i) = (digest(i) ^ hash1(i)).toByte
+ }
+ digest
+ }
+}
View
42 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Packet.scala
@@ -0,0 +1,42 @@
+package com.twitter.finagle.mysql.protocol
+
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.ChannelBuffers._
+
+/**
+ * Represents a logical packet received from MySQL.
+ * A MySQL packet consists of a header and body.
+ */
+case class Packet(header: PacketHeader, body: Array[Byte])
+
+case class PacketHeader(size: Int, seq: Short) {
+ lazy val toChannelBuffer = {
+ val bw = BufferWriter(new Array[Byte](Packet.HeaderSize))
+ bw.writeInt24(size)
+ bw.writeByte(seq)
+ bw.toChannelBuffer
+ }
+}
+
+object Packet {
+ val HeaderSize = 0x04
+ val OkByte = 0x00.toByte
+ val ErrorByte = 0xFF.toByte
+ val EofByte = 0xFE.toByte
+
+ def apply(size: Int, seq: Short, body: Array[Byte]): Packet =
+ Packet(PacketHeader(size, seq), body)
+
+ /**
+ * Creates a MySQL Packet as a Netty
+ * ChannelBuffer. Useful for sending a Packet
+ * down stream through Netty.
+ */
+ def toChannelBuffer(size: Int, seq: Short, body: ChannelBuffer) = {
+ val headerBuffer = PacketHeader(size, seq).toChannelBuffer
+ wrappedBuffer(headerBuffer, body)
+ }
+
+ def toChannelBuffer(size: Int, seq: Short, body: Array[Byte]): ChannelBuffer =
+ toChannelBuffer(size, seq, wrappedBuffer(body))
+}
View
74 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/PreparedStatement.scala
@@ -0,0 +1,74 @@
+package com.twitter.finagle.mysql.protocol
+
+import com.twitter.util.Promise
+
+case class PreparedStatement(statementId: Int, numberOfParams: Int) extends Result {
+ val statement: Promise[String] = new Promise[String]()
+ private[this] var params: Array[Any] = new Array[Any](numberOfParams)
+ private[this] var hasNewParams: Boolean = false
+
+ def parameters: Array[Any] = params
+ def hasNewParameters: Boolean = hasNewParams
+
+ def bindParameters() = hasNewParams = false
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

for side effecting methods like this, convention dicatates the use of {}:

def bindParameters() { hasNewParams = false }

otherwise it’s difficult to discern whether you mean to do do the assignment, or if it’s a bug and you instead meant:

def bindParameters() = hasNewParams == false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ def parameters_=(arr: Array[Any]) = {
+ require(arr.size == numberOfParams, "Invalid number of parameters.")
+ hasNewParams = true
+ params = arr
+ }
+
+ def updateParameter(index: Int, value: Any): Unit = {
+ hasNewParams = true
+ params(index) = value
+ }
+}
+
+object PreparedStatement {
+ /**
+ * A prepared statement is made up of the following packets:
+ * - A PrepareOK packet with meta data about the prepared statement.
+ * - A set of field packets with parameter data if number of parameters > 0
+ * - A set of field packets with column data if number of columns > 0
+ *
+ * The pipeline does not guarantee that the two sets are in order, so
+ * we need to verify which set contains what data based on the PrepareOK packet.
+ */
+ def decode(header: Packet, seqOne: Seq[Packet], seqTwo: Seq[Packet]): PreparedStatement = {
+ val ok = PreparedOK.decode(header)
+ // The current PreparedStatement implementation does not make use of this
+ // data. However, it can be used to perform parameter type verification before
+ // sending an ExecuteRequest to the server.
+ val (paramPackets, columnPackets) = (ok.numOfParams, ok.numOfColumns) match {
+ case (0, 0) => (Nil, Nil)
+ case (0, n) if n > 0 => (Nil, seqOne)
+ case (n, 0) if n > 0 => (seqOne, Nil)
+ case (_, _) => (seqOne, seqTwo)
+ }
+
+ PreparedStatement(ok.statementId, ok.numOfParams)
+ }
+}
+
+/**
+ * Prepared statement header returned from the server
+ * in response to a prepared statement initialization request
+ * COM_STMT_PREPARE.
+ */
+case class PreparedOK(statementId: Int,
+ numOfColumns: Int,
+ numOfParams: Int,
+ warningCount: Int)
+
+object PreparedOK {
+ def decode(packet: Packet) = {
+ val br = BufferReader(packet.body, 1)
+ val stmtId = br.readInt()
+ val col = br.readUnsignedShort()
+ val params = br.readUnsignedShort()
+ br.skip(1)
+ val warningCount = br.readUnsignedShort()
+ PreparedOK(stmtId, col, params, warningCount)
+ }
+}
+
View
186 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Request.scala
@@ -0,0 +1,186 @@
+package com.twitter.finagle.mysql.protocol
+
+import com.twitter.logging.Logger
+import org.jboss.netty.buffer.ChannelBuffer
+import org.jboss.netty.buffer.ChannelBuffers._
+import scala.math.BigInt
+
+object Command {
+ val COM_SLEEP = 0x00.toByte // internal thread state
+ val COM_QUIT = 0x01.toByte // mysql_close
+ val COM_INIT_DB = 0x02.toByte // mysql_select_db
+ val COM_QUERY = 0x03.toByte // mysql_real_query
+ val COM_FIELD_LIST = 0x04.toByte // mysql_list_fields
+ val COM_CREATE_DB = 0x05.toByte // mysql_create_db (deperacted)
+ val COM_DROP_DB = 0x06.toByte // mysql_drop_db (deprecated)
+ val COM_REFRESH = 0x07.toByte // mysql_refresh
+ val COM_SHUTDOWN = 0x08.toByte // mysql_shutdown
+ val COM_STATISTICS = 0x09.toByte // mysql_stat
+ val COM_PROCESS_INFO = 0x0A.toByte // mysql_list_processes
+ val COM_CONNECT = 0x0B.toByte // internal thread state
+ val COM_PROCESS_KILL = 0x0C.toByte // mysql_kill
+ val COM_DEBUG = 0x0D.toByte // mysql_dump_debug_info
+ val COM_PING = 0x0E.toByte // mysql_ping
+ val COM_TIME = 0x0F.toByte // internal thread state
+ val COM_DELAYED_INSERT = 0x10.toByte // internal thread state
+ val COM_CHANGE_USER = 0x11.toByte // mysql_change_user
+ val COM_BINLOG_DUMP = 0x12.toByte // sent by slave IO thread to req a binlog
+ val COM_TABLE_DUMP = 0x13.toByte // deprecated
+ val COM_CONNECT_OUT = 0x14.toByte // internal thread state
+ val COM_REGISTER_SLAVE = 0x15.toByte // sent by the slave to register with the master (optional)
+ val COM_STMT_PREPARE = 0x16.toByte // mysql_stmt_prepare
+ val COM_STMT_EXECUTE = 0x17.toByte // mysql_stmt_execute
+ val COM_STMT_SEND_LONG_DATA = 0x18.toByte // mysql_stmt_send_long_data
+ val COM_STMT_CLOSE = 0x19.toByte // mysql_stmt_close
+ val COM_STMT_RESET = 0x1A.toByte // mysql_stmt_reset
+ val COM_SET_OPTION = 0x1B.toByte // mysql_set_server_option
+ val COM_STMT_FETCH = 0x1C.toByte // mysql_stmt_fetch
+}
+
+abstract class Request(seq: Short) {
+ /**
+ * Request data translates to the body of the MySQL
+ * Packet sent to the server. This field becomes
+ * part of a compisition of ChannelBuffers. To ensure
+ * that it has the correct byte order use Buffer.toChannelBuffer(...)
+ * to create the ChannelBuffer.
+ */
+ val data: ChannelBuffer
+
+ def toChannelBuffer: ChannelBuffer =
+ Packet.toChannelBuffer(data.capacity, seq, data)
+}
+
+abstract class CommandRequest(val cmd: Byte) extends Request(0)
+
+class SimpleCommandRequest(command: Byte, buffer: Array[Byte])
+ extends CommandRequest(command) {
+ override val data = Buffer.toChannelBuffer(Array(cmd), buffer)
+}
+
+/**
+ * NOOP Request used internally by this client.
+ */
+case object ClientInternalGreet extends Request(0) {
+ override val data = EMPTY_BUFFER
+ override def toChannelBuffer = EMPTY_BUFFER
+}
+
+case object PingRequest
+ extends SimpleCommandRequest(Command.COM_PING, Buffer.EMPTY_BYTE_ARRAY)
+
+case class UseRequest(dbName: String)
+ extends SimpleCommandRequest(Command.COM_INIT_DB, dbName.getBytes)
+
+case class QueryRequest(sqlStatement: String)
+ extends SimpleCommandRequest(Command.COM_QUERY, sqlStatement.getBytes)
+
+case class PrepareRequest(sqlStatement: String)
+ extends SimpleCommandRequest(Command.COM_STMT_PREPARE, sqlStatement.getBytes)
+
+/**
+ * An Execute Request.
+ * Uses the binary protocol to build an execute request for
+ * a prepared statement.
+ */
+case class ExecuteRequest(ps: PreparedStatement, flags: Byte = 0, iterationCount: Int = 1)
+ extends CommandRequest(Command.COM_STMT_EXECUTE) {
+ private[this] val log = Logger("finagle-mysql")
+
+ private[this] def isNull(param: Any): Boolean = param match {
+ case null => true
+ case _ => false
+ }
+
+ private[this] def makeNullBitmap(parameters: List[Any], bit: Int = 0, result: BigInt = BigInt(0)): Array[Byte] =
+ parameters match {
+ case Nil => result.toByteArray.reverse // As little-endian byte array
+ case param :: rest =>
+ val bits = if (isNull(param)) result.setBit(bit) else result
+ makeNullBitmap(rest, bit+1, bits)
+ }
+
+ private[this] def writeTypeCode(param: Any, writer: BufferWriter): Unit = {
+ val typeCode = Type.getCode(param)
+
+ if (typeCode != -1)
+ writer.writeShort(typeCode)
+ else {
+ // Unsupported type. Write the error to log, and write the type as null.
+ // This allows us to safely skip writing the parameter without corrupting the buffer.
+ log.error("ExecuteRequest: Unknown parameter %s will be treated as SQL NULL.".format(param.getClass.getName))
+ writer.writeShort(Type.NULL)
+ }
+ }
+
+ /**
+ * Returns sizeof all the parameters in the List.
+ */
+ private[this] def sizeOfParameters(parameters: List[Any], size: Int = 0): Int = parameters match {
+ case Nil => size
+ case p :: rest =>
+ val typeSize = Type.sizeOf(p)
+ // We can safely convert unknown sizes to 0 because
+ // any unknown type is being sent as NULL.
+ val sizeOfParam = if (typeSize == -1) 0 else typeSize
+ sizeOfParameters(rest, size + sizeOfParam)
+ }
+
+ /**
+ * Writes the parameter into its MySQL binary representation.
+ */
+ private[this] def writeParam(param: Any, writer: BufferWriter) = param match {
+ case s: String => writer.writeLengthCodedString(s)
+ case b: Boolean => writer.writeBoolean(b)
+ case b: Byte => writer.writeByte(b)
+ case s: Short => writer.writeShort(s)
+ case i: Int => writer.writeInt(i)
+ case l: Long => writer.writeLong(l)
+ case f: Float => writer.writeFloat(f)
+ case d: Double => writer.writeDouble(d)
+ case b: Array[Byte] => writer.writeBytes(b)
+ // Dates
+ case t: java.sql.Timestamp => TimestampValue.write(t, writer)
+ case d: java.sql.Date => DateValue.write(d, writer)
+ case d: java.util.Date => TimestampValue.write(new java.sql.Timestamp(d.getTime), writer)
+ case _ => writer // skip null and uknown values
+ }
+
+ override val data = {
+ val bw = BufferWriter(new Array[Byte](10))
+ bw.writeByte(cmd)
+ bw.writeInt(ps.statementId)
+ bw.writeByte(flags)
+ bw.writeInt(iterationCount)
+
+ val paramsList = ps.parameters.toList
+ val nullBytes = makeNullBitmap(paramsList)
+ val newParamsBound: Byte = if (ps.hasNewParameters) 1 else 0
+
+ val initialBuffer = Buffer.toChannelBuffer(bw.array, nullBytes, Array(newParamsBound))
+
+ // convert parameters to binary representation.
+ val sizeOfParams = sizeOfParameters(paramsList)
+ val values = BufferWriter(new Array[Byte](sizeOfParams))
+ paramsList foreach { writeParam(_, values) }
+
+ // parameters are tagged on to the end of the buffer
+ // after types or initialBuffer depending if the prepared statement
+ // has new parameters.
+ if (ps.hasNewParameters) {
+ // only add type data if the prepared statement has new parameters.
+ val types = BufferWriter(new Array[Byte](ps.numberOfParams * 2))
+ paramsList foreach { writeTypeCode(_, types) }
+ wrappedBuffer(initialBuffer, types.toChannelBuffer, values.toChannelBuffer)
+ } else
+ wrappedBuffer(initialBuffer, values.toChannelBuffer)
+ }
+}
+
+case class CloseRequest(ps: PreparedStatement) extends CommandRequest(Command.COM_STMT_CLOSE) {
+ override val data = {
+ val bw = BufferWriter(new Array[Byte](5))
+ bw.writeByte(cmd).writeInt(ps.statementId)
+ bw.toChannelBuffer
+ }
+}
View
69 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Result.scala
@@ -0,0 +1,69 @@
+package com.twitter.finagle.mysql.protocol
+
+trait Result
+
+/**
+ * Represents the OK Packet received from the server. It is sent
+ * to indicate that a command has completed succesfully. The following
+ * commands receive OK packets:
+ * - COM_PING
+ * - COM_QUERY (INSERT, UPDATE, or ALTER TABLE)
+ * - COM_REFRESH
+ * - COM_REGISTER_SLAVE
+ */
+case class OK(affectedRows: Long,
+ insertId: Long,
+ serverStatus: Int,
+ warningCount: Int,
+ message: String) extends Result
+
+object OK {
+ def decode(packet: Packet) = {
+ // start reading after flag byte
+ val br = BufferReader(packet.body, 1)
+ OK(
+ br.readLengthCodedBinary(),
+ br.readLengthCodedBinary(),
+ br.readUnsignedShort(),
+ br.readUnsignedShort(),
+ new String(br.takeRest())
+ )
+ }
+}
+
+/**
+ * Used internally to synthesize a response from
+ * the server when sending a prepared statement
+ * CloseRequest
+ */
+object CloseStatementOK extends OK(0,0,0,0, "Internal Close OK")
+
+/**
+ * Represents the Error Packet received from the server
+ * and the data sent along with it.
+ */
+case class Error(code: Short, sqlState: String, message: String) extends Result
+
+object Error {
+ def decode(packet: Packet) = {
+ // start reading after flag byte
+ val br = BufferReader(packet.body, 1)
+ val code = br.readShort()
+ val state = new String(br.take(6))
+ val msg = new String(br.takeRest())
+ Error(code, state, msg)
+ }
+}
+
+/**
+ * Represents and EOF result received from the server which
+ * contains any warnings and the server status.
+ */
+case class EOF(warnings: Short, serverStatus: Short) extends Result
+
+object EOF {
+ def decode(packet: Packet) = {
+ val br = BufferReader(packet.body, 1)
+ EOF(br.readShort(), br.readShort())
+ }
+}
View
179 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/ResultSet.scala
@@ -0,0 +1,179 @@
+package com.twitter.finagle.mysql.protocol
+
+import com.twitter.finagle.mysql.ClientError
+import com.twitter.logging.Logger
+import scala.math.BigInt
+import java.sql.{Timestamp, Date}
+
+trait ResultSet extends Result {
+ val fields: Seq[Field]
+ val rows: Seq[Row]
+}
+
+class SimpleResultSet(val fields: Seq[Field], val rows: Seq[Row]) extends ResultSet {
+ override def toString = {
+ val header = fields map { _.id } mkString("\t")
+ val content = rows map { _.values.mkString("\t") } mkString("\n")
+ header + "\n" + content
+ }
+}
+
+object ResultSet {
+ def decode(isBinaryEncoded: Boolean)(header: Packet, fieldPackets: Seq[Packet], rowPackets: Seq[Packet]) = {
+ val fields = fieldPackets map { Field.decode(_) }
+
+ // A name -> index map used to allow quick lookups for rows based on name.
+ val indexMap = fields.map(_.id).zipWithIndex.toMap
+
+ /**
+ * Rows can be encoded as Strings or Binary depending
+ * on if the ResultSet is created by a normal query or
+ * a prepared statement, respectively.
+ */
+ val rows = rowPackets map { p: Packet =>
+ if (!isBinaryEncoded)
+ new StringEncodedRow(p.body, fields, indexMap)
+ else
+ new BinaryEncodedRow(p.body, fields, indexMap)
+ }
+
+ new SimpleResultSet(fields, rows)
+ }
+}
+
+/**
+ * Defines an interface that allows for easily
+ * decoding a row into its appropriate values.
+ */
+trait Row {
+ /**
+ * Contains a Field object for each
+ * Column in the Row.
+ */
+ val fields: Seq[Field]
+
+ /** The values for this Row. */
+ val values: IndexedSeq[Value]
+
+ /**
+ * Retrieves the index of the column with the given
+ * name.
+ * @param columnName name of the column.
+ * @return Some(Int) if the column
+ * exists with the given name. Otherwise, None.
+ */
+ def indexOf(columnName: String): Option[Int]
+
+ /**
+ * Retrieves the Value in the column with the
+ * given name.
+ * @param columnName name of the column.
+ * @return Some(Value) if the column
+ * exists with the given name. Otherwise, None.
+ */
+ def valueOf(columnName: String): Option[Value] =
+ valueOf(indexOf(columnName))
+
+ protected def valueOf(columnIndex: Option[Int]): Option[Value] =
+ for (idx <- columnIndex) yield values(idx)
+}
@mariusae
Twitter, Inc. member
mariusae added a line comment Aug 10, 2012

it might make sense to name "valueOf" -> "apply".

by convention in scala, maps use that for their keys, then you can access columns thus:

val row: Row
row("theColumn")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+class StringEncodedRow(row: Array[Byte], val fields: Seq[Field], indexMap: Map[String, Int]) extends Row {
+ val br = BufferReader(row)
+
+ /**
+ * Convert the string representation of each value
+ * into an appropriate Value object.
+ */
+ val values: IndexedSeq[Value] = for (idx <- 0 until fields.size) yield {
+ Value(fields(idx).fieldType, br.readLengthCodedString())
+ }
+
+ def indexOf(name: String) = indexMap.get(name)
+}
+
+class BinaryEncodedRow(row: Array[Byte], val fields: Seq[Field], indexMap: Map[String, Int]) extends Row {
+ val buffer = BufferReader(row, 1) // skip first byte
+
+ /**
+ * In a binary encoded row, null values are not sent from the
+ * server. Instead, the server sends a bit vector where
+ * each bit corresponds to the index of the column. If the bit
+ * is set, the value is null.
+ */
+ val nullBitmap: BigInt = {
+ val len = ((fields.size + 7 + 2) / 8).toInt
+ val bytesAsBigEndian = buffer.take(len).reverse
+ BigInt(bytesAsBigEndian)
+ }
+
+ /**
+ * Check if the bit is set. Note, the
+ * first 2 bits are reserved.
+ */
+ def isNull(index: Int) = nullBitmap.testBit(index + 2)
+
+ /**
+ * Convert the binary representation of each value
+ * into an appropriate Value object.
+ */
+ val values: IndexedSeq[Value] = for (idx <- 0 until fields.size) yield {
+ if (isNull(idx))
+ NullValue
+ else
+ Value(fields(idx).fieldType, buffer)
+ }
+
+ def indexOf(name: String) = indexMap.get(name)
+}
+
+/**
+ * A ResultSet contains a Field packet for each column.
+ */
+case class Field(
+ catalog: String,
+ db: String,
+ table: String,
+ origTable: String,
+ name: String,
+ origName: String,
+ charset: Short,
+ displayLength: Int,
+ fieldType: Int,
+ flags: Short,
+ decimals: Byte
+) {
+ def id: String = if (name.isEmpty) origName else name
+}
+
+object Field {
+ def decode(packet: Packet): Field = {
+ val br = BufferReader(packet.body)
+ val catalog = br.readLengthCodedString()
+ val db = br.readLengthCodedString()
+ val table = br.readLengthCodedString()
+ val origTable = br.readLengthCodedString()
+ val name = br.readLengthCodedString()
+ val origName = br.readLengthCodedString()
+ br.skip(1) // filler
+ val charset = br.readShort()
+ val length = br.readInt()
+ val fieldType = br.readUnsignedByte()
+ val flags = br.readShort()
+ val decimals = br.readByte()
+ new Field(
+ catalog,
+ db,
+ table,
+ origTable,
+ name,
+ origName,
+ charset,
+ length,
+ fieldType,
+ flags,
+ decimals
+ )
+ }
+}
+
View
107 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Type.scala
@@ -0,0 +1,107 @@
+package com.twitter.finagle.mysql.protocol
+
+object Type {
+ /** MySQL type codes */
+ val DECIMAL = 0x00;
+ val TINY = 0x01;
+ val SHORT = 0x02;
+ val LONG = 0x03;
+ val FLOAT = 0x04;
+ val DOUBLE = 0x05;
+ val NULL = 0x06;
+ val TIMESTAMP = 0x07;
+ val LONGLONG = 0x08;
+ val INT24 = 0x09;
+ val DATE = 0x0a;
+ val TIME = 0x0b;
+ val DATETIME = 0x0c;
+ val YEAR = 0x0d;
+ val NEWDATE = 0x0e;
+ val VARCHAR = 0x0f;
+ val BIT = 0x10;
+ val NEWDECIMAL = 0xf6;
+ val ENUM = 0xf7;
+ val SET = 0xf8;
+ val TINY_BLOB = 0xf9;
+ val MEDIUM_BLOB = 0xfa;
+ val LONG_BLOB = 0xfb;
+ val BLOB = 0xfc;
+ val VAR_STRING = 0xfd;
+ val STRING = 0xfe;
+ val GEOMETRY = 0xff;
+
+ /**
+ * Returns the sizeof the given parameter in
+ * its MySQL binary representation. If the size
+ * is unknown -1 is returned.
+ */
+ def sizeOf(any: Any) = any match {
+ case s: String => Buffer.sizeOfLen(s.size) + s.size
+ case b: Array[Byte] => Buffer.sizeOfLen(b.size) + b.size
+ case b: Boolean => 1
+ case b: Byte => 1
+ case s: Short => 2
+ case i: Int => 4
+ case l: Long => 8
+ case f: Float => 4
+ case d: Double => 8
+ case null => 0
+ // Date and Time
+ case t: java.sql.Timestamp => 12
+ case d: java.sql.Date => 5
+ case d: java.util.Date => 12
+ case _ => -1
+ }
+
+ /**
+ * Retrieves the MySQL type code for the
+ * given parameter. If the parameter type
+ * mapping is unknown -1 is returned.
+ */
+ def getCode(any: Any) = any match {
+ // primitives
+ case s: String => VARCHAR
+ case b: Boolean => TINY
+ case b: Byte => TINY
+ case s: Short => SHORT
+ case i: Int => LONG
+ case l: Long => LONGLONG
+ case f: Float => FLOAT
+ case d: Double => DOUBLE
+ case null => NULL
+ // blobs
+ case b: Array[Byte] if b.size <= 255 => TINY_BLOB
+ case b: Array[Byte] if b.size <= 65535 => BLOB
+ case b: Array[Byte] if b.size <= 16777215 => MEDIUM_BLOB
+
+ // No support for LONG_BLOBS. In order to implement this correctly
+ // in Java/Scala we need to represent this set of bytes as a composition
+ // of buffers.
+ // case b: Array[Byte] if b.size <= 4294967295L => LONG_BLOB
+
+ // Date and Time
+ case t: java.sql.Timestamp => TIMESTAMP
+ case d: java.sql.Date => DATE
+ case d: java.util.Date => DATETIME
+ case _ => -1
+ }
+}
+
+/**
+ * Timestamp object that can appropriately
+ * represent MySQL zero Timestamp.
+ */
+case object SQLZeroTimestamp extends java.sql.Timestamp(0) {
+ override val getTime = 0L
+ override val toString = "0000-00-00 00:00:00"
+}
+
+/**
+ * Date object that can appropriately
+ * represent MySQL zero Date.
+ */
+case object SQLZeroDate extends java.sql.Date(0) {
+ override val getTime = 0L
+ override val toString = "0000-00-00"
+}
+
View
242 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Value.scala
@@ -0,0 +1,242 @@
+package com.twitter.finagle.mysql.protocol
+
+import java.sql.{Timestamp, Date => SQLDate}
+import java.util.Calendar
+
+