Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

revert EndToEndSpec

  • Loading branch information...
commit 1136054519394a96dee9461fc86f849af1bf2960 1 parent 5179638
@bierbaum bierbaum authored
Showing with 69 additions and 97 deletions.
  1. +69 −97 src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala
View
166 src/test/scala/com/twitter/finagle/thrift/EndToEnd.scala
@@ -7,7 +7,7 @@ import java.net.{InetAddress, InetSocketAddress, Socket}
import org.specs.Specification
-import org.apache.thrift.{TProcessor, TProcessorFactory}
+import org.apache.thrift.TProcessorFactory
import org.apache.thrift.transport.{
TSocket, TServerSocket, TFramedTransport}
import org.apache.thrift.protocol.TBinaryProtocol
@@ -15,8 +15,8 @@ import org.apache.thrift.server.TSimpleServer
import org.jboss.netty.bootstrap.{ClientBootstrap, ServerBootstrap}
import org.jboss.netty.channel.{
- Channels, ChannelPipeline, ChannelPipelineFactory,
- SimpleChannelUpstreamHandler, ChannelHandlerContext, MessageEvent}
+ Channels, ChannelPipelineFactory, SimpleChannelUpstreamHandler,
+ ChannelHandlerContext, MessageEvent}
import org.jboss.netty.channel.local.{
DefaultLocalServerChannelFactory, DefaultLocalClientChannelFactory,
LocalAddress}
@@ -54,109 +54,71 @@ object EndToEndSpec extends Specification {
bloop.reverse
}
- object Codecs {
- abstract class Appender {
- def append(pipeline: ChannelPipeline): Unit = throw new AbstractMethodError
- def processor(): TProcessor = throw new AbstractMethodError
- }
-
- object ThriftProcessorHandler extends Appender {
- override def processor =
- new Silly.Processor(new SillyImpl)
-
- override def append(pipeline: ChannelPipeline) {
- val processorFactory = new TProcessorFactory(processor)
- pipeline.addLast("processor", new ThriftProcessorHandler(processorFactory))
- }
- }
-
- object ThriftCodecAndSeparateHandler extends Appender {
- override def append(pipeline: ChannelPipeline) {
- pipeline.addLast("codec", new ThriftServerCodec)
- pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- val msg = e.getMessage.asInstanceOf[ThriftCall[_, _]]
- msg match {
- case bleep: ThriftCall[Silly.bleep_args, Silly.bleep_result] =>
- val args = bleep.args.asInstanceOf[Silly.bleep_args]
- val response = bleep.newResponseInstance
- response.setSuccess(args.request.reverse)
- Channels.write(ctx.getChannel, new ThriftReply[Silly.bleep_result](response, bleep))
- case _ =>
- throw new IllegalArgumentException
- }
- }
- })
- }
- }
- }
-
// TODO: test with a traditional thrift stack over local loopback
// TCP
"client & server" should {
- def commonClientServerTests(codecUnderTest: Codecs.Appender) = {
- "talk silly to each other (%s)".format(codecUnderTest.getClass.getName) in {
- // ** Set up the server.
- val serverBootstrap = new ServerBootstrap(new DefaultLocalServerChannelFactory())
- serverBootstrap.setPipelineFactory(new ChannelPipelineFactory {
- def getPipeline() = {
-
- val pipeline = Channels.pipeline()
- pipeline.addLast("framer", new ThriftFrameCodec)
- codecUnderTest.append(pipeline)
- pipeline
- }
- })
-
- val callResults = new Promise[Silly.bleep_result]
-
- // ** Set up the client.
- val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory)
- clientBootstrap.setPipelineFactory(new ChannelPipelineFactory {
- def getPipeline() = {
- val pipeline = Channels.pipeline()
- pipeline.addLast("framer", new ThriftFrameCodec)
- pipeline.addLast("codec", new ThriftCodec)
- pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result])
- Channels.close(ctx.getChannel)
- }
- })
-
- pipeline
- }
- })
-
- val addr = new LocalAddress("thrift")
- val serverChannel = serverBootstrap.bind(addr)
- for (ch <- clientBootstrap.connect(addr)) {
- val thriftCall =
- ThriftCall[Silly.bleep_args, Silly.bleep_result](
- "bleep", new Silly.bleep_args("heyhey"))
-
- Channels.write(ch, thriftCall)
+ "talk silly to each other" in {
+ // ** Set up the server.
+ val serverBootstrap = new ServerBootstrap(new DefaultLocalServerChannelFactory())
+ serverBootstrap.setPipelineFactory(new ChannelPipelineFactory {
+ def getPipeline() = {
+ val processor = new Silly.Processor(new Silly.Iface {
+ def bleep(bloop: String): String =
+ bloop.reverse
+ })
+ val processorFactory = new TProcessorFactory(processor)
+
+ val pipeline = Channels.pipeline()
+ pipeline.addLast("framer", new ThriftFrameCodec)
+ pipeline.addLast("processor", new ThriftProcessorHandler(processorFactory))
+ pipeline
}
+ })
+
+ val callResults = new Promise[Silly.bleep_result]
+
+ // ** Set up the client.
+ val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory)
+ clientBootstrap.setPipelineFactory(new ChannelPipelineFactory {
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ pipeline.addLast("framer", new ThriftFrameCodec)
+ pipeline.addLast("codec", new ThriftCodec)
+ pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
+ override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result])
+ Channels.close(ctx.getChannel)
+ }
+ })
+
+ pipeline
+ }
+ })
+
+ val addr = new LocalAddress("thrift")
+ val serverChannel = serverBootstrap.bind(addr)
+ for (ch <- clientBootstrap.connect(addr)) {
+ val thriftCall =
+ ThriftCall[Silly.bleep_args, Silly.bleep_result](
+ "bleep", new Silly.bleep_args("heyhey"))
- val result = callResults.within(1.second)
- result.isReturn must beTrue
-
- result().success must be_==("yehyeh")
-
- // ** Shutdown
- serverChannel.close().awaitUninterruptibly()
- serverBootstrap.getFactory.releaseExternalResources()
+ Channels.write(ch, thriftCall)
}
- }
+
+ val result = callResults.within(1.second)
+ result.isReturn must beTrue
+
+ result().success must be_==("yehyeh")
- "ThriftProcessorHandler" in {
- commonClientServerTests(Codecs.ThriftProcessorHandler)
- commonClientServerTests(Codecs.ThriftCodecAndSeparateHandler)
+ // ** Shutdown
+ serverChannel.close().awaitUninterruptibly()
+ serverBootstrap.getFactory.releaseExternalResources()
}
}
"client" should {
+
"talk silly to an existing server" in {
// ** Set up a traditional thrift server.
val serverPort = PickRandomPort()
@@ -165,8 +127,12 @@ object EndToEndSpec extends Specification {
val transportFactory = new TFramedTransport.Factory
val protocolFactory = new TBinaryProtocol.Factory(true, true)
+ val processor = new Silly.Processor(new Silly.Iface {
+ def bleep(bloop: String): String =
+ bloop.reverse
+ })
val thriftServer = new TSimpleServer(
- Codecs.ThriftProcessorHandler.processor, serverSocket,
+ processor, serverSocket,
transportFactory, protocolFactory)
val callResults = new Promise[Silly.bleep_result]
@@ -187,7 +153,7 @@ object EndToEndSpec extends Specification {
Channels.close(ctx.getChannel)
}
})
-
+
pipeline
}
})
@@ -226,9 +192,15 @@ object EndToEndSpec extends Specification {
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory {
def getPipeline() = {
+ val processor = new Silly.Processor(new Silly.Iface {
+ def bleep(bloop: String): String =
+ bloop.reverse
+ })
+ val processorFactory = new TProcessorFactory(processor)
+
val pipeline = Channels.pipeline()
pipeline.addLast("framer", new ThriftFrameCodec)
- Codecs.ThriftProcessorHandler.append(pipeline)
+ pipeline.addLast("processor", new ThriftProcessorHandler(processorFactory))
pipeline
}
})
Please sign in to comment.
Something went wrong with that request. Please try again.