Permalink
Browse files

Includes most of the changes suggested during the code review.

  • Loading branch information...
1 parent 6e5503f commit c6f7c9e59a350d71dbda1bb0a071c3ceff0a2b00 @roanta committed Jul 27, 2012
Showing with 850 additions and 829 deletions.
  1. +32 −0 finagle-mysql/README
  2. +43 −0 finagle-mysql/pom.xml
  3. +51 −57 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Client.scala
  4. +72 −0 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Codec.scala
  5. +6 −14 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/Example.scala
  6. +155 −0 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/Endec.scala
  7. +0 −69 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/MySQL.scala
  8. +20 −14 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/PacketFrameDecoder.scala
  9. +0 −23 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/RequestEncoder.scala
  10. +0 −168 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/codec/ResultDecoder.scala
  11. +110 −89 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Buffer.scala
  12. +33 −33 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Capability.scala
  13. +23 −22 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Handshake.scala
  14. +21 −32 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Packet.scala
  15. +30 −20 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/PreparedStatement.scala
  16. +144 −114 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Request.scala
  17. +25 −25 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Result.scala
  18. +68 −86 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/ResultSet.scala
  19. +0 −23 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/protocol/Types.scala
  20. +4 −4 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/util/BufferUtil.scala
  21. +13 −36 finagle-mysql/src/main/scala/com/twitter/finagle/mysql/util/Query.scala
View
@@ -0,0 +1,32 @@
+A brief overview of finagle-mysql with references to MySQL Client/Server Protocol.
+
+** Packets
+The basic unit of communication for the MySQL Client/Server protocol is an application-layer packet. A MySQL packet consists of a header (3 byte size, 1 byte seq number) and a body. Packets can be fragmented across frames during transmission. To simplify the decoding of results received from the server, the codec includes a packet frame decoder on the pipeline.
+
+Relevant source files: Packet.scala, PacketFrameDecoder.scala
+
+** Handshake
+When a client connects to the server, the server sends a greeting. The greeting contains information about the server version, protocol, capabilities, etc. The client replies with a login request containing, among other information, authentication credentials. To ensure connections are authenticated before issued by the ServiceFactory, the codec implements prepareConnFactory with an AuthenticationProxy.
+
+Relevant source files: Handshake.scala, Codec.scala
+
+** Requests
+Command requests to the server contain a PacketHeader and a body containing the data. Note that the first byte of the body must contain a command byte denoting which command to issue. Each Request object has a data field which defines the body of the Request. Requests are translated into netty ChannelBuffers when they reach the Encoder.
+
+Relevant source files: Request.scala
+
+** Results
+finagle-mysql translates packets received from the server into Scala objects. Result packets can be distinguished by their first byte. Some result packets denote the start of a longer transmissions and need to be defragged by the decoder.
+
+Relevant source files: Endec.scala, Result.scala, ResultSet.scala, PreparedStatement.scala
+
+** Buffers
+The BufferReader and BufferWriter class provide convenient methods for reading/writing common data types exchanged between the client/server. Note, data exchanged between the client/server needs to be encoded with little-endian byte order.
+
+Relevant source files: Buffer.scala
+
+** Detailed resources about the MySQL protocol
+
+MySQL Forge - http://forge.mysql.com/wiki/MySQL_Internals_ClientServer_Protocol
+
+Understanding MySQL Internals - http://my.safaribooksonline.com/book/databases/mysql/0596009577/client-server-communication/orm9780596009571-chp-4
View
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-mysql</artifactId>
+ <packaging>jar</packaging>
+ <version>4.0.3-SNAPSHOT</version>
+ <parent>
+ <groupId>com.twitter</groupId>
+ <artifactId>scala-parent</artifactId>
+ <version>0.0.2</version>
+ <relativePath>../../parents/scala-parent/pom.xml</relativePath>
+ </parent>
+ <properties>
+ <git.dir>${project.basedir}/../../.git</git.dir>
+ </properties>
+ <dependencies>
+ <!-- library dependencies -->
+ <!-- project dependencies -->
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-core</artifactId>
+ <version>4.0.3-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>util-logging</artifactId>
+ <version>4.0.1</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
@@ -1,7 +1,6 @@
package com.twitter.finagle.mysql
import com.twitter.finagle.builder.ClientBuilder
-import com.twitter.finagle.mysql.codec.MySQL
import com.twitter.finagle.mysql.protocol._
import com.twitter.finagle.mysql.util.Query
import com.twitter.finagle.Service
@@ -10,19 +9,19 @@ import com.twitter.util.Future
object Client {
/**
- * Construct a Client given a ServiceFactory.
- */
+ * Constructs a Client given a ServiceFactory.
+ */
def apply(factory: ServiceFactory[Request, Result]): Client = {
new Client(factory)
}
/**
- * Construct a ServiceFactory using a single host.
- * @param host a String of host:port combination.
- * @param username the username used to authenticate to the mysql instance
- * @param password the password used to authenticate to the mysql instance
- * @param dbname database to initially use
- */
+ * Constructs a ServiceFactory using a single host.
+ * @param host a String of host:port combination.
+ * @param username the username used to authenticate to the mysql instance
+ * @param password the password used to authenticate to the mysql instance
+ * @param dbname database to initially use
+ */
def apply(host: String, username: String, password: String, dbname: Option[String]): Client = {
val factory = ClientBuilder()
.codec(new MySQL(username, password, dbname))
@@ -42,60 +41,55 @@ object Client {
}
class Client(factory: ServiceFactory[Request, Result]) {
- private lazy val fService = factory.apply()
+ private[this] lazy val fService = factory.apply()
/**
- * Sends a query to the server without using
- * prepared statements.
- * @param sql An sql statement to be executed.
- * @return an OK Result or a ResultSet for queries that return
- * rows.
- */
- def query(sql: String, params: Any*) = try {
- val stmt = Query.injectParams(sql, params)
- send(QueryRequest(stmt)) {
+ * Sends a query to the server without using
+ * prepared statements.
+ * @param sql An sql statement to be executed.
+ * @return an OK Result or a ResultSet for queries that return
+ * rows.
+ */
+ def query(sql: String) = send(QueryRequest(sql)) {
case rs: ResultSet => Future.value(rs)
case ok: OK => Future.value(ok)
- }
- } catch {
- case e => Future.exception(e)
}
/**
- * Runs a query that returns a result set. For each row
- * in the ResultSet, call f on the row and return the results.
- * @param sql A sql statement that returns a result set.
- * @param params: Any
- * @param f A function from ResultSet to any type T.
- * @return a Future of Seq[T]
- */
- def select[T](sql: String, params: Any*)(f: Row => T): Future[Seq[T]] = query(sql, params: _*) map {
+ * Runs a query that returns a result set. For each row
+ * in the ResultSet, call f on the row and return the results.
+ * @param sql A sql statement that returns a result set.
+ * @param params: Any
+ * @param f A function from ResultSet to any type T.
+ * @return a Future of Seq[T]
+ */
+ def select[T](sql: String)(f: Row => T): Future[Seq[T]] = query(sql) map {
case rs: ResultSet => rs.rows.map(f)
case ok: OK => Seq()
}
/**
- * Sends a query to server to be prepared for execution.
- * @param sql A query to be prepared on the server.
- * @return PreparedStatement
- */
+ * Sends a query to server to be prepared for execution.
+ * @param sql A query to be prepared on the server.
+ * @return PreparedStatement
+ */
def prepare(sql: String, params: Any*) = try {
val stmt = Query.expandParams(sql, params)
send(PrepareRequest(stmt)) {
case ps: PreparedStatement =>
ps.statement.setValue(stmt)
ps.parameters = Query.flatten(params).toArray
Future.value(ps)
- }
+ }
} catch {
case e => Future.exception(e)
}
/**
- * Execute a prepared statement.
- * @return an OK Result or a ResultSet for queries that return
- * rows.
- */
+ * Execute a prepared statement.
+ * @return an OK Result or a ResultSet for queries that return
+ * rows.
+ */
def execute(ps: PreparedStatement) = send(ExecuteRequest(ps)) {
case rs: ResultSet =>
ps.bindParameters()
@@ -106,31 +100,31 @@ object Client {
}
/**
- * Runs a query that returns a result set. For each row
- * in the ResultSet, call f on the row and return the results.
- * @param ps A prepared statement.
- * @param f A function from ResultSet to any type T.
- * @return a Future of Seq[T]
- */
+ * Runs a query that returns a result set. For each row
+ * in the ResultSet, call f on the row and return the results.
+ * @param ps A prepared statement.
+ * @param f A function from ResultSet to any type T.
+ * @return a Future of Seq[T]
+ */
def select[T](ps: PreparedStatement)(f: Row => T): Future[Seq[T]] = execute(ps) map {
case rs: ResultSet => rs.rows.map(f)
case ok: OK => Seq()
}
/**
- * Combines the prepare and select operation using prepared statements.
- * @return a Future[(PreparedStatement, Seq[T])] tuple.
- */
+ * Combines the prepare and select operation using prepared statements.
+ * @return a Future[(PreparedStatement, Seq[T])] tuple.
+ */
def prepareAndSelect[T](sql: String, params: Any*)(f: Row => T): Future[(PreparedStatement, Seq[T])] =
prepare(sql, params: _*) flatMap { ps => select(ps)(f) map {
seq => (ps, seq)
}
}
/**
- * Close a prepared statement on the server.
- * @return OK result.
- */
+ * Close a prepared statement on the server.
+ * @return OK result.
+ */
def closeStatement(ps: PreparedStatement) = send(CloseRequest(ps)) {
case ok: OK => Future.value(ok)
}
@@ -148,15 +142,15 @@ object Client {
}
/**
- * Close the ServiceFactory and its underlying resources.
- */
+ * Close the ServiceFactory and its underlying resources.
+ */
def close() = factory.close()
/**
- * Helper function to send requests to the ServiceFactory
- * and handle Error responses from the server.
- */
- private def send[T](r: Request)(handler: PartialFunction[Result, Future[T]]) =
+ * Helper function to send requests to the ServiceFactory
+ * and handle Error responses from the server.
+ */
+ private[this] def send[T](r: Request)(handler: PartialFunction[Result, Future[T]]) =
fService flatMap { service =>
service(r) flatMap (handler orElse {
case Error(c, s, m) => Future.exception(ServerError(c + " - " + m))
@@ -0,0 +1,72 @@
+package com.twitter.finagle.mysql
+
+import com.twitter.finagle._
+import com.twitter.finagle.mysql.codec.{PacketFrameDecoder, Endec}
+import com.twitter.finagle.mysql.protocol._
+import com.twitter.util.Future
+import org.jboss.netty.channel.{ChannelPipelineFactory, Channels, Channel}
+
+class MySQL(username: String, password: String, database: Option[String])
+ extends CodecFactory[Request, Result] {
+
+ def server = throw new Exception("Not yet implemented...")
+
+ def client = Function.const {
+ new Codec[Request, Result] {
+
+ def pipelineFactory = new ChannelPipelineFactory {
+ def getPipeline = {
+ val pipeline = Channels.pipeline()
+
+ pipeline.addLast("frameDecoder", new PacketFrameDecoder)
+ pipeline.addLast("EncoderDecoder", new Endec)
+
+ pipeline
+ }
+ }
+
+ // Authenticate each connection before returning it via a ServiceFactoryProxy.
+ override def prepareConnFactory(underlying: ServiceFactory[Request, Result]) =
+ new AuthenticationProxy(underlying, username, password, database)
+
+ }
+ }
+}
+
+class AuthenticationProxy(
+ underlying: ServiceFactory[Request, Result],
+ username: String,
+ password: String,
+ database: Option[String])
+ extends ServiceFactoryProxy(underlying) {
+
+ def makeLoginReq(sg: ServersGreeting) =
+ LoginRequest(username, password, database, sg.serverCap, sg.salt)
+
+ def acceptGreeting(res: Result) = res match {
+ case sg: ServersGreeting if sg.serverCap.has(Capability.Protocol41) =>
+ Future.value(sg)
+
+ case sg: ServersGreeting =>
+ Future.exception(IncompatibleServerVersion)
+
+ case r =>
+ Future.exception(new ClientError("Invalid Reply type %s".format(r.getClass.getName)))
+ }
+
+ def acceptLogin(res: Result) = res match {
+ case r: OK =>
+ Future.value(res)
+
+ case Error(c, _, m) =>
+ Future.exception(ServerError("Error when authenticating the client "+ c + " - " + m))
+ }
+
+ override def apply(conn: ClientConnection) = for {
+ service <- self(conn)
+ result <- service(ClientInternalGreet)
+ sg <- acceptGreeting(result)
+ loginRes <- service(makeLoginReq(sg))
+ _ <- acceptLogin(loginRes)
+ } yield service
+}
@@ -16,31 +16,23 @@ object Main {
val client = Client(host+":"+port, username, password, dbname)
case class City(id: Option[Int], name: Option[String], date: Option[Timestamp])
- //Select Query (Not using prepared statements) */
- client.select("SELECT * FROM cities WHERE id in (?)", 1 to 10) { row =>
+ // Select Query (Not using prepared statements) */
+ /*client.select("SELECT * FROM cities WHERE id in (?)", 1 to 10) { row =>
City(row.getInt("id"), row.getString("name"), row.getTimestamp("dateadded"))
} onSuccess {
result => println(result)
} onFailure {
case e => e.printStackTrace()
- }
-
- //Query (Not using prepared statements)
- /*client.query("INSERT INTO cities (name, dateadded) VALUES('?', NOW())", "Boston") onSuccess {
- case r: OK => println(r.insertId)
- case rs: ResultSet => println(rs)
- } onFailure {
- e => e.printStackTrace()
}*/
- //Prepared Statements
- /*client.prepareAndSelect("SELECT * FROM cities WHERE id in (?)", (1,2,3)) { row =>
+ // Prepared Statements
+ client.prepareAndSelect("SELECT * FROM cities WHERE id in (?)", (1,2,3)) { row =>
City(row.getInt("id"), row.getString("name"), row.getTimestamp("dateadded"))
} onSuccess {
- case (ps, seq) => println(seq)
+ case (ps, seq) => seq.foreach(println)
} onFailure {
e => e.printStackTrace()
- }*/
+ }
}
def parseArgs(parsed: Map[String, Any], args: List[String]): Map[String, Any] = args match {
Oops, something went wrong.

0 comments on commit c6f7c9e

Please sign in to comment.