Skip to content

Commit

Permalink
clean up the rest of the codecs, tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusae committed Mar 24, 2011
1 parent 5524385 commit f4ed163
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 83 deletions.
12 changes: 7 additions & 5 deletions finagle-core/src/main/scala/com/twitter/finagle/http/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.jboss.netty.handler.codec.http._
import com.twitter.util.StorageUnit
import com.twitter.conversions.storage._

import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec}

case class Http(
_compressionLevel: Int = 0,
Expand All @@ -22,8 +22,8 @@ case class Http(
def maxRequestSize(bufferSize: StorageUnit) = copy(_maxRequestSize = bufferSize)
def maxResponseSize(bufferSize: StorageUnit) = copy(_maxResponseSize = bufferSize)

override val clientPipelineFactory: ChannelPipelineFactory =
new ChannelPipelineFactory {
override def clientCodec = new ClientCodec[HttpRequest, HttpResponse] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpClientCodec())
Expand All @@ -40,9 +40,10 @@ case class Http(
pipeline
}
}
}

override val serverPipelineFactory =
new ChannelPipelineFactory {
override def serverCodec = new ServerCodec[HttpRequest, HttpResponse] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpServerCodec)
Expand All @@ -65,6 +66,7 @@ case class Http(
pipeline
}
}
}
}

object Http {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.twitter.finagle.example.echo

import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec}
import org.jboss.netty.handler.codec.string.{StringEncoder, StringDecoder}
import org.jboss.netty.channel.{Channels, ChannelPipelineFactory}
import org.jboss.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecoder}
Expand All @@ -14,23 +14,27 @@ import org.jboss.netty.util.CharsetUtil
object StringCodec extends StringCodec

class StringCodec extends Codec[String, String] {
override val serverPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line",
new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
override def serverCodec = new ServerCodec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line",
new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
}
}
}

override val clientPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
pipeline
override def clientCodec = new ClientCodec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
pipeline
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.twitter.finagle.kestrel.protocol

import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec}
import org.jboss.netty.channel._
import org.jboss.netty.buffer.ChannelBuffer
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
Expand All @@ -9,8 +9,8 @@ import com.twitter.finagle.memcached.protocol.text.{Encoder, server, client}
class Kestrel extends Codec[Command, Response] {
private[this] val storageCommands = collection.Set[ChannelBuffer]("set")

override val serverPipelineFactory = {
new ChannelPipelineFactory {
override def serverCodec = new ServerCodec[Command, Response] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()

Expand All @@ -26,9 +26,8 @@ class Kestrel extends Codec[Command, Response] {
}
}


override val clientPipelineFactory = {
new ChannelPipelineFactory {
override def clientCodec = new ClientCodec[Command, Response] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.finagle.memcached.protocol.text

import client.DecodingToResponse
import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec}
import org.jboss.netty.channel._
import com.twitter.finagle.memcached.protocol._
import org.jboss.netty.buffer.ChannelBuffer
Expand All @@ -14,8 +14,8 @@ class Memcached extends Codec[Command, Response] {
private[this] val storageCommands = collection.Set[ChannelBuffer](
"set", "add", "replace", "append", "prepend")

override val serverPipelineFactory = {
new ChannelPipelineFactory {
override def serverCodec = new ServerCodec[Command, Response] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()

Expand All @@ -31,9 +31,8 @@ class Memcached extends Codec[Command, Response] {
}
}


override val clientPipelineFactory = {
new ChannelPipelineFactory {
override def clientCodec = new ClientCodec[Command, Response] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,30 @@ import org.jboss.netty.handler.codec.frame.{Delimiters, DelimiterBasedFrameDecod
import com.twitter.util.{Future, RandomSocket}
import com.twitter.conversions.time._
import org.jboss.netty.channel._
import com.twitter.finagle.{Codec, Service}
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec, Service}

class StringCodec extends Codec[String, String] {
override val serverPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line",
new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
override def serverCodec = new ServerCodec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("line",
new DelimiterBasedFrameDecoder(100, Delimiters.lineDelimiter: _*))
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8))
pipeline
}
}
}

override val clientPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
pipeline
override def clientCodec = new ClientCodec[String, String] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("stringEncode", new StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("stringDecode", new StringDecoder(CharsetUtil.UTF_8))
pipeline
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
package com.twitter.finagle.stream

import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec, ServerCodec}
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http.{HttpServerCodec, HttpClientCodec, HttpRequest}
import org.jboss.netty.buffer.ChannelBuffer

object Stream extends Stream

class Stream extends Codec[HttpRequest, com.twitter.concurrent.Channel[ChannelBuffer]] {
override val serverPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpServerCodec)
pipeline.addLast("dechannelize", new ChannelToHttpChunk)
pipeline
override def serverCodec =
new ServerCodec[HttpRequest, com.twitter.concurrent.Channel[ChannelBuffer]] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpServerCodec)
pipeline.addLast("dechannelize", new ChannelToHttpChunk)
pipeline
}
}
}
}

override val clientPipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpClientCodec)
pipeline.addLast("bechannelfy", new HttpChunkToChannel)
pipeline
override def clientCodec =
new ClientCodec[HttpRequest, com.twitter.concurrent.Channel[ChannelBuffer]] {
def pipelineFactory = new ChannelPipelineFactory {
def getPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpClientCodec)
pipeline.addLast("bechannelfy", new HttpChunkToChannel)
pipeline
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import org.apache.thrift.protocol.TProtocolFactory
import com.twitter.finagle.Codec

class ThriftClientBufferedCodec(protocolFactory: TProtocolFactory)
extends Codec[ThriftClientRequest, Array[Byte]]
extends ThriftClientFramedCodec
{
private[this] val framedCodec = new ThriftClientFramedCodec

override val clientPipelineFactory = {
val framedPipelineFactory = framedCodec.clientPipelineFactory
override def pipelineFactory = {
val framedPipelineFactory = super.pipelineFactory

new ChannelPipelineFactory {
def getPipeline() = {
Expand All @@ -23,7 +21,5 @@ class ThriftClientBufferedCodec(protocolFactory: TProtocolFactory)
}
}
}

override val serverPipelineFactory = clientPipelineFactory
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ object ThriftClientFramedCodec {
def apply() = new ThriftClientFramedCodec
}

class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]]
class ThriftClientFramedCodec extends ClientCodec[ThriftClientRequest, Array[Byte]]
{
override val clientPipelineFactory =
def pipelineFactory =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
Expand All @@ -38,10 +38,7 @@ class ThriftClientFramedCodec extends Codec[ThriftClientRequest, Array[Byte]]
}
}

override val serverPipelineFactory = clientPipelineFactory

override def prepareClientChannel(underlying: Service[ThriftClientRequest, Array[Byte]]) =
{
override def prepareService(underlying: Service[ThriftClientRequest, Array[Byte]]) = {
// Attempt to upgrade the protocol the first time around by
// sending a magic method invocation.
val buffer = new OutputBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ private[thrift] class ThriftServerTracingFilter
}
}

class ThriftServerFramedCodec extends Codec[Array[Byte], Array[Byte]] {
override val clientPipelineFactory =
class ThriftServerFramedCodec
extends ServerCodec[Array[Byte], Array[Byte]]
{
def pipelineFactory =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
Expand All @@ -107,8 +109,6 @@ class ThriftServerFramedCodec extends Codec[Array[Byte], Array[Byte]] {
}
}

override val serverPipelineFactory = clientPipelineFactory

override def wrapServerChannel(service: Service[Array[Byte], Array[Byte]]) =
(new ThriftServerTracingFilter) andThen service
override def prepareService(service: Service[Array[Byte], Array[Byte]]) =
Future.value((new ThriftServerTracingFilter) andThen service)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.thrift.async.AsyncMethodCallback
import com.twitter.test.{B, AnException, SomeStruct}
import com.twitter.util.{RandomSocket, Promise, Return, Throw, Future}

import com.twitter.finagle.Codec
import com.twitter.finagle.{Codec, ClientCodec}
import com.twitter.finagle.builder.ClientBuilder

object FinagleClientThriftServerSpec extends Specification {
Expand Down Expand Up @@ -62,7 +62,10 @@ object FinagleClientThriftServerSpec extends Specification {
}


def doit(transportFactory: TTransportFactory, codec: Codec[ThriftClientRequest, Array[Byte]]) {
def doit(
transportFactory: TTransportFactory,
codec: ClientCodec[ThriftClientRequest, Array[Byte]]
) {
"talk to each other" in {
// TODO: interleave requests (to test seqids, etc.)

Expand Down

0 comments on commit f4ed163

Please sign in to comment.